We want to implement linearizability. What that means is, each read and write can be assigned a point in wall-clock time such that, based on those time-points, each read sees the result of its last preceding write. Also, those time-points have to be contained in the time interval after the read or write operation began and before it ended.

Each “read” and “write” only involves modifying the data related to a single (<uid> <predicate>) pair. Notably, that unit of information is contained within a single Raft group. There are no multi-reads or multi-writes. Linearizability is a property maintained separately on each (<uid> <predicate>) pair – if for each (<uid> <predicate>) pair is treated independently with linearizable semantics, then we have linearizable semantics in general.

Let’s look at things from a read’s perspective: There exists a sequence of writes, w_1, …, w_n, at times s_1, …, s_n. Our read begins at time t_b and ends at time t_e. Given i, j such that s_i <= t_b < s_{i+1} < s_{i+2} < … < s_{j-1} < s_j < t_b <= s_{j+1}, the set of “valid writes” for that read is {w_k : i <= k <= j}. The read may return the value of any valid write.

Implementation I

Suppose we’re just looking at one <uid> <predicate> pair.

When a write happens, it happens as part of a particular Raft commit. Its log index describes the point of logical time that it happened. We can assign each log index some point in wall-clock time too. Also, because we’re just looking at one <uid> <predicate>, let’s suppose, for conversational purposes, every log index has exactly one write. (Some might be no-op writes.) A “valid commit [index]” is that whose write is a valid write.

Suppose we perform a read that starts at time t_0. We can ask every node what its latest committed log index is. At some later time t_1, we’ve gotten a few responses. Every response is one whose write time happened before t_1. Any committed index greater than or equal to that received from a majority of nodes’must also have been such as of t_0, which means it’s a valid log index. Let’s call the smallest such index J.

For example, if there are 7 nodes, and we’ve received 4 responses {11, 12, 13, 15}, then J = 15. If we received a 5th response, making our set {11, 12, 13, 15, 14}, then J is now 14.

We can read from any node that has committed index J or greater, and we’ll receive a valid write. (There’s no maximum – if our read returns data from a committed index, the write was valid.) We might say the read procedure is as follows:

-1. Let n be the number of nodes. A majority constitutes m = ceil((n+1)/2) nodes.

-2. Ask every node what its latest committed log index is.

-3. After m or more have responded, let J be the maximum of some subset of responses of size m.

-4. Read a value from log index J or later.

However, there’s a fatal flaw in our reasoning – there might have been a cluster reconfiguration. We don’t know how many nodes it really takes to reach a majority, as of log index J, or later. So instead, we have the following algorithm:

-1. Ask every node (that we know of) what its latest committed log index is, and the full membership list of the Raft group as of that log index.

-2. Wait until we get the following criterion. There exists a response (index: i, members: [node_1, ..., node_k]) such that i is the maximum log index of the responses of some majority of [node_1, ..., node_k]. Let J be the log index of such a response.

-3. Read a value from log index J or later, any that has been committed.

Implementation II

We might look at Implementation I and think it’s rather slow. We have two round trips – one to gather log indexes, another to perform the read. We can speed it up as follows.

-1. Let L be the node which we think is the leader.

-2-a. Same as before. Ask every node (that we know of) what its latest committed log index is, and the full membership list of the Raft group as of that log index.

-2-b. Also ask node L to perform the read.

-3. Wait until the responses from 2-a meet the forementioned criterion.

-4-a. If L’s read returns with a sufficiently high log index, we’ve successfully read the value.

-4-b. If L’s read returns with too low a log index, it must no longer be the leader. Perform a backup read (using a log index J) from a different node.

-4-c. If enough time has passed without a response from L, perform a backup read (using a log index J) from a different node.

Now in the typical case, we perform reads in one round trip – but still one that needs a majority of nodes to respond.

Nested graph queries

We’re doing a bit more than accessing one <uid> <predicate> pair. We’ve got multiple predicates, nested queries, funcs, filters, and the like.

We have two decent ways (without going full-bore transactions) of approaching such a query:

-1. Declare the query starts at t_0, and say, “We’re happy to use any writes valid as of time t_0 or later.”

-2. Apply an ordering relation – when we dip into a nested query, we only allow writes which are greater than or equal to the write time of the graph edge we followed.

Option 2 seems like it might be useful for some purposes – but let’s go with option 1. A problem with option 2 is that because predicates are in different groups, we’d need some cross-group ordering relation.

The consequence of option 1 is that we only need to ask nodes what their latest log index is, and their membership list, once. We can then use that information to read from replicas whenever we have the opportunity. We can optimistically read from the leaders for the first network hop we make, then use replicas for subsequent hops.

So we do this:

-1. Request last committed log index and membership lists from each node of groups whose predicates are involved in our query. We do that up-front.

-2. Query optimistically or not, depending on whether we’ve gotten enough responses to query replicas non-optimistically yet.

A note about in-group optimistic reads

Suppose we’re a member of the group whose leader we optimistically read from.

In that case, the leader might as well respond, “read the data from yourself, from a log index not less than K” instead of sending us the data. It’s already in-flight. Once we’ve verified that K is a valid log index, we wait for our copy of the data to reach K (and become committed), and then read it.

A note about replica reads

When we ask nodes for their last committed log index and membership list, we could also ask for their un-committed log history (log index numbers and their election terms) and the election term of the last committed log index too. This increases the set of replicas it’s reasonable to do an up-to-date read from – some nodes already have the latest committed log index, they just haven’t yet heard that it’s committed.

One more thing

I’m not completely on top of how query evaluation operates. Based on a quick look, I believe the node processing the query sends network requests and gets responses. But there are no recursive network requests. Is that right?



If the group leader has a lease on being leader, we can just query the leader and know that it’s the leader. Then we don’t have to send messages to the other nodes. That saves network traffic and latency, because we’re just talking to one node. We know that it’s the leader because a new leader can’t be elected until a disjoint time interval.

To use this does require some system clock accuracy. (We don’t want the leader to mistakenly think it still has the lease because its clock is too slow.) This also might require some closer look at etcd. We should know this possibility is out there.

We don’t need to. Raft index is sufficient for our purposes. We already track a watermark which specifies until what raft index have we applied all the mutations for.

There’s a function in Raft library to do this:

That’s something dealt with by the Raft library; not something we need to be concerned about.

True. Though note that this request has already arrived at a server serving this raft group. What we intend to do here is to wait until this server catches up to J. Once it does (and our watermark would tell us about it), we can execute this particular portion of the query.

The rest of the implementation seems to overlap with Raft implementation itself. We shouldn’t need to reimplement Raft mechanics – we’re using etcd’s raft library for this purpose.

I think you’re seeing this Raft group from outside – the situation that we need to solve for is from within. The query has already landed on one of the servers which is serving this particular group. We either do this read locally, or we wait until our context times out. So, all the information that we need is to figure out if this particular Raft node has advanced beyond ReadIndex (which is calculated from the quorum).

Also, we shouldn’t execute all reads from the leader. That would hinder our throughput design.

We don’t have a global view of time in a real world cluster. Each server might be slightly ahead or behind another. We can only do this in the context of each Raft group involved.

Overall, I think a better understanding of the available and well integrated Raft library in use is necessary to implement this feature. It requires lot fewer changes than what the proposal entails; because you already have many pieces in place which can give you what you need to fit this in.

My references to wall-clock time are for the purpose of describing how we meet the abstract requirements of linearizability, not something which is used in the implementation itself.

I am indeed assuming that we want to read from the group, with a query from outside the group. The reason is, queries can generally start from outside the group. If we get linearizability by using ReadIndex from inside the group, we have to make an extra network round-trip – first from outside the group to a group member, then inside.

Note that with our vendored etcd version, using ReadIndex requires CheckQuorum==true in the etcd config. ReadIndex uses leases, and given the way etcd uses an external ticker to keep time, it might not be a reliable way to operate. This is not true in a later etcd version.

Executing all reads from the leader would be perfectly fine if we created finer-grained raft groups. So, in the short run, while we’re sharding by predicate, we might want to avoid that. But in the long run, we might prefer fine-grained sharding, and to optimistically read from the leader. Something to keep in mind.

If we update our etcd version, we can get a “safe” non-lease-based ReadIndex implementation. See in particular, to see what we get. Then we have two options. The lease-based ReadIndex is the same as before – the follower sends a message to the leader, and the leader sends a response. For the “safe” ReadIndex, a message is sent to the leader, and the leader then broadcasts a message (a heartbeat) to all nodes (that it’s aware of). A majority respond, and then the leader responds to the follower.

So, when querying from inside the raft group, we have the options of 1 round trip with the unsafe lease-based ReadIndex, or 2 round trips with the safe ReadIndex implementation.

When querying from outside the raft group, we could use ReadIndex in the following manner:

-1. Send a message to the leader telling it to reply (1st time) with its lastIndex right away, but also to run ReadIndex (with the “safe” option), and then later reply (2nd time) confirming the value.
-2. Once we get the 1st reply, perform the read (optimistically) on whatever replica we want (telling it to wait for that watermark).
-3. Wait for the 2nd reply from the leader confirming that the read was valid.

This lets us use 2 round trips (typically) with the safe ReadIndex instead of 3. We could do the same from within the group, just to start the actual read operation faster.

Regardless, we could just say, hey, we’re going to accept 3 round-trips from outside the group, and 2 from within.

In any case, we will also need to change the implementation of Watermark so that we can wait for a watermark without using time.Sleep.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.