Support Batch Upserts in Live Loader

Moved from GitHub dgraph/4829

Posted by larvinloy:

What you wanted to do

Given an existing cluster, we want to be able to insert relatively big data-sets (multiple files) that might contain nodes that already exist in the graph.

What you actually did and why it wasn’t great

To do this we decided to use upsert blocks, but unfortunately it appears that I cannot pass an array of upsert blocks to the live loader, the way you could with set.

Any external references to support your case

The only other option seems to be to adopt his approach here, but the number of connections we need to make between the nodes is huge and having to make each connection though an individual call to the mutate endpoint is not feasible.

MichelDiz commented :

@larvinloy you can use the last approach I’ve shared on that topic. use literal numbers instead of Blank Node.

About your ask, it is a pretty huge change in the Liveloader. Not sure if you gonna see this done as fast as you need.

The Liveloader or Bulkloader have strong logic with RDF/JSON. It would be hard to introduce a new player (a query) in this context. It is impossible to add query logic in RDF or JSON. I can’t see an easy way that this having some overall compliance with RDF/JSON.

We would have a logic that reads your RDF and separate the batches in small pieces to use in a transaction batch. But It is hard to code predict what the users want. Things can go out of control fast.

But nothing is impossible with hard work to change the whole thing.

For sure your load would not be pure RDF. You would have to change all your RDF to the Upsert block format. This requires manual work on your side. Or create some script that converts your RDF to Upsert Block.

hackintoshrao commented :

Hey @larvinloy ,

Could you give us more information, like the size of the dataset, the nature of the application?
From your description it looks like we’re looking at a feasibility problem, I’ll check with the team on how to do it and how painful would it be and see if we can ease the process.

larvinloy commented :

@hackintoshrao We’ve got 60 batches of ~150 mil triples. Each batch usually corresponds to a month’s worth of data and up-to a 1/5th of the nodes could already exist in the graph.

@MichelDiz We’ve looked into using literal numbers but that does not work for us either. The idea was to somehow hash our id to a uint64, but that would require us to lease out all the possible uids from DGraph. I tried hitting the lease endpoint with a really large number, but it still leases out only a small number, somewhere around 300k in my case.

larvinloy commented :

The same data on Neptune is sitting at ~12 Terabytes

MichelDiz commented :

Wow, that’s huge! Do you have more than 2⁶⁴ entities?
if each byte you have has it’s own UID. So you have about 12 billion UIDs to lease. This is less than 0,00001% of uids available. (I can be wrong by 1± zero, for more or for less)

I tried hitting the lease endpoint with a really large number, but it still leases out only a small number

You can hit N times you want that endpoint. With big numbers. The lease will grow.

BTW, you should use Bulkloader instead.

Cheers.

UPDATE:

As you can do a “hash” of the id from your dataset. You can create unique Blank-nodes tho. Just add _: in front your hash and you’re done. No need to do UID lease manually.

larvinloy commented :

@MichelDiz We can’t use bulk loader cause its a live cluster. We cant use live loader (with set operations) cause we cant afford duplicate nodes in a data-set of that size. Also, having to store an xid mapping somewhere else adds unnecessary additional complexity.

The idea of leasing all the uids was that, we’d hash our ids to some uid in the entire uid set( 2⁶⁴). That would only work if we have access to all 2⁶⁴ uids at once.

This is why upsert support in the live loader would be so useful to us.

arijitAD commented :

You can run queries like this if you want to perform upsert operation in batches.

upsert {
  query {
    q1(func: eq(username, "Arijit")) {
      u1 as uid
    }
    q2(func: eq(username, "Arijit1")) {
      u2 as uid
    }
   q3(func: eq(username, "Arijit2")) {
      u3 as uid
    }
    q4(func: eq(username, "Arijit3")) {
      u4 as uid
    }
  }

  mutation {
    set {
      uid(u1) <email> "arijit@dgraph.io" .
      uid(u1) <knows> uid(u2) .
      uid(u2) <email> "arijit1@dgraph.io" .
      uid(u2) <knows> uid(u3) .
      uid(u3) <email> "arijit2@dgraph.io" .
      uid(u3) <knows> uid(u4) .
      uid(u4) <email> "arijit3@dgraph.io" .
      uid(u4) <knows> uid(u1) .
    }
  }
}

You can use Dgraph client to perform this operartion.

You can also batch transactions by starting a new transaction, sending upsert requests and then calling commit.

txn := dgraphClient.NewTxn()

for ... { // Perform multiple upsert query.
  query = `
    query {
        user as var(func: eq(email, "wrong_email@dgraph.io"))
    }`
  mu := &api.Mutation{
    SetNquads: []byte(`uid(user) <email> "correct_email@dgraph.io" .`),
  }
  req := &api.Request{
    Query: query,
    Mutations: []*api.Mutation{mu},
    **CommitNow:false**,
  }
  // Update email only if matching uid found.
  if _, err := dg.NewTxn().Do(ctx, req); err != nil {
    log.Fatal(err)
  }
}

err := txn.Commit(ctx)
if err == y.ErrAborted {
  // Retry or handle error
}