Some problem on branch release/v1.2.2

i use dgraph(compile with release v1.2.2) bulk to produce dgraph data , but always out of memery.

read sourse i find in file https://github.com/dgraph-io/dgraph/blob/release/v1.2.2/dgraph/cmd/bulk/reduce.go
function func (r *reducer) toList

func (r *reducer) toList(mapEntries []*pb.MapEntry, list *bpb.KVList) int {
	var currentKey []byte
	var uids []uint64
	pl := new(pb.PostingList)
	var size int

	appendToList := func() {
		atomic.AddInt64(&r.prog.reduceKeyCount, 1)

		// For a UID-only posting list, the badger value is a delta packed UID
		// list. The UserMeta indicates to treat the value as a delta packed
		// list when the value is read by dgraph.  For a value posting list,
		// the full pb.Posting type is used (which pb.y contains the
		// delta packed UID list).
		if len(uids) == 0 {
			return
		}

		// If the schema is of type uid and not a list but we have more than one uid in this
		// list, we cannot enforce the constraint without losing data. Inform the user and
		// force the schema to be a list so that all the data can be found when Dgraph is started.
		// The user should fix their data once Dgraph is up.
		parsedKey, err := x.Parse(currentKey)
		x.Check(err)
		if parsedKey.IsData() {
			schema := r.state.schema.getSchema(parsedKey.Attr)
			if schema.GetValueType() == pb.Posting_UID && !schema.GetList() && len(uids) > 1 {
				fmt.Printf("Schema for pred %s specifies that this is not a list but more than  "+
					"one UID has been found. Forcing the schema to be a list to avoid any "+
					"data loss. Please fix the data to your specifications once Dgraph is up.\n",
					parsedKey.Attr)
				r.state.schema.setSchemaAsList(parsedKey.Attr)
			}
		}

		pl.Pack = codec.Encode(uids, 256)
		shouldSplit := pl.Size() > (1<<20)/2 && len(pl.Pack.Blocks) > 1
		if shouldSplit {
			l := posting.NewList(y.Copy(currentKey), pl, r.state.writeTs)
			kvs, err := l.Rollup()
			x.Check(err)
			list.Kv = append(list.Kv, kvs...)
		} else {
			val, err := pl.Marshal()
			x.Check(err)
			kv := &bpb.KV{
				Key:      y.Copy(currentKey),
				Value:    val,
				UserMeta: []byte{posting.BitCompletePosting},
				Version:  r.state.writeTs,
			}
			list.Kv = append(list.Kv, kv)
		}

		uids = uids[:0]
		pl.Reset()
	}

	for _, mapEntry := range mapEntries {
		atomic.AddInt64(&r.prog.reduceEdgeCount, 1)

		if !bytes.Equal(mapEntry.Key, currentKey) && currentKey != nil {
			appendToList()
		}
		currentKey = mapEntry.Key

		uid := mapEntry.Uid
		if mapEntry.Posting != nil {
			uid = mapEntry.Posting.Uid
		}
		if len(uids) > 0 && uids[len(uids)-1] == uid {
			continue
		}
		uids = append(uids, uid)
		if mapEntry.Posting != nil {
			pl.Postings = append(pl.Postings, mapEntry.Posting)
		}
	}
	appendToList()
	return size
}

the param size is do nothing ,always return zero. so in function encodeAndWrite ,param listSize is always zero.

func (r *reducer) encodeAndWrite(
	writer *badger.StreamWriter, entryCh chan []*pb.MapEntry, closer *y.Closer) {
	defer closer.Done()
        var listSize int
	……
	for batch := range entryCh {
		listSize += r.toList(batch, list)  
		if listSize > 4<<20 {
			for _, kv := range list.Kv {
				setStreamId(kv)
				if prevSID != 0 && (prevSID != kv.StreamId) {
					doneStreams = append(doneStreams, prevSID)
				}
				prevSID = kv.StreamId
			}
			addDone(doneStreams, list)
			x.Check(writer.Write(list))
			doneStreams = doneStreams[:0]
			list = &bpb.KVList{}
			listSize = 0
		}
	}
	……
}

Judgment statement listSize > 4<<20 is always false。