Concurrent upserts creating aborted transactions

java-client
mutation

(Michel Conrado) #21

It can’t be, otherwise it would happen to me too. However, I am using it locally. Via binaries. I am not using docker to test it. In this case, it’s something related to Docker if so. Your config, that’s why I’ve asked about the port.

Do a check list to make sure.


(Devin Bost) #22

@mrjn You mentioned:

“In general, all folks here, if you have slow queries, please post them in Discuss, and tag me or @dmai. We’re looking for ways to optimize Dgraph”

(Query Performance)

There are two issues that I’ve mentioned in this thread, but the root issue is related to a concurrency problem.

We have data structured like this in Dgraph:

In the diagram, blue nodes are existing nodes, and red nodes/edges represent nodes that can be created or edited via an upsert.

My concern is that it appears that the entire root node is getting locked from all of the upsert cases (as per the diagram). This behavior would be equivalent to locking an entire table when only specific rows need to be edited or added, like in the diagram below:

Can someone please confirm if this is not the case? If it’s the case, then it’s a serious architectural/design flaw in Dgraph that needs to be urgently corrected.


(Manish R Jain) #23

Dgraph does not do locking for transactions, instead it executes the mutations and during commit checks if there was a conflict or not. These commit checks involve checking for conflicts. So, we do create conflict keys. If all the nodes you’re operating on are connected to one root node in your dataset, it’s possible that multiple concurrently executing transactions can conflict with each other.

Adding @upsert directive would allow for broader conflict keys, because it is designed to include indices in conflict detection as well.

No, that should not be the case. Can you share your schema?


(Devin Bost) #24

Thanks for the response and detailed explanation.
Thank you also for correcting my statement about “locks.” It does appear that we’re both referring to transaction conflicts and conflict keys.
The query was included in the first post of this thread: Concurrent upserts creating aborted transactions
and the schema is here: Upsert with multiple UIDs

I’ll quote it here for your convenience.

type Products { 
    products: [Product] 
} 
type Product { 
    productId: string 
    options: [Option] 
} 
type Option { 
    optionId: string 
    color: string 
}
<collectionId>: int @index(int) .
<color>: string .
<optionId>: int @index(int) .
<options>: [uid] .
<productId>: int @index(int) .
<products>: [uid] .

I’ll also quote the upsert mutation here for your convenience:

upsert {
  query {
  getVals(func: has(products)) {
    productsUid as uid
  	products @filter(eq(productId, 19610626)) {
      productUid as uid
      options @filter(eq(optionId, 32661491)) {
        optionUid as uid
      }
    }
  }
}

  mutation {
    set {
      uid(productsUid) <products> uid(productUid) .
      uid(productsUid) <dgraph.type> "Products" .
      uid(productUid) <productId> "19610626" .
      uid(productUid) <options> uid(optionUid) .
      uid(productUid) <dgraph.type> "Product" .
      uid(optionUid) <color> "blue" .
      uid(optionUid) <dgraph.type> "Option" .
      uid(optionUid) <optionId> "32661491" .
    }
  }
}

Initially, we excluded the @upsert directive on all fields in the schema. After we included the @upsert directive on our predicates, however, we actually noticed a slight decrease in the ratio of transactions that were throwing the TxnConflictException (to approximately 60% of all transactions, a noticeable decrease from approximately 80% of all transactions).

Most of the incoming messages for the upsert involve different ProductId values, so it’s very surprising that we would see so many transaction conflicts.

If Dgraph will meet our performance requirements, we are planning on running Dgraph at-scale. Our initial production requirements require us to process approximately 1,700 transactions per second with near-time latency (preferably under 300 ms), and we plan to process significantly more transactions per second (one to two orders of magnitude more) if Dgraph passes our phase 1 tests. So, meeting our scaling requirements will be critical for our use-cases.

You mentioned:

If all the nodes you’re operating on are connected to one root node in your dataset, it’s possible that multiple concurrently executing transactions can conflict with each other.

How might that happen?


(Devin Bost) #25

@mrjn I’m happy to dig into the Dgraph source code as well, but it would be helpful to get some pointers regarding where I should look.


(Devin Bost) #26

@mrjn @dmai
I modified the upsert to omit the products node like this:

upsert {
  query {
getVals(func: eq(productId, 19610626)) {
  productUid  as uid
    options @filter(eq(optionId, 32661491)) {
    optionUid as uid
    }
  }
}

  mutation {
    set {
      uid(productUid) <productId> "19610626" .
      uid(productUid) <options> uid(optionUid) .
      uid(productUid) <dgraph.type> "Product" .
      uid(optionUid) <color> "blue" .
      uid(optionUid) <dgraph.type> "Option" .
      uid(optionUid) <optionId> "32661491" .
    }
  }
}

(where the integer and string values are provided by incoming messages.)

That dropped my transaction error rate down to 20.4% and dropped the average latency from ~300 ms to 41 ms.
I then deleted the Products node from the Dgraph schema.
After I waited a while, I then started my function again, and I noticed that the transaction error rate (java.util.concurrent.CompletionException: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry) went back up to 57.1%.

So, I’m at a loss for what could be going on here…


(Devin Bost) #27

Now we’re just getting:

java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception:
at io.dgraph.AsyncTransaction.lambda$doRequest$2(AsyncTransaction.java:173) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_212]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception:
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_212]
… 5 more
Caused by: java.lang.RuntimeException: The doRequest encountered an execution exception:
at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:212) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_212]
… 5 more
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9999658649ns
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_212]
at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:180) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_212]
… 5 more
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9999658649ns
at io.grpc.Status.asRuntimeException(Status.java:533) ~[functions.jar:?]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442) ~[functions.jar:?]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[functions.jar:?]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700) ~[functions.jar:?]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[functions.jar:?]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:507) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:627) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:515) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:686) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:675) ~[functions.jar:?]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[functions.jar:?]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[functions.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]