I was reading about ways in which we could transfer posting list data from one instance to another using an RPC. This would be required in RAFT to move data around.
RocksDB recommends Checkpoints for efficient Snapshots but gorocksdb doesn’t support them. So we would have to use normal Snapshots for now.
The idea is to create a snapshot, iterate over the K-V pairs using an iterator and stream it to the other instance, which can do a Write or a BatchWrite then. Does that sound good @minions?
One of the doubts I had was that do we want to transfer data for a predicate at a time or all of it simultaneously? I guess data for a predicate at a time would make sense, in that case, will the requesting instance know which predicates to request data for?
I might have forgotten some things you mentioned yesterday @mrjn. Though now should be able to understand better what we are trying to do here.
I think we use very few functions of RocksDB and gorocksdb gives us a lot more than we need.
I wonder if it makes sense to maintain our own wrapper over RocksDB, just like what cockroachDB does I believe. Writing this wrapper would probably be easier than trying to SWIG RocksDB classes. And we can add on as we need more of RocksDB, e.g., checkpoints.
I would be happy moving away from gorocksdb. Also, if we can rely upon the C++ version of RocksDB, instead of C, that’d be pure awesomeness. C++ API is a lot more readable than the C one – and as you figured out @jchiu, also lot more up to date.
Async BatchWrites are the most performant. We should totally use them.
I think we should use checkpoints. While @jchiu figures out a way to provide those, just add a TODO for yourself to switch to it, and do the iteration ITMT.
We want to transfer all data stored in a single server for one predicate. That predicate would be known to the caller and provided in the request to the callee. Note that one request would be for only one predicate, and not for a list of predicates. At least, that’s how we’d be implementing RAFT, based upon my plan so far.
Figure out what would make sense, using a unidirectional stream RPC or many RPCs of equal size (say a couple of MBs).
Just wanted to provide an update on checkpointing. We already have a PR last week to support checkpointing, but there was a question about whether we can still write to the original store when checkpointing. The short answer is: Yes.
Details:
I tried reading RocksDB code but decide that it is better to run an experiment to find out. What I try to do is to start a goroutine that will write to the store while checkpointing is happening. I find that we can write during checkpointing. In addition, it seems that what is written during checkpointing can still make it to the checkpoint. BUT, the value might be stale.
Here is the code for the experiment.
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
const (
dbPath1 = "/tmp/testdbn"
dbPath2 = "/tmp/testdbn2"
)
func randStr() []byte {
return []byte(strconv.Itoa(rand.Int()))
}
func main() {
st, err := store.NewStore(dbPath1)
x.Check(err)
fmt.Println("Initial population")
for i := 0; i < 1000000; i++ {
st.SetOne(randStr(), randStr())
}
fmt.Println("NewCheckpoint")
checkpoint, err := st.NewCheckpoint()
x.Check(err)
go func() {
// Make sure we start after we save checkpoint.
time.Sleep(10 * time.Millisecond)
for i := 0; i < 123456; i++ {
if (i % 10000) == 0 {
fmt.Printf("SetOne %d\n", i)
}
if i >= 110000 {
st.SetOne([]byte("aaaaa"), []byte("old"))
} else {
st.SetOne([]byte("aaaaa"), []byte("new"))
}
}
}()
fmt.Println("Start saving checkpoint")
checkpoint.Save(dbPath2)
fmt.Println("Done saving checkpoint")
checkpoint.Destroy()
st2, err := store.NewStore(dbPath2)
result, err := st2.Get([]byte("aaaaa"))
if result != nil {
fmt.Printf("Result: [%s]\n", string(result))
} else {
fmt.Println("Not found")
}
}
Note that we start calling SetOne AFTER we start saving checkpoint. Note that the final result is “old” not “new” and we start writing “new” only towards the end of checkpointing.
So we start writing old towards the end and not new, right? Here it almost seems like Save blocks until the goroutine above has finished writing. Is this behavior predictable (that is save checkpoint doesn’t finish untile all keys are written) on multiple runs?
I doubt save checkpoint is blocked by writing. In a previous run (not documented here), I tried to SetOne a lot more and I can see that “Done saving checkpoint” gets printed before a few more SetOnes get printed.
What I think is happening is that checkpoint doesn’t block anything. It just saves whatever it can as it finds them. So it saves “old” because it happens very early right after checkpointing starts. However, it didn’t get to save the latest value “new”.
Ok, but to me you seem to be setting old at the end. Hence my doubt that does it block. Though as you said if you do more SetOne, it saves an intermediate state. So that clears it.
Writes are possible to the DB while a checkpoint is in progress.
Checkpoint would definitely contain information about writes that happened before the checkpoint started and usually information about some writes that happened while the checkpoint saving was in the process.
From the blog Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory.
We don’t have any information about Snapshots yet. Perhaps a similar test for snapshots would help us understand.
Interestingly snapshot seems to behave differently from checkpoint. It seems that once we start the snapshot, the writes are blocked until the snapshot is done.
Code:
package main
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)
const (
dbPath1 = "/tmp/testdba"
)
func randStr() []byte {
return []byte(strconv.Itoa(rand.Int()))
}
func main() {
st, err := store.NewStore(dbPath1)
x.Check(err)
fmt.Println("Initial population")
for i := 0; i < 1000000; i++ {
st.SetOne(randStr(), randStr())
}
go func() {
fmt.Println("~~~Start updating")
for i := 0; i < 5000000; i++ {
if (i % 10000) == 0 {
fmt.Printf("SetOne %d\n", i)
}
if i < 230000 {
st.SetOne([]byte("aaaaa"), []byte("old"))
} else {
st.SetOne([]byte("aaaaa"), []byte("new"))
}
}
}()
fmt.Println("Start saving snapshot")
start := time.Now()
snapshot := st.NewSnapshot()
fmt.Printf("Done saving snapshot; time elapsed %v\n", time.Since(start))
time.Sleep(10 * time.Millisecond)
fmt.Println("Before using snapshot")
result, err := st.Get([]byte("aaaaa"))
if result != nil {
fmt.Printf("Result: [%s]\n", string(result))
} else {
fmt.Println("Not found")
}
fmt.Println("After using snapshot")
st.SetSnapshot(snapshot)
result, err = st.Get([]byte("aaaaa"))
if result != nil {
fmt.Printf("Result: [%s]\n", string(result))
} else {
fmt.Println("Not found")
}
}
Output:
Initial population
Start saving snapshot
Done saving snapshot; time elapsed 6.786µs
~~~Start updating
SetOne 0
Before using snapshot
Result: [old]
After using snapshot
Not found
Notice that snapshotting takes a while, so the goroutine has plenty of time to start running. However, it did not start running until the snapshot is done.
Before restoring the snapshot, we query the value and get “old”. Then we restore the snapshot and query again and get “value not found”. None of the writes made it to the snapshot, which is expected since they didn’t start until the snapshot is done.