I have been reading up on upsert and implemented this for one of our datasets. In this scenario I am trying to upsert a source and target node and then a edge in between. This works perfect when run in a single thread. But if I run the load in parallel I am getting “Transaction has been aborted. Please retry” errors.
This is not unexpected so I added retry logic to my code. But even with the retry when loading 2 million edges in batches of 500 eventually the retry (30 attempts, 1000ms sleep between) gets exhausted.
I have tried many things including:
- Removing the upsert of the nodes and loading them prior to the edges
- Creating separate mutations for each record
- Creating separate mutations for each source\target\edge
- Playing around w/ different batch sizes
But each time eventually the retry exhausts.
The data looks like this:
sourceid|sourcetype|label|targetid|targettype|start|end|date|release_date
/m/01000l5|Artist|Released|/m/011ygnw|Album||||2005-02-22
/m/01000l5|Artist|Released|/m/01n30lt|Album||||2003-09-09
/m/01000l5|Artist|Released|/m/01n30mg|Album||||2005-06
/m/01000l5|Artist|Released|/m/01n30p3|Album||||2004-10-19
/m/01000l5|Artist|Released|/m/01n30sc|Album||||2002-03-01
/m/01000l5|Artist|Released|/m/01n30ss|Album||||2003-05-20
/m/01000l5|Artist|Released|/m/01n30tl|Album||||2006-07-31
/m/01000l5|Artist|Released|/m/01n30v4|Album||||2005-05-12
/m/01000l5|Artist|Released|/m/01n30vh|Album||||2004-06-22
/m/01000l5|Artist|Released|/m/02z72zd|Album||||2006
/m/01000l5|Artist|Released|/m/03cn54r|Album||||
/m/01000l5|Artist|Released|/m/03g205s|Album||||
/m/01000l5|Artist|Released|/m/05mvlyz|Album||||2009-05-19
/m/01000l5|Artist|Released|/m/063_yr1|Album||||2002
/m/01000l5|Artist|Released|/m/07wq9j|Album||||2004
/m/01000l5|Artist|Released|/m/09vm2v|Album||||2004-03-24
/m/01000l5|Artist|Released|/m/0dn4g77|Album||||
/m/01000l5|Artist|Released|/m/0ds3mj|Album||||2006-02-07
/m/01000l5|Artist|Released|/m/0dz66q4|Album||||
/m/01000l5|Artist|Released|/m/0fgk2l4|Album||||
/m/01000l5|Artist|Released|/m/0ft5s0j|Album||||
/m/01000l5|Artist|Released|/m/0g1pwnv|Album||||2011-01-25
/m/01000l5|Artist|Released|/m/0g7tk5y|Album||||
/m/01000l5|Artist|Released|/m/0gfy52|Album||||2006
/m/01000l5|Artist|Released|/m/0nn4vj|Album||||2002-09-24
/m/01000l5|Artist|Released|/m/0pw38x|Album||||2002
/m/01000mq|Artist|Released|/m/01000mq|Album||||
/m/01000mq|Artist|Released|/m/01000pg|Album||||
/m/01000mq|Artist|Released|/m/01k7vwr|Album||||
/m/01001g5|Artist|Released|/m/01001g5|Album||||
/m/01001g5|Artist|Released|/m/01d8_ql|Album||||2005-05-24
And the code:
private static final int BATCH_SIZE = 500;
protected static final int NUM_RECORDS = 2_000_000;
private static final int NUM_RETRIES = 30;
public void process(String line) throws Exception
{
batch.add(parser.parseEdge(line, AddEdgesBatchRepo.EDGE_COLUMNS));
if (batch.size() >= BATCH_SIZE)
{
int retryNum = 0;
while (true)
{
try
{
repo.startTransaction();
repo.addEdges(batch);
repo.commit();
batch.clear();
break;
}
catch (Exception e)
{
Thread.sleep(1000);
System.out.println("Retrying Transaction [" + retryNum + "]");
if (++retryNum > NUM_RETRIES)
{
throw e;
}
}
}
}
}
public void addEdges(List<EdgeRecord> edgeRecords) throws Exception
{
Set<String> idFuncs = new HashSet<>();
StringBuilder query = new StringBuilder();
StringBuilder nodeUpsertBuilder = new StringBuilder();
StringBuilder edgeUpsertBuilder = new StringBuilder();
query.append("query {\n");
for(EdgeRecord record : edgeRecords)
{
String sourceLabel = escapeValue(record.getFromId()).toString();
String targetLabel = escapeValue(record.getToId()).toString();
String sourceId = record.getFromLabel() + ":" + sourceLabel;
String targetId = record.getToLabel() + ":" + targetLabel;
String sourceIdFunc = "ID_" + sourceId.replaceAll("[:/]", "_");
String targetIdFunc = "ID_" + targetId.replaceAll("[:/]", "_");
if(!idFuncs.contains(sourceIdFunc))
{
nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <_stp_id> \"").append(sourceId).append("\" .\n");
nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <_stp_label> \"").append(sourceLabel).append("\" .\n");
nodeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <dgraph.type> \"").append(record.getFromLabel()).append("\" .\n");
query.append("\tvar(func: eq(<_stp_id>, \"" + sourceId + "\")) { ").append(sourceIdFunc).append(" as uid }\n");
idFuncs.add(sourceIdFunc);
}
if(!idFuncs.contains(targetIdFunc))
{
nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <_stp_id> \"").append(targetId).append("\" .\n");
nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <_stp_label> \"").append(targetLabel).append("\" .\n");
nodeUpsertBuilder.append("uid(").append(targetIdFunc).append(") <dgraph.type> \"").append(record.getToLabel()).append("\" .\n");
query.append("\tvar(func: eq(<_stp_id>, \"" + targetId + "\")) { ").append(targetIdFunc).append(" as uid }\n");
idFuncs.add(targetIdFunc);
}
edgeUpsertBuilder.append("uid(").append(sourceIdFunc).append(") <").append(record.getName()).append("> uid(").append(targetIdFunc).append(")");
if (record.getProperties().size() > 0)
{
edgeUpsertBuilder.append("(");
Boolean first = true;
for (String name : record.getProperties().keySet())
{
if (!first)
{
edgeUpsertBuilder.append(", ");
}
edgeUpsertBuilder.append(name).append("=\"").append(record.getProperties().get(name)).append("\"");
first = false;
}
edgeUpsertBuilder.append(")");
}
edgeUpsertBuilder.append(" .\n");
}
query.append("}\n");
if(edgeUpsertBuilder.length() > 0)
{
Mutation mu = Mutation.newBuilder()
.setSetNquads(ByteString.copyFromUtf8(nodeUpsertBuilder.append(edgeUpsertBuilder).toString()))
.build();
Request request = Request.newBuilder()
.setQuery(query.toString())
.addMutations(mu)
.build();
doRequest(request);
}
else
{
rollback();
}
}
}