Dgraph4j support for custom executor

Hi,

We are running into issues where CompletableFutures are using too many resources due to the fact that the default ForkJoinPool#commonPool() is used. I would like to make the AsyncClient accept an Executor class upon construction so we can limit the resources used for the AsyncTxn calls.

Thanks,

We have worked out an extended Client that can get around the issue for now.

package co.treelab.dgraph.util;

import io.dgraph.DgraphAsyncClient;
import io.dgraph.DgraphGrpc;
import io.dgraph.ExceptionUtil;
import io.dgraph.TxnConflictException;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

public class DgraphAsyncWithExecutorClient extends DgraphAsyncClient {
    private static final Logger LOG = LoggerFactory.getLogger(DgraphAsyncWithExecutorClient.class);

    private final Executor executor;

    public DgraphAsyncWithExecutorClient(Executor executor, DgraphGrpc.DgraphStub... stubs) {
        super(stubs);
        this.executor = executor;
    }

    @Override
    protected <T> CompletableFuture<T> runWithRetries(String operation, Callable<CompletableFuture<T>> callable) {
        final Callable<CompletableFuture<T>> ctxCallable = Context.current().wrap(callable);

        return CompletableFuture.supplyAsync(
                () -> {
                    try {
                        return ctxCallable.call().get();
                    } catch (InterruptedException e) {
                        LOG.error("The " + operation + " got interrupted:", e);
                        throw new RuntimeException(e);
                    } catch (ExecutionException e) {
                        if (ExceptionUtil.isJwtExpired(e.getCause())) {
                            try {
                                // retry the login
                                retryLogin().get();
                                // retry the supplied logic
                                return ctxCallable.call().get();
                            } catch (InterruptedException ie) {
                                LOG.error("The retried " + operation + " got interrupted:", ie);
                                throw new RuntimeException(ie);
                            } catch (ExecutionException ie) {
                                LOG.error("The retried " + operation + " encounters an execution exception:", ie);
                                throw new RuntimeException(ie);
                            } catch (Exception ie) {
                                LOG.error("The retried " + operation + " encounters a completion exception:", ie);
                                throw new CompletionException(ie);
                            }
                        } else if (e.getCause() instanceof StatusRuntimeException) {
                            StatusRuntimeException ex1 = (StatusRuntimeException) e.getCause();
                            Status.Code code = ex1.getStatus().getCode();
                            String desc = ex1.getStatus().getDescription();

                            if (code.equals(Status.Code.ABORTED)
                                    || code.equals(Status.Code.FAILED_PRECONDITION)) {
                                throw new CompletionException(new TxnConflictException(desc));
                            }
                        }
                        // Handle the case when the outer exception is not caused by JWT expiration
                        throw new RuntimeException(
                                "The " + operation + " encountered an execution exception:", e);
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                },
                executor
        );
    }
}

Thanks @lhr0909,

Accepting this improvement. Will have it in a week’s time in dgraph4j v20.03.3 release.

Hi @lhr0909,

I have raised a PR for the same, please have a look if possible: feat: add client constructor with executor (DGRAPH-2746) by abhimanyusinghgaur · Pull Request #161 · dgraph-io/dgraph4j · GitHub

I will merge it and release a new version of the client, if it looks good to you.

Thanks

1 Like

Hey @abhimanyusinghgaur the PR looks good. I had a small comment but it shouldn’t seem to be a blocking issue.

Thanks @lhr0909.

I have released version 20.03.3 of the client with the PR. It should be visible on Maven Central within 24hrs.

1 Like