Idea - Shard predicate / Edge across groups

PS: This is just an idea, you should not consider this as a valid or what.

Users requests:
1 Single predicate sharded across groups
2 Data Distribution Across to Servers

Not sure how good is that idea, cuz that could be done in the core level instead of a “transpile”. I believe that this post serves as an algorithm example. A transpiler would be interesting, as it would take advantage of some Dgraph features. As for example concurrency query per block.

PS. After thinking about it, I noticed that sharding at predicate level is hard for sure. Nesting for example, would make really big internal queries.

The Schema

name: String @atomShard(3000) #this will add count index too
friend: [uid] @atomShard(3000)

type User {
   name : <name.dgraph.*> #This alias will be generated internally
   friend : <friend.dgraph.*>
}

This means that:

The predicate “name” will be divided into more predicates when the count reach 3k nodes.
Each new division will have an internal naming e.g:

<name.dgraph.0001>

The DQL query

The user will query

{ 
    q(func: has(name)) @filter(eq(age, 31)) {
        uid
        name
    }
}

Dgraph has to check if the predicate has @atomShard directive. If so, apply the transpiling rules.

Dgraph will translate internally to

Assuming it has only two atomic shards. <name.dgraph.0000> and <name.dgraph.0001>

{ 
    name_dgraph_a as var(func: has(<name.dgraph.0000>)) @filter(eq(age, 31))
    name_dgraph_b as var(func: has(<name.dgraph.0001>)) @filter(eq(age, 31))

    q(func: uid(name_dgraph_a, name_dgraph_b)){
        uid
        name
    }
}

The predicate name would be solved in the schema level as a shortcut to <name.dgraph.0001> and <name.dgraph.0000>. e.g. Add aliases at the schema level (In type)

Edge example

{ 
    q(func: has(name)) @filter(eq(age, 31)) {
        uid
        name
        friend @filter(gt(age, "25")) {
           uid
           name
    }
}

Dgraph will translate internally to

{ 
    name_dgraph_a as var(func: has(<name.dgraph.0000>)) @filter(eq(age, 31)) 
    name_dgraph_b as var(func: has(<name.dgraph.0001>)) @filter(eq(age, 31))

    var(func: uid(name_dgraph_a, name_dgraph_b)) {
        friend_dgraph_a as <friend.dgraph.0000> @filter(gt(age, "25")) 
        friend_dgraph_b as <friend.dgraph.0001> @filter(gt(age, "25"))
        friend_dgraph_c as <friend.dgraph.0002> @filter(gt(age, "25"))
        friend_dgraph_d as <friend.dgraph.0003> @filter(gt(age, "25"))
    }

    q(func: uid(name_dgraph_a, name_dgraph_b)){
        uid
        name
        friend @filter(uid(friend_dgraph_a, friend_dgraph_b, friend_dgraph_c ,friend_dgraph_d)) {
           uid
           name
    }
    }
}

@MichelDiz - I’m interested in the idea of predicate sharding, and since you’ve already started the discussion of potential methods, thought I’d share some ideas I’ve had.

Multiple sharding methods

I think it makes sense to me to offer the potential ability to shard by many different methods as Dgraph develops. Potentially there could be a declaration of the shard type, like:

enum ShardMethod {
  block # block of 'n' predicates (like in your example)
  hash  # (or hash|id) hash of the ID of the subject of the predicate, distributed over 'n' buckets
  ...
}

directive @shard(method: ShardMethod, n: Int) on FIELD_DEFINITION

type BigDataType {
  name: String @shard(method: "hash", n: 32)  # hashes ID, storing the value in the appropriate bucket
  dob: DateTime @shard(method: "block", n: 1<<7) # block of 1M keys
}

or the arguments could dictate the sharding method used, e.g.:

directive @shard(buckets: Int, keys: Int) on FIELD_DEFINITION

type BigDataType {
  name: String @shard(buckets: 32)
  dob: DateTime @shard(keys: 1<<7)
}

Personally of the two I prefer the second, since it uses fewer parameters.

Sharding by allocating the predicate subject ID to a bucket

For lookup speed, rather than doing blocks of ‘n’ keys, using the internal ID of the predicate’s subject (which will be invariant), a predicate could be put into one of ‘n’ buckets. This could be done either on the ID itself (using ID mod $buckets), or a hash of it (which might be more evenly distributed).

Obviously with very large datasets, we’ll want to make sure we create enough predicate-buckets so that we don’t have to rebalance them later on with a schema update.

It might be an idea to suggest or force that the number of buckets be a power of 2, such that any rebalancing isn’t going to move all the keys around the cluster (only some of them) and/or creating a virtual bucket system (say 4096 or more vbuckets) that keeps multiple vbuckets in the same group (or server shard). (I got the vbucket idea from Couchbase.)

Other possible sharding methods

  • Hash of some other invariant
  • Filtering of some value (probably too much of a headache with changes, and will add extra overhead)

Sharding of indexes

In addition to the sharding of the data, the indexes will almost certainly also need to be sharded. Some options here:

  • Automatically shard the index into the same number of buckets as the original dataset
  • Expose a [Type]Index or [Type]Search type that can also have its sharding bucket count set for fine-tuning of the index

Obviously the index tokens would be organized in a different way to the data values, and would periodically need to be shuffled around the shards as they’re rebalanced.

Any operations that utilize any of the indexes would obviously need to be done intelligently, either by testing boundaries and/or running queries in parallel and then merging them afterwards.

As you say, the task of sharding predicates is not a trival thing to do.

1 Like