Inconsistent bulk loader failures

Moved from GitHub dgraph/5361

Posted by dzygmundfelt:

What version of Dgraph are you using?

v1.1.1, v1.2.2, v20.03.0

Have you tried reproducing the issue with the latest release?

No. Latest is currently v20.03.1.

What is the hardware spec (RAM, OS)?

Two different machines running Ubuntu 16.04, one with 4cpu/30gb ram, the other with 8cpu/64gb ram.

Steps to reproduce the issue (command/config used to run Dgraph).

dgraph bulk -f {directory with rdf files} -s {schema file} --map_shards=2 --reduce_shards=1 --http=localhost:8000 --zero=localhost:5080 --format=rdf

Expected behaviour and actual result.

On both aforementioned machines, I tried running v1.2.2 and v20.03.0 with the same result: after successfully completing the MAP phase, the reduce phase failed with ~98.5M edge count (note that this edge count was consistent in the failures over both versions and both machines). Over a short period of time, the bulk loader would ramp up its memory usage to the entirety of the machine’s RAM, then the bulk loader would freeze and crash.

When I downgraded the dgraph version to v1.1.1, the MAP and REDUCE phases completely successfully, and I was able to run a new dgraph cluster on top of the resultant p directory.

ashish-goswami commented :

Hey @dzygmundfelt, thanks for reporting this issue. How much data were you trying to insert(data size and RDF count)? Also it will be very helpful if you can share memory profile with us when usage are at peak.

dzygmundfelt commented :

@ashish-goswami Size of the data is 37GB, with about 425 million nquads. I’ll see about setting up some memory profiling and rerunning.

n3integration commented :

I’ve seen similar behavior running the bulk loader in v1.2.2 and v02.30.1. However, I’m able to run the reduce phase to completion without the process running out of memory using v2.0.0-rc1. The memory stats below were captured from a bulk process where the reduce phase eventually crashed due to OOM.

// HEAP //
File: dgraph
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 44487.54MB, 99.36% of 44774.33MB total
Dropped 234 nodes (cum <= 223.87MB)
Showing top 10 nodes out of 60
      flat  flat%   sum%        cum   cum%
20127.60MB 44.95% 44.95% 20127.60MB 44.95%  github.com/dgraph-io/ristretto/z.(*Bloom).Size
15454.99MB 34.52% 79.47% 23157.80MB 51.72%  github.com/dgraph-io/badger/v2/pb.(*TableIndex).Unmarshal
 7702.81MB 17.20% 96.67%  7702.81MB 17.20%  github.com/dgraph-io/badger/v2/pb.(*BlockOffset).Unmarshal
  924.56MB  2.06% 98.74%   924.56MB  2.06%  github.com/DataDog/zstd.Decompress
  259.16MB  0.58% 99.32%   259.16MB  0.58%  github.com/dgraph-io/ristretto.newCmRow
      15MB 0.034% 99.35%   940.57MB  2.10%  github.com/dgraph-io/badger/v2/table.(*Table).block
    1.78MB 0.004% 99.36% 43158.64MB 96.39%  github.com/dgraph-io/badger/v2/table.OpenTable
    0.64MB 0.0014% 99.36%   389.84MB  0.87%  github.com/dgraph-io/ristretto.NewCache
    0.50MB 0.0011% 99.36% 20128.10MB 44.95%  github.com/dgraph-io/ristretto/z.NewBloomFilter
    0.50MB 0.0011% 99.36%   633.83MB  1.42%  github.com/dgraph-io/dgraph/worker.processTask

// ALLOCS //
File: dgraph
Type: alloc_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1630.99GB, 76.04% of 2145.01GB total
Dropped 1124 nodes (cum <= 10.73GB)
Showing top 10 nodes out of 134
      flat  flat%   sum%        cum   cum%
  600.96GB 28.02% 28.02%   601.24GB 28.03%  github.com/dgraph-io/badger/v2.(*safeRead).Entry
  555.20GB 25.88% 53.90%   561.62GB 26.18%  github.com/DataDog/zstd.Decompress
  178.82GB  8.34% 62.24%   201.70GB  9.40%  github.com/dgraph-io/dgraph/posting.(*List).Uids
   53.67GB  2.50% 64.74%    53.67GB  2.50%  github.com/dgraph-io/badger/v2.(*Iterator).newItem
   49.83GB  2.32% 67.06%    49.83GB  2.32%  github.com/dgraph-io/dgraph/query.(*fastJsonNode).New
   48.06GB  2.24% 69.30%    55.58GB  2.59%  github.com/dgraph-io/badger/v2/pb.(*TableIndex).Unmarshal
   43.70GB  2.04% 71.34%    46.29GB  2.16%  github.com/dgraph-io/dgraph/protos/pb.(*Group).Unmarshal
   35.23GB  1.64% 72.98%    35.23GB  1.64%  bytes.makeSlice
   34.10GB  1.59% 74.57%    36.44GB  1.70%  github.com/dgraph-io/dgraph/posting.(*pIterator).init
   31.41GB  1.46% 76.04%    71.62GB  3.34%  github.com/dgraph-io/dgraph/posting.ReadPostingList

martinmr commented :

Based on the information given (issue is not in 2.0.0-rc1 but it’s present in 20.03.0). I took a log of the changes in the bulk loader between those two releases. Here they are:

commit 2ce4db1f046ea24ec3f74763a5f17ddfd355506e
Author: Daniel Mai <daniel@dgraph.io>
Date:   Tue Mar 17 19:32:32 2020 -0700

    tests: Fix flaky bulk loader systest. (#4953)
    
    This fixes flaky tests with the following changes:
    
    * Use the same "dgraph" Docker Compose project as the other tests.
    * Wait for background indexing to finish (introduced in #4819).
    
    test-bulk-schema.sh was creating its Dgraph cluster using its own
    Docker Compose project, which doesn't work nicely with the other
    custom cluster tests which all utilize the same "dgraph" project
    name. Using the same name allows Docker Compose to cleanly
    recreate or delete containers. This change makes
    test-bulk-schema.sh use the same "dgraph" project.
    
    Because Dgraph re-indexes in the background, the test must wait
    for the update to finish to properly compare the schema before
    and after. Otherwise, this error in the test can happen:
    
        test-bulk-schema.sh: verifying schema is same before export and after bulk import
        [00:23:41][./dgraph/cmd/bulk/systest/test-bulk-schema.sh] [Test Error Output]
        test-bulk-schema.sh: schema incorrect
        test-bulk-schema.sh: *** unexpected error ***
        [00:23:41][./dgraph/cmd/bulk/systest/test-bulk-schema.sh] [Test Output]
        7a8
        >       "index": true,
        8a10,12
        >       "tokenizer": [
        >         "exact"
        >       ],
    
    (cherry picked from commit 7d92d2e96ac9fa6a5247ad5ae3404efb6855ef9c)

commit 3c1ce0ef1b028c04ef35064f84dee5602463ecaa
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date:   Tue Mar 17 11:19:45 2020 -0700

    Use a different stream writer id for split keys. (#4875)

commit 4486c803753b6c9e0dc94b218c1421e847bf7513
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date:   Fri Feb 28 13:13:48 2020 -0800

    Remove size calculation from toList method in bulk loader. (#4869)
    
    The total size of the key-value pairs is being calculated but it's not
    being used anywhere.

commit 51d579a3175bf9f058bde3b4423a8cd03b4319d7
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date:   Tue Feb 25 16:56:08 2020 -0800

    Fix spacing and comments in reduce.go (#4850)
    
    No code changes.

commit f7d0371408bcb60d12d5b743736e33ac4d9e6765
Author: balaji <rbalajis25@gmail.com>
Date:   Tue Feb 25 19:57:42 2020 +0530

    Add partition key based iterator to the bulk loader (#4841)

commit 317e02e2094e4802b1068fd669c2dd3941c5d6aa
Author: Pawan Rawal <pawan0201@gmail.com>
Date:   Tue Feb 25 18:09:36 2020 +0530

    Change backup and export tests to use the new GraphQL admin API (#4845)

commit 6b7a339769b9fdb08782ed41598b43e3122f4802
Author: balaji <rbalajis25@gmail.com>
Date:   Mon Feb 24 14:23:40 2020 +0530

    Revert "add partition key based reducer (#4734)" (#4840)
    
    This reverts commit fa400d60fd5bbf6a3e3bb120123c91197e7277af.

commit fa400d60fd5bbf6a3e3bb120123c91197e7277af
Author: balaji <rbalajis25@gmail.com>
Date:   Mon Feb 24 12:59:19 2020 +0530

    add partition key based reducer (#4734)
    
    * add partition key based reducer
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * go routine batcher
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * Made the concurrent part of reducer work. No need for seq number. Ensures that we don't have too many pending requests.
    
    * Lots of changes to make the reducer run nicely w.r.t. memory usage and speed.
    
    * We should not Unmarshal MapEntry upfront. Instead, unmarshal only in encoder. We need a way to parse key easily for comparisons though.
    
    * Some more ideas
    
    * delay marshal
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * remove fmt
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * pool fix
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * ]minor
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * Manish's comments. Avoid allocating a lot of pb.MapEntries
    
    * reduce allocation
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * add global allocator
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * remove arena
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * remove all
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * remove allocator
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * add bulk
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * add chagne
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * fix sorting and writer
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * fix my stupid mistake
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * remove compression
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * bring back
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * wip
    
    Signed-off-by: balaji <rbalajis25@gmail.com>
    
    * Merge branch 'master' of https://github.com/dgraph-io/dgraph into balaji/new_reduce
    
    Signed-off-by: balaji <rbalajis25@gmail.com>

commit 5ac56f0628ff36488110a0c530605f12c7e86803
Author: Pawan Rawal <pawan0201@gmail.com>
Date:   Fri Feb 21 00:50:01 2020 +0530

    Revert "Remove HTTP admin endpoints (#4754)" (#4822)
    
    This reverts commit 8fa2fd943a362c588259a4c9b3e2216798a4e1c9.

Most of these commits are minor changes and I don’t expect them to change the memory output too much. I think commit f7d0371408bcb60d12d5b743736e33ac4d9e6765 might be the cause. I’ll look at it to see if I can spot any changes that might have caused the memory to increase.

@balajijinnah can you look at the commit as well? You probably have better context on this issue as well.

balajijinnah commented :

Hey @dzygmundfelt , could you provide me the number of predicates and their count?

jarifibrahim commented :

2 of the 4 items in the heap profile below can be optimized

// HEAP //
File: dgraph
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 44487.54MB, 99.36% of 44774.33MB total
Dropped 234 nodes (cum <= 223.87MB)
Showing top 10 nodes out of 60
      flat  flat%   sum%        cum   cum%
20127.60MB 44.95% 44.95% 20127.60MB 44.95%  github.com/dgraph-io/ristretto/z.(*Bloom).Size
15454.99MB 34.52% 79.47% 23157.80MB 51.72%  github.com/dgraph-io/badger/v2/pb.(*TableIndex).Unmarshal
 7702.81MB 17.20% 96.67%  7702.81MB 17.20%  github.com/dgraph-io/badger/v2/pb.(*BlockOffset).Unmarshal
  924.56MB  2.06% 98.74%   924.56MB  2.06%  github.com/DataDog/zstd.Decompress
  1. Bloom.Size can be reduced by setting badger.BfCacheSize. By default, badger will store all the bloom filters in memory. When there are many sst files, the bloom filters could take up a lot of memory.
    badger/options.go at 62b7a10a949e77b33d43cd7438979833c32cc865 · dgraph-io/badger · GitHub
  2. The pb.TableIndex and pb.BlockOffset both are in-memory indices used by badger to perform lookups badger/table.go at 62b7a10a949e77b33d43cd7438979833c32cc865 · dgraph-io/badger · GitHub
    We should be able to reduce memory usage by optimizing this function badger/table.go at 62b7a10a949e77b33d43cd7438979833c32cc865 · dgraph-io/badger · GitHub and using the bloom filter cache to store the table index as well. As the data grows, the number of sst will grow and so the memory used by in-memory index will grow. I’ve created an issue in badger to track this Optimize memory usage of table.TableIndex · Issue #1335 · dgraph-io/badger · GitHub
  3. Memory used by Decompress should be reduced in the newer versions of dgraph. This has been fixed in badger via Buffer pool for decompression by jarifibrahim · Pull Request #1308 · dgraph-io/badger · GitHub

danielmai commented :

This is fixed via #5537. It’s part of v20.03.3 and v1.2.5.

xiangzhao632 commented :

@danielmai I have read all source code of bulkloader, both v1.1.1 and v20.03.1. I approve @martinmr that the commit f7d0371 caused the memory to increase. I have tested v1.1.1, v1.2.1, v1.2.2, v20.03.1 and the master, bulkdloader cousumed much more momory during the reduce stage and took more time to load dataset since v1.2.2.
And there is a bug in v1.2.2, the var size int just doesn’t work:

func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int {
	var currentKey []byte
	var uids []uint64
	pl := new(pb.PostingList)
	var size int

	appendToList := func() {
		atomic.AddInt64(&r.prog.reduceKeyCount, 1)

		// For a UID-only posting list, the badger value is a delta packed UID
		// list. The UserMeta indicates to treat the value as a delta packed
		// list when the value is read by dgraph.  For a value posting list,
		// the full pb.Posting type is used (which pb.y contains the
		// delta packed UID list).
		if len(uids) == 0 {
			return
		}

		// If the schema is of type uid and not a list but we have more than one uid in this
		// list, we cannot enforce the constraint without losing data. Inform the user and
		// force the schema to be a list so that all the data can be found when Dgraph is started.
		// The user should fix their data once Dgraph is up.
		parsedKey, err := x.Parse(currentKey)
		x.Check(err)
		if parsedKey.IsData() {
			schema := r.state.schema.getSchema(parsedKey.Attr)
			if schema.GetValueType() == pb.Posting_UID && !schema.GetList() && len(uids) > 1 {
				fmt.Printf("Schema for pred %s specifies that this is not a list but more than  "+
					"one UID has been found. Forcing the schema to be a list to avoid any "+
					"data loss. Please fix the data to your specifications once Dgraph is up.\n",
					parsedKey.Attr)
				r.state.schema.setSchemaAsList(parsedKey.Attr)
			}
		}

		pl.Pack = codec.Encode(uids, 256)
		shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
		if shouldSplit {
			l := posting.NewList(y.Copy(currentKey), pl, r.state.writeTs)
			kvs, err := l.Rollup()
			x.Check(err)
			list.Kv = append(list.Kv, kvs...)
		} else {
			val, err := pl.Marshal()
			x.Check(err)
			kv := &bpb.KV{
				Key:      y.Copy(currentKey),
				Value:    val,
				UserMeta: []byte{posting.BitCompletePosting},
				Version:  r.state.writeTs,
			}
			list.Kv = append(list.Kv, kv)
		}

		uids = uids[:0]
		pl.Reset()
	}

	for _, mapEntry := range mapEntries {
		atomic.AddInt64(&r.prog.reduceEdgeCount, 1)

		if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil {
			appendToList()
		}
		currentKey = mapEntry.Key

		uid := mapEntry.Uid
		if mapEntry.Posting != nil {
			uid = mapEntry.Posting.Uid
		}
		if len(uids) > 0 && uids[len(uids)-1] == uid {
			continue
		}
		uids = append(uids, uid)
		if mapEntry.Posting != nil {
			pl.Postings = append(pl.Postings, mapEntry.Posting)
		}
	}
	appendToList()
	return size
}

martinmr commented :

The toList function does not return an int anymore so the bug is no longer relevant but thanks for pointing it out.

I’ll let @balajijinnah answer questions about the commit since he has more context as the author.

balajijinnah commented :

Hey @xiangzhao632 , Yep that commit will increase the memory usage. The main reason for us to bring that change is to bulk load large dataset. By heap-based method is not working well with big dataset.

Regarding the performance, we’re tossing new ideas to improve. (eg: parallel sorting) . I will update you once, I land there.

xiangzhao632 commented :

Thank you @balajijinnah, I want to share some points:

  1. I think memory usage is the bottleneck rather than the loading speed. The largest memory of my machines is 192g (many other teams are 128G). Such memory can only cope with 3 billion rdf, while my team have more than 50 billion rdf. I know some teams who want to try dgraph, but due to the OOM issue, they can’t init a cluster by bulkloader and they turned to liveloader.
  2. Less memory usage in the reduce stage means that --reducer can be set to a larger value, which can also increase the loading speed.
    How do you think

BigMurry commented :

At reduce stage, it costs too much memory. We have 390G rdf files, and our 256G memory machine just crashed due to OOM issue in the reduce stage even when edge_count was 0.

This issue has been fixed in v20.11.0 release of dgraph. Please try running your dataset on the latest release.

1 Like