Implementation of "nearest" geo-query

I’m working on implementing the nearest geo-query in Dgraph. This will be similar to the near query with the difference that instead of always passing a distance for the near query, we pass N to the nearest query to get the N results nearest to the provided point. The query would look something like this:

 tourist( nearest("loc", "[-122.4140288, 37.8083504]", "10" ) ) {

with the last parameter being N. We could maybe include an optional parameter maxD that could serve as another termination condition.

I’m following the approach taken in this MongoDB post. The logic for the nearest query would have to wrap around the near query, and recurse in growing intervals. The current logic for the near query is a pipeline:

  1. processTask gets tokens from GetGeoTokens
  2. processTask then gets or creates the posting list
  3. processTask then calls FilterGeoUids
  4. FilterGeoUids iterates over the values in the PL, and for each one, checks whether it is contained by the cap (created from the provided point)

The recursive algorithm would need to wrap this whole pipeline in a function that can then be called recursively until some termination condition is met (either have found N results or traversed maxD distance). How performant do we need v1 of this query to be? If the answer is very, I think the pipeline will either have to be refactored or partly duplicated into the recursive algorithm. If the answer is not very, then we can just run multiple iterations of the full pipeline and select a big enough initial search radius.

That’s the way I’ve been thinking about it. I’d appreciate any elucidation you can provide on that thought process @mrjn .

I’m working on implementing a recursive algorithm that wraps the whole pipeline and accumulates results (the non-performant version in the dichotomy laid out above). Once I have a working version I’ll update this post.

Hey @dmonay,

Agreed. Do the least change required first to achieve this functionality, and then we can iterate over it.

Hey @mrjn I’ve got a version implemented to wrap my head around how the whole query is executed. I wanted to recurse at the ProcessGraph level but had trouble accumulating results since it’s launched in a new goroutine from processRequest and also calls itself in a new goroutine for each of the children. So to just get something working I recursed at the processRequest level. Here’s the code delta in main.go:

var recurseOut = &query.SubGraph{
	DestUIDs: &task.List{Uids: []uint64{}},
var recurseN uint64
var recurseRadius int
var cyclesRan = 0

func processRequest(ctx context.Context, gq *gql.GraphQuery,
	l *query.Latency) (*query.SubGraph, wrappedErr) {
	if gq == nil || (len(gq.UID) == 0 && gq.Func == nil) {
		return &query.SubGraph{}, wrappedErr{nil, x.ErrorOk}

	sg, err := query.ToSubGraph(ctx, gq)
	if err != nil {
		x.TraceError(ctx, x.Wrapf(err, "Error while conversion to internal format"))
		return &query.SubGraph{}, wrappedErr{err, x.ErrorInvalidRequest}

	l.Parsing = time.Since(l.Start)
	x.Trace(ctx, "Query parsed")

	rch := make(chan error)

	// transform nearest query into near query
	if sg.SrcFunc[0] == "nearest" {
		if cyclesRan == 0 {
			// we want N results - grab it just once
			recurseN, _ = strconv.ParseUint(sg.SrcFunc[2], 10, 64)
		recurseRadius += 200 // interval is 200

		sg.SrcFunc[0] = "near"
		sg.SrcFunc[2] = strconv.Itoa(recurseRadius)

	go query.ProcessGraph(ctx, sg, nil, rch)
	err = <-rch
	if err != nil {
		x.TraceError(ctx, x.Wrapf(err, "Error while executing query"))
		return &query.SubGraph{}, wrappedErr{err, x.Error}

	// build the results
	recurseOut.DestUIDs.Uids = append(recurseOut.DestUIDs.Uids, sg.DestUIDs.Uids...)

	// recurse if termination condition not met
	if cyclesRan > 0 && uint64(len(recurseOut.DestUIDs.Uids)) < recurseN {
		return processRequest(ctx, gq, l)

	sg.DestUIDs.Uids = recurseOut.DestUIDs.Uids

	l.Processing = time.Since(l.Start) - l.Parsing
	x.Trace(ctx, "Graph processed")

	if len(*dumpSubgraph) > 0 {
		x.Checkf(os.MkdirAll(*dumpSubgraph, 0700), *dumpSubgraph)
		s := time.Now().Format("20060102.150405.000000.gob")
		filename := path.Join(*dumpSubgraph, s)
		f, err := os.Create(filename)
		x.Checkf(err, filename)
		enc := gob.NewEncoder(f)
		x.Checkf(f.Close(), filename)
	return sg, wrappedErr{nil, ""}

You can try it with the query in the original post and it does return >10 results. It almost works (performance and style aside), but the act of directly accumulating the DestUIDs and then sticking them into the final result comes back to bite us in the butt in preTraverse. I added the following check to prevent the index out of range error but it filters out some legitimate answers, returning fewer than the query actually found:

func (sg *SubGraph) preTraverse(uid uint64, dst outputNode) error {
	invalidUids := make(map[uint64]bool)
	// We go through all predicate children of the subgraph.
	for _, pc := range sg.Children {
		idx := algo.IndexOf(pc.SrcUIDs, uid)
		if idx < 0 {
                // There are many more UIDs that are filtered out here. The indices don't match up.
		if len(pc.uidMatrix) <= idx {

One solution is to make the uidMatrix and values fields of the SubGraph struct exported. Then I could accumulate them directly the way I do with DestUIDs. My intuition is that there’s a reason those were unexported and for keeping them so.

Another solution would be to recurse at the ProcessGraph level. I think this is much preferable, and ideally we could go even further down the chain and recurse at the processTask level, but either of those is made complex by the act of launching these functions in their own goroutines. I think I’d need to add or modify some fields of the SubGraph struct in order to accumulate data across recursive calls in separate goroutines.

This is where I’m at now. What are your thoughts?

@mrjn Have you been able to look into this?

Hey @dmonay – I think the approach had a few issues, and I haven’t been able to expand on them. @ashwin95r can guide you here, he’s the author and maintainer of geo-query stuff.

@ashwin95r: Can you look into this?

Hey @dmonay

Thanks for trying this out!

As you mentioned, Yes your approach kind of works, But modifying just the destuids, will not propagate the results correctly as you observed in preTraverse. As the uidMatrix entries Must also be correct.

Also, These variables cyclesRan, recurseRadius, etc seem global which shouldn’t be.

I think the right place to handle this would be around here:

But the problem with nearest is we’d not know how many buckets to check before we actually get the elements. So we can do special handling in processTask for nearest and go in an iterative loop (similar to your current recursive method) which gets the required number of entities.
(Also, later we need to optimize this so that we just don’t increase the radius, as it’d involve fetching some buckets repeatedly, which we can try to avoid)


This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.