[DGraph Cluster] 600ms vs 6s: Query response times differ greatly based on the shard that is executing it

Hello there!

I have a DGraph cluster deployment using 1 Zero and 6 Alphas, split into 2 shards with 3 replicas each. All of those are running on separate VMs (gcloud n1-standard-4: 4 vCPUs 15G Memory and an SSD)

Here is my compose file:

version: "3.3"

services:
  dgraphzero:
    image: dgraph/dgraph:v21.03.2
    command: dgraph zero --my=dgraphzero:5080 --replicas 3 --telemetry "sentry=false"
    ports:
      - "6080"
      - "5080"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdb1==true"
  dgraphalpha1:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha1:7080 --zero=dgraphzero:5080 -o 0 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9080"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport1==true"
  dgraphalpha2:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha2:7081 --zero=dgraphzero:5080 -o 1 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9081"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport2==true"
  dgraphalpha3:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha3:7082 --zero=dgraphzero:5080 -o 2 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9082"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport3==true"
  dgraphalpha4:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha4:7083 --zero=dgraphzero:5080 -o 3 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9083"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport4==true"
  dgraphalpha5:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha5:7084 --zero=dgraphzero:5080 -o 4 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9084"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport5==true"
  dgraphalpha6:
    image: dgraph/dgraph:v21.03.2
    command: dgraph alpha --my=dgraphalpha6:7085 --zero=dgraphzero:5080 -o 5 --security whitelist=0.0.0.0/0 --telemetry "sentry=false" --limit "query-edge=1000000"
    ports:
      - "9085"
    deploy:
      placement:
        constraints:
          - "node.labels.graphdbsupport6==true"

networks:
  default:
    attachable: true
    driver: overlay

The issue
I send a query to the cluster using the dgo golang client, which randomly picks one alpha to issue the request to. Depending on which shard the selected alpha belongs to, the response time differs immensely: 6 seconds versus 600 ms

Simplified: If the predicate that I @filter on, is on the remote shard, response times take a nose dive

The schema
I have the following schema as an example to reproduce the issue

type Object {
                                        o.tenant
					o.color
					o.shape
					o.material
					o.city
                                        o.name
					o.beer
					o.cc
					o.email
                                        o.year
					o.uuid
					o.hip
                                        o.country
					o.animal
					o.word
				}
				type Material {
					m.name
				}
				
 				o.tenant: string @index(exact) .
				o.color: string @index(exact) .
				o.shape: string @index(exact) .
				o.city: string @index(exact) .
				o.name: string @index(exact) .
				o.year: int @index(int) .
				o.beer: string @index(exact) .
				o.cc: string @index(exact) .
				o.email: string @index(exact) .
				o.hip: string @index(exact) .
				o.material: uid .
				o.uuid: string @index(exact) .
				m.name: string @index(term) .
				o.country: string @index(exact) .
				o.animal: string @index(exact) .
				o.word: string @index(exact) .

Some predicates are low cardinality (o.tenantId) while some medium (o.country) or high (o.year) cardinality and o.uuid is unique. In total, I provision 500000 objects.

The query
The query is not ideal for a graph database, I admit, however one of our use cases requires this query where we select 250 objects by their ID that belong to a specific tenant.

query q(){
	q(func: eq(o.tenant, "TheOneAndOnly")) @filter(eq(o.uuid, "a") OR eq(o.uuid, "b") OR eq(o.uuid, "c") OR eq(o.uuid, "d") OR *.... x250* ){
	        expand(_all_)
        }
}

Let’s say the

  • o.tenant predicate is hosted on shard 1
  • o.uuid predicate is hosted on shard 2

If request is sent to shard 1: ~ 6 seconds
If request is sent to shard 2: ~ 600 ms

I assume this is due to how filters are executed

Expectation
Same response time no matter which alpha in the cluster receives the request

For completeness the provisioning code

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/alexflint/go-arg"
	"github.com/brianvoe/gofakeit/v6"
	"github.com/dgraph-io/dgo/v210"
	"github.com/dgraph-io/dgo/v210/protos/api"
	"github.com/google/uuid"
	"google.golang.org/grpc"
	"math/rand"
	"os"
	"strings"
)

var args struct {
	QueryHost string `arg:"-t" default:"dgraphalpha1:9080"`
}

func main() {
	arg.MustParse(&args)

	hostList := []string{"dgraphalpha1:9080",
		"dgraphalpha2:9081",
		"dgraphalpha3:9082",
		"dgraphalpha4:9083",
		"dgraphalpha5:9084",
		"dgraphalpha6:9085"}

	var connections []api.DgraphClient
	for _, host := range hostList {
		grpcConn, err := grpc.Dial(host, grpc.WithInsecure())
		if err != nil {
			fmt.Printf("error connecting to host %s: %s\n", host, err.Error())
			os.Exit(1)
		}
		connections = append(connections, api.NewDgraphClient(grpcConn))
	}
	client := dgo.NewDgraphClient(connections...)
        provision(client)
	
}

func provision(client *dgo.Dgraph) {
	schema := `type Object {
                    o.tenant
					o.color
					o.shape
					o.material
					o.city
                    o.name
					o.beer
					o.cc
					o.email
                    o.year
					o.uuid
					o.hip
                    o.country
					o.animal
					o.word
				}
				type Material {
					m.name
				}
				
 				o.tenant: string @index(exact) .
				o.color: string @index(exact) .
				o.shape: string @index(exact) .
				o.city: string @index(exact) .
				o.name: string @index(exact) .
				o.year: int @index(int) .
				o.beer: string @index(exact) .
				o.cc: string @index(exact) .
				o.email: string @index(exact) .
				o.hip: string @index(exact) .
				o.material: uid .
				o.uuid: string @index(exact) .
				m.name: string @index(term) .
				o.country: string @index(exact) .
				o.animal: string @index(exact) .
				o.word: string @index(exact) .
				`

	if err := client.Alter(context.Background(), &api.Operation{
		Schema:          schema,
		RunInBackground: false,
	}); err != nil {
		fmt.Printf("error creating schema: %s\n", err.Error())
		os.Exit(1)
	}
	fmt.Println("Provisioning")
	materialNQuads := []byte(`_:material1 <m.name> "steel" .
				 _:material2 <m.name> "paper" .
                 _:material3 <m.name> "wood" .
				`)
	txn := client.NewTxn()
	resp, err := txn.Mutate(context.Background(), &api.Mutation{
		SetNquads: materialNQuads,
		CommitNow: true,
	})
	if err != nil {
		fmt.Printf("error committing materials: %s\n", err.Error())
		os.Exit(1)
	}
	material = append(material, resp.Uids["material1"])
	material = append(material, resp.Uids["material2"])
	material = append(material, resp.Uids["material3"])

	for i := 0; i < 500; i++ {

		func() {
			nQuadBatch := generateBatch(1000)
			txn := client.NewTxn()
			ctx := context.Background()
			//defer txn.Discard(ctx)
			if _, err := txn.Mutate(ctx, &api.Mutation{
				SetNquads: nQuadBatch,
				CommitNow: true,
			}); err != nil {
				fmt.Printf("error comitting batch %d: %s\n", i, err.Error())
			} else {
				fmt.Println("batch complete")
			}
		}()
	}
	fmt.Println("Done")
}

var colors = []string{"red", "yellow", "green", "blue", "purple", "orange"}
var shapes = []string{"square", "circle", "triangle"}
var material []string

func generateBatch(size int) []byte {
	var nQuads string
	for i := 0; i < size; i++ {
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.tenant> \"%s\" . \n", i, "TheOneAndOnly")
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.color> \"%s\" . \n", i, getRandomString(colors))
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.shape> \"%s\" . \n", i, getRandomString(shapes))
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.material> <%s> . \n", i, material[0])
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.city> \"%s\" . \n", i, gofakeit.City())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.name> \"%s\" . \n", i, gofakeit.Name())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.year> \"%d\" . \n", i, gofakeit.Year())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.beer> \"%s\" . \n", i, gofakeit.BeerName())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.hip> \"%s\" . \n", i, gofakeit.HipsterWord())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.country> \"%s\" . \n", i, gofakeit.Country())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.animal> \"%s\" . \n", i, gofakeit.Animal())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.word> \"%s\" . \n", i, gofakeit.Word())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.cc> \"%s\" . \n", i, gofakeit.CreditCard())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.email> \"%s\" . \n", i, gofakeit.Email())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <o.uuid> \"%s\" . \n", i, uuid.NewString())
		nQuads = nQuads + fmt.Sprintf("_:object_%d <dgraph.type> \"Object\" . \n", i)

	}
	return []byte(nQuads)
}
func getRandomString(src []string) string {
	return src[rand.Intn(len(src))]
}

Any tips to improve my schema are also appreciated. It should be noted that in the real schema, we have other types that have relationships between each other as well as relationships to objects, hence the need for a graph. The above is a reduced version to show the issue with a specific query

Thank you so much!

Is it possible to construct the query with multiple query blocks? Like this:

query q() {
	TENANT as var(func: eq(o.tenant, "TheOneAndOnly"))
	U001 as var(func: eq(o.uuid, "a"))
	U002 as var(func: eq(o.uuid, "b"))
	U003 as var(func: eq(o.uuid, "c"))
	...
	q(func: uid(U001,U002,U003)) @filter(uid(TENANT)) {
	    expand(_all_)
    }
}

Query blocks run in parallel afaik. I’m interested in the difference in performance, and if the same issue occurs.

1 Like

Hey @Benjamin_Rupp I’d like to replicate this setup, seems like a good harness.

When you say ‘separate VMs’ above, are you saying there are seven (7) n1-standard VMs in your set up, each running dgraph (1 zero and 6 alphas)? If so, I wasn’t aware gcloud could do that from a docker-compose yaml. (I’m not too familiar with gcloud).

@matthewmcneely Thank you for the checking this out.

We install docker swarm on the gcloud instances once they are up, label the nodes and then run the compose.

We use terraform for that, but I’m on vacation for a week now, so don’t have access the terraform config right now.

We gave it a try and it is much faster and more consistent!

The numbers:

filter with 250x OR eq(o.uuid, "a")

UUID on remote shard

  • minimum: ~5.5 sec
  • average: ~ 6 sec
  • maximum: 8 secs

UUID on local shard

  • minimum: 115 ms
  • average: ~120 ms
  • maximum: 155 ms

filter with 250 query blocks

UUID on remote shard

  • minimum: 55 ms
  • average: ~ 60 ms
  • maximum: 70 ms

UUID on local shard

  • minimum: 50 ms
  • average: ~ 55ms
  • maximum: 60 ms

The approach with query blocks executed in parallel is working great!

Thank you!

1 Like