Posted by weickdev:
Following the document, I operation transaction like this:
Transaction txn = dgraphClient.newTransaction();
try {
// Do something here
// ...
} finally {
txn.discard();
}
It works well if I run single thread.
But if I run this in multi-threads, it throws below exception:
Exception in thread "pool-2-thread-5" io.dgraph.TxnConflictException: Transaction has been aborted. Please retry.
at io.dgraph.DgraphClient$Transaction.checkAndThrowException(DgraphClient.java:315)
at io.dgraph.DgraphClient$Transaction.commit(DgraphClient.java:256)
This is my code:
private void saveVertices(List<Map<String, Object>> itemList) {
long startTime = System.currentTimeMillis();
DgraphClient.Transaction transaction = dgraphClient.newTransaction();
try {
try {
String json = JSON.toJSONString(itemList);
DgraphProto.Mutation mutation = DgraphProto.Mutation.newBuilder().setSetJson(ByteString.copyFromUtf8(json)).build();
transaction.mutate(mutation);
} catch (Exception e) {
logger.error("Node create error: ", e);
}
transaction.commit();
} finally {
transaction.discard();
}
logger.info("save {} items costs {}ms", itemList.size(), System.currentTimeMillis() - startTime);
}
private void ansySaveVertex(List<Map<String, Object>> dataList) {
List<Map<String, Object>> dataListCopy = new ArrayList<>();
dataListCopy.addAll(dataList);
executorService.execute(() -> {
saveVertices(dataListCopy);
});
dataList.clear();
}
public void init() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("192.168.1.74", 9080).usePlaintext(true).build();
DgraphGrpc.DgraphBlockingStub blockingStub = DgraphGrpc.newBlockingStub(channel);
dgraphClient = new DgraphClient(Collections.singletonList(blockingStub));
executorService = new ThreadPoolExecutor(
threadNum, threadNum, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadNum),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}