The dgraph bulk loader the fastest way to load data into Dgraph at close to 1M edges/sec. This currently satisfies most users, but for extremely large data sets on the order of terabytes, it takes on the order of days if not weeks to finish bulk loading the entire data set.
What you wanted to do
Complete a bulk load a multi-terabyte RDF triples data set in a timely manner.
What you actually did
Run the bulk loader on a multi-terabyte RDF triples data set on an i3.metal AWS instance with 14 TB of SSD space.
Why that wasn’t great, with examples
The bulk loader job did not finish on the i3.metal instance. Disk space ran out during the mapping phase.
What could be improved
Since the bulk loader mapping phase can’t be completed on a single machine for large data sets, then a distributed map reduce bulk loader would help make it at the very least possible while also increasing throughput to decrease the wait time from weeks to days or hours.
What interface into the CLI would be ideal for this? I.e. @danielmai what CLI options and flags would you have liked to use when running a distributed dgraph bulk with your multi-terabyte dataset?
A distributed dgraph bulk loader should work like the current single-machine bulk loader, but work distributed across multiple machines. For example, running a bulk load of the 21-million movie data set is currently done with this command:
That makes sense, but I was talking more about the machine discovery in the bulk command, since currently the cluster must be offline before running dgraph bulk .... I’m taking a look at the interface because it would be relevant to how I structure the shared XID->UID map, since now the mappings must be distributed across multiple machines. Would multiple Zero instances be set as options in the dgraph bulk ... command line and then used to coordinate the shared Xidmap?
I think it would be useful to integrate with existing hadoop echo system through yarn/mesos or AWS EMR. AWS EMR clusters are super easy to spawn and can be scaled up for the needed duration while data is ingested into dgraph (or prepared, like the current bulk loader). I will prepare a draft of my proposal.
@mangalaman93 I think the hadoop ecosystem would be great for this. I’m currently prototyping the distributed bulk loader with the Gleam mapreduce framework in Go, but I should be able to translate my code over to the more stable hadoop system fairly easily. Did you already have a proposal in mind for the design?
So far, my plan in the following (please correct me if I’m wrong or think that a better method is possible):
Run a preliminary map-reduce job to assign uids to all xids
Map over RDFs, outputting subject xid and (if applicable) object xid as separate keys
Run a distinct reducer to output only distinct xids
Map over distinct xids and assign uids, and finally persist output to KV store
Run the map-reduce job to convert RDFs to posting lists
Use the xid->uid KV store as a distributed cache
Map over RDFs, convert to map entries
Partition by predicate over number of requested shards
Reduce to posting lists and persist to distributed store
Such an approach can only ingest limited amount of data given that it would require a large enough memory/disk to be able to store the mapping of xids → uids on each executor/machine. I had two approaches in mind -
Store the mapping on a distributed key value store (Aerospike, redis, badger) etc. I think this could be chosen by the users itself, and we can define the interface that we need. The problem that I see with this approach is dependency on external key value store which may not provide us with fast enough throughput.
We first map all the vertices and generate the UIDs. We, then, take the edge data, perform join twice to get the UIDs of each end of the edge and generate posting lists from there. In this approach, everything is done in the Map-Reduce framework itself, it has no dependency on any external system. The problem I see with this approach, is that, it will be a huge job performing joins. The chances of success for the job may not be solid.
Such an approach can only ingest limited amount of data […]
I agree. I initially thought that using a badger store for the xids->uids DB would make disk space be the limiting factor, which would have to be large anyway to store the output posting list DBs, but yes it’s not ideal.
Store the mapping on a distributed key value store (Aerospike, redis, badger) etc.
I tried using redis cluster when I first created the mapping phase prototype in Gleam and found that throughput was definitely a problem. I could try again with hadoop and see if the bottleneck was in the framework’s implementation. I agree that this would be a very flexible approach and would simplify the implementation of the distributed bulk loader since the only phase that really needs to change fundamentally would be UID mapping.
We, then, take the edge data, perform join twice to get the UIDs of each end of the edge and generate posting lists from there.
I haven’t tried this yet, I could benchmark something quick and post back here.
We already have an implementation of xid to uid mapping stored in Badger, which can be readily used. I don’t think disk would be a limiting factor to store just the entities in the graph.
We need a design review for this change. Let’s set up a call. Can you email me?
Just submitted a PR which significantly improves the throughput of the bulk loader. I can see upwards of 4 million edges / sec on my desktop. Would be worth benchmarking on a large dataset, to see what kind of performance we can get.
For large dataset, bulk loader requires high end machines with large SSD and memory which may not be available all the time. Distributed bulk loader should potentially be able to run on commodity machines.
Horizontal Scalability, if I want the bulk loader to run in shorter time, I can increase the resources in my cluster, and reduce the time taken to boot the Dgraph cluster. Could be useful in recovery scenarios.
+1 on this issue; we’re trying to import a large set: 6.5 billion nquads totalling roughly 270GB (uncompressed). Currently the only choice seems to be using a machine that really could only exist in a virtual cloud environment, meaning 100+ cores / some hundreds of GB of RAM (!).
To use dgraph in “big data” applications this ends up being a massive limitation; users really need a way to distribute the bulk loading across many commodity nodes instead of forcing large datasets to be bulk loaded on ludicrous specialty hardware.