(Simon Liang)
November 19, 2020, 9:43am
We are running into issues where CompletableFutures are using too many resources due to the fact that the default ForkJoinPool#commonPool()
is used. I would like to make the AsyncClient accept an Executor
class upon construction so we can limit the resources used for the AsyncTxn calls.
(Simon Liang)
November 19, 2020, 10:20am
We have worked out an extended Client that can get around the issue for now.
package co.treelab.dgraph.util;
import io.dgraph.DgraphAsyncClient;
import io.dgraph.DgraphGrpc;
import io.dgraph.ExceptionUtil;
import io.dgraph.TxnConflictException;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
public class DgraphAsyncWithExecutorClient extends DgraphAsyncClient {
private static final Logger LOG = LoggerFactory.getLogger(DgraphAsyncWithExecutorClient.class);
private final Executor executor;
public DgraphAsyncWithExecutorClient(Executor executor, DgraphGrpc.DgraphStub... stubs) {
this.executor = executor;
protected <T> CompletableFuture<T> runWithRetries(String operation, Callable<CompletableFuture<T>> callable) {
final Callable<CompletableFuture<T>> ctxCallable = Context.current().wrap(callable);
return CompletableFuture.supplyAsync(
() -> {
try {
return ctxCallable.call().get();
} catch (InterruptedException e) {
LOG.error("The " + operation + " got interrupted:", e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
if (ExceptionUtil.isJwtExpired(e.getCause())) {
try {
// retry the login
// retry the supplied logic
return ctxCallable.call().get();
} catch (InterruptedException ie) {
LOG.error("The retried " + operation + " got interrupted:", ie);
throw new RuntimeException(ie);
} catch (ExecutionException ie) {
LOG.error("The retried " + operation + " encounters an execution exception:", ie);
throw new RuntimeException(ie);
} catch (Exception ie) {
LOG.error("The retried " + operation + " encounters a completion exception:", ie);
throw new CompletionException(ie);
} else if (e.getCause() instanceof StatusRuntimeException) {
StatusRuntimeException ex1 = (StatusRuntimeException) e.getCause();
Status.Code code = ex1.getStatus().getCode();
String desc = ex1.getStatus().getDescription();
if (code.equals(Status.Code.ABORTED)
|| code.equals(Status.Code.FAILED_PRECONDITION)) {
throw new CompletionException(new TxnConflictException(desc));
// Handle the case when the outer exception is not caused by JWT expiration
throw new RuntimeException(
"The " + operation + " encountered an execution exception:", e);
} catch (Exception e) {
throw new CompletionException(e);
Thanks @lhr0909 ,
Accepting this improvement. Will have it in a week’s time in dgraph4j v20.03.3
Hi @lhr0909 ,
I have raised a PR for the same, please have a look if possible: feat: add client constructor with executor (DGRAPH-2746) by abhimanyusinghgaur · Pull Request #161 · dgraph-io/dgraph4j · GitHub
I will merge it and release a new version of the client, if it looks good to you.
1 Like
(Simon Liang)
November 25, 2020, 2:34am
Hey @abhimanyusinghgaur the PR looks good. I had a small comment but it shouldn’t seem to be a blocking issue.
Thanks @lhr0909 .
I have released version 20.03.3
of the client with the PR. It should be visible on Maven Central within 24hrs.
1 Like