[21.03.2] Inconsistent edge/reverse edge generation with large mutations

Hi @matthewmcneely ,
thanks again for your time today!
As discussed, this is the schema/queries we used to replicate the issue:

type BookCollection {
	nid
	collection.books
}

type Book {
	nid
	book_name
}

nid: string @index(hash) @upsert .
collection.books: [uid] @count @reverse .
book_name: string .

Read query:

{
  coll(func:eq(nid, "BookColl_13_a")) {
    xid
    count(collection.books)
  }
  books(func:type(Book)) @cascade{
    count(uid)
    ~collection.books @filter(eq(nid, "BookColl_13_a"))
  }  
  // these 2 queries should always return the same count
}

Upsert query:

upsert {
  query{
    target as var(func:eq(nid, "BookColl_13_a")) {
      uid
    }
    books as var(func:type(Book), first: 1000, offset: 0) {
      uid
    }
  }

  mutation {
    delete {
      uid(target) <collection.books> * .
    } // delete first all old edges in the collection
  }
  mutation {
    set {
      uid(target) <nid> "BookColl_13_a" .
      uid(target) <dgraph.type> "BookCollection" .
      uid(target) <collection.books> uid(books) .
    } // update the collection with new book edges
  }  
}

To add the books to the graph I used a simple golang script that adds books in batches like this:

func addBooks(dgraph *dgo.Dgraph) error {
	for i := 0; i < 25; i++ {
		txn := dgraph.NewTxn()
		batchSize := 1000
		_, err := txn.Do(context.Background(), setupBooksRequest(batchSize, i*batchSize))
		if err != nil {
			txn.Discard(context.Background())
			fmt.Println(err)
			return err
		}

		err = txn.Commit(context.Background())
		if err != nil {
			return err
		}
	}
	return nil
}

func setupBooksRequest(limit int, offset int) *api.Request {
	fmt.Println("Limit: " + strconv.Itoa(limit) + ", offset: " + strconv.Itoa(offset))
	var setMutation strings.Builder
	for i := 0; i < limit; i++ {
		fmt.Fprintf(&setMutation, "_:book_%d <nid> %q .\n", i, strconv.Itoa(i+offset))
		fmt.Fprintf(&setMutation, "_:book_%d <book_name> %q .\n", i, uuid.NewString())
		fmt.Fprintf(&setMutation, "_:book_%d <dgraph.type> %q .\n", i, "Book")
	}
	mutations := []*api.Mutation{}
	mutations = append(mutations, &api.Mutation{SetNquads: []byte(setMutation.String())})
	request := api.Request{
		Mutations: mutations,
	}
	return &request
}

The issue seems to happen from the second upsert on a collection. When a new collection is created, the count matches correctly, when it’s updated again the counts start to diverge.
Since the delete is not really executed the first time a collection is created, the issue might have to do with the delete mutation interfering with the set mutation.