GRPC message limit for a single transaction

Hey there,

Trying to ingest a large number of quads in a single transaction hits the GRPC message length limit of 4MB.

I’ve added a sample program below that will error out with 100k quads, but in my application code this could be well over 10 million generated quads.

I’m guessing the transaction gets built up client side and doesn’t send anything to the dgraph server until commit is called.

A short term workaround is to increase the GRPC server message limit, or to split the ingest into multiple transactions (eg. 50k quads each) but that would defy the point of having transactions.

Any suggestions on how to approach this problem?

Dgraph version is: bbff31c

Cheers,
Emerson

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"

	"github.com/dgraph-io/dgraph/client"
	"github.com/dgraph-io/dgraph/protos"
	"github.com/dgraph-io/dgraph/x"
	"google.golang.org/grpc"
)

const targetAddr = "localhost:9080"

func main() {

	// Setup dgraph client
	ctx := context.Background()
	conn, err := grpc.Dial(targetAddr, grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	pc := protos.NewDgraphClient(conn)
	c := client.NewDgraphClient(pc)

	// Set schema
	op := &protos.Operation{}
	op.Schema = `name: string @index(fulltext) .`
	x.Check(c.Alter(ctx, op))

	// Ingest
	TestInsertQuads(ctx, c)
	conn.Close()
}

func TestInsertQuads(ctx context.Context, c *client.Dgraph) {
	txn := c.NewTxn()
	mu := &protos.Mutation{}

	for i := 1; i <= 100000; i++ {
		quad := &protos.NQuad{
			Subject:     strconv.Itoa(i),
			Predicate:   "name",
			ObjectValue: &protos.Value{&protos.Value_StrVal{fmt.Sprintf("ok %d", i)}},
		}
		mu.Set = append(mu.Set, quad)
	}

	_, err := txn.Mutate(ctx, mu)
	x.Check(err)
	x.Check(txn.Commit(ctx))
}

This seems like a client-side limitation. You can increase the size that client can send like

conn, err := grpc.Dial(addr,
		grpc.WithDefaultCallOptions(
			grpc.MaxCallRecvMsgSize(256 << 20),
			grpc.MaxCallSendMsgSize(256 << 20)),
		grpc.WithInsecure())

Hi Emerson,

When you call txn.Mutate, the client will communicate those nquads to the server. But they won’t be committed until you call Commit. So what you could do in your case is use multiple Mutate calls, then finally Commit once you have sent your 10 million nquads (which will apply all nquads sent by Commit as a single unit).

EDIT: Or do what Pawan suggested :grinning:

Hey, thanks for the replies.

@pawan that seems more like a workaround? That could mean gigabytes of data could be sent across the connection as a single message, which would work, but I think that would have a pretty big performance hit compared to streaming.

Ideally the millions of quads get streamed into dgraph as they’re received, which could be over the course of several minutes, and committed once they’ve all been received.

@peter I tried out your suggestion (attached below), breaking the transaction up into multiple mutates, but I still hit the GRPC message limit. In this case, it errors on the txn.Mutate() call after ~72000 quads. I’ve tried with different maxMutationSize limits, but it will always fail around 72k.

My guess is that it’s sending the whole transaction across each time txn.Mutate() is called. I expected it to only send the current mutation set, which would be at most 1000 quads in this example.

Any thoughts?

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"

	"github.com/dgraph-io/dgraph/client"
	"github.com/dgraph-io/dgraph/protos"
	"github.com/dgraph-io/dgraph/x"
	"google.golang.org/grpc"
)

const (
	targetAddr      = "localhost:9080"
	maxMutationSize = 1000
)

func main() {

	// Setup dgraph client
	ctx := context.Background()
	conn, err := grpc.Dial(targetAddr, grpc.WithInsecure())
	if err != nil {
		log.Fatal(err)
	}
	pc := protos.NewDgraphClient(conn)
	c := client.NewDgraphClient(pc)

	// Set schema
	op := &protos.Operation{}
	op.Schema = `name: string @index(fulltext) .`
	x.Check(c.Alter(ctx, op))

	// Ingest
	TestInsertQuads(ctx, c)
	conn.Close()
}

func TestInsertQuads(ctx context.Context, c *client.Dgraph) {
	txn := c.NewTxn()
	mu := &protos.Mutation{}

	for i := 1; i <= 100000; i++ {
		quad := &protos.NQuad{
			Subject:     strconv.Itoa(i),
			Predicate:   "name",
			ObjectValue: &protos.Value{&protos.Value_StrVal{fmt.Sprintf("ok %d", i)}},
		}
		mu.Set = append(mu.Set, quad)

		if len(mu.Set) >= maxMutationSize {
			_, err := txn.Mutate(ctx, mu)
			x.Check(err)

			mu = &protos.Mutation{}
		}
	}

	if len(mu.Set) > 0 {
		_, err := txn.Mutate(ctx, mu)
		x.Check(err)
	}
	x.Check(txn.Commit(ctx))
}

Hey @emersonwood,

Just to take a step back here, what’s the reasoning behind trying to send such huge mutations in one call? Our live loader also is designed for loading a lot of data, but we set it to have a batch of 1000 mutations in one call, parallelizing the calls, to achieve throughput.

You could break up your data into multiple transactions and just run them concurrently.

Hey Manish,

I’ve got very large trees being generated that can exceed 10 million quads each. Each of these trees are independent of one another but share a common root. I was hoping to write each tree as a transaction so that either the whole tree gets ingested or it doesn’t.

I agree, I’ll go with this approach by breaking up each tree into multiple transactions and only connect the tree to the common root if all the transactions commit without error. If they do error, I’ll try recover or cleanup the tree.

Thanks for everyone’s help.

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.