Concurrent upserts creating aborted transactions

In a cluster with multiple Zero and Alpha nodes, where should Jaeger be run?

It depends on what you want, all or only one.

Instances simply send data to Jaeger. There are no specific rules for clusters or groups. You will see for each instance the specific activities of those instances and not groups or something. Then I would recommend you put all of them.

Meanwhile, I added the @upsert directive to all of the predicates in my schema (except the ones for [uid] since I don’t know if those can have the directive), but I’m still getting a very large quantity of these messages:
java.util.concurrent.CompletionException: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry

If we can’t establish parallelism, then my company will not be able to use Dgraph going forward.

I ran tcpdump on port 14268, and there is no traffic, so it appears that Dgraph is not sending the data to that port.

Please, share the whole cluster configs and stats (machine) to understand your context.

Dunno what it can be, I can run it just fine. Must be something very specific in your context.

Have you started the cluster with the flag --jaeger.collector? have you inserted the right port in the first time?

We’re running Dgraph Zero and Alpha on a 5 node VM cluster.
Each node is running an instance of Dgraph Zero and an instance of Dgraph Alpha. We’re running the production Dgraph containers.

Here is the script we run to start Dgraph Zero as a docker container on the host named app12. (app08 is our leader):

/usr/local/bin/dgraph-zero :

#!/bin/bash

. /usr/local/bin/docker-functions

DATA=/opt/data/dgraph-zero
IMAGE=docker.*obfuscated*.com/dgraph/dgraph:v1.1.0
OFFSET=2
REPLICAS=3
ZERO_HOST=app08.*obfuscated*.com
ZERO_PORT=5080
ZERO_OFFSET=`expr ${ZERO_PORT} + ${OFFSET}`
IDX=5
JAEGER_COLLECTOR=http://app08.*obfuscated*.com:14268

export DOCKER_RUN=" -d \
--name dgraph-zero \
--memory-swappiness=0 \
--stop-timeout=300 \
--restart=unless-stopped \
--net=host \
-v ${DATA}:/dgraph \
${IMAGE} dgraph zero \
--replicas ${REPLICAS} \
--port_offset ${OFFSET} \
--my $(hostname -f):${ZERO_OFFSET} \
--jaeger.collector=${JAEGER_COLLECTOR} \
--expose_trace \
--idx ${IDX}"

if [ ${IDX} -gt 1 ]; then
    DOCKER_RUN+=" --peer ${ZERO_HOST}:${ZERO_OFFSET}"
fi

run "$@"

Here is the script we run to start Dgraph Alpha as a docker container:

/usr/local/bin/dgraph-alpha :

#!/bin/bash

. /usr/local/bin/docker-functions

DATA=/opt/data/dgraph-alpha
IMAGE=docker.*obfuscated*.com/dgraph/dgraph:v1.1.0
OFFSET=2
ZERO_HOST=app08.*obfuscated*.com
ZERO_PORT=5080
ALPHA_PORT=7080
LRU_MB=20000
ALPHA_OFFSET=`expr ${ALPHA_PORT} + ${OFFSET}`
ZERO_OFFSET=`expr ${ZERO_PORT} + ${OFFSET}`
JAEGER_COLLECTOR=http://app08.*obfuscated*.com:14268

export DOCKER_RUN=" -d \
--name dgraph-alpha \
--memory-swappiness=0 \
--stop-timeout=300 \
--restart=unless-stopped \
--net=host \
-v ${DATA}:/dgraph \
${IMAGE} dgraph alpha \
--port_offset ${OFFSET} \
--lru_mb ${LRU_MB} \
--zero ${ZERO_HOST}:${ZERO_OFFSET} \
--jaeger.collector=${JAEGER_COLLECTOR} \
--expose_trace \
--my $(hostname -f):${ALPHA_OFFSET}"

run "$@"

We’re running Jaeger jaegertracing/all-in-one:1.14.0 on the app08 host.
The script we’re running to start Jaeger is here:

/usr/local/bin/jaeger :

#!/bin/bash
. /usr/local/bin/docker-functions

export DOCKER_RUN=" -d \
--memory-swappiness=0 \
--stop-timeout=300 \
--restart=unless-stopped \
--net=host \
-v /etc/jaeger:/opt/jaeger/config \
-e "COLLECTOR_ZIPKIN_HTTP_PORT=9411" \
docker.*obfuscated*.com/jaegertracing/all-in-one:1.14.0"

run "$@" 

That Jaeger instance is only running on app08. We have Dgraph Zero and Alpha running on all instances in the cluster (app08 - app12).

The hosts are running CentOS 7 (Core).
The Linux Kernel is: Linux 3.10.0-957.10.1.el7.x86_64
The architecture is: x86-64

Is there any additional information that will be helpful?

Are you exposing the port 14268 in Docker?

This bellow is a comment I made in another issue - Try to do it for you case.

if* you have a lot of RAM available, try to run your cluster with these flags (Just for this situation):

--badger.tables=ram
--badger.vlog=mmap

dgraph alpha -h

 --badger.tables string  [ram, mmap, disk] 
Specifies how Badger LSM tree is stored. Option sequence consume most to least RAM while providing best to worst read performance respectively. (default "mmap")
 --badger.vlog string  [mmap, disk] 
Specifies how Badger Value log is stored. mmap consumes more RAM, but provides better performance. (default "mmap")

What I have indicated is a way of assessing whether it could be bottleneck by I/O.

I would then recommend that you have an actual cluster, where each group has its own resources. A single instance with slow HDD tends to have poor performance. But if you have multiple instances with own resources, even with slow HDDs and well-defined groups, there is no performance loss.

Also, Dgraph/BadgerDB works better with SSD/NVMe.

In general Docker can have a bottleneck by I/O.

1 Like

Yes, we’re exposing port 14268 in Docker.
I actually just configured and deployed an entire Jaeger cluster with a Jaeger Agent on each Dgraph host, several Jaeger Collector nodes, and a Jaeger Query node with the Cassandra backend in an attempt to resolve any possible I/O issues.
However, we are still not seeing any Dgraph data appear in the Jaeger cluster.

I think there is a bug in Dgraph.

It can’t be, otherwise it would happen to me too. However, I am using it locally. Via binaries. I am not using docker to test it. In this case, it’s something related to Docker if so. Your config, that’s why I’ve asked about the port.

Do a check list to make sure.

@mrjn You mentioned:

“In general, all folks here, if you have slow queries, please post them in Discuss, and tag me or @dmai. We’re looking for ways to optimize Dgraph”

(Query Performance - #5 by MichelDiz)

There are two issues that I’ve mentioned in this thread, but the root issue is related to a concurrency problem.

We have data structured like this in Dgraph:

In the diagram, blue nodes are existing nodes, and red nodes/edges represent nodes that can be created or edited via an upsert.

My concern is that it appears that the entire root node is getting locked from all of the upsert cases (as per the diagram). This behavior would be equivalent to locking an entire table when only specific rows need to be edited or added, like in the diagram below:

Can someone please confirm if this is not the case? If it’s the case, then it’s a serious architectural/design flaw in Dgraph that needs to be urgently corrected.

Dgraph does not do locking for transactions, instead it executes the mutations and during commit checks if there was a conflict or not. These commit checks involve checking for conflicts. So, we do create conflict keys. If all the nodes you’re operating on are connected to one root node in your dataset, it’s possible that multiple concurrently executing transactions can conflict with each other.

Adding @upsert directive would allow for broader conflict keys, because it is designed to include indices in conflict detection as well.

No, that should not be the case. Can you share your schema?

1 Like

Thanks for the response and detailed explanation.
Thank you also for correcting my statement about “locks.” It does appear that we’re both referring to transaction conflicts and conflict keys.
The query was included in the first post of this thread: Concurrent upserts creating aborted transactions
and the schema is here: Upsert with multiple UIDs - #43 by devinbost

I’ll quote it here for your convenience.

type Products { 
    products: [Product] 
} 
type Product { 
    productId: string 
    options: [Option] 
} 
type Option { 
    optionId: string 
    color: string 
}
<collectionId>: int @index(int) .
<color>: string .
<optionId>: int @index(int) .
<options>: [uid] .
<productId>: int @index(int) .
<products>: [uid] .

I’ll also quote the upsert mutation here for your convenience:

upsert {
  query {
  getVals(func: has(products)) {
    productsUid as uid
  	products @filter(eq(productId, 19610626)) {
      productUid as uid
      options @filter(eq(optionId, 32661491)) {
        optionUid as uid
      }
    }
  }
}

  mutation {
    set {
      uid(productsUid) <products> uid(productUid) .
      uid(productsUid) <dgraph.type> "Products" .
      uid(productUid) <productId> "19610626" .
      uid(productUid) <options> uid(optionUid) .
      uid(productUid) <dgraph.type> "Product" .
      uid(optionUid) <color> "blue" .
      uid(optionUid) <dgraph.type> "Option" .
      uid(optionUid) <optionId> "32661491" .
    }
  }
}

Initially, we excluded the @upsert directive on all fields in the schema. After we included the @upsert directive on our predicates, however, we actually noticed a slight decrease in the ratio of transactions that were throwing the TxnConflictException (to approximately 60% of all transactions, a noticeable decrease from approximately 80% of all transactions).

Most of the incoming messages for the upsert involve different ProductId values, so it’s very surprising that we would see so many transaction conflicts.

If Dgraph will meet our performance requirements, we are planning on running Dgraph at-scale. Our initial production requirements require us to process approximately 1,700 transactions per second with near-time latency (preferably under 300 ms), and we plan to process significantly more transactions per second (one to two orders of magnitude more) if Dgraph passes our phase 1 tests. So, meeting our scaling requirements will be critical for our use-cases.

You mentioned:

If all the nodes you’re operating on are connected to one root node in your dataset, it’s possible that multiple concurrently executing transactions can conflict with each other.

How might that happen?

@mrjn I’m happy to dig into the Dgraph source code as well, but it would be helpful to get some pointers regarding where I should look.

@mrjn @dmai
I modified the upsert to omit the products node like this:

upsert {
  query {
getVals(func: eq(productId, 19610626)) {
  productUid  as uid
    options @filter(eq(optionId, 32661491)) {
    optionUid as uid
    }
  }
}

  mutation {
    set {
      uid(productUid) <productId> "19610626" .
      uid(productUid) <options> uid(optionUid) .
      uid(productUid) <dgraph.type> "Product" .
      uid(optionUid) <color> "blue" .
      uid(optionUid) <dgraph.type> "Option" .
      uid(optionUid) <optionId> "32661491" .
    }
  }
}

(where the integer and string values are provided by incoming messages.)

That dropped my transaction error rate down to 20.4% and dropped the average latency from ~300 ms to 41 ms.
I then deleted the Products node from the Dgraph schema.
After I waited a while, I then started my function again, and I noticed that the transaction error rate (java.util.concurrent.CompletionException: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry) went back up to 57.1%.

So, I’m at a loss for what could be going on here…

Now we’re just getting:

java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception:
at io.dgraph.AsyncTransaction.lambda$doRequest$2(AsyncTransaction.java:173) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_212]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_212]
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: The doRequest encountered an execution exception:
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_212]
… 5 more
Caused by: java.lang.RuntimeException: The doRequest encountered an execution exception:
at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:212) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_212]
… 5 more
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9999658649ns
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_212]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_212]
at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:180) ~[functions.jar:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_212]
… 5 more
Caused by: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9999658649ns
at io.grpc.Status.asRuntimeException(Status.java:533) ~[functions.jar:?]
at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:442) ~[functions.jar:?]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[functions.jar:?]
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700) ~[functions.jar:?]
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[functions.jar:?]
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[functions.jar:?]
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:507) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:627) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:515) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:686) ~[functions.jar:?]
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:675) ~[functions.jar:?]
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[functions.jar:?]
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) ~[functions.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

I restarted the jaeger-collector nodes, and they’re giving me:

    {"level":"info","ts":1573584984.721451,"caller":"flags/admin.go:108","msg":"Mounting health check on admin server","route":"/"}
    {"level":"info","ts":1573584984.7215617,"caller":"flags/admin.go:114","msg":"Starting admin HTTP server","http-port":14269}
    {"level":"info","ts":1573584984.721596,"caller":"flags/admin.go:100","msg":"Admin server started","http-port":14269,"health-status":"unavailable"}
    2019/11/12 18:56:24 gocql: unable to dial control conn 10.1.2.3: dial tcp 10.1.2.3:9042: connect: connection refused
    {"level":"fatal","ts":1573584984.7342262,"caller":"collector/main.go:91","msg":"Failed to init storage factory","error":"gocql: unable to create session: control: unable to connect to initial hosts: dial tcp 10.1.2.3:9042: connect: connection refused","stacktrace":"main.main.func1\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/collector/main.go:91\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).execute\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:762\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).ExecuteC\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:852\ngithub.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra.(*Command).Execute\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/spf13/cobra/command.go:800\nmain.main\n\t/home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/collector/main.go:182\nruntime.main\n\t/home/travis/.gimme/versions/go1.12.1.linux.amd64/src/runtime/proc.go:200"}```

Also, I restarted the `dgraph-zero` nodes, and there's nothing in their logs about Jaeger at all. 
I was able to successfully deploy a new cluster to another environment, so it seems like something is in a bad state. I'm not sure what else to check.

Just an update:

In this repo GitHub - MichelDiz/ItisTimetoReproduce: This is a personal repository with tests to be reproduced.
I’ve created a way to reproduce issues. In there you can run scripts that can provide Dgraph (single ou multi instances) and also run jaeger. All locally. It runs out o the box.

So… It turns out that all we needed to do was stop Dgraph, remove the Dgraph docker containers, and then redeploy them, and Jaeger started working in production.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.