Linearizability
We want to implement linearizability. What that means is, each read and write can be assigned a point in wall-clock time such that, based on those time-points, each read sees the result of its last preceding write. Also, those time-points have to be contained in the time interval after the read or write operation began and before it ended.
Each “read” and “write” only involves modifying the data related to a single (<uid> <predicate>
) pair. Notably, that unit of information is contained within a single Raft group. There are no multi-reads or multi-writes. Linearizability is a property maintained separately on each (<uid> <predicate>
) pair – if for each (<uid> <predicate>
) pair is treated independently with linearizable semantics, then we have linearizable semantics in general.
Let’s look at things from a read’s perspective: There exists a sequence of writes, w_1, …, w_n, at times s_1, …, s_n. Our read begins at time t_b and ends at time t_e. Given i, j such that s_i <= t_b < s_{i+1} < s_{i+2} < … < s_{j-1} < s_j < t_b <= s_{j+1}, the set of “valid writes” for that read is {w_k : i <= k <= j}. The read may return the value of any valid write.
Implementation I
Suppose we’re just looking at one <uid> <predicate>
pair.
When a write happens, it happens as part of a particular Raft commit. Its log index describes the point of logical time that it happened. We can assign each log index some point in wall-clock time too. Also, because we’re just looking at one <uid> <predicate>
, let’s suppose, for conversational purposes, every log index has exactly one write. (Some might be no-op writes.) A “valid commit [index]” is that whose write is a valid write.
Suppose we perform a read that starts at time t_0. We can ask every node what its latest committed log index is. At some later time t_1, we’ve gotten a few responses. Every response is one whose write time happened before t_1. Any committed index greater than or equal to that received from a majority of nodes’must also have been such as of t_0, which means it’s a valid log index. Let’s call the smallest such index J.
For example, if there are 7 nodes, and we’ve received 4 responses {11, 12, 13, 15}, then J = 15. If we received a 5th response, making our set {11, 12, 13, 15, 14}, then J is now 14.
We can read from any node that has committed index J or greater, and we’ll receive a valid write. (There’s no maximum – if our read returns data from a committed index, the write was valid.) We might say the read procedure is as follows:
-1. Let n be the number of nodes. A majority constitutes m = ceil((n+1)/2) nodes.
-2. Ask every node what its latest committed log index is.
-3. After m or more have responded, let J be the maximum of some subset of responses of size m.
-4. Read a value from log index J or later.
However, there’s a fatal flaw in our reasoning – there might have been a cluster reconfiguration. We don’t know how many nodes it really takes to reach a majority, as of log index J, or later. So instead, we have the following algorithm:
-1. Ask every node (that we know of) what its latest committed log index is, and the full membership list of the Raft group as of that log index.
-2. Wait until we get the following criterion. There exists a response (index: i, members: [node_1, ..., node_k])
such that i
is the maximum log index of the responses of some majority of [node_1, ..., node_k]
. Let J be the log index of such a response.
-3. Read a value from log index J or later, any that has been committed.
Implementation II
We might look at Implementation I and think it’s rather slow. We have two round trips – one to gather log indexes, another to perform the read. We can speed it up as follows.
-1. Let L be the node which we think is the leader.
-2-a. Same as before. Ask every node (that we know of) what its latest committed log index is, and the full membership list of the Raft group as of that log index.
-2-b. Also ask node L to perform the read.
-3. Wait until the responses from 2-a meet the forementioned criterion.
-4-a. If L’s read returns with a sufficiently high log index, we’ve successfully read the value.
-4-b. If L’s read returns with too low a log index, it must no longer be the leader. Perform a backup read (using a log index J) from a different node.
-4-c. If enough time has passed without a response from L, perform a backup read (using a log index J) from a different node.
Now in the typical case, we perform reads in one round trip – but still one that needs a majority of nodes to respond.
Nested graph queries
We’re doing a bit more than accessing one <uid> <predicate>
pair. We’ve got multiple predicates, nested queries, funcs, filters, and the like.
We have two decent ways (without going full-bore transactions) of approaching such a query:
-1. Declare the query starts at t_0, and say, “We’re happy to use any writes valid as of time t_0 or later.”
-2. Apply an ordering relation – when we dip into a nested query, we only allow writes which are greater than or equal to the write time of the graph edge we followed.
Option 2 seems like it might be useful for some purposes – but let’s go with option 1. A problem with option 2 is that because predicates are in different groups, we’d need some cross-group ordering relation.
The consequence of option 1 is that we only need to ask nodes what their latest log index is, and their membership list, once. We can then use that information to read from replicas whenever we have the opportunity. We can optimistically read from the leaders for the first network hop we make, then use replicas for subsequent hops.
So we do this:
-1. Request last committed log index and membership lists from each node of groups whose predicates are involved in our query. We do that up-front.
-2. Query optimistically or not, depending on whether we’ve gotten enough responses to query replicas non-optimistically yet.
A note about in-group optimistic reads
Suppose we’re a member of the group whose leader we optimistically read from.
In that case, the leader might as well respond, “read the data from yourself, from a log index not less than K” instead of sending us the data. It’s already in-flight. Once we’ve verified that K is a valid log index, we wait for our copy of the data to reach K (and become committed), and then read it.
A note about replica reads
When we ask nodes for their last committed log index and membership list, we could also ask for their un-committed log history (log index numbers and their election terms) and the election term of the last committed log index too. This increases the set of replicas it’s reasonable to do an up-to-date read from – some nodes already have the latest committed log index, they just haven’t yet heard that it’s committed.
One more thing
I’m not completely on top of how query evaluation operates. Based on a quick look, I believe the node processing the query sends network requests and gets responses. But there are no recursive network requests. Is that right?
Edit:
Leases
If the group leader has a lease on being leader, we can just query the leader and know that it’s the leader. Then we don’t have to send messages to the other nodes. That saves network traffic and latency, because we’re just talking to one node. We know that it’s the leader because a new leader can’t be elected until a disjoint time interval.
To use this does require some system clock accuracy. (We don’t want the leader to mistakenly think it still has the lease because its clock is too slow.) This also might require some closer look at etcd. We should know this possibility is out there.