[GitHub] Spark connector for Dgraph

Moved from GitHub dgraph/2123

Posted by jimanvlad:

Hi,

It would be great if, at some point, a Spark connector was to be created.

Neo4J uses its connector both for preprocessing and loading data in, but also to get sub-graphs out for processing in Spark.


https://spark.apache.org/docs/0.9.0/graphx-programming-guide.html

Vlad

EshwarSR commented :

Hi. I’m also looking for such a feature. I guess many people will be benefitted by this.

For now, is there any way to get this done faster than querying all the data from dgraph and populating the spark dataframes?

srfrog commented :

Damn, sorry for the wrong reference. Butter fingers

EnricoMi commented :

I am currently looking into this. I have created a very simple connector that can load data from DGraph into Spark. I have a few fundamental questions / requirements to get to a robust solution.

Spark is a massively parallel cluster where hundreds or thousands of concurrent reads would hit the DGraph cluster. For this to work the Spark connector 1) needs to know the exact partitioning and available alphas existing in DGraph, and 2) needs to be able to query / read non-overlapping and deterministic parts (partitions) of the data. Is there a way to get access to the partitioning and cluster setup through the grpc endpoint? What I would need is the groups, the predicate range per group and the alphas per group. Would I at al be able to query a given alpha directlyl? Handling a change of this partitioning will be challenging.

The next thing is to be able to read fraction of the graph. Lets consider two use-cases: I) read the entire graph and II) given a GraphQL query, read the entire result set. For each of these use-cases there needs to be a way to retrieve a fixed fraction (in Spark called a partition, not 1:1 a DGraph partition, but for simplicity and performance a Spark partition is a well defined subset of a DGraph partition). How could I read a single partition of DGraph data? Simplest approach would be has(PROPERTY) for the set of properties in a partition.

Spark partitions are immutable. Can we, for the lifetime of a Spark partition that is mapped to a DGraph partition, guarantee we see an immutable graph / a snapshot?

In use-case II), a Spark partition is not mapped to a DGraph partition anymore, because the data potentially come from multiple DGraph partitions already. How could the result set be split into Spark partitions? With pagination? That would require to know the exact number of results in the first place. The result uid space could be split into partition ranges. Are range queries like 0x1 <= uid < 0x9 supported and efficient on an arbitrary GraphQL query? Can I get the space of possible uids / the min and max uid existing in the DGraph?

Finally, given a single Spark partition ideally is in the order of 1 million rows / predicates / triples, a streaming JSON result from grpc would be desirable. Using the example of the official Java client, client.newReadOnlyTransaction().query(query) loads the entire response JSON into memory, then parses it into a JsonObject or something alike before I can provide it to the iterative Spark API. Does the grpc support a JSON stream? Do you know if the Java client supports that stream of results if grpc does?

EnricoMi commented :

inviting @mangalaman93

danielmai commented :

Hey @EnricoMi. It’s great to hear you’re looking into Spark and Dgraph integration.

Is there a way to get access to the partitioning and cluster setup through the grpc endpoint? What I would need is the groups, the predicate range per group and the alphas per group.

You can check Zero’s /state page for cluster membership info. (docs)

How could I read a single partition of DGraph data?

You can use queries to retrieve a specific subgraph of your dataset.

Can we, for the lifetime of a Spark partition that is mapped to a DGraph partition, guarantee we see an immutable graph / a snapshot?

Dgraph has transactions. So, you can create a new transaction, and queries within that transaction will be for a snapshot of the data at a particular timestamp.

Can I get the space of possible uids / the min and max uid existing in the DGraph?

In /state there’s a field called maxLeaseId. That would be highest possible UID that could be in Dgraph at that moment. i.e., the maxLeaseId could be 10000 but the actual highest UID already assigned to a node could be 123 and definitely not 10001.

a streaming JSON result from grpc would be desirable

Queries currently aren’t streamed. If this is something you need, please file a separate feature request.

EnricoMi commented :

Hi @danielmai, thanks for the pointers!

How does the concept of subgraphs work in DGraph? Can you point me to docs or examples? Can you elaborate on that a bit more?

The rest is clear and gives me a lot to work through. I’ll let you know how it goes or if I need something else.

Enrico

EnricoMi commented :

I have released the Spark Dgraph Connector as uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.3.0-3.0 for Spark 3.0. There is also a Spark 2.4 version available. Sources can be found at https://github.com/G-Research/spark-dgraph-connector.

The connector properly partitions the Dgraph so that the whole Spark cluster can be used to read from the Dgraph cluster. Performance and scalability is still under development but in general it is in good shape to test it against real graphs.

At which point would you agree to add this connector to the list of unofficial Dgraph Clients?

danielmai commented :

Thanks for sharing, @EnricoMi! I’ve created PR #5859 to add your Spark connector to the docs.

Thanks for adding the connector to the list of clients. I suggest this discussion to be marked as “solved”.
For reference, there is also this related discussion: Spark Connector for dgraph