Hi!
We are currently on 1.0.8 with 3 nodes, all as replicas using one zero instance. They sit behind nginx using the GRPC proxy provided so that the client is only dealing with one unified endpoint and this has been working for us for the most part.
When we are dealing with write heavy days, I see many transaction abortions occurring. Here is our schema.
id: int @index(int) .
name: string @lang @index(term) .
match: [string] @index(term) .
potentialEmployers: [string] .
title: string @lang @index(term) .
group: int @index(int) .
seniority: int @index(int) .
location: geo @index(geo) .
when: dateTime .
knows: uid @reverse .
prospect: uid .
recommended: uid .
applied: uid .
contacted: uid .
hired: uid .
skipped: uid .
favorited: uid .
attached: uid @reverse .
One particular node we update often is one we call profile. It general has the following data and when we merge these profiles, we often re-insert the same data instead of sending in the difference.
{
_profile: "", // type system suggested by dgraph docs
uid: "0xabcd", // dgraph uid
id: 234, // our id in database
name: "John Smith",
match: ["term1", "term2", "term3"]
title: "Software Developer",
group: 85,
seniority: 2,
...
}
Can anyone help me understand why I would be seeing many transaction abortions here? I’d say we update any set of these nodes thousands of times a minute during peak traffic.
The function we use to grab a “base node” that has an ID and type.
/**
* Given an ID and type, upsert a given node (no conflict), returning the UID
* NOTE: #1 - Use this for initial creation, then update fields of UID to avoid transaction abortions
* NOTE: #2 - This doesn't take transaction because it should always be separate
* @param id
* @param type
* @returns {Promise<void>}
*/
async function upsertBaseNode(id: number, type: string): Promise<string> {
const transaction = dgraphClient.leaseClient().newTxn();
const nodeUID = await dgraphUtils.getUidByType(id, type, transaction);
if (nodeUID) {
Logger.info(`upsertNode(uid=${nodeUID}, type=${type}) EXISTS`);
await transaction.discard();
return nodeUID;
} else {
const mu = new dgraph.Mutation();
mu.setSetJson({ id, ['_' + type]: '' });
const result = await transaction.mutate(mu);
await transaction.commit();
const uids = result.getUidsMap();
const newUID = uids.get('blank-0');
Logger.info(`upsertNode(uid=${newUID}, type=${type}) CREATED`);
return newUID;
}
}
The function used to grab the UID of a node:
async function getUidByType(id: number, type: string, txn?: Object) {
// NOTE: No need for discard(), query only doesn't need discard(), and if we pass in txn, parent function handles discard
const transaction = txn || dgraphClient.leaseClient().newTxn();
const query = `{exists(func: eq(id, ${id})) @filter(has(_${type})) { uid }}`;
let res;
try {
res = await transaction.query(query);
const result = res.getJson();
if (result && result.exists && result.exists.length > 0) {
return Promise.resolve(result.exists[0].uid);
}
return Promise.resolve(null);
} catch (err) {
Logger.error(`getUidByType(${id}, "${type}") FAILED, ${err.message}`);
return Promise.reject(err);
}
}
Finally, our function to merge any node of any type:
/**
* Generically merge a node with some data
* @param nodeData
* @param type
* @param txn
* @returns {Promise<void>}
*/
async function mergeNode(type: string, nodeData: { id: number }, txn: dgraph.Txn) {
if (!nodeData || !nodeData.id) {
throw new Error('No ID specified while merging node');
}
// Only create a new transaction if we need to
const typeLogStr = _.capitalize(type);
const transaction = txn || dgraphClient.leaseClient().newTxn();
// Always create the node, return immediately if there is nothing but the id
const nodeUID = await upsertBaseNode(nodeData.id, type);
const values = _.omit(nodeData, ['id']);
const definedValues = _.pickBy(values, _.identity);
if (Object.keys(definedValues).length === 0) {
Logger.info(`merge${typeLogStr}(id=${nodeData.id}, uid=${nodeUID}) QUICK_RETURN`);
return nodeUID;
}
// Time to update, since we have created the node already, run ignoring index conflict
const mu = new dgraph.Mutation();
mu.setIgnoreIndexConflict(true);
mu.setSetJson({ uid: nodeUID, ...values });
// Run the query and grab the UID used
try {
await transaction.mutate(mu);
if (!txn) await transaction.commit();
Logger.info(`merge${typeLogStr}(uid=${nodeUID}, id=${nodeData.id}) COMMITTED`);
} catch (err) {
Logger.error(`merge${typeLogStr}(uid=${nodeUID}, id=${nodeData.id}) FAILED, ${err.message}`);
return Promise.reject(err);
} finally {
if (!txn) await transaction.discard(); // if commit is successful, discard() is a no-op
}
// Return the UID
return nodeUID;
}
As far as I can tell, this is not related to nginx, we have 8000 connections per worker and the worker count is assigned to auto so it is number of CPUs (roughly 32) so we have 256,000 max connections and as far as I know the GRPC proxy itself does not deal with transactions, that is a dgraph concept.
We already have a retry strategy the tries up to 10 times (increasing by 100ms each failure) and we are still seeing many transaction abortions at then end of 10 tries.
In general, I would like to better understand how writes work with dgraph and badger. How they work with transactions, without transactions, etc. Do we need to queue writes ourselves? For example, when we insert a record into psql, we can update that table while other records are being written to the same table simultaneously. Are we correct in assuming we can do this with dgraph as well?
Any advice would be greatly appreciated