Dgraph can't horizontally scale write speeds

RAFT says 2n + 1, since this signifies this has to be odd number of nodes.
and can cope with failure of n nodes, i.e can cope with failure of less than half of the nodes.

The n in my example isn’t same as what RAFT paper says, if it makes easy, I will change my formula to
(N+x)/2 + 1

Where
N is the original size of the cluster = 3
x number of new nodes added = 2
majority for this cluster of size 5 is 5/2 + 1 = (3+2)/2 + 1 = (N+x)/2 + 1 = 3

Hope my formula makes more sense now

@vikram
Thank you for the clarification. I was already suspecting something like this so chances are very high that this limitation wasn’t really exposed earlier because you need a fairly large cluster to bump into this problem in practice.

However, I ended up selecting anzograph for our OLAP use case because it actually scales, today.

Haven’t followed through the whole long thread. But, wanted to clarify a few things:

  • In no system do replicas and leaders all have independent writes. This is true for any system: Raft-based (writes go through leader) or which uses the traditional primary-secondary system (all writes go to primary first).
  • To achieve better write throughputs, one needs sharding, not more replication.
  • To achieve better read throughputs, one needs replication (and could also use sharding).

Now I’m not sure if the write throughput didn’t scale well despite sharding the data? Is that the case here?

I’m not sure what @marvin-hansen tried.
But what I understood was, and trying to explain in this long chain was

  1. Given predicate X, it can’t be sharded. (At least not yet)
  2. No sharding means, no horizontal scaling for that particular predicate.

Also since a given predicate can’t be sharded yet, it might lead to OOM? Supernode Problem - #3 by vikram

There’s one point that completely missed. I decided to add it here as reference for anyone who is reading this thread.

To achieve better write throughputs, zero has to be horizontally scaled as well. (This needs to be combined with sharding of data, Which has its own limitations as explained above.)

Reason

  • Transactions start and commit timestamps are handed by zero
  • Transactions abort/commit decision is taken by zero, depending on presence/absence of conflicts.

Both these steps are executed by zero leader only! Other zeroes in cluster act as hot standbys. (This statement holds true for Txn handling, but I’m not sure if this applies to predicate move as well)

So what all this means is, to scale write throughput, try the two below

  • 1.Start with data sharding.
    • If you start hitting limits of sharding (one shard=one predicate),
      • you will need to scale-up the alpha group leader atleast, if this group is handling hot predicate. (or might as well scale-up all alphas in the group)
    • predicate sharding is in roadmap 2020, so you may be able to escape this, but schedule is TBD :frowning:

And

  • 2.Scale-up atleast the zero leader (or scale-up all zeroes) to improve txn throughput.

** All this discussion is meant to shed light on internals of dgraph as I understood, so that other devs can make educated choices.

correction

  • To achieve better write throughputs, zero has to be vertically scaled as well.

Hey,

To figure out whether zero or alpha is the bottleneck (the details are below). Your assumption was that the zero timestamps would be the bottleneck, but we observed different. We found out that under full load, one zero could spit out timestamps 7x times the speed at which alpha could ingest data.

We ran some benchmarks to see how much start and commit timestamps could be generated. The throughput was around 70k transactions per second. We also ran the live loader with 1 nquad per transaction (Default is 1000 nquads per transaction), to see the max throughput of alpha. It was around ~10k transactions per second. So theoretically 1 zero could serve up to 7 alphas without being the bottleneck.

Data sharding, Query Planning and Predicate sharding are on the roadmap for this year.

Machine Specs:

  • 1 Zero server, 1 Alpha serer
  • 16GB Ram DDR4
  • AMD Ryzen 7 3700X 8-Core Processor
dgraph live -s 21million-noindex.schema -f 21million.rdf --alpha localhost:9180 --zero localhost:5180 -c500 -b1
package main

import (
        "context"
        "log"
        "testing"

        "github.com/dgraph-io/dgo/v2"
        "github.com/dgraph-io/dgo/v2/protos/api"
        "google.golang.org/grpc"
)

func BenchmarkZeroProposal(b *testing.B) {
        conn, err := grpc.Dial("localhost:9180", grpc.WithInsecure())
        if err != nil {
                log.Fatal(err)
        }
        defer conn.Close()
        dgraphClient := dgo.NewDgraphClient(api.NewDgraphClient(conn))

        q := `
        {
           me(func: uid(0x01)) {
              uid
           }
        }
        `

        ctx := context.Background()

        b.RunParallel(func(pb *testing.PB) {
                for pb.Next() {
                        txn := dgraphClient.NewTxn()
                        _, err := txn.Query(ctx, q)
                        if err != nil {
                                panic(err)
                        }
                        err = txn.Commit(ctx)
                        if err != nil {
                                panic(err)
                        }
                }
        })

}

3 Likes

The just-published paper explains a lot of the ins & outs of the underlying concept. This is very much appreciated and helps a LOT to understand important details.

From the paper, it occurs to me that Alpha’s and zero can scale to whatever number you set, because there aren’t any limitation worth mentioning. However, the really tricky part comes from the interaction between zero and alpha due to the time-stamp based lock-less transaction model.

As @harshil_goel pointed out, one zero can serve up to 7 alphas because alphas ingest data at a much lower rate than the timestamp creation throughput on zero. I did notice, however, that adding more zero’s seems to add more (Raft?) overhead, so technically, keeping the number of zero’s below the number of alphas ensure that throughput never throttles below the capacity determined by alphas. Adding more alphas, however, seem to add more throughput and lowers overall latency because of the linear scaling of the data sharing.

However, here is the big question I still cannot fully answer, even after having read the paper from cover to cover:

If zero is essentially a single leader elected through RAFT, then does it mean that the timestamp generation is equivalent to the throughput of the leader node?

The reason I am asking is, when planning resources I want to know if adding more nodes to zero adds more performance (vertical scaling) or just adding more CPU to zero adds more tx performance (horizontal scaling) and the way I read it from the paper is, that zero is an elected leader with everyone else in standby so I better add a more cores to less zero nodes to increase throughput while keeping overhead low.

Eventually, after some more testing, I ended up with a 1 zero & 5 alpha configuration which really gives very high performance at constantly very low (~ 5ms) latency and that seems to support the notion that more zero’s just add more overhead but don’t contribute much in terms of total throughput.

For the time being, I am using DGraph on a much smaller subset of data and still observing its run-time characteristics and, as it turned out, the 1-zero & 5-Alpha config just flies on write op’s, means writing ~80k data is done in maybe a second or two. And that seems to be in line with the observation that timestamp generation really is the only thing to keep an eye on it. I might be totally wrong, but that is what I observe.

Still need to test out a 3x10 config to see if these write op’s keep up approximating the timestamp throughput to answer the only hard question left, that is, how do go well above and beyond the ~70k timestamps/sec?

After having read the paper, the only practical way I can imagine is just adding more horsepower in terms of adding a ton more CPU and memory to the zero nodes to pump up timestamp generation.

Is that correct or do I miss something important here?

I am asking because my next use case starts out with some ~50M op/sec and can go as high as ~500M op/sec and obviously I have factored out DGraph already from that particular use case but I am still eager to figure out the truth about the zero/timestamp issue because I just trying to figure out if that’s a hard limit bound to the resources of a single zero leader or if there is anything else I might have missed?

1 Like

Hey,

Yeah, your understanding is correct, timestamp generation is equivalent to the throughput of the leader node. Adding additional zero nodes doesn’t affect the timestamp generation, however in case of a node failure, additional zero could pick up the role of the leader. Vertical scaling of zero leader would give better results than horizontal scaling. The role of multiple zero nodes isn’t for performance (yet), but for handling crashes.

Could you please elaborate on your use case?, and what other databases are you considering that suits your needs? I ask this because we are working on “Ludicrous mode”, in which we would be providing higher write speed but that has weaker consistency guarantees. So it would come a long way for us to get an idea of what the users want.

1 Like

And actually, IIRC, the timestamp generation that @harshil_goel tried, included doing start ts → commit ts writes to Raft. In ludicrous mode, those might not be required, then the cost of timestamp generation would be as fast as network + RAM access. In general, I doubt the timestamp generation would ever be a bottleneck. A good way to know if one needs to vertically scale the Zero server is to use htop and see how much CPU it is using – I’d bet it’s not using much considering the costs of everything else that’s involved with txn commits.

@harshil_goel @mrjn

Thanks for the clarification. In this case, just throwing more CPU & memory to a ZERO is the way to go. With the new EPYC Ryzen available on GKE, we can go up all the way to 97 cores & 768GM memory per node, which should be sufficient for quite some time. Also, very good to know that zero’s don’t scale vertically, which saves us some cash because there really is no point to run more than 3 zero’s for fail-over.

All that means, scaling zero vertically determines the ultimate performance of the entire Dgraph cluster by design. I need to measure performance for different CPU & memory configurations, but
I am relatively certain that a high-end multi-core CPU node as ZERO will get us >100k op/sec, maybe up to 250k/sec and that is way more than enough for OLTP.

If I remember it correctly from the paper, the timestamps are generated by a strict monotonic function, and as such, there must be total order (as opposed to partial order in weak monotonic functions) so technically, the time-stamp series can be sliced and each slice can be processed in parallel while still yielding correct results due to the total order property.

Does the zero node slices the timestamp series to do tx processing and conflict resolution in parallel?

As for the “Ludicrous mode”, I am not convinced. If you sacrifice consistency for performance, you end up losing customers who need both and don’t compromise on either.

I already suggested an in-memory mode, that is, keep the entire graph in memory and sync memory on disk for persistence. It’s way cheaper than GPU acceleration, relatively easy to implement, and you get crazy performance (network + RAM bound) while preserving whatever consistency level you target.
On GCP, you already get nodes with 0.7 TB memory for a reasonable price, and up to ~3TB for some (unreasonable) more money, so adequate resources are certainly a given. Details in this ticket:

My current use case doesn’t need much except a scalable OLTP graph DB with a GraphQL endpoint, so Dgraph in version 2 works just fine for that. I had a lot of hassles with version 1, but 2.0-rc1 ironed out a lot of glitches, added better documentation, and actually works quite well. Thank you for the substantial improvement!

My next use case deals with real-time analytics on very large (~trillion nodes) RDF graphs with real-time data ingestion. Previously, we couldn’t tackle this due to resource constraints but Google just sponsored us 20k worth of cloud credits with the option to add another 100k so I am preparing the next use case right now. There is no way I can consider DGraph because, for once, I don’t have the trust that Dgraph can be operated with interruption-free updates (another set of open issues), and, more profoundly doing it anyway would require an ETL process to extract the entire graph continously, and I believe, I already made a case last month that in that scenario you simply cannot move around humongous data so you need to process them in place. There is really nothing worth discussing here.

Eventually, I seperated OLTP from OLAP and settled for anzo-graph from Cambridge Semantics, here in Boston, because it’s one of only very few available systems that already comes with all important graph ML algorithms by default and can execute them in real-time on up to 500 billion nodes. Last time I spoke with them, they offered us engineering assistance to get us started as fast as possible and that’s welcome because I am still learning RDF data modeling & SPARQL querying myself.

After some design sketches, I came to realize that with our unified graph, adding just one more database really is a non-event and should be treated as such.

Anyway, just my 2cents.

3 Likes