[24785200.409547] Out of memory: Kill process 18677 (dgraph) score 712 or sacrifice child
[24785200.411802] Killed process 18677 (dgraph) total-vm:97476656kB, anon-rss:93715760kB, file-rss:0kB, shmem-rss:0kB
@Test
public void testUpsertAddUsersWithRDFFormatById() throws InterruptedException {
long start = 0;
long end = 10000000L; //
int worker = 10;
int page = 1000;
ThreadPoolExecutor executorService = (ThreadPoolExecutor) Executors.newFixedThreadPool(worker);
for (; start < end; ) {
long taskStart = start;
start += page;
long taskEnd = start;
executorService.execute(() -> {
System.out.println(String.format("start: start:%d, end:%d", taskStart, taskEnd));
List<Long> ids = new ArrayList<>();
for (long i = taskStart; i < taskEnd; i++) {
ids.add(i);
}
upsertAddUsersWithRDFFormatById(ids, taskStart, taskEnd);
System.out.println(String.format("end: start:%d, end:%d", taskStart, taskEnd));
});
}
Thread.sleep(1000000000);
}
public void upsertAddUsersWithRDFFormatById(Collection<Long> ids, long start, long end) {
try {
if (ids.size() < 1) {
return;
}
StringBuilder sbQuery = new StringBuilder();
StringBuilder sbMutation = new StringBuilder();
sbQuery.append("query ");
sbQuery.append("{");
int index = 1;
for (Long id : ids) {
sbQuery.append("user").append(index).append(" as var(func:eq(arith_name, ")
.append(id).append(")){\n");
sbQuery.append("}\n");
sbMutation.append(String.format("uid(%s) <arith_name> \"%s\" .\n", "user" + index, id));
sbMutation.append(String.format("uid(%s) <dgraph.type> \"ARITH_U\" .\n", "user" + index));
index++;
}
sbQuery.append("}");
// 此处的mutation可以使用json写,也可以使用rdf的格式写
// System.out.println(sbQuery.toString());
// System.out.println(sbMutation.toString());
DgraphProto.Mutation mutation = DgraphProto.Mutation
.newBuilder()
.setSetNquads(ByteString.copyFromUtf8(sbMutation.toString()))
.build();
DgraphProto.Request request = DgraphProto.Request
.newBuilder()
.setQuery(sbQuery.toString())
.addMutations(mutation)
.setCommitNow(true)
.build();
Transaction transaction = dgraphClient.newTransaction();
try {
DgraphProto.Response response = transaction.doRequest(request);
System.out.println(response);
} finally {
transaction.discard();
transaction.close();
}
} catch (Exception e) {
if (NestedExceptionUtils.getRootCause(e) instanceof TxnConflictException) {
System.out.printf(String.format("[CONFLICT] start:%d, end:%d, ids:%s\n", start, end, ids), e);
} else {
System.out.printf("unknown error %s\n", e);
e.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}