Strange TxnConflictException in java client

What version of Dgraph are you using?

20.07.02
java client: 20.03.2(can’t not find 20.07 version)

Have you tried reproducing the issue with the latest release?

This is the latest release

What is the hardware spec (RAM, OS)?

os: linux
cluster info : 3 machines with 140G memory and 16 cores

Steps to reproduce the issue (command/config used to run Dgraph).

zero

dgraph zero --my xxxx:5080 --replicas 3

alpha

dgraph alpha --whitelist xxxx --lru_mb 42000 --my xxxx:7080 --zero xxxx:5080 --snapshot_after 1000 --graphql_debug

I spent large time to look for the answer about TxnConflictException. All guys say if you concurrent modify the same node, your will get this error. But I’m sure I don’t concurrent modify the same node. My scene is I will migrate large user relations to Dgraph, about six hundred million users, six billion edges. So, I must concurrent insert nodes to Dgraph in order to save times. For simplify the question. I just concurrent batch insert nodes( one batch 1000 nodes), and this nodes just one predicate. the schme and snippet code below:

schema

arith_name:int @index(int) @count @upsert .

type ARITH_U {
    arith_name
}

java client version

<dependency>
            <groupId>io.dgraph</groupId>
            <artifactId>dgraph4j</artifactId>
            <version>20.03.2</version>
</dependency>

test code

Divide from 0 to 100000000L, each size 1000 , then use 10 threads concurrent insert.

public void test1() throws InterruptedException {
        long start = 0;
        long end = 100000000L; //
        int worker = 10;
        int page = 1000;
        ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(worker);
        for (; start < end; ) {
            long taskStart = start;
            start += page;
            long taskEnd = start;
            executorService.execute(() -> {
                List<Long> names = new ArrayList<>();
                for (long k = taskStart; k < taskEnd; k++) {
                    names.add(k);
                }
                upsertAddUsersWithRDFFormatById(names, taskStart, taskEnd);
            });
        }
        executorService.shutdown();
        while (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
            System.out.println(executorService);
        }
    }

public void upsertAddUsersWithRDFFormatById(Collection<Long> ids, long start, long end) {
        try {
            if (ids.size() < 1) {
                return;
            }
            StringBuilder sbQuery = new StringBuilder();
            StringBuilder sbMutation = new StringBuilder();
            sbQuery.append("query ");
            sbQuery.append("{");
            int index = 1;
            for (Long id : ids) {
                sbQuery.append("user").append(index).append(" as var(func:eq(arith_name, ")
                        .append(id).append(")){\n");
                sbQuery.append("}\n");
                sbMutation.append(String.format("uid(%s) <arith_name> \"%s\" .\n", "user" + index, id));
                sbMutation.append(String.format("uid(%s) <dgraph.type> \"ARITH_U\" .\n", "user" + index));
                index++;
            }
            sbQuery.append("}");
        
            DgraphProto.Mutation mutation = DgraphProto.Mutation
                    .newBuilder()
                    .setSetNquads(ByteString.copyFromUtf8(sbMutation.toString()))
                    .build();
            DgraphProto.Request request = DgraphProto.Request
                    .newBuilder()
                    .setQuery(sbQuery.toString())
                    .addMutations(mutation)
                    .setCommitNow(true)
                    .build();
            Transaction transaction = dgraphClient.newTransaction();
            try {
                DgraphProto.Response response = transaction.doRequest(request);
//            System.out.println(response);
            } finally {
                transaction.discard();
                transaction.close();
            }
        } catch (Exception e) {
            if (NestedExceptionUtils.getRootCause(e) instanceof TxnConflictException) {
                logger.error(String.format("[CONFLICT] start:%d, end:%d, ids:%s", start, end, ids), e);
            }else {
                logger.error("unknown error", e);
            }
    }

a little while,I got below errors. As you see, the node inserted are irrelevantly. so I think this is a serious bug in dgraph or dgraph java client.

 java.util.concurrent.CompletionException: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry
        at io.dgraph.AsyncTransaction.lambda$doRequest$2(AsyncTransaction.java:187) ~[dgraph4j-20.03.2.jar:?]
        at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) ~[?:1.8.0_191]
        at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797) ~[?:1.8.0_191]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_191]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595) ~[?:1.8.0_191]
        at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_191]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_191]
        at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_191]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_191]
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_191]
Caused by: java.util.concurrent.CompletionException: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry
        at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:210) ~[dgraph4j-20.03.2.jar:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_191]
     ... 5 more
Caused by: io.dgraph.TxnConflictException: Transaction has been aborted. Please retry
        at io.dgraph.DgraphAsyncClient.lambda$runWithRetries$2(DgraphAsyncClient.java:210) ~[dgraph4j-20.03.2.jar:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) ~[?:1.8.0_191]
        ... 5 more

Every node is irrelevant, but throw conflict exception, Isn’t that strange?

Hi @payne4handsome
Is there any particular reason you have an @upsert tag on the arith_name field (which is an int)? The @upsert directive enforces uniqueness on arith_name when transactions are concurrently executed. Could you please run your use case without @upsert and check the behavior? Please let us know.

@anand In my case, arith_name amount to primary key in mysql. so I must guarantee arith_name is uniqueness.

Hi @payne4handsome, Could you please try after removing @count directive for arith_name? A similar code worked at my end after removing that index. @upsert uses indices to detect mechanism and I believe @count is adding some indexes that are creating conflicts.

Please let us know.

@anand ok, I now try it you mentioned. Waiting for my reply.

@anand Good news, I took you advice that remove @count directive. There no TxnConflictException error. But I also think this is a bug that spent my lots of time. There also have an other question in above code. If batch is 1000 that will have a high CPU system load and memory usage that will cause dgraph alpha crash. we can get errors log just like this

fatal error: runtime: out of memory

runtime stack:
runtime.throw(0x1c5981f, 0x16)
        /usr/local/go/src/runtime/panic.go:1116 +0x72
runtime.sysMap(0xdf70000000, 0x4000000, 0x2ecbaf8)
        /usr/local/go/src/runtime/mem_linux.go:169 +0xc5
runtime.(*mheap).sysAlloc(0x2c91800, 0x1400000, 0x2c91808, 0x801)
        /usr/local/go/src/runtime/malloc.go:715 +0x1cd
runtime.(*mheap).grow(0x2c91800, 0x801, 0x0)
        /usr/local/go/src/runtime/mheap.go:1286 +0x11c
runtime.(*mheap).allocSpan(0x2c91800, 0x801, 0xa20100, 0x2ecbb08, 0x200000001)
        /usr/local/go/src/runtime/mheap.go:1124 +0x6a0
runtime.(*mheap).alloc.func1()
        /usr/local/go/src/runtime/mheap.go:871 +0x64
runtime.(*mheap).alloc(0x2c91800, 0x801, 0xa50101, 0x2c91800)
        /usr/local/go/src/runtime/mheap.go:865 +0x81
runtime.largeAlloc(0x1000002, 0x101, 0x2c91800)
        /usr/local/go/src/runtime/malloc.go:1152 +0x92
runtime.mallocgc.func1()
        /usr/local/go/src/runtime/malloc.go:1047 +0x46
runtime.systemstack(0x0)
        /usr/local/go/src/runtime/asm_amd64.s:370 +0x66
runtime.mstart()
        /usr/local/go/src/runtime/proc.go:1041

goroutine 14913849 [running]:
runtime.systemstack_switch()
        /usr/local/go/src/runtime/asm_amd64.s:330 fp=0xc0809670e8 sp=0xc0809670e0 pc=0xa53440
runtime.mallocgc(0x1000002, 0x1a12320, 0x1, 0x1555574)
        /usr/local/go/src/runtime/malloc.go:1046 +0x895 fp=0xc080967188 sp=0xc0809670e8 pc=0x9f9915
runtime.makeslice(0x1a12320, 0x1000002, 0x1000002, 0x1ee69c0)
        /usr/local/go/src/runtime/slice.go:49 +0x6c fp=0xc0809671b8 sp=0xc080967188 pc=0xa3a48c
encoding/json.(*decodeState).literalStore(0xc1353eae70, 0xd17e23400d, 0x155555a, 0x1555ff3, 0x19e6120, 0xc134da67e0, 0x197, 0x0, 0x1555574, 0xc1353eae98)
        /usr/local/go/src/encoding/json/decode.go:961 +0x223d fp=0xc080967350 sp=0xc0809671b8 pc=0xb0992d
encoding/json.(*decodeState).value(0xc1353eae70, 0x19e6120, 0xc134da67e0, 0x197, 0x1, 0xc07b73c520)
        /usr/local/go/src/encoding/json/decode.go:401 +0x1de fp=0xc0809673b8 sp=0xc080967350 pc=0xb03c3e

There is a fatal error OOM. In the source code dgraph.main.go, there have a hard code to set thread num 128. I think this will let system load higher. I suggest let user to configure this in dgraph alpha start command.

rand.Seed(time.Now().UnixNano())
	// Setting a higher number here allows more disk I/O calls to be scheduled, hence considerably
	// improving throughput. The extra CPU overhead is almost negligible in comparison. The
	// benchmark notes are located in badger-bench/randread.
	runtime.GOMAXPROCS(128)

Hi @payne4handsome,
It is normal to have very high CPU/RAM load while doing bulk load into Dgraph. My suggestion is to space out the upserts so that alpha does not get overwhelmed (this will avoid the OOM situation that you are seeing). If you have confidence that your source does not have duplicates, you can switch off @upsert while doing the load, and then switch it back on when you have finished the bulk load.

Also, please review the fast data loaders, which may be more suitable option for bulk data loads.

ok,Thanks for suggestions. But you know alter schema will spend lots of time(for example some hours). Anyway, you solved my question in title. Thanks again.

1 Like

Hi @payne4handsome, you are welcome! :grinning: Please let us know if we can help in any other aspects.