Bulk uploader not making equal shards

version: v21.03.2
Compressed data size before ingestion: 375GB
Total out size: 894GB
Zero: 32 core, 256 GB, 2TB SSD
Run: 31 hours for map phase, 11 map reduce phase

Command

dgraph bulk -f /coldstartinput/upload/pending_predicates -s /coldstartinput/upload/rdf_schema/patient.rdf --out /coldstartoutput/out --replace_out --num_go_routines=20 --reducers=7 --format=rdf --store_xids --map_shards=14 --reduce_shards=7 > check.log &

last line of bulk uploader

[04:23:44Z] REDUCE 16h19m42s 100.00% edge_count:81.27G edge_speed:1.383M/sec plist_count:49.18G plist_speed:836.6k/sec. Num Encoding MBs: 0. jemalloc: 0 B

Distribution:

  1. Shard: 139GB
  2. Shard: 116GB
  3. Shard: 158GB
  4. Shard: 153GB
  5. Shard: 103GB
  6. Shard: 104GB
  7. Shard: 124GB

K9S screen capture

##Questions:

  • Can someone help to interpret above mentioned the last line of bulk uploader?
  • What can I do make more equal distribution of data among shards
  • Once the alpha nodes are up; there was heavy predicates move in alpha nodes (circled in the above k9s screen capture)

I set the rebalance_ interval flag of zero, which is too large to disable auto rebalance.

@Valdanito thanks for the tip, I will try that.

  • We run bulk loader in a separate cluster and copy ‘p’ values to working cluster
  • Per official docs, increasing --map_shards supposed to evenly distribute shards. I’m not seeing this happening

dgraph is segmented by predicates. The same predicates will be in a group. You can view which predicates are in each group on the cluster page of ratel.

@Valdanito thanks for the reply.

cluster Info
Alpha Nodes: 7
Zero Nodes: 1
Shard Replica: 1
(Essentially 7 groups with no replication)

That was my understanding too and I confirm from the ratel!!! But, what I don’t understand is…

  • This cluster was stood up about 72 hrs ago
  • Based on the K8S logs, all along predicates are on the move on two Alpha nodes (see attached screen capture highlighted in red circle) which I don’t understand
  • Below is the typical log entry
   I0314 17:45:14.678083      20 schema.go:496] Setting schema for attr Encounter.timestamp: datetime, tokenizer: [year], directive: INDEX, count: false                                                                 ││ I0314 17:45:15.057332      20 predicate_move.go:48] Writing 335545 keys                                                                                                                                               ││ I0314 17:45:15.658652      20 log.go:34] L0 was stalled for 5.479s                                                                                                                                                    ││ I0314 17:45:15.721073      20 log.go:34] [0] [E] LOG Compact 0->5 (8, 55 -> 58 tables with 5 splits). [297058 297069 297075 297086 297092 297102 297110 297118 . 297066 297072 297079 297083 297089 297095 297099 297 ││ I0314 17:45:17.902581      20 schema.go:496] Setting schema for attr Encounter.timestamp: datetime, tokenizer: [year], directive: INDEX, count: false

Same here, dgraph zero always thinks that the shards are not equal enough, so it keeps moving, so I set rebalance_ interval, disable it from auto balancing.
You can manually equalize shards in ratel.

@Valdanito thanks so much for suggesting --rebalance_interval trick.

  • I completely rebuilt my cluster with setting --rebalance_interval 1200000h when starting zero’s
  • My cluster is looking so beautiful
  • Although, predicates are bit imbalanced

For now, only my biggest unresolved issues is, bulk uploading more than 350GB (compressed size) data. Below is the command I run, let me know if you have similar tricks :slight_smile:

dgraph bulk -f /coldstart/upload/pending_predicates -s /coldstart/upload/rdf_schema/patient.rdf --replace_out --num_go_routines=20 --reducers=7 --format=rdf --store_xids --map_shards=14 --reduce_shards=7 

I don’t know if your problem is that the imported file is too large. You can try this flag –badger, which is available in both alpha and bulk commands.

@Valdanito thanks for the reply.

  • our each data file size is ~350 MB (compressed), - Is this too high?
  • The --badger default is “compression=snappy; numgoroutines=8;”
  • What should I set this?
    – “zsdt:1”
    – “compression=snappy; numgoroutines=20;”

As the doc says:

Using this compression setting (Snappy) provides a good compromise between the need for a high compression ratio and efficient CPU usage.

I think it depends on the performance of your machine, the size of the dataset and which indexes are used. There are no definite numbers. You need to run the import many times to get experience.

I can’t find the document of --num_go_routines. I remember it is positively related to import speed and memory consumption.