Just finished a process of writing a script to sync data between our Dgraph service and another database.
In the process I needed to throttle the GraphQL requests that were coming across to keep the Dgraph server from going OOM with so many reads/writes per ms.
In essence this is like a pool of available connections, but each connection is stateless because that is how GraphQL works. I decided to throttle the throughput instead of actually awaiting the response and releasing back into a pool. So this allows more than the limit of connections to be open at any given time but not more than the limit sent in any given ms.
I think this throttling method is called a “token bucket algorithm”.
I found that Dgraph does well with receiving many requests if they can just be spread out a tiny little bit. Going at it full force throws it into OOM for some configurations.
Here is what I came up with:
const dotenv = require('dotenv')
const fetch = require('node-fetch')
dotenv.config()
const dgraph_endpoint = process.env.dgraph_endpoint
const dgraph_token = process.env.dgraph_token
let limiter = 20
const addRate = 1 // every x seconds
const addAmount = 10 // make this many more available
const limit = 20 // not more than this limit at once though
const throttle = () => limiter > 0 && (limiter-- || true)
const addToBucket = (n) => {
limiter = limiter + n > limit ? limit : limiter + n
}
setInterval(() => addToBucket(addAmount), addRate * 1000)
const graphql = async (query, variables, authToken) => {
const q = () => fetch(dgraph_endpoint,{
method: 'POST',
headers: {
'Content-Type': 'application/json',
'<your-header-key-here>': authToken || token,
},
body: JSON.stringify({
query,
variables: variables || {},
}),
})
.then((res) => res.json())
.then((res)) => {
if (res.errors) {
throw Error(JSON.stringify([res.errors, query, variables]))
}
return res.data || {}
})
return throttle() ? q() : new Promise((r) => {
const t = setInterval(() => throttle() && (clearInterval(t) || r()), 100)
}).then(() => q())
}
Then you can use it like:
const promises = []
// a loop that would be replaced by rows from a table for example
for (let i = 0; i < 100; i++) {
promises.push(
graphql(
`query ($ids: [ID!]) { queryType(filter: { id: $ids }) { id, fields }
}`,
{ ids: ['0x2','0x3','0x4'] }
)
.then(data => {
// do something with `data.queryType` array like compare with rows from a table and do inserts/updates/mutations
})
)
}
return Promise.all(promises)