On both aforementioned machines, I tried running v1.2.2 and v20.03.0 with the same result: after successfully completing the MAP phase, the reduce phase failed with ~98.5M edge count (note that this edge count was consistent in the failures over both versions and both machines). Over a short period of time, the bulk loader would ramp up its memory usage to the entirety of the machine’s RAM, then the bulk loader would freeze and crash.
When I downgraded the dgraph version to v1.1.1, the MAP and REDUCE phases completely successfully, and I was able to run a new dgraph cluster on top of the resultant p directory.
Hey @dzygmundfelt, thanks for reporting this issue. How much data were you trying to insert(data size and RDF count)? Also it will be very helpful if you can share memory profile with us when usage are at peak.
I’ve seen similar behavior running the bulk loader in v1.2.2 and v02.30.1. However, I’m able to run the reduce phase to completion without the process running out of memory using v2.0.0-rc1. The memory stats below were captured from a bulk process where the reduce phase eventually crashed due to OOM.
// HEAP //
File: dgraph
Type: inuse_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 44487.54MB, 99.36% of 44774.33MB total
Dropped 234 nodes (cum <= 223.87MB)
Showing top 10 nodes out of 60
flat flat% sum% cum cum%
20127.60MB 44.95% 44.95% 20127.60MB 44.95% github.com/dgraph-io/ristretto/z.(*Bloom).Size
15454.99MB 34.52% 79.47% 23157.80MB 51.72% github.com/dgraph-io/badger/v2/pb.(*TableIndex).Unmarshal
7702.81MB 17.20% 96.67% 7702.81MB 17.20% github.com/dgraph-io/badger/v2/pb.(*BlockOffset).Unmarshal
924.56MB 2.06% 98.74% 924.56MB 2.06% github.com/DataDog/zstd.Decompress
259.16MB 0.58% 99.32% 259.16MB 0.58% github.com/dgraph-io/ristretto.newCmRow
15MB 0.034% 99.35% 940.57MB 2.10% github.com/dgraph-io/badger/v2/table.(*Table).block
1.78MB 0.004% 99.36% 43158.64MB 96.39% github.com/dgraph-io/badger/v2/table.OpenTable
0.64MB 0.0014% 99.36% 389.84MB 0.87% github.com/dgraph-io/ristretto.NewCache
0.50MB 0.0011% 99.36% 20128.10MB 44.95% github.com/dgraph-io/ristretto/z.NewBloomFilter
0.50MB 0.0011% 99.36% 633.83MB 1.42% github.com/dgraph-io/dgraph/worker.processTask
// ALLOCS //
File: dgraph
Type: alloc_space
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 1630.99GB, 76.04% of 2145.01GB total
Dropped 1124 nodes (cum <= 10.73GB)
Showing top 10 nodes out of 134
flat flat% sum% cum cum%
600.96GB 28.02% 28.02% 601.24GB 28.03% github.com/dgraph-io/badger/v2.(*safeRead).Entry
555.20GB 25.88% 53.90% 561.62GB 26.18% github.com/DataDog/zstd.Decompress
178.82GB 8.34% 62.24% 201.70GB 9.40% github.com/dgraph-io/dgraph/posting.(*List).Uids
53.67GB 2.50% 64.74% 53.67GB 2.50% github.com/dgraph-io/badger/v2.(*Iterator).newItem
49.83GB 2.32% 67.06% 49.83GB 2.32% github.com/dgraph-io/dgraph/query.(*fastJsonNode).New
48.06GB 2.24% 69.30% 55.58GB 2.59% github.com/dgraph-io/badger/v2/pb.(*TableIndex).Unmarshal
43.70GB 2.04% 71.34% 46.29GB 2.16% github.com/dgraph-io/dgraph/protos/pb.(*Group).Unmarshal
35.23GB 1.64% 72.98% 35.23GB 1.64% bytes.makeSlice
34.10GB 1.59% 74.57% 36.44GB 1.70% github.com/dgraph-io/dgraph/posting.(*pIterator).init
31.41GB 1.46% 76.04% 71.62GB 3.34% github.com/dgraph-io/dgraph/posting.ReadPostingList
Based on the information given (issue is not in 2.0.0-rc1 but it’s present in 20.03.0). I took a log of the changes in the bulk loader between those two releases. Here they are:
commit 2ce4db1f046ea24ec3f74763a5f17ddfd355506e
Author: Daniel Mai <daniel@dgraph.io>
Date: Tue Mar 17 19:32:32 2020 -0700
tests: Fix flaky bulk loader systest. (#4953)
This fixes flaky tests with the following changes:
* Use the same "dgraph" Docker Compose project as the other tests.
* Wait for background indexing to finish (introduced in #4819).
test-bulk-schema.sh was creating its Dgraph cluster using its own
Docker Compose project, which doesn't work nicely with the other
custom cluster tests which all utilize the same "dgraph" project
name. Using the same name allows Docker Compose to cleanly
recreate or delete containers. This change makes
test-bulk-schema.sh use the same "dgraph" project.
Because Dgraph re-indexes in the background, the test must wait
for the update to finish to properly compare the schema before
and after. Otherwise, this error in the test can happen:
test-bulk-schema.sh: verifying schema is same before export and after bulk import
[00:23:41][./dgraph/cmd/bulk/systest/test-bulk-schema.sh] [Test Error Output]
test-bulk-schema.sh: schema incorrect
test-bulk-schema.sh: *** unexpected error ***
[00:23:41][./dgraph/cmd/bulk/systest/test-bulk-schema.sh] [Test Output]
7a8
> "index": true,
8a10,12
> "tokenizer": [
> "exact"
> ],
(cherry picked from commit 7d92d2e96ac9fa6a5247ad5ae3404efb6855ef9c)
commit 3c1ce0ef1b028c04ef35064f84dee5602463ecaa
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date: Tue Mar 17 11:19:45 2020 -0700
Use a different stream writer id for split keys. (#4875)
commit 4486c803753b6c9e0dc94b218c1421e847bf7513
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date: Fri Feb 28 13:13:48 2020 -0800
Remove size calculation from toList method in bulk loader. (#4869)
The total size of the key-value pairs is being calculated but it's not
being used anywhere.
commit 51d579a3175bf9f058bde3b4423a8cd03b4319d7
Author: Martin Martinez Rivera <martinmr@users.noreply.github.com>
Date: Tue Feb 25 16:56:08 2020 -0800
Fix spacing and comments in reduce.go (#4850)
No code changes.
commit f7d0371408bcb60d12d5b743736e33ac4d9e6765
Author: balaji <rbalajis25@gmail.com>
Date: Tue Feb 25 19:57:42 2020 +0530
Add partition key based iterator to the bulk loader (#4841)
commit 317e02e2094e4802b1068fd669c2dd3941c5d6aa
Author: Pawan Rawal <pawan0201@gmail.com>
Date: Tue Feb 25 18:09:36 2020 +0530
Change backup and export tests to use the new GraphQL admin API (#4845)
commit 6b7a339769b9fdb08782ed41598b43e3122f4802
Author: balaji <rbalajis25@gmail.com>
Date: Mon Feb 24 14:23:40 2020 +0530
Revert "add partition key based reducer (#4734)" (#4840)
This reverts commit fa400d60fd5bbf6a3e3bb120123c91197e7277af.
commit fa400d60fd5bbf6a3e3bb120123c91197e7277af
Author: balaji <rbalajis25@gmail.com>
Date: Mon Feb 24 12:59:19 2020 +0530
add partition key based reducer (#4734)
* add partition key based reducer
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* go routine batcher
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* Made the concurrent part of reducer work. No need for seq number. Ensures that we don't have too many pending requests.
* Lots of changes to make the reducer run nicely w.r.t. memory usage and speed.
* We should not Unmarshal MapEntry upfront. Instead, unmarshal only in encoder. We need a way to parse key easily for comparisons though.
* Some more ideas
* delay marshal
Signed-off-by: balaji <rbalajis25@gmail.com>
* remove fmt
Signed-off-by: balaji <rbalajis25@gmail.com>
* pool fix
Signed-off-by: balaji <rbalajis25@gmail.com>
* ]minor
Signed-off-by: balaji <rbalajis25@gmail.com>
* Manish's comments. Avoid allocating a lot of pb.MapEntries
* reduce allocation
Signed-off-by: balaji <rbalajis25@gmail.com>
* add global allocator
Signed-off-by: balaji <rbalajis25@gmail.com>
* remove arena
Signed-off-by: balaji <rbalajis25@gmail.com>
* remove all
Signed-off-by: balaji <rbalajis25@gmail.com>
* remove allocator
Signed-off-by: balaji <rbalajis25@gmail.com>
* add bulk
Signed-off-by: balaji <rbalajis25@gmail.com>
* add chagne
Signed-off-by: balaji <rbalajis25@gmail.com>
* fix sorting and writer
Signed-off-by: balaji <rbalajis25@gmail.com>
* fix my stupid mistake
Signed-off-by: balaji <rbalajis25@gmail.com>
* remove compression
Signed-off-by: balaji <rbalajis25@gmail.com>
* bring back
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* wip
Signed-off-by: balaji <rbalajis25@gmail.com>
* Merge branch 'master' of https://github.com/dgraph-io/dgraph into balaji/new_reduce
Signed-off-by: balaji <rbalajis25@gmail.com>
commit 5ac56f0628ff36488110a0c530605f12c7e86803
Author: Pawan Rawal <pawan0201@gmail.com>
Date: Fri Feb 21 00:50:01 2020 +0530
Revert "Remove HTTP admin endpoints (#4754)" (#4822)
This reverts commit 8fa2fd943a362c588259a4c9b3e2216798a4e1c9.
Most of these commits are minor changes and I don’t expect them to change the memory output too much. I think commit f7d0371408bcb60d12d5b743736e33ac4d9e6765 might be the cause. I’ll look at it to see if I can spot any changes that might have caused the memory to increase.
@balajijinnah can you look at the commit as well? You probably have better context on this issue as well.
@danielmai I have read all source code of bulkloader, both v1.1.1 and v20.03.1. I approve @martinmr that the commit f7d0371 caused the memory to increase. I have tested v1.1.1, v1.2.1, v1.2.2, v20.03.1 and the master, bulkdloader cousumed much more momory during the reduce stage and took more time to load dataset since v1.2.2.
And there is a bug in v1.2.2, the var size int just doesn’t work:
func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int {
var currentKey []byte
var uids []uint64
pl := new(pb.PostingList)
var size int
appendToList := func() {
atomic.AddInt64(&r.prog.reduceKeyCount, 1)
// For a UID-only posting list, the badger value is a delta packed UID
// list. The UserMeta indicates to treat the value as a delta packed
// list when the value is read by dgraph. For a value posting list,
// the full pb.Posting type is used (which pb.y contains the
// delta packed UID list).
if len(uids) == 0 {
return
}
// If the schema is of type uid and not a list but we have more than one uid in this
// list, we cannot enforce the constraint without losing data. Inform the user and
// force the schema to be a list so that all the data can be found when Dgraph is started.
// The user should fix their data once Dgraph is up.
parsedKey, err := x.Parse(currentKey)
x.Check(err)
if parsedKey.IsData() {
schema := r.state.schema.getSchema(parsedKey.Attr)
if schema.GetValueType() == pb.Posting_UID && !schema.GetList() && len(uids) > 1 {
fmt.Printf("Schema for pred %s specifies that this is not a list but more than "+
"one UID has been found. Forcing the schema to be a list to avoid any "+
"data loss. Please fix the data to your specifications once Dgraph is up.\n",
parsedKey.Attr)
r.state.schema.setSchemaAsList(parsedKey.Attr)
}
}
pl.Pack = codec.Encode(uids, 256)
shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
if shouldSplit {
l := posting.NewList(y.Copy(currentKey), pl, r.state.writeTs)
kvs, err := l.Rollup()
x.Check(err)
list.Kv = append(list.Kv, kvs...)
} else {
val, err := pl.Marshal()
x.Check(err)
kv := &bpb.KV{
Key: y.Copy(currentKey),
Value: val,
UserMeta: []byte{posting.BitCompletePosting},
Version: r.state.writeTs,
}
list.Kv = append(list.Kv, kv)
}
uids = uids[:0]
pl.Reset()
}
for _, mapEntry := range mapEntries {
atomic.AddInt64(&r.prog.reduceEdgeCount, 1)
if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil {
appendToList()
}
currentKey = mapEntry.Key
uid := mapEntry.Uid
if mapEntry.Posting != nil {
uid = mapEntry.Posting.Uid
}
if len(uids) > 0 && uids[len(uids)-1] == uid {
continue
}
uids = append(uids, uid)
if mapEntry.Posting != nil {
pl.Postings = append(pl.Postings, mapEntry.Posting)
}
}
appendToList()
return size
}
Hey @xiangzhao632 , Yep that commit will increase the memory usage. The main reason for us to bring that change is to bulk load large dataset. By heap-based method is not working well with big dataset.
Regarding the performance, we’re tossing new ideas to improve. (eg: parallel sorting) . I will update you once, I land there.
Thank you @balajijinnah, I want to share some points:
I think memory usage is the bottleneck rather than the loading speed. The largest memory of my machines is 192g (many other teams are 128G). Such memory can only cope with 3 billion rdf, while my team have more than 50 billion rdf. I know some teams who want to try dgraph, but due to the OOM issue, they can’t init a cluster by bulkloader and they turned to liveloader.
Less memory usage in the reduce stage means that --reducer can be set to a larger value, which can also increase the loading speed.
How do you think
At reduce stage, it costs too much memory. We have 390G rdf files, and our 256G memory machine just crashed due to OOM issue in the reduce stage even when edge_count was 0.