What I want to do
I want to ingest my data into Dgraph using live loader
What I did
My data is in mongoDB
using spark I am creating json array for every 100 records and storing that array in a file and trying to ingest that file using live loader.
I am seeing this below error while loading some files
Found 1 data file(s) to process
Processing data file "/home/part.json"
2021/07/23 10:54:29 Not all of JSON file consumed
If I observe that file , I am seeing multiple json arrays in it.
I am new to both spark and Dgraph.This is the scala code I implemented
object Profile {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local")
.appName("ProfilesMigration")
.getOrCreate()
val sc = spark.sparkContext
val db = Constant.ETG + Config.getString("client")
val df = MongoSpark.load(sc, ReadConfig(Map("uri" -> Config.getString("mongo.uri"),
"collection" -> Config.getString("mongo.collection"), "database" -> db,
"batchSize" -> Config.getString("mongo.batch.size"), "sampleSize" -> Config.getString("mongo.sample.size"))))
val profiles = df.map(rec=> ProfileProcessor.parseData(rec)).map(rec=>rec.toString().concat(","))
//val profiles = _profiles.map(rec=> rec.concat("\n"))
val destBasePath = Config.getString("destBasePath") +
Config.getString("client") + Constant.SLASH + Config.getString("mongo.collection") + Constant.SLASH
var profDF = spark.read.json(profiles)
val count = profDF.count()
val recsPerFile = Config.getString("records.per.file").toInt
val parts = (count/recsPerFile).toInt
profDF = profDF.repartition(parts)
val df2 = profDF.select(to_json(struct(profDF.columns.map(x=>profDF("`"+x+"`")):_*)).alias("json")).groupBy(spark_partition_id())
.agg(collect_list("json").alias("json_list")).select(col("json_list").cast("string"))
df2.write.mode(SaveMode.Overwrite).text(destBasePath)
}
}
Is anything wrong with this code implementation?
All my files are generating with .txt extension , how can i generate with .json extension?
How can I get single array of 100 records per file?