Spark Connector for dgraph

Hi team,

I have a use case where I need to run graph algorithms on the graphs stored in dgraph. I’m using Spark and Scala for graph processing and would like to have a spark connector for dgraph, which loads the graph from dgraph to spark.

Is there any such connector already available?
If not, I would like to help in creating one. Help me get started.

Thanks,
Eshwar

1 Like

Hi Eshwar,

We currently do not have a spark connector available. We do have a Java client GitHub - dgraph-io/dgraph4j: Official Dgraph Java client which could become the base of the Spark Connector. I have some experience in building spark connectors, let me know if you need any help.

I would consider looking at Spark DataSource V2 APIs which are pretty well designed and then go from there.

Thanks

Hi @amanmangal,

Thanks for the reply. I am using graphx of spark and would like to create graph from RDDs of vertices and edges. (using graph.apply method)

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph%24@apply

So what is the best way to get all the data from dgraph to that format?

Thanks

If this is a one time activity, you can convert the data into json format using Spark. Once you have json data, you can run the Dgraph bulk loader and very quickly have Dgraph up and running. You can skip writing the Spark connector for Dgraph. How much data do you have right now?

Its not a one time activity.
And I need to do it 2 way. I mean load from dgraph to spark and write back to dgraph from spark.

Also, I’m running into dependency errors after including the dgraph4j dependency in my project.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/tmp/sbt_dedcb9ab/target/54c6dd23/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/tmp/sbt_dedcb9ab/target/be4b3c56/slf4j-simple-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]

Its not a one time activity.
And I need to do it 2 way. I mean load from dgraph to spark and write back to dgraph from spark.

In that case, a spark connector of sort would make sense.

Also, I’m running into dependency errors after including the dgraph4j dependency in my project.

Spark generally have conflicts with grpc dependencies. This looks different. Are you getting errors at compile time?

I guess thats a warning. But I get the following error during run time.

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/tmp/sbt_8fbaec8d/target/54c6dd23/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/tmp/sbt_8fbaec8d/target/be4b3c56/slf4j-simple-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See SLF4J Error Codes for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties

[error] (run-main-0) java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat

[error] java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat

[error] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:312)

[error] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)

[error] at org.apache.spark.rdd.RDD.count(RDD.scala:1168)

[error] at org.apache.spark.graphx.GraphLoader$.edgeListFile(GraphLoader.scala:94)

[error] at example.MyPPRPlay$.main(MyPRDriver.scala:26)

[error] at example.MyPPRPlay.main(MyPRDriver.scala)

[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

[error] at java.lang.reflect.Method.invoke(Method.java:498)

19/04/07 12:51:55 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext

java.lang.InterruptedException

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)

at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:97)

at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)

at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:83)

at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)

at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:83)

Thanks,
Eshwar

Hi @amanmangal,

Can you please suggest how to go about this?

Thanks,
Eshwar

The above log trace points to issue with GraphX. I am not sure what the issue is here. Is there a way I can reproduce it?

Yeah, just create a sample sbt project and add the following dependencies:

libraryDependencies += sparkCore,
libraryDependencies += sparkMLlib,
libraryDependencies += "io.dgraph" % "dgraph4j" % "1.7.0"

In the main scala file, just create a spark context and try to load a graph.

val conf = new SparkConf().setAppName("dgraph debug").setMaster("local[*]")
val sc = new SparkContext(conf)
val graph = GraphLoader.edgeListFile(sc, "edges.txt")

Sample edges.txt: spark/followers.txt at master · apache/spark · GitHub

My project complies successfully without the dgraph4j dependency.
It’s failing with the above error as soon as I add it.

What is the version of spark core and SparkMLlib? Also, please share the version of sbt.

sbt version 1.2.7
scala version 2.12.8
spark-core version 2.4.0
spark-mllib version 2.4.0

1 Like

What is GraphLoader here?

import org.apache.spark.graphx.GraphLoader

What is the version of GraphX are you using, in that case?

It’s part of the spark core only. No need to include it separately.

Will you share your complete build.sbt? For spark core, I only added following dependencies, and it doesn’t have graphx -

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += “org.apache.spark” %% “spark-core” % “2.4.0”


ThisBuild / scalaVersion     := "2.12.8"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "spark-ppr",
    libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test,
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0",
    libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.0",

    libraryDependencies += "io.dgraph" % "dgraph4j" % "1.7.0" // exclude("org.slf4j", "*")
  )

My bad. Looks like graphx is not part of spark-core but its a dependency for spark-mllib.

Thanks, I can reproduce the issue here now. Let me look into it.