Retry Logic Getting Exhausted

I have been reading up on upsert and implemented this for one of our datasets. In this scenario I am trying to upsert a source and target node and then a edge in between. This works perfect when run in a single thread. But if I run the load in parallel I am getting “Transaction has been aborted. Please retry” errors.

This is not unexpected so I added retry logic to my code. But even with the retry when loading 2 million edges in batches of 500 eventually the retry (30 attempts, 1000ms sleep between) gets exhausted.

I have tried many things including:

  • Removing the upsert of the nodes and loading them prior to the edges
  • Creating separate mutations for each record
  • Creating separate mutations for each source\target\edge
  • Playing around w/ different batch sizes

But each time eventually the retry exhausts.

The data looks like this:

sourceid|sourcetype|label|targetid|targettype|start|end|date|release_date
/m/01000l5|Artist|Released|/m/011ygnw|Album||||2005-02-22
/m/01000l5|Artist|Released|/m/01n30lt|Album||||2003-09-09
/m/01000l5|Artist|Released|/m/01n30mg|Album||||2005-06
/m/01000l5|Artist|Released|/m/01n30p3|Album||||2004-10-19
/m/01000l5|Artist|Released|/m/01n30sc|Album||||2002-03-01
/m/01000l5|Artist|Released|/m/01n30ss|Album||||2003-05-20
/m/01000l5|Artist|Released|/m/01n30tl|Album||||2006-07-31
/m/01000l5|Artist|Released|/m/01n30v4|Album||||2005-05-12
/m/01000l5|Artist|Released|/m/01n30vh|Album||||2004-06-22
/m/01000l5|Artist|Released|/m/02z72zd|Album||||2006
/m/01000l5|Artist|Released|/m/03cn54r|Album||||
/m/01000l5|Artist|Released|/m/03g205s|Album||||
/m/01000l5|Artist|Released|/m/05mvlyz|Album||||2009-05-19
/m/01000l5|Artist|Released|/m/063_yr1|Album||||2002
/m/01000l5|Artist|Released|/m/07wq9j|Album||||2004
/m/01000l5|Artist|Released|/m/09vm2v|Album||||2004-03-24
/m/01000l5|Artist|Released|/m/0dn4g77|Album||||
/m/01000l5|Artist|Released|/m/0ds3mj|Album||||2006-02-07
/m/01000l5|Artist|Released|/m/0dz66q4|Album||||
/m/01000l5|Artist|Released|/m/0fgk2l4|Album||||
/m/01000l5|Artist|Released|/m/0ft5s0j|Album||||
/m/01000l5|Artist|Released|/m/0g1pwnv|Album||||2011-01-25
/m/01000l5|Artist|Released|/m/0g7tk5y|Album||||
/m/01000l5|Artist|Released|/m/0gfy52|Album||||2006
/m/01000l5|Artist|Released|/m/0nn4vj|Album||||2002-09-24
/m/01000l5|Artist|Released|/m/0pw38x|Album||||2002
/m/01000mq|Artist|Released|/m/01000mq|Album||||
/m/01000mq|Artist|Released|/m/01000pg|Album||||
/m/01000mq|Artist|Released|/m/01k7vwr|Album||||
/m/01001g5|Artist|Released|/m/01001g5|Album||||
/m/01001g5|Artist|Released|/m/01d8_ql|Album||||2005-05-24

And the code:

private static final int BATCH_SIZE = 500;
protected static final int NUM_RECORDS = 2_000_000;
private static final int NUM_RETRIES = 30;

public void process(String line) throws Exception
    {
      batch.add(parser.parseEdge(line, AddEdgesBatchRepo.EDGE_COLUMNS));
      if (batch.size() >= BATCH_SIZE)
      {
        int retryNum = 0;
        while (true)
        {
          try
          {
            repo.startTransaction();
            repo.addEdges(batch);
            repo.commit();
            batch.clear();
            break;
          }
          catch (Exception e)
          {
            Thread.sleep(1000);
            System.out.println("Retrying Transaction [" + retryNum + "]");
            if (++retryNum > NUM_RETRIES)
            {
              throw e;
            }
          }
        }
      }
    }
  public void addEdges(List<EdgeRecord> edgeRecords) throws Exception
  {
    Set<String> idFuncs = new HashSet<>();
    StringBuilder query = new StringBuilder();
    StringBuilder nodeUpsertBuilder = new StringBuilder();
    StringBuilder edgeUpsertBuilder = new StringBuilder();
    query.append("query {\n");
    for(EdgeRecord record : edgeRecords)
    {
      String sourceLabel = escapeValue(record.getFromId()).toString();
      String targetLabel = escapeValue(record.getToId()).toString();
      String sourceId = record.getFromLabel() + ":" + sourceLabel;
      String targetId = record.getToLabel() + ":" + targetLabel;
      String sourceIdFunc = "ID_" + sourceId.replaceAll("[:/]", "_");
      String targetIdFunc = "ID_" + targetId.replaceAll("[:/]", "_");

      if(!idFuncs.contains(sourceIdFunc))
      {
        nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <_stp_id> \"").append(sourceId).append("\" .\n");
        nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <_stp_label> \"").append(sourceLabel).append("\" .\n");
        nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <dgraph.type> \"").append(record.getFromLabel()).append("\" .\n");

        query.append("\tvar(func: eq(<_stp_id>, \"" + sourceId + "\")) { ").append(sourceIdFunc).append(" as uid }\n");
        idFuncs.add(sourceIdFunc);
      }

      if(!idFuncs.contains(targetIdFunc))
      {
        nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <_stp_id> \"").append(targetId).append("\" .\n");
        nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <_stp_label> \"").append(targetLabel).append("\" .\n");
        nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <dgraph.type> \"").append(record.getToLabel()).append("\" .\n");

        query.append("\tvar(func: eq(<_stp_id>, \"" + targetId + "\")) { ").append(targetIdFunc).append(" as uid }\n");
        idFuncs.add(targetIdFunc);
      }

      edgeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <").append(record.getName()).append("> uid(").append(targetIdFunc).append(")");
      if (record.getProperties().size() > 0)
      {
        edgeUpsertBuilder.append("(");
        Boolean first = true;
        for (String name : record.getProperties().keySet())
        {
          if (!first)
          {
            edgeUpsertBuilder.append(", ");
          }
          edgeUpsertBuilder.append(name).append("=\"").append(record.getProperties().get(name)).append("\"");
          first = false;
        }
        edgeUpsertBuilder.append(")");
      }
      edgeUpsertBuilder.append(" .\n");
    }
    query.append("}\n");

    if(edgeUpsertBuilder.length() > 0)
    {
      Mutation mu = Mutation.newBuilder()
              .setSetNquads(ByteString.copyFromUtf8(nodeUpsertBuilder.append(edgeUpsertBuilder).toString()))
              .build();

      Request request = Request.newBuilder()
              .setQuery(query.toString())
              .addMutations(mu)

              .build();
      doRequest(request);
    }
    else
    {
      rollback();
    }
  }
}

@MichelDiz any thoughts on this?

Note I also tried adding @upsert to the appropriate predicates as well as using hash instead of exact for indexing (according to the doc I believe these should accomplish the same thing).

Sorry, I’m not a JAVA developer. I’ll to see if there’s anyone available to see this.

OK. Thanks. As noted in other posts I am evaluating your product (including customer support - which if there is a more appropriate channel to direct those queries let me know) against other databases, so I am looking for solutions or explanations to issues I encounter. If the problem’s are on my side I need to understand what I am doing wrong…

Hi Luke,
We can give you access to our trial support, which is done via a different (dedicated) channel
I am sending you a private message
Thanks,

Hi @luke.daugherty, thanks for sharing above information with us. Please also share your schema with us. Indexing can be one of the reason for transaction abortions. So you can try to insert records with same value in single transactions and compare your retries.
If insertion is one time task then, you can try inserting records without any schema and then change schema, so that indexing can be performed after insertion.
Let us know your findings.

Note, a trial support ticket has been opened on this topic and was being looked at by Alvin Khaled.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.