Loading close to 1M edges/sec into Dgraph - Dgraph Blog

We’re seeing more and more users who want to load massive data sets into Dgraph. Many users want to load billions of edges, and some even want to load up to 50 billion edges! When we heard about the size of these datasets, we knew we needed to have a solid data loading story so that we could support the most extreme demands from our users.

In a previous blog post, we discussed some of the challenges that we met on our journey towards loading massive datasets into Dgraph. In this post, we will discuss an alternate approach that has yielded significantly faster loading times.

TL;DR: We created a new tool called Bulkloader to populate Dgraph with an initial dataset. It’s highly performant and is able to load the entire Stack Overflow dataset in just over 1 hour on a single 64 core machine. The Stack Overflow dataset is massive and consists of ~2 billion RDFs. An additional ~1 billion edges are created for indexing and other internal usages. This gives ~3 billion edges in total. This equates to a loading rate of ~820k edges per second! Note that we had a significant skew in the predicate distribution which slowed the load down (all the text for questions, answers and comments was on the same predicate). Better results can likely be achieved on datasets with a more even predicate distribution.

Previous approach

All of our previous approaches have involved writing batch mutations to a running Dgraph instance. This is a convenient approach; we just have to iterate over the RDFs we want to load, batch them up, then send them to Dgraph.

We were able to achieve over 50k edges per second, which is really good throughput for a live system. But doing so seemed insufficient when you’re starting out with billions of edges and just want to bulk load them up into a new Dgraph instance.

Our previous approaches at speeding up the loading process had a focus on performance improvements to Dgraph’s mutation handling capabilities. We made great strides here, facing some tough challenges related to memory management along the way. Ultimately we hit some speed roadblocks though. We had to propose the mutations via Raft, which would write to a WAL for durability (to deal with crashes) and achieve consensus for consistency. We also needed to read the key to merge the new mutations coming in. Some of these keys (particularly index keys) were in such high demand that they’d become stragglers for the entire bulk load operation.

A key observation about loading massive datasets is that Dgraph does not have to be live while the loading happens. This is because loading large amounts of data is usually a step taken by users when migrating from another database technology stack (whether a graph database or otherwise).

Because Dgraph doesn’t have to be up and running while data is being loaded, this opens up lots of avenues for alternate (faster!) approaches.

This is where Bulkloader comes in. It’s a new program that performs the initial population of a Dgraph instance. It runs very quickly and scales well on high-end hardware.

The new approach

Bulkloader utilises the map/shuffle/reduce paradigm. The diagram below shows the data flow through the system, from RDFs in files all the way to posting lists stored in Badger (Dgraph’s key-value store).

![](upload://wQeKuEPHe2VOpT6LRBOc5NnnyHc.png)

There’s a lot going on here, so let’s break it down:

  • The map phase takes RDFs as its input. It creates the edges that will eventually become part of the final Dgraph instance. Each RDF can correspond to one or more edges. The additional edges are for indexing and reverse edge traversal. Each edge is output along with the key of the posting list it will eventually become part of (this combination of edge and posting list key is referred to as a map entry). Map entries are bundled up into reasonably sized batches (e.g. 100MB), sorted, then written out to disk in map files. Sorting is only local for each map file since edges can be split between the map files in arbitrary ways. The sorting is important, and critical to the speed of the shuffler.

  • The shuffle phase combines all map files into a single globally sorted stream of map entries. Because each map file is sorted internally, a min heap data structure can be used to quickly assemble the stream. The stream is then broken up into batches that are passed on to the reduce phase.

  • The reduce phase takes batches of sorted map entries from the shuffle phase. It collects up groups of map entries with the same posting list key and then builds the corresponding posting list. To do this, the reducer iterates through the batch and generates a posting list whenever the posting list key changes. The posting lists are then written directly to Badger, Dgraph’s key-value store.

There are some additional complexities in the architecture that aren’t shown in the diagram. These are all related to bottlenecks in the processing where all data must flow through a single point or exclusive access to a single resource is needed. The bottlenecks don’t show themselves when running on regular quad-core machines but are must be removed in order to achieve full hardware utilization on high-end machines (e.g. machines with 64 cores).

Sharded UID Map

The first bottleneck is in the map phase. As part of this phase, blank nodes must be mapped to 64-bit UIDs. The first time a particular blank node (a string) is seen, a new UID must be assigned. When that blank node is seen again in a subsequent RDF, the same UID must be used. Since the input RDF files can be split up in any arbitrary way, the same blank node could be processed by two different mappers that are running concurrently. Because of this, the management of UID assignment must be shared between all mappers.

The original design was to use a map (from string to uint64) protected by a mutex. With many concurrent mappers (e.g. 64), profiling showed large amounts of contention on the lock.

We solved this problem by using a sharded map. The idea is to break the map into two levels. The first level is a fixed size array e.g. of length 256. A hash of the key determines which entry in the array to use. Each array element is a mutex and a regular hash map, which stores the final string to uint64 mapping (i.e. a single shard of the sharded map). Assuming the hash function gives a reasonable key distribution, access is spread out evenly among the different map shards, and contention is no longer a problem.

More Shufflers

Another bottleneck in the program is the shuffling. The architecture diagram above just shows a single shuffler for simplicity. This was the original design and showed up as a bottleneck when running on a machine with 64 cores. All data has to flow through a single code path in a single thread in order to get a globally sorted stream of map entries.

The reason we wanted a globally sorted stream was so that the reducer could quickly scan the stream and assemble posting lists from entries with the same posting list key.

The solution was to split up the entire Bulkloader into shards, separated by predicates. Since edges with different predicates can never appear in the same posting list, this is safe to do. There are a separate set of map entry files for each shard that can be picked up by multiple shufflers, 1 per shard. If there are more predicates than the requested number of shards, then multiple predicates are placed in each shard. Automatic predicate balancing results in shards that are as evenly sized as possible.

Multiple Badgers

For big datasets, users typically want to split up the data among multiple servers. So splitting the output from multiple shufflers into multiple Badgers allows us to produce multiple shards of Dgraph; which a user can use to run a Dgraph cluster.

This also significantly improves the reduce phase, because multiple Badgers are concurrently being written to. So many badgers!

Show me the numbers

We used Bulkloader to load the entire Stack Overflow dataset in just over 1 hour.

Machine details:

  • i3.16xlarge AWS machine.
  • 64 cores.
  • 488GB RAM.
  • NVMe SSD.

Bulkloader performance:

  • Total runtime: 1 hr and 4 min.
  • RDF count: 1.950 billion.
  • Edge count: 3.161 billion.
  • Map phase speed: 1.767 million edges per second.
  • Reduce phase speed: 1.525 million edges per second.
  • Total speed: 0.818 million edges per second.

Bulkloader is super fast, and we recommend it as the way to do the initial data loading for new Dgraph instances. It’s available from Dgraph v0.8.2 onwards.

There are lots of different knobs and dials that can be tweaked to get maximum performance out of different hardware setups (look at the output when using the --help flag). Documentation is also available. Give it a go and be amazed!

Top image: Cassini spacecraft near Saturn.


This is a companion discussion topic for the original entry at https://blog.dgraph.io/post/bulkloader/
2 Likes

@mrjn How much disk space was used on the machine after loading the entire dataset?

I don’t have the data handy anymore on disk, but from memory the size of the final posting directories was ~200GB.

For comparison, the compressed RDFs loaded into the bulk loader take up ~50GB.

Note that the bulk loader requires additional scratch space for it’s map/reduce phase, so that should be taken into account. I would advise having ~500GB free disk to load datasets of similar size.

BTW, the link to the bulk loader is dead at https://blog.dgraph.io/post/bulkloader/ :wink: . should be https://docs.dgraph.io/deploy/#bulk-loader, not