Sorting and indexing

For returning results in a sorted order, our plan is to bucket-index the values. For example, as we add an edge of UID 123's dob is 1980-01-01, we want to add 123 to the posting list of $dob|1980. Here, dates are bucketed by years.

When the user wants to order the results by dob, we will go through each bucket’s posting list (in worker package), intersect this PL with UIDs we want to sort, sort the intersected results, and return the sorted UIDs / permutation, concatenated over buckets.

One question is this: How do we know the smallest bucket to begin from? Do we just iterate from the prefix $dob? That sounds great, and RocksDB supports that. However, our memory representation (mutation layers, cache) does not keep the keys sorted. It is a regular hash map, not a skip list.

Whether we want to transition to a skip list in memory for this is a bigger decision that we can leave to another day. Btw, RocksDB has a PlainTableFormat which is optimized for latency critical applications and is fully in memory.

To implement this feature, I wonder if we can just hardcode or infer the range of values from somewhere, say the schema. It is not going to be as efficient as directly iterating, especially in the case where the range of values is huge and a lot of intermediate buckets are empty. We will just iterate over this range of possible values, do lookups and sort. Is this ok? @core-devs

Yeah, that’s what I had in mind. Unless the query specifies the minimum year, we’d just iterate over $dob, and find the min year from there, do the intersections etc. Once we find the number of results we’re looking for, we’ll stop. But, remember to always pick up all the reults from each bucket in full. Because the uids within the same bucket aren’t sorted by dob, they’re sorted by uint64 themselves.

Not sure, why the hash map has to do anything here. When we iterate over rocksdb, we can get the key, then ask for that PL. The optimization that we could make here is to only retrieve keys from RocksDB and not values. That way, our existing logic would work just fine.

That is something that our clients should decide. If they have small data which can fit entirely in RAM; or have deeper pockets to fit all data on RAM, they could pass a flag to Dgraph, which would instruct us to open the tables using that particular setting. This isn’t something we should use by default.

Iteration seems like a simpler logic here. If you have other ideas, which are similar in terms of simplicity, we could also discuss them. But, try to avoid complexity early on. Iteration might just give us all we need, and/or we might be able to cache these keys in local memory to help us for further lookups, etc.

Note that we’ll stop iterating once we receive N results, where we’ll have a default max value of N, if the client doesn’t specify it. So, it still won’t be prohibitively expensive.

Thanks for the responses.

Why I was talking about hash map: Basically I was worried about changes that are not yet committed to RocksDB, and only exist in mutation layers. We could force a full commit (aggressive evict) after a certain time period, even if there is a lot of memory left.

PlainTableFormat: Yes,I don’t really want to go there yet.

Combining pagination with sorting: I think one question is when we sort, do we sort each PL in the UIDMatrix? Or do we sort the merged results? When there is a lot of overlap between PLs in the UIDMatrix, sorting each PL can be wasteful. When there is little overlap, sorting the combined list is actually not too bad. I am leaning towards sorting the merged results, but that would complicate pagination optimization. Let’s discuss this later.

Recall we have a UID matrix: A. Denote A_i as i-th posting list. We want to sort all of them by some attribute say dob. Let B be the merged list. Let N be the total number of elements in UID matrix. Let |A_i| be size of i-th posting list. Let |B| be size of B, i.e., number of unique values. Note N=sum_i |A_i|. Let |A| be number of posting lists.

There are two extreme scenarios to keep in mind. First scenario is that the posting lists overlap completely, and |B|=N/|A|. Second scenario is that posting lists do not overlap at all, and |B|=N.

There are many ways we can sort. First, don’t worry about pagination.

Method 1: Worker sorts B and returns permutation. Build posting lists using permutation.

Here we do only one network call instead of |A| network calls where |A| is the number of posting lists. The complexity lies in how we use the ordering of B to order each posting list A_i.

Here is one way to do it. As we merge the posting lists to form B, keep information to be able to go from B to each A_i. One way to do it is that for each element of B, we store which posting lists this element comes from. For example, if A_0=(1,2,3) and A_1=(2,3,4) and B=(1,2,3,4). Then we will have a list of lists (or something equivalent and more memory-efficient) that goes like Q:=((0), (0,1), (0,1), (1)). The first element of Q is just (0) because B_0=1 is contained in posting list A_0 only. On the other hand, the second element of Q is (0,1) because B_1=2 is contained in both A_0,A_1.

Now, send B to be sorted. Say the final sorted B by dob goes like (4,3,2,1). Then the worker will return (3,2,1,0) as B_3 is the first element in the sorted B. Now, do a loop over (3,2,1,0). First, we see a 3. We look up Q and we see that B_0 is contained only in A_1. Hence, we append C_1 by B_3=4, where C is like A (UID matrix) but will contain posting lists sorted by dob. Continue scanning (3,2,1,0). The next element we see is 2. Look up Q and we see that B_2 is contained in A_0,A_1. Hence, we append both C_0,C_1 by B_2. Now, C_0=(B_2)=(3) while C_1=(B_3,B_2)=(4,3). We can continue this process and obtain C_0=(3,2,1) and C_1=(4,3,2).

Total running time is O(|B| log |B| + N). The first term is due to sorting of B in worker. The second term is due to “coordinator” sorting each posting list using the returned permutation.

One downside is that you need to allocate more memory to store Q and garbage collection is not cheap.

Method 2: Like Method 1 but do binary search instead of keeping Q.

You could try this. Say we have B sorted by dob, which is (4,3,2,1). For each element, we can run through each posting list, do a binary search to see if it is inside. If it is inside, append to C_i. This is actually quite expensive… The amount of work needed to sort each posting list (after getting sorted B) is O(|A| N) ignoring log factors for the binary search. This looks bad. Maybe you all have some other ways of using binary search?

Method 3: Worker sorts B, then coordinator sorts each posting list by their position in sorted B

Each posting list will need to store their positions in B. In the above example, A_0=(1,2,3) will store (0,1,2) while A_1 will store (1,2,3). Then the worker returns the position of B_i in the sorted list. In this case, it will return (3,2,1,0) like before because B_0=1 is position 3 of sorted list (4,3,2,1), and B_1=2 is position 2 of sorted list. Now, sort each posting list by this mapping.

There is actually no memory overhead here because after this, we have the ordering of each posting list. The disadvantage is that the coordinator still needs to do sorting per posting list, which removes the advantage of sorting B in the case of a lot of overlap between posting lists. Running time is O(|B| log |B| + N log (N/|A|)), roughly.

Method 4: Worker sorts each posting list (simplest)

This is the simplest method. Running time is O(N log (N/|A|)) roughly. The downside is that we need to make |A| network calls instead of 1, and time spent doing actual sorting can be worse by a factor of |A| in the case of max overlap. And we can no longer tell people that Dgraph is more efficient because we make one network call per predicate… However, in terms of asymptotic behavior, it is actually not that bad, as method 1 still has that O(N) term to build sorted posting lists in the coordinator.

Pagination

Say we want just the top few entries of each posting lists. If we use method 4, the worker can trim the results and return early. It can iterate over RocksDB until it gets enough results per posting list, and return.

However, for the other methods that sort B, this is a lot harder. You can’t really do any trimming. For example, A_0=(0,1,2,3,4,5,6,7,8,9,10) and A_1=(11). We ask for just one result. B=(0,1,2,3,4,5,6,7,8,9,10,11). Say when sorted by dob, the order doesn’t change. If we want the top result for A_1, we have to return the full sorted B. If the worker sorts each posting list, then it could have just returned (0) for sorting A_0 and (11) for sorting A_1.

Questions

Am I missing something basic here? Do you all have some other approaches in mind? I am leaning towards Method 4. It can be worse by a log factor, which I thought is palatable given that pagination can sometimes save a lot of work.

What do you all think? @core-devs

(In another post, I can write about whether we want to keep keys of index in memory.)

1 Like

So, in this case, the worker is going to not just return the sorted list, it’s going to return the indices of the sorted list. It’s not going to be that exact.

Sorting is essentially 2 step stage. First stage is to get a list of results which would be granular at the bucket level, i.e. we always pick up all intersections from a bucket. Because entities are sorted by uid in the bucket, and not by the specified sort order. So, we pick them all, and then send them out to the worker which contains the dob. This worker would then retrieve their dob, sort them, and trim the list down to the requested number N. Note that this N will almost always be lower than the initial sent list. So, we should not just return indices, we should return the actual final list.

Also, this particular step smells of some sort of optimization, while breaking the current semantics which is every query and reply contain uids, and not their indices or anything. I think something has to convert the list of uids to the index in this scheme, and I’d rather have that conversion be done closer to the metal than by another worker.

Method 2.

Shouldn’t this be N * log B, but you avoid creating an in-memory structure and allocating any more memory to store indices etc. in Q. In fact, even when you do store indices, you still need to convert from a given UID in B, to it’s index in Q. That effort is not being captured here, I think your assumption is that part would be done elsewhere in the worker. See above comments about doing that conversion here.

But we do right. You just stored for each posting list, another list which contains the index of the element. So, you essentially created the entire UID matrix in memory again.

That basically kills our entire idea of keeping the number of network calls directly proportional to the complexity of the query, and not the results. So, discard that idea immediately.

I think you’re applying the concept of pagination in the wrong place. You’re starting off with a defined set of uids, and then just re-arranging them. Once you do the re-arranging at each posting list in the UIDmatrix (Ai), you can individually do the pagination at the co-ordinator. So, this doesn’t seem like that much of an issue.

You’re trying to save cpu and memory costs but adding the cost of |A| network calls and stragglers. CPU is always faster than network, any way you look. So, this approach is definitely not better.

I remembered now the problem I was thinking about with pagination yesterday. So, say the query is [first 10 actors by dob]. Say there’s a node which all actors connect to, so getting all actors would give us a UID Matrix, with one element, and that element contains say 100K actors. We don’t really want to retrieve all their dobs before we pick the top 10.

So, what we can do is to execute this query in 2 stages:

  1. Send the entire UID Matrix (and not just a sorted unique uids) to the worker which serves the dob index. There, we intersect this UID Matrix, PL by PL (i.e. each Ai), with the dob index starting from the first year that we have records for. I say PL by PL, because it’s simple and it’s not going to be expensive, because after the first RocksDB lookup, we keep the retrieved PL in memory.
    We retrieve the first 10 for each Ai (in this case i is 0, but assume the scenario where i can be greater than zero), and also ensure that we pick each bucket’s intersection in its entirety. We shouldn’t stop after retrieving 10 intersection results in the middle of bucket, because within the bucket the ordering is based on uids, and not dobs.
    Return that result.

  2. Now we have a trimmed version of UID Matrix, we do what we do generally, i.e. convert it into a list of unique uids, and send to the worker which serves the original date of birth predicate. We retrieve all the dobs, and either send them back, if asked for, or sort the uids and send them back. The coordinator then re-generates the UID matrix based on these results and trims each Ai individually.

So, note that there’re 2 trims going on here, 1 in each step. And at each Ai, in each step.

This avoids the |A| network calls, while still allowing pagination without dropping any results on the floor.

Note that one might want to make this a step process in case UID matrix is small, but again, we shouldn’t optimize for small result sets. Assume that UID Matrix is huge, and just focus on a solution for that. This solution assumes that UID Matrix is huge. And that’s why the 2 step process makes sense.

1 Like

Thanks for reading and thanks for the many suggestions.

Method 2: I should write a bit more. For each element x in sorted B, do a loop over |A| lists. For each of the |A| lists, check if x is inside by a binary search. I did make a mistake here. The running time should be |B||A| ignoring log factors for binary search, which look squite bad in the case where |B| = N, i.e., no overlap.

Method 4: I didn’t like |A| network calls either, but the other three seem really bad for pagination.

Clarification about what I wrote about method 4: I was thinking of what you proposed, except that it is not batched up over all the lists. I thought it was obvious and didn’t write much about it. What I mean by “worker can trim the results and return early. It can iterate over RocksDB until… and return” is to iterate over RocksDB, intersect with index bucket until we get enough results (hence implicitly trimming the results) and return (early) once we have enough results.

Extra memory overhead for method 3: I was thinking that maybe we still need to keep the sorted UIDs. In retrospect, we probably don’t. That said, the sorting by position in sorted B can be done in-place, if we don’t need to keep one copy of UIDs sorted in original order, leading to no memory overhead at all.

Overall method: I like what you propose. To me, it is method 4, but batching over the posting lists, which is great. As a first step, I might assume that the index and predicate values live on the same worker. Later on, I can update the code to do MutateOverNetwork.

Yeah, it is essentially your method 4, with the main difference being it just avoids the |A| network calls. Also, note the 2-step approach.

I’d say just start with the 2-step approach. You’d essentially have to rewrite the thing if you assume they’re all in one machine. You’ll realize that ProcessTaskOverNetwork automatically takes care of everything you need to do. So, it’s not really that much more effort. Also, it avoids you writing custom code for the same-worker-serving-both assumption.

One more thing I was thinking about was that, you might want to have a new RPC call to do the first step of sort-trim, because it involves you sending out a UIDMatrix, which is not how typical Query expects it. Might keep things clean to have that diversification.

Noted the 2-step approach and will do away with the “same worker serving both” assumption.

Regarding new RPC call: Can I add a new structure other than task.Query? Maybe that’s what you imply already.

Yup. That’s what I meant.

ProcessTaskOverNetwork is in worker package while the “indexing spawning new mutations” is done in posting package. This will form a dependency cycle.

There are a few ways around this. One way is to do some dependency inversion / extra abstraction. What do you all think?

You could just push the newly generated mutations into a channel, which both these packages can access, maybe a new indexing package? And then have the worker iterate over the channel, batch them up and send to the right place.

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.