GraphQL Throttler Script

Just finished a process of writing a script to sync data between our Dgraph service and another database.

In the process I needed to throttle the GraphQL requests that were coming across to keep the Dgraph server from going OOM with so many reads/writes per ms.

In essence this is like a pool of available connections, but each connection is stateless because that is how GraphQL works. I decided to throttle the throughput instead of actually awaiting the response and releasing back into a pool. So this allows more than the limit of connections to be open at any given time but not more than the limit sent in any given ms.

I think this throttling method is called a “token bucket algorithm”.

I found that Dgraph does well with receiving many requests if they can just be spread out a tiny little bit. Going at it full force throws it into OOM for some configurations.

Here is what I came up with:

const dotenv = require('dotenv')
const fetch = require('node-fetch')

dotenv.config()

const dgraph_endpoint = process.env.dgraph_endpoint
const dgraph_token = process.env.dgraph_token

let limiter = 20
const addRate = 1 // every x seconds
const addAmount = 10 // make this many more available
const limit = 20 // not more than this limit at once though
const throttle = () => limiter > 0 && (limiter-- || true)
const addToBucket = (n) => {
  limiter = limiter + n > limit ? limit : limiter + n
}
setInterval(() => addToBucket(addAmount), addRate * 1000)
const graphql = async (query, variables, authToken) => {
  const q = () => fetch(dgraph_endpoint,{
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      '<your-header-key-here>': authToken || token,
    },
    body: JSON.stringify({
      query,
      variables: variables || {},
    }),
  })
    .then((res) => res.json())
    .then((res)) => {
      if (res.errors) {
        throw Error(JSON.stringify([res.errors, query, variables]))
      }
      return res.data || {}
    })
  return throttle() ? q() : new Promise((r) => {
    const t = setInterval(() => throttle() && (clearInterval(t) || r()), 100)
  }).then(() => q())
}

Then you can use it like:

const promises = []
// a loop that would be replaced by rows from a table for example
for (let i = 0; i < 100; i++) {
  promises.push(
    graphql(
      `query ($ids: [ID!]) { queryType(filter: { id: $ids }) { id, fields }
      }`,
      { ids: ['0x2','0x3','0x4'] }
    )
      .then(data => {
        // do something with `data.queryType` array like compare with rows from a table and do inserts/updates/mutations
      })
  )
}
return Promise.all(promises)
3 Likes