Improve throughput of bulk loader with distributed loading

Moved from GitHub dgraph/2628

Posted by danielmai:

Experience Report

The dgraph bulk loader the fastest way to load data into Dgraph at close to 1M edges/sec. This currently satisfies most users, but for extremely large data sets on the order of terabytes, it takes on the order of days if not weeks to finish bulk loading the entire data set.

What you wanted to do

Complete a bulk load a multi-terabyte RDF triples data set in a timely manner.

What you actually did

Run the bulk loader on a multi-terabyte RDF triples data set on an i3.metal AWS instance with 14 TB of SSD space.

Why that wasn’t great, with examples

The bulk loader job did not finish on the i3.metal instance. Disk space ran out during the mapping phase.

What could be improved

Since the bulk loader mapping phase can’t be completed on a single machine for large data sets, then a distributed map reduce bulk loader would help make it at the very least possible while also increasing throughput to decrease the wait time from weeks to days or hours.

srfrog commented :

I think some of the backup work will help with this, specifically the restore.

svmhdvn commented :

I would like to help with this feature! Is there any work currently being done on this already?

srfrog commented :

all yours! im still working on backup

svmhdvn commented :

What interface into the CLI would be ideal for this? I.e. @danielmai what CLI options and flags would you have liked to use when running a distributed dgraph bulk with your multi-terabyte dataset?

danielmai commented :

A distributed dgraph bulk loader should work like the current single-machine bulk loader, but work distributed across multiple machines. For example, running a bulk load of the 21-million movie data set is currently done with this command:

dgraph bulk -r 21million.rdf.gz -s 21million.schema --map_shards=4 --reduce_shards=1 -z localhost:5080

This does the mapping and reducing on the caller’s machine. But for larger data sets this should be able to run across multiple machines.

svmhdvn commented :

That makes sense, but I was talking more about the machine discovery in the bulk command, since currently the cluster must be offline before running dgraph bulk .... I’m taking a look at the interface because it would be relevant to how I structure the shared XID->UID map, since now the mappings must be distributed across multiple machines. Would multiple Zero instances be set as options in the dgraph bulk ... command line and then used to coordinate the shared Xidmap?

mangalaman93 commented :

I think it would be useful to integrate with existing hadoop echo system through yarn/mesos or AWS EMR. AWS EMR clusters are super easy to spawn and can be scaled up for the needed duration while data is ingested into dgraph (or prepared, like the current bulk loader). I will prepare a draft of my proposal.

relunctance commented :

Is bulk Can add a parameters to resolve incremental data ,
at the same time check subject uid exists , if subject not exists then add with new uid .

svmhdvn commented :

@mangalaman93 I think the hadoop ecosystem would be great for this. I’m currently prototyping the distributed bulk loader with the Gleam mapreduce framework in Go, but I should be able to translate my code over to the more stable hadoop system fairly easily. Did you already have a proposal in mind for the design?

So far, my plan in the following (please correct me if I’m wrong or think that a better method is possible):

  1. Run a preliminary map-reduce job to assign uids to all xids
    • Map over RDFs, outputting subject xid and (if applicable) object xid as separate keys
    • Run a distinct reducer to output only distinct xids
    • Map over distinct xids and assign uids, and finally persist output to KV store
  2. Run the map-reduce job to convert RDFs to posting lists
    • Use the xid->uid KV store as a distributed cache
    • Map over RDFs, convert to map entries
    • Partition by predicate over number of requested shards
    • Reduce to posting lists and persist to distributed store

mangalaman93 commented :

Such an approach can only ingest limited amount of data given that it would require a large enough memory/disk to be able to store the mapping of xids -> uids on each executor/machine. I had two approaches in mind -

  1. Store the mapping on a distributed key value store (Aerospike, redis, badger) etc. I think this could be chosen by the users itself, and we can define the interface that we need. The problem that I see with this approach is dependency on external key value store which may not provide us with fast enough throughput.
  2. We first map all the vertices and generate the UIDs. We, then, take the edge data, perform join twice to get the UIDs of each end of the edge and generate posting lists from there. In this approach, everything is done in the Map-Reduce framework itself, it has no dependency on any external system. The problem I see with this approach, is that, it will be a huge job performing joins. The chances of success for the job may not be solid.

svmhdvn commented :

Such an approach can only ingest limited amount of data […]

I agree. I initially thought that using a badger store for the xids->uids DB would make disk space be the limiting factor, which would have to be large anyway to store the output posting list DBs, but yes it’s not ideal.

Store the mapping on a distributed key value store (Aerospike, redis, badger) etc.

I tried using redis cluster when I first created the mapping phase prototype in Gleam and found that throughput was definitely a problem. I could try again with hadoop and see if the bottleneck was in the framework’s implementation. I agree that this would be a very flexible approach and would simplify the implementation of the distributed bulk loader since the only phase that really needs to change fundamentally would be UID mapping.

We, then, take the edge data, perform join twice to get the UIDs of each end of the edge and generate posting lists from there.

I haven’t tried this yet, I could benchmark something quick and post back here.

manishrjain commented :

We already have an implementation of xid to uid mapping stored in Badger, which can be readily used. I don’t think disk would be a limiting factor to store just the entities in the graph.

We need a design review for this change. Let’s set up a call. Can you email me?

svmhdvn commented :

Sure, I’ll email you soon.

mangalaman93 commented :

Will you keep me in the loop too?

manishrjain commented :

Just submitted a PR which significantly improves the throughput of the bulk loader. I can see upwards of 4 million edges / sec on my desktop. Would be worth benchmarking on a large dataset, to see what kind of performance we can get.

campoy commented :

I wonder if this is still relevant or we’ve achieved a performance up to our expectations.

What do you think, @danielmai and @manishrjain ?

mangalaman93 commented :

Two reasons that this is still relevant -

  • For large dataset, bulk loader requires high end machines with large SSD and memory which may not be available all the time. Distributed bulk loader should potentially be able to run on commodity machines.
  • Horizontal Scalability, if I want the bulk loader to run in shorter time, I can increase the resources in my cluster, and reduce the time taken to boot the Dgraph cluster. Could be useful in recovery scenarios.

campoy commented :

We should come up with a performance goal at which point the issue can be closed.

sikanrong commented :

+1 on this issue; we’re trying to import a large set: 6.5 billion nquads totalling roughly 270GB (uncompressed). Currently the only choice seems to be using a machine that really could only exist in a virtual cloud environment, meaning 100+ cores / some hundreds of GB of RAM (!).

To use dgraph in “big data” applications this ends up being a massive limitation; users really need a way to distribute the bulk loading across many commodity nodes instead of forcing large datasets to be bulk loaded on ludicrous specialty hardware.