Snapshots/Backup of RocksDB

I was reading about ways in which we could transfer posting list data from one instance to another using an RPC. This would be required in RAFT to move data around.

RocksDB recommends Checkpoints for efficient Snapshots but gorocksdb doesn’t support them. So we would have to use normal Snapshots for now.

The idea is to create a snapshot, iterate over the K-V pairs using an iterator and stream it to the other instance, which can do a Write or a BatchWrite then. Does that sound good @minions?

One of the doubts I had was that do we want to transfer data for a predicate at a time or all of it simultaneously? I guess data for a predicate at a time would make sense, in that case, will the requesting instance know which predicates to request data for?

I might have forgotten some things you mentioned yesterday @mrjn. Though now should be able to understand better what we are trying to do here.


Links
http://rocksdb.org/blog/2609/use-checkpoints-for-efficient-snapshots/

I think we use very few functions of RocksDB and gorocksdb gives us a lot more than we need.

I wonder if it makes sense to maintain our own wrapper over RocksDB, just like what cockroachDB does I believe. Writing this wrapper would probably be easier than trying to SWIG RocksDB classes. And we can add on as we need more of RocksDB, e.g., checkpoints.

Would you all support this direction?

2 Likes

I would be happy moving away from gorocksdb. Also, if we can rely upon the C++ version of RocksDB, instead of C, that’d be pure awesomeness. C++ API is a lot more readable than the C one – and as you figured out @jchiu, also lot more up to date.

Async BatchWrites are the most performant. We should totally use them.

I think we should use checkpoints. While @jchiu figures out a way to provide those, just add a TODO for yourself to switch to it, and do the iteration ITMT.

We want to transfer all data stored in a single server for one predicate. That predicate would be known to the caller and provided in the request to the callee. Note that one request would be for only one predicate, and not for a list of predicates. At least, that’s how we’d be implementing RAFT, based upon my plan so far.

Figure out what would make sense, using a unidirectional stream RPC or many RPCs of equal size (say a couple of MBs).

Just wanted to provide an update on checkpointing. We already have a PR last week to support checkpointing, but there was a question about whether we can still write to the original store when checkpointing. The short answer is: Yes.

Details:
I tried reading RocksDB code but decide that it is better to run an experiment to find out. What I try to do is to start a goroutine that will write to the store while checkpointing is happening. I find that we can write during checkpointing. In addition, it seems that what is written during checkpointing can still make it to the checkpoint. BUT, the value might be stale.

Here is the code for the experiment.

package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
)

const (
	dbPath1 = "/tmp/testdbn"
	dbPath2 = "/tmp/testdbn2"
)

func randStr() []byte {
	return []byte(strconv.Itoa(rand.Int()))
}

func main() {
	st, err := store.NewStore(dbPath1)
	x.Check(err)
	fmt.Println("Initial population")
	for i := 0; i < 1000000; i++ {
		st.SetOne(randStr(), randStr())
	}

	fmt.Println("NewCheckpoint")
	checkpoint, err := st.NewCheckpoint()
	x.Check(err)

	go func() {
		// Make sure we start after we save checkpoint.
		time.Sleep(10 * time.Millisecond)
		for i := 0; i < 123456; i++ {
			if (i % 10000) == 0 {
				fmt.Printf("SetOne %d\n", i)
			}
			if i >= 110000 {
				st.SetOne([]byte("aaaaa"), []byte("old"))
			} else {
				st.SetOne([]byte("aaaaa"), []byte("new"))
			}
		}
	}()
	fmt.Println("Start saving checkpoint")
	checkpoint.Save(dbPath2)
	fmt.Println("Done saving checkpoint")

	checkpoint.Destroy()

	st2, err := store.NewStore(dbPath2)
	result, err := st2.Get([]byte("aaaaa"))
	if result != nil {
		fmt.Printf("Result: [%s]\n", string(result))
	} else {
		fmt.Println("Not found")
	}
}

Here is the output.

Initial population
NewCheckpoint
Start saving checkpoint
SetOne 0
SetOne 10000
SetOne 20000
SetOne 30000
SetOne 40000
SetOne 50000
SetOne 60000
SetOne 70000
SetOne 80000
SetOne 90000
SetOne 100000
SetOne 110000
SetOne 120000
Done saving checkpoint
Result: [old]

Note that we start calling SetOne AFTER we start saving checkpoint. Note that the final result is “old” not “new” and we start writing “new” only towards the end of checkpointing.

Any further questions about this? @mrjn @pawan

Shall we resolve this PR?

Thanks!

So we start writing old towards the end and not new, right? Here it almost seems like Save blocks until the goroutine above has finished writing. Is this behavior predictable (that is save checkpoint doesn’t finish untile all keys are written) on multiple runs?

I doubt save checkpoint is blocked by writing. In a previous run (not documented here), I tried to SetOne a lot more and I can see that “Done saving checkpoint” gets printed before a few more SetOnes get printed.

What I think is happening is that checkpoint doesn’t block anything. It just saves whatever it can as it finds them. So it saves “old” because it happens very early right after checkpointing starts. However, it didn’t get to save the latest value “new”.

Ok, but to me you seem to be setting old at the end. Hence my doubt that does it block. Though as you said if you do more SetOne, it saves an intermediate state. So that clears it.

Errr yes, I make a mistake… got the signs flipped. Thanks for being alert @pawan :slight_smile:

Here is an update.

Code:

package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
)

const (
	dbPath1 = "/tmp/testdbs"
	dbPath2 = "/tmp/testdbs2"
)

func randStr() []byte {
	return []byte(strconv.Itoa(rand.Int()))
}

func main() {
	st, err := store.NewStore(dbPath1)
	x.Check(err)
	fmt.Println("Initial population")
	for i := 0; i < 1000000; i++ {
		st.SetOne(randStr(), randStr())
	}

	fmt.Println("NewCheckpoint")
	checkpoint, err := st.NewCheckpoint()
	x.Check(err)

	go func() {
		// Make sure we start after we save checkpoint.
		time.Sleep(10 * time.Millisecond)
		for i := 0; i < 2000000; i++ {
			if (i % 10000) == 0 {
				fmt.Printf("SetOne %d\n", i)
			}
			if i < 230000 {
				st.SetOne([]byte("aaaaa"), []byte("old"))
			} else {
				st.SetOne([]byte("aaaaa"), []byte("new"))
			}
		}
	}()
	fmt.Println("Start saving checkpoint")
	checkpoint.Save(dbPath2)
	fmt.Println("Done saving checkpoint")

	checkpoint.Destroy()

	st2, err := store.NewStore(dbPath2)
	result, err := st2.Get([]byte("aaaaa"))
	if result != nil {
		fmt.Printf("Result: [%s]\n", string(result))
	} else {
		fmt.Println("Not found")
	}
}

Output

Initial population
NewCheckpoint
Start saving checkpoint
SetOne 0
SetOne 10000
SetOne 20000
SetOne 30000
SetOne 40000
SetOne 50000
SetOne 60000
SetOne 70000
SetOne 80000
SetOne 90000
SetOne 100000
SetOne 110000
SetOne 120000
SetOne 130000
SetOne 140000
SetOne 150000
SetOne 160000
SetOne 170000
SetOne 180000
SetOne 190000
SetOne 200000
SetOne 210000
SetOne 220000
SetOne 230000
Done saving checkpoint
SetOne 240000
SetOne 250000
SetOne 260000
SetOne 270000
Result: [old]

Notice that right at i=230000, we print fmt.Printf("SetOne %d\n", i) and then we set the value to new. However, the checkpoint has old instead of new.

Another point is that we continue writing past i=230000 but the checkpointing is completed. So I don’t think checkpoint is blocked by the writing.

Summary of this finding and how Checkpoint and snapshot work, @pawan, @jchiu?

  1. Writes are possible to the DB while a checkpoint is in progress.
  2. Checkpoint would definitely contain information about writes that happened before the checkpoint started and usually information about some writes that happened while the checkpoint saving was in the process.
  3. From the blog Checkpoint is a feature in RocksDB which provides the ability to take a snapshot of a running RocksDB database in a separate directory.

We don’t have any information about Snapshots yet. Perhaps a similar test for snapshots would help us understand.

1 Like

@pawan, thanks for the summary.

Interestingly snapshot seems to behave differently from checkpoint. It seems that once we start the snapshot, the writes are blocked until the snapshot is done.

Code:

package main

import (
	"fmt"
	"math/rand"
	"strconv"
	"time"

	"github.com/dgraph-io/dgraph/store"
	"github.com/dgraph-io/dgraph/x"
)

const (
	dbPath1 = "/tmp/testdba"
)

func randStr() []byte {
	return []byte(strconv.Itoa(rand.Int()))
}

func main() {
	st, err := store.NewStore(dbPath1)
	x.Check(err)
	fmt.Println("Initial population")
	for i := 0; i < 1000000; i++ {
		st.SetOne(randStr(), randStr())
	}

	go func() {
		fmt.Println("~~~Start updating")
		for i := 0; i < 5000000; i++ {
			if (i % 10000) == 0 {
				fmt.Printf("SetOne %d\n", i)
			}
			if i < 230000 {
				st.SetOne([]byte("aaaaa"), []byte("old"))
			} else {
				st.SetOne([]byte("aaaaa"), []byte("new"))
			}
		}
	}()
	fmt.Println("Start saving snapshot")
	start := time.Now()
	snapshot := st.NewSnapshot()
	fmt.Printf("Done saving snapshot; time elapsed %v\n", time.Since(start))
	time.Sleep(10 * time.Millisecond)

	fmt.Println("Before using snapshot")
	result, err := st.Get([]byte("aaaaa"))
	if result != nil {
		fmt.Printf("Result: [%s]\n", string(result))
	} else {
		fmt.Println("Not found")
	}

	fmt.Println("After using snapshot")
	st.SetSnapshot(snapshot)
	result, err = st.Get([]byte("aaaaa"))
	if result != nil {
		fmt.Printf("Result: [%s]\n", string(result))
	} else {
		fmt.Println("Not found")
	}
}

Output:

Initial population
Start saving snapshot
Done saving snapshot; time elapsed 6.786µs
~~~Start updating
SetOne 0
Before using snapshot
Result: [old]
After using snapshot
Not found

Notice that snapshotting takes a while, so the goroutine has plenty of time to start running. However, it did not start running until the snapshot is done.

Before restoring the snapshot, we query the value and get “old”. Then we restore the snapshot and query again and get “value not found”. None of the writes made it to the snapshot, which is expected since they didn’t start until the snapshot is done.

2 Likes

To summarize:

  • Checkpointing doesn’t block writes. Its output can include some updates (but not necessarily all) during the checkpointing.
  • In contrast, snapshotting blocks updates until it is done.
3 Likes

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.