Upsert in parallel creates duplicates

I am using an upsert operation as described in dgraph-js to get or create a new node with a given external id called did and return its uid.

My frontend application makes two requests in parallel, both of which call the upsertProfile function with the same did. The problem is that a new uid is created in both calls, and thus I end with a duplicated entry (two uids pointing to the same did).

If I call again the same function after giving enough time to finish the first two calls, a new uid is not created.

I am not sure this is a bug, as I don’t know if the upsert operation works as expected in parallel operations, and how dgraph handle parallel conflicting mutations

Here is the fuction I use:

async upsertProfile(did: string):Promise<void> {
    await this.ready();
    
    const mu = new dgraph.Mutation();
    const req = new dgraph.Request();

    let query = `profile as var(func: eq(did, "${did}"))`;
  
    req.setQuery(`query{${query}}`);

    let nquads = `uid(profile) <did> "${did}" .`;
    nquads = nquads.concat(`\nuid(profile) <dgraph.type> "${PROFILE_SCHEMA_NAME}" .`);

    mu.setSetNquads(nquads);
    req.setMutationsList([mu]);
    req.setCommitNow(true);

    let result = await this.client.newTxn().doRequest(req);
    console.log('[DGRAPH] upsertProfile', {query}, {nquads}, result.getUidsMap().toArray());
  }

Here is the log i got:

run in parallel: 

[DGRAPH] upsertProfile { query: 'profile as var(func: eq(did, "anonymous:02"))' } {
  nquads: 'uid(profile) <did> "anonymous:02" .\nuid(profile) <dgraph.type> "Profile" .'
} [ [ 'uid(profile)', '0x1adc1' ] ]
[DGRAPH] upsertProfile { query: 'profile as var(func: eq(did, "anonymous:02"))' } {
  nquads: 'uid(profile) <did> "anonymous:02" .\nuid(profile) <dgraph.type> "Profile" .'
} [ [ 'uid(profile)', '0x1adc2' ] ]

run afterwards:

[DGRAPH] upsertProfile { query: 'profile as var(func: eq(did, "anonymous:02"))' } {
  nquads: 'uid(profile) <did> "anonymous:02" .\nuid(profile) <dgraph.type> "Profile" .'
} []

Did you use @upsert in the schema for the predicate? https://docs.dgraph.io/query-language/#upsert-directive

Nope! Thanks for pointing it out! Maybe adding a reference to it in the Upsert Block section of the documentation (outside of the example) would help others.

I have just added and it now throws an error in the second upsert.

Error: Transaction has been aborted. Please retry

At least it does not create the duplicate now. However, the upsert throws and some other transactions that I do afterward are not done.

Any ideas on how to handle this? I am thinking either

  • add a time delay and retry this.client.newTxn().doRequest(req) if I get that error (maybe do this for all requests now that I am at it)
  • put the upsertProfile function inside a try catch block and ignore that error.

I think this shouldn’t be an often occurrence, as why would the user upsert the same profile in parallel? I don’t know your use case; I would handle such cases by retrying once and throwing an error for the user if it fails.

2 Likes

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.