i use dgraph(compile with release v1.2.2) bulk to produce dgraph data , but always out of memery.
read sourse i find in file https://github.com/dgraph-io/dgraph/blob/release/v1.2.2/dgraph/cmd/bulk/reduce.go
function func (r *reducer) toList
func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int {
var currentKey []byte
var uids []uint64
pl := new(pb.PostingList)
var size int
appendToList := func() {
atomic.AddInt64(&r.prog.reduceKeyCount, 1)
// For a UID-only posting list, the badger value is a delta packed UID
// list. The UserMeta indicates to treat the value as a delta packed
// list when the value is read by dgraph. For a value posting list,
// the full pb.Posting type is used (which pb.y contains the
// delta packed UID list).
if len(uids) == 0 {
return
}
// If the schema is of type uid and not a list but we have more than one uid in this
// list, we cannot enforce the constraint without losing data. Inform the user and
// force the schema to be a list so that all the data can be found when Dgraph is started.
// The user should fix their data once Dgraph is up.
parsedKey, err := x.Parse(currentKey)
x.Check(err)
if parsedKey.IsData() {
schema := r.state.schema.getSchema(parsedKey.Attr)
if schema.GetValueType() == pb.Posting_UID && !schema.GetList() && len(uids) > 1 {
fmt.Printf("Schema for pred %s specifies that this is not a list but more than "+
"one UID has been found. Forcing the schema to be a list to avoid any "+
"data loss. Please fix the data to your specifications once Dgraph is up.\n",
parsedKey.Attr)
r.state.schema.setSchemaAsList(parsedKey.Attr)
}
}
pl.Pack = codec.Encode(uids, 256)
shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
if shouldSplit {
l := posting.NewList(y.Copy(currentKey), pl, r.state.writeTs)
kvs, err := l.Rollup()
x.Check(err)
list.Kv = append(list.Kv, kvs...)
} else {
val, err := pl.Marshal()
x.Check(err)
kv := &bpb.KV{
Key: y.Copy(currentKey),
Value: val,
UserMeta: []byte{posting.BitCompletePosting},
Version: r.state.writeTs,
}
list.Kv = append(list.Kv, kv)
}
uids = uids[:0]
pl.Reset()
}
for _, mapEntry := range mapEntries {
atomic.AddInt64(&r.prog.reduceEdgeCount, 1)
if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil {
appendToList()
}
currentKey = mapEntry.Key
uid := mapEntry.Uid
if mapEntry.Posting != nil {
uid = mapEntry.Posting.Uid
}
if len(uids) > 0 && uids[len(uids)-1] == uid {
continue
}
uids = append(uids, uid)
if mapEntry.Posting != nil {
pl.Postings = append(pl.Postings, mapEntry.Posting)
}
}
appendToList()
return size
}
the param size is do nothing ,always return zero. so in function encodeAndWrite ,param listSize is always zero.
func (r *reducer) encodeAndWrite(
writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) {
defer closer.Done()
var listSize int
……
for batch := range entryCh {
listSize += r.toList(batch, list)
if listSize > 4<<20 {
for _, kv := range list.Kv {
setStreamId(kv)
if prevSID != 0 && (prevSID != kv.StreamId) {
doneStreams = append(doneStreams, prevSID)
}
prevSID = kv.StreamId
}
addDone(doneStreams, list)
x.Check(writer.Write(list))
doneStreams = doneStreams[:0]
list = &bpb.KVList{}
listSize = 0
}
}
……
}
Judgment statement listSize > 4<<20 is always false。