Dgraph can't horizontally scale write speeds

@MichelDiz,

My comments about horizontal scaling still hold I think, I will do tests over HA setup to test my theory.
The fact that a given predicate always goes to same alpha group, and that every write needs majority consensus means, writes for same predicate can’t be scaled.

The reason for me to open this discussion thread was to understand, in theory, how dgraph can horizontally scale writes to same predicate. Could you please explain about, how and if it can be scaled?

About testing with single alpha + zero, I think it went off topic. I will open new discussion post if needed to further discuss this. I shared these numbers as @marvin-hansen was interested in what numbers I had. The janusgraph setup was same, with single janusgraph+cassandra +es setup. And when I ran dgraph, there were no limits set on docker containers. But yeah I will do these tests on HA setup, as HA is what gets deployed in production and not single alpha+zero.

That’s right. The write speeds don’t get better by adding more Alphas to the same group. They are replicas, so the same data has to be copied (done via Raft proposals).

Instead, you should try to:

  1. Do more upserts concurrently.
  2. Do more writes per upsert (if possible).
  3. If V1 and V2 dont’ already exist, other DBs make you create them first, then connect them. Dgraph could do all of that in one mutation (create and connect) and avoid doing upserts. That might or might not be applicable in your case.

@mrjn, Thanks for the clarification

I understand point 2 and 3,
I have few clarifications about 1,

Let’s say I have predicate xid on person.
If I want to add lot of person vertices, concurrent upserts will help because

  1. There is no global lock. So all txns proceed without blocking each other.
  2. Txns are resolved by zero, to either commit or abort (abort if txns are conflicting, proceed otherwise)

Is this the reason as to why concurrent upserts would scale well?

Yeah, no global lock. Queries / mutations can all work concurrently. 2 is correct as well.

@vikram

Just adding more alphas isn’t doing terrible much, and I guess you already figured that out. You have to add a load balancer to dispatch between alphas and then you measure better performance whenever you increase concurrent workload. At least that is what I see in my cluster.

Txn => 88,000 upserts performed sequentially.
=> each tnx searched for V1 and V2, then added edge between the two V1 -> V2

Are V1 & V2 indexed?
Are you using the has operator or do you use of equality operations when adding edges?

It might be possible that the search operation does something equivalent to a full table scan on non-indexed data and that would explain a lot and is most likely unrelated to scaling.

When I tested above scenario, I didn’t do concurrent upserts.
Which means neither adding alpha nor LB would have helped.
I did some more tests with sequential upserts vs concurrent upersts

  • janusgraph (backed by cassandra) has better sequential performance
  • dgraph easily beats janusgraph with concurrent upserts.

dgraph can handle really high (?) concurrent write qps, once we reach these limits, adding more alphas, even with LB infront will NOT increase the write qps.

However once predicate splitting is supported (Splitting predicates into multiple groups), we might be able to scale write speeds by splitting same predicate into multiple groups. But this could introduce new bottleneck by increasing network calls between alpha groups to serve same predicate.

I said might because it depends on how predicate splitting feature is done and increased network calls. @mrjn

Would you please elaborate your reasoning for your conclusion that writing isn’t actually scaling with nodes?

I haven’t had time to do pressure testing to hit whatever limits might exists, but DGraphs concurrent write throughput, from my observation, is pretty good. At least 100X better compared to what we had before, so in this department, I really have nothing to complaint about.

However, I would love to know why the load-balanced solution works so much faster and better than then un-balanced HA deployment?

TLDR:

  • writes need to go through raft consensus process
  • given alpha in a group can NOT act/decide/do anything independently or on its own

Here’s the detailed explanation

For the purpose of this discussion we are assuming we will not make any change to the hardware, i.e we are not vertically scaling.

Let’s assume the following

  • alpha group1 has N alphas, and stores predicate P, on a given hardware
  • and we max out at W writes per second with this setup.

With this, let’s say we add a n alphas to the same group, i.e we end up with N+n alphas in group1
After this change is done, writes per second will not increase, i.e W will not increase.

How writes happen

  • each alphas group is a raft group with a 1 leader and (N+n-1) followers
  • each of the alphas store copy of the same predicate P, i.e they are all replicas
  • to complete a given write w, a proposer needs to propose it to the rest of the alphas in the same group
  • this proposal needs to be replicated on majority of the alphas, (this is the raft consensus algorithm)
  • i.e (N+n)/2 + 1 alphas need to ack the writes

What does all this mean?

  • any/all writes needs to through raft process to complete the writes
  • simply adding new alpha will only increase number of followers, who need to ack writes

@vikram

Where do you get the (N+n)/2 + 1 from?

By definition, RAFT only requires 2n + 1 to cope with n failing nodes so how do y

For the scaling, after having read the somewhat updated design / concept docs, I noticed that:

  1. Lockless transactions don’t need distributed lock management, and I believe, this explains the very high throughput I see when load balancing.

  2. Sequential Consistency preservers total order, as opposed to the weaker partial order, and thus gives much stronger concurrency guarantees than, say, causal consistency. That seems to be a direct consequence from the chosen timestamp isolation level.

  3. Snapshot Isolation with write conflict resolution either aborts or commits a write transaction, which again, ensures cluster-wide consistency.

Based on these few bits, I can’t really determine why, by design or theory, write op’s can’t scale write speeds with the number of nodes assuming a fast network is given to handle the write/ops and raft overhead.

There aren’t terribly many alternatives for RAFT. The most useful I have found is called MultiRaft and somewhat brings order by partitioning RAFT groups to reduce cluster internal overhead requried to scale further. And it seems to work because MultiRaft has eventually merged into the storage engine of another DB

However, what realy bugs me though, the underlying badger can do anywhere between 70k to 135k Put, ops/sec and up to 200k Get, ops/sec, but we don’t see anything like this in DGraph.

According to RAFT, majority need to ack writes, and one of them must be leader

If you see my example, I have assumed
Alpha group has N nodes. and n new nodes are added.

Let’s say N = 3
(N / 2) + 1 = 2 (assuming integer division) i.e writes needs to be acked by at least 2, including the leader.

Lets say I add 2 nodes to group 1, where n = 2.
Majority would be (N + n) /2 + 1 = (3 + 2) / 2 + 1 = 3. i.e writes needs to be acked by one extra alpha, including leader

If total number of nodes in a group is (N+n), then majority is (N+n)/2 + 1

So irrespective of number nodes that get added to alpha group, majority need to ack the writes, including leader, which will become bottleneck for horizontal scaling. i.e leader will need to be vertically scaled, since every writes must go through the leader

RAFT says 2n + 1, since this signifies this has to be odd number of nodes.
and can cope with failure of n nodes, i.e can cope with failure of less than half of the nodes.

The n in my example isn’t same as what RAFT paper says, if it makes easy, I will change my formula to
(N+x)/2 + 1

Where
N is the original size of the cluster = 3
x number of new nodes added = 2
majority for this cluster of size 5 is 5/2 + 1 = (3+2)/2 + 1 = (N+x)/2 + 1 = 3

Hope my formula makes more sense now

@vikram
Thank you for the clarification. I was already suspecting something like this so chances are very high that this limitation wasn’t really exposed earlier because you need a fairly large cluster to bump into this problem in practice.

However, I ended up selecting anzograph for our OLAP use case because it actually scales, today.

Haven’t followed through the whole long thread. But, wanted to clarify a few things:

  • In no system do replicas and leaders all have independent writes. This is true for any system: Raft-based (writes go through leader) or which uses the traditional primary-secondary system (all writes go to primary first).
  • To achieve better write throughputs, one needs sharding, not more replication.
  • To achieve better read throughputs, one needs replication (and could also use sharding).

Now I’m not sure if the write throughput didn’t scale well despite sharding the data? Is that the case here?

I’m not sure what @marvin-hansen tried.
But what I understood was, and trying to explain in this long chain was

  1. Given predicate X, it can’t be sharded. (At least not yet)
  2. No sharding means, no horizontal scaling for that particular predicate.

Also since a given predicate can’t be sharded yet, it might lead to OOM? Supernode Problem

There’s one point that completely missed. I decided to add it here as reference for anyone who is reading this thread.

To achieve better write throughputs, zero has to be horizontally scaled as well. (This needs to be combined with sharding of data, Which has its own limitations as explained above.)

Reason

  • Transactions start and commit timestamps are handed by zero
  • Transactions abort/commit decision is taken by zero, depending on presence/absence of conflicts.

Both these steps are executed by zero leader only! Other zeroes in cluster act as hot standbys. (This statement holds true for Txn handling, but I’m not sure if this applies to predicate move as well)

So what all this means is, to scale write throughput, try the two below

  • 1.Start with data sharding.
    • If you start hitting limits of sharding (one shard=one predicate),
      • you will need to scale-up the alpha group leader atleast, if this group is handling hot predicate. (or might as well scale-up all alphas in the group)
    • predicate sharding is in roadmap 2020, so you may be able to escape this, but schedule is TBD :frowning:

And

  • 2.Scale-up atleast the zero leader (or scale-up all zeroes) to improve txn throughput.

** All this discussion is meant to shed light on internals of dgraph as I understood, so that other devs can make educated choices.

correction

  • To achieve better write throughputs, zero has to be vertically scaled as well.

Hey,

To figure out whether zero or alpha is the bottleneck (the details are below). Your assumption was that the zero timestamps would be the bottleneck, but we observed different. We found out that under full load, one zero could spit out timestamps 7x times the speed at which alpha could ingest data.

We ran some benchmarks to see how much start and commit timestamps could be generated. The throughput was around 70k transactions per second. We also ran the live loader with 1 nquad per transaction (Default is 1000 nquads per transaction), to see the max throughput of alpha. It was around ~10k transactions per second. So theoretically 1 zero could serve up to 7 alphas without being the bottleneck.

Data sharding, Query Planning and Predicate sharding are on the roadmap for this year.

Machine Specs:

  • 1 Zero server, 1 Alpha serer
  • 16GB Ram DDR4
  • AMD Ryzen 7 3700X 8-Core Processor
dgraph live -s 21million-noindex.schema -f 21million.rdf --alpha localhost:9180 --zero localhost:5180 -c500 -b1
package main

import (
        "context"
        "log"
        "testing"

        "github.com/dgraph-io/dgo/v2"
        "github.com/dgraph-io/dgo/v2/protos/api"
        "google.golang.org/grpc"
)

func BenchmarkZeroProposal(b *testing.B) {
        conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
        if err != nil {
                log.Fatal(err)
        }
        defer conn.Close()
        dgraphClient := dgo.NewDgraphClient(api.NewDgraphClient(conn))

        q := `
        {
           me(func: uid(0x01)) {
              uid
           }
        }
        `

        ctx := context.Background()

        b.RunParallel(func(pb *testing.PB) {
                for pb.Next() {
                        txn := dgraphClient.NewTxn()
                        _, err := txn.Query(ctx, q)
                        if err != nil {
                                panic(err)
                        }
                        err = txn.Commit(ctx)
                        if err != nil {
                                panic(err)
                        }
                }
        })

}

3 Likes

The just-published paper explains a lot of the ins & outs of the underlying concept. This is very much appreciated and helps a LOT to understand important details.

https://github.com/dgraph-io/dgraph/blob/master/paper/dgraph.pdf

From the paper, it occurs to me that Alpha’s and zero can scale to whatever number you set, because there aren’t any limitation worth mentioning. However, the really tricky part comes from the interaction between zero and alpha due to the time-stamp based lock-less transaction model.

As @harshil_goel pointed out, one zero can serve up to 7 alphas because alphas ingest data at a much lower rate than the timestamp creation throughput on zero. I did notice, however, that adding more zero’s seems to add more (Raft?) overhead, so technically, keeping the number of zero’s below the number of alphas ensure that throughput never throttles below the capacity determined by alphas. Adding more alphas, however, seem to add more throughput and lowers overall latency because of the linear scaling of the data sharing.

However, here is the big question I still cannot fully answer, even after having read the paper from cover to cover:

If zero is essentially a single leader elected through RAFT, then does it mean that the timestamp generation is equivalent to the throughput of the leader node?

The reason I am asking is, when planning resources I want to know if adding more nodes to zero adds more performance (vertical scaling) or just adding more CPU to zero adds more tx performance (horizontal scaling) and the way I read it from the paper is, that zero is an elected leader with everyone else in standby so I better add a more cores to less zero nodes to increase throughput while keeping overhead low.

Eventually, after some more testing, I ended up with a 1 zero & 5 alpha configuration which really gives very high performance at constantly very low (~ 5ms) latency and that seems to support the notion that more zero’s just add more overhead but don’t contribute much in terms of total throughput.

For the time being, I am using DGraph on a much smaller subset of data and still observing its run-time characteristics and, as it turned out, the 1-zero & 5-Alpha config just flies on write op’s, means writing ~80k data is done in maybe a second or two. And that seems to be in line with the observation that timestamp generation really is the only thing to keep an eye on it. I might be totally wrong, but that is what I observe.

Still need to test out a 3x10 config to see if these write op’s keep up approximating the timestamp throughput to answer the only hard question left, that is, how do go well above and beyond the ~70k timestamps/sec?

After having read the paper, the only practical way I can imagine is just adding more horsepower in terms of adding a ton more CPU and memory to the zero nodes to pump up timestamp generation.

Is that correct or do I miss something important here?

I am asking because my next use case starts out with some ~50M op/sec and can go as high as ~500M op/sec and obviously I have factored out DGraph already from that particular use case but I am still eager to figure out the truth about the zero/timestamp issue because I just trying to figure out if that’s a hard limit bound to the resources of a single zero leader or if there is anything else I might have missed?

1 Like

Hey,

Yeah, your understanding is correct, timestamp generation is equivalent to the throughput of the leader node. Adding additional zero nodes doesn’t affect the timestamp generation, however in case of a node failure, additional zero could pick up the role of the leader. Vertical scaling of zero leader would give better results than horizontal scaling. The role of multiple zero nodes isn’t for performance (yet), but for handling crashes.

Could you please elaborate on your use case?, and what other databases are you considering that suits your needs? I ask this because we are working on “Ludicrous mode”, in which we would be providing higher write speed but that has weaker consistency guarantees. So it would come a long way for us to get an idea of what the users want.

1 Like

And actually, IIRC, the timestamp generation that @harshil_goel tried, included doing start ts -> commit ts writes to Raft. In ludicrous mode, those might not be required, then the cost of timestamp generation would be as fast as network + RAM access. In general, I doubt the timestamp generation would ever be a bottleneck. A good way to know if one needs to vertically scale the Zero server is to use htop and see how much CPU it is using – I’d bet it’s not using much considering the costs of everything else that’s involved with txn commits.