Predicate Moves using RAFT

Predicate moves

Every predicate would be part of a 3-node cluster. When mutation arrives, it would be directed to the master of the cluster. The master would append the mutation to it’s write-ahead log (and synced to disk in parallel with the network calls coming next). It would then send this mutation off to it’s followers, and once they confirm, the mutation would be applied to Posting List.

Periodically, we need to delete older logs, to keep their size in check. To do this, we’ll run this algorithm independently on each server in the cluster.

// Do the heavy lifting of merging here
While number of dirty PLs for predicate Q > threshold:
	for each PL given a predicate Q:
		PL.Merge()

STOP mutations for all Q on that server
Note down the latest commit entry term
{
	// Merge again for the updates which happened during the merging step above.
	for each PL given a predicate Q:
		PL.Merge()
	Start checkpoint for RocksDB for all Q prefixed keys
}
START mutations for all Q on that server

Once the checkpoint finishes, we can delete all logs earlier than commit entry term

When we need to bring up a new server for predicate Q, we can first stream the checkpoint to the new server, and then replay the logs. This should give us a clean predicate move.


P.S. Need to send this off to xiang.li so he can confirm that the above algorithm would work.

1 Like

I think the algo should work. Just try to make sure I understand you correctly.

Basically, we have two steps before we can clean up raft logs on each server independently:

  1. do merging
  2. do checkpointing

Then we start to clean up raft log.

The only thing we need to make sure is that the checkpoint can be used to recover the entire state correctly. Also we need to ensure the checkpoint covers the logs we are going to recover.

One quick question: do you know how large the checkpoint would be? ~10MB? ~100MB? or ~GB And how frequent would we need to do this? Do we need incremental checkpoint?

3 Likes

That’s correct. Merging and then checkpointing for that particular predicate shard. Note that all predicate shards lie within the same RocksDB instance.

Regarding checkpointing: I just noticed that RocksDB doesn’t allow you to specify the prefix of keys when checkpointing. Without specifying the prefix, we would have to checkpoint all the predicate shards in the server – which would be slow. So, maybe we’ll have to get a RocksDB snapshot, iterate over the prefix keys for the predicate shard, write them to disk, and then release the snapshot.

The size of checkpoint would depend on the size of the predicate shard. We haven’t set a threshold yet, but we’d limit it to a few Gigabytes.

Not sure. Open to advice here. Some rough ideas would be to do this when the size of RAFT logs >= X% of the size of predicate shard (snapshot in RAFT terms).

I think RocksDB checkpoint API might not work for us, now that I looked at it again. Because each predicate shard lies on the same RocksDB instance but is part of different RAFT groups, we’d want to snapshot them independently of each other. So, I think we’ll just do a full snapshot each time for the predicate shard.

One more question: In case of a network partition, it’s possible that the leader can overwrite the RAFT entries in a follower which had made progress independently via a stale leader. This poses an interesting problem for us. Once we apply the update to our posting list, we have no way to determine the last state it was in. So, we can’t just revert back an applied mutation.

How does CoreOS RAFT implementation deal with these situations? Does it ask the node’s state to be reverted back? So far our idea has been to abandon that RAFT node, create a new one, and rebuild it from master’s snapshot and via replaying master’s RAFT logs.

Hey @xiang90

Would be great hear your comments on this.

I asked because this might affect how you send the snapshot to a slow follower. If the follower dies for a while, it might require a snapshot once it restarts. When you look at raft proto, you might notice that the snapshot message includes a data field. It is useful to use data field to include the entire checkpoint when the checkpoint is small (<100MB). Once your checkpoint get larger, you do not want to marshal the entire checkpoint into memory, and then put it into the data field. You probably want to use a side-channel to send the checkpoint data as stream. We do this in etcd.

I do not think this is a good idea in practice. If you have GB of checkpoint, you will end up with writing huge amount of data onto disk when creating a checkpoint. It is better to just use rocksdb’s API somehow. I know both TiKV and cockroachdb does this.

Raft allows stable leader. However the stable leader cannot really move forward any followers. If any entry is committed on a node, it will be committed on other nodes at the exact same index no matter what happened.

3 Likes

Yeah, that’s what we’re doing as well. The receiver gets an indication to ask for the checkpoint, and then initiates a separate channel to ask for that checkpoint from the leader.

That’s a good point about the data write to disk. We’ll figure out a way to avoid checkpointing. FYI: @pawan

Quoting from RAFT paper https://raft.github.io/raft.pdf:

Section 5.3: Log Replication: In Raft, the leader handles inconsistencies by forcing
the followers’ logs to duplicate its own. This means that
conflicting entries in follower logs will be overwritten
with entries from the leader’s log.

These are the overwrites, which are hard to do for us, because we will have to revert state; which we currently can’t.

So, does that sort of overwrite NOT happen with your implementation of RAFT?

Raft replication has two phases: replicated, then committed. Replicated data can be overwritten, committed cannot. Your application should only see committed entries not replicated. It should not be a problem to you at all.

2 Likes

Right, checkpoints anyway don’t work for us because we can’t do checkpoints on limited data(e.g. data for a predicate). What we can do is store a RocksDB snapshot(which is a pointer that allows us to read data from a state), after doing the PL.Merge() as suggested above. Once we store the snapshot, we can delete(compact) old logs. Now, whenever we bootup a new instance, we can stream data by reading from the snapshot and then copy over the logs.

We have to decide on the frequency of snapshotting. As taking a RocksDB snapshot blocks, we might not wan’t do it often or else it would affect availability.

I think if you are storing the snapshot, then it’s the same problem. You have to write all this data to disk. We want to avoid doing that.

Replaying logs for us is an idempotent op. So, we could just not store a state, and stream directly from rocksdb; which is what we were talking about earlier. It’s not exact science, but it should give us the same state. Also, we can use the posting list time stamp to avoid rewriting older mutations.

In fact, here’s an idea. We can go through committed posting lists, and pick the min commit time stamp. Then use that ts to figure out where to stream updates from. Discard older logs. Possibly, make this a periodic process.

Unlike checkpoints, snapshots don’t write all data to disk. They just provide a point in time view of the database(and are not persisted across restarts), unlike checkpoint which stores the state in another directory.

From RocksDB FAQ · facebook/rocksdb Wiki · GitHub

Q: Does RocksDB hold SST files and memtables for a snapshot?

A: No. See Home · facebook/rocksdb Wiki · GitHub for how snapshots work.

Now coming to our idea

Are you talking about the leader streaming to its followers which already exist here? What happens when we want to move all predicate data to a new shard(follower)?

Sure. My point was about what you’re doing with the snapshot. Iterate and stream to the client or store it on disk? Either way, a blocking snapshot would kill our throughput, so we should just do the iteration over the live system.

No that happens continuously and without involvement of RocksDB specifically.

I was talking about this scenario, where we need to catch up a new node. We can stream out the state, i.e. PLs stored on RocksDB while also keeping track of the min commit ts that they have. And then, replay logs from that min commit ts.

1 Like

That makes sense, we can just iterate over the live system when we want to move data to a new shard, like we do know as part of our RPC.

Right, so there would be a regular log compaction (preceded by merging posting list for a predicate) and then any time a new instance comes up we can stream the predicate data and only replay newer logs.

1 Like

Hey @xiang90,

I’ve started implementing RAFT. And @pawan suggested that we could use rafthttp package. It does look nice, but my govendor shows a whole bunch of deps associated.

$ govendor list +e                                                                                                                                                                                                         ~/go/src/github.com/dgraph-io/dgraph
 e  github.com/beorn7/perks/quantile                                    
 e  github.com/coreos/etcd/etcdserver/stats                             
 e  github.com/coreos/etcd/pkg/fileutil                                 
 e  github.com/coreos/etcd/pkg/httputil                                 
 e  github.com/coreos/etcd/pkg/ioutil                                   
 e  github.com/coreos/etcd/pkg/logutil                                  
 e  github.com/coreos/etcd/pkg/pbutil                                   
 e  github.com/coreos/etcd/pkg/tlsutil                                  
 e  github.com/coreos/etcd/pkg/transport                                
 e  github.com/coreos/etcd/pkg/types                                    
 e  github.com/coreos/etcd/raft/raftpb                                  
 e  github.com/coreos/etcd/snap                                         
 e  github.com/coreos/etcd/snap/snappb                                  
 e  github.com/coreos/etcd/version                                      
 e  github.com/coreos/go-semver/semver                                  
 e  github.com/coreos/go-systemd/journal                                
 e  github.com/coreos/pkg/capnslog                                      
 e  github.com/matttproud/golang_protobuf_extensions/pbutil             
 e  github.com/prometheus/client_golang/prometheus                      
 e  github.com/prometheus/client_model/go                               
 e  github.com/prometheus/common/expfmt                                 
 e  github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg    
 e  github.com/prometheus/common/model                                  
 e  github.com/prometheus/procfs                                        
 e  github.com/xiang90/probing     

Is this expected? Or, are we doing something wrong? If possible, I just want to pick the least number of packages from coreos - ideally raft and the raftpb stuff.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.