Spark Connector for dgraph


(Eshwar) #1

Hi team,

I have a use case where I need to run graph algorithms on the graphs stored in dgraph. I’m using Spark and Scala for graph processing and would like to have a spark connector for dgraph, which loads the graph from dgraph to spark.

Is there any such connector already available?
If not, I would like to help in creating one. Help me get started.

Thanks,
Eshwar


(Aman Mangal) #2

Hi Eshwar,

We currently do not have a spark connector available. We do have a Java client https://github.com/dgraph-io/dgraph4j which could become the base of the Spark Connector. I have some experience in building spark connectors, let me know if you need any help.

I would consider looking at Spark DataSource V2 APIs which are pretty well designed and then go from there.

Thanks


(Eshwar) #3

Hi @amanmangal,

Thanks for the reply. I am using graphx of spark and would like to create graph from RDDs of vertices and edges. (using graph.apply method)

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.graphx.Graph%24@apply

So what is the best way to get all the data from dgraph to that format?

Thanks


(Aman Mangal) #4

If this is a one time activity, you can convert the data into json format using Spark. Once you have json data, you can run the Dgraph bulk loader and very quickly have Dgraph up and running. You can skip writing the Spark connector for Dgraph. How much data do you have right now?


(Eshwar) #5

Its not a one time activity.
And I need to do it 2 way. I mean load from dgraph to spark and write back to dgraph from spark.

Also, I’m running into dependency errors after including the dgraph4j dependency in my project.

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/tmp/sbt_dedcb9ab/target/54c6dd23/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/tmp/sbt_dedcb9ab/target/be4b3c56/slf4j-simple-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]


(Aman Mangal) #6

Its not a one time activity.
And I need to do it 2 way. I mean load from dgraph to spark and write back to dgraph from spark.

In that case, a spark connector of sort would make sense.

Also, I’m running into dependency errors after including the dgraph4j dependency in my project.

Spark generally have conflicts with grpc dependencies. This looks different. Are you getting errors at compile time?


(Eshwar) #7

I guess thats a warning. But I get the following error during run time.

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/tmp/sbt_8fbaec8d/target/54c6dd23/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/tmp/sbt_8fbaec8d/target/be4b3c56/slf4j-simple-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties

[error] (run-main-0) java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat

[error] java.lang.IllegalAccessError: tried to access method com.google.common.base.Stopwatch.<init>()V from class org.apache.hadoop.mapred.FileInputFormat

[error] at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:312)

[error] at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)

[error] at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:253)

[error] at scala.Option.getOrElse(Option.scala:138)

[error] at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)

[error] at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)

[error] at org.apache.spark.rdd.RDD.count(RDD.scala:1168)

[error] at org.apache.spark.graphx.GraphLoader$.edgeListFile(GraphLoader.scala:94)

[error] at example.MyPPRPlay$.main(MyPRDriver.scala:26)

[error] at example.MyPPRPlay.main(MyPRDriver.scala)

[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

[error] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

[error] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

[error] at java.lang.reflect.Method.invoke(Method.java:498)

19/04/07 12:51:55 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext

java.lang.InterruptedException

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)

at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)

at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:97)

at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)

at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)

at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:83)

at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)

at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:83)

Thanks,
Eshwar


(Eshwar) #8

Hi @amanmangal,

Can you please suggest how to go about this?

Thanks,
Eshwar


(Aman Mangal) #9

The above log trace points to issue with GraphX. I am not sure what the issue is here. Is there a way I can reproduce it?


(Eshwar) #10

Yeah, just create a sample sbt project and add the following dependencies:

libraryDependencies += sparkCore,
libraryDependencies += sparkMLlib,
libraryDependencies += "io.dgraph" % "dgraph4j" % "1.7.0"

In the main scala file, just create a spark context and try to load a graph.

val conf = new SparkConf().setAppName("dgraph debug").setMaster("local[*]")
val sc = new SparkContext(conf)
val graph = GraphLoader.edgeListFile(sc, "edges.txt")

Sample edges.txt: https://github.com/apache/spark/blob/master/data/graphx/followers.txt

My project complies successfully without the dgraph4j dependency.
It’s failing with the above error as soon as I add it.


(Aman Mangal) #11

What is the version of spark core and SparkMLlib? Also, please share the version of sbt.


(Eshwar) #12

sbt version 1.2.7
scala version 2.12.8
spark-core version 2.4.0
spark-mllib version 2.4.0


(Aman Mangal) #13

What is GraphLoader here?


(Eshwar) #14

import org.apache.spark.graphx.GraphLoader


(Aman Mangal) #15

What is the version of GraphX are you using, in that case?


(Eshwar) #16

It’s part of the spark core only. No need to include it separately.


(Aman Mangal) #17

Will you share your complete build.sbt? For spark core, I only added following dependencies, and it doesn’t have graphx -

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += “org.apache.spark” %% “spark-core” % “2.4.0”


(Eshwar) #18

ThisBuild / scalaVersion     := "2.12.8"
ThisBuild / version          := "0.1.0-SNAPSHOT"
ThisBuild / organization     := "com.example"
ThisBuild / organizationName := "example"

lazy val root = (project in file("."))
  .settings(
    name := "spark-ppr",
    libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % Test,
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0",
    libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.0",

    libraryDependencies += "io.dgraph" % "dgraph4j" % "1.7.0" // exclude("org.slf4j", "*")
  )

(Eshwar) #19

My bad. Looks like graphx is not part of spark-core but its a dependency for spark-mllib.


(Aman Mangal) #20

Thanks, I can reproduce the issue here now. Let me look into it.