Dgraph version : v20.07.0-12-g681fe9116
Dgraph codename : shuri-mod
Go version : go1.14.1
@chewxy we use twitter data:
wget http://an.kaist.ac.kr/\~haewoon/release/twitter_social_graph/twitter_rv.tar.gz
# split by line
split -l 54535107 twitter_rv.net
then use the go code to convert csv data to rdf
package main
import (
"path/filepath"
"os"
"fmt"
"flag"
"bufio"
"io"
"strings"
)
func getFilelist(path string) []string {
files := []string{}
err := filepath.Walk(path, func(path string, f os.FileInfo, err error) error {
if f == nil {
return err
}
if f.IsDir() {
return nil
}
files = append(files,path)
return nil
})
if err != nil {
fmt.Printf("filepath.Walk() returned %v\n", err)
}
return files
}
func rdf(f string, separator string, ch chan string) {
rv, err := os.Open(f)
if err != nil {
fmt.Println("open file err=", err)
rv.Close()
return
}
defer rv.Close()
out, _ := os.OpenFile(f+".rdf", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
reader := bufio.NewReader(rv)
wirter := bufio.NewWriter(out)
nodes := make(map[string]bool)
//循环的读取文件的内容
errcount := 0
for {
str, err := reader.ReadString('\n') // 读到一个换行就结束
if err == io.EOF { // io.EOF表示文件的末尾
break
}
//输出内容
line := strings.Split(str, separator)
if len(line) != 2 {
errcount++
fmt.Printf("from now on find %d errors in %s\n", errcount, f)
continue
}
src := strings.Trim(line[0], "\n")
dst := strings.Trim(line[1], "\n")
tpl := "_:v%s <id> \"%s\" . \n_:v%s <dgraph.type> \"twitter_user\" .\n"
if !nodes[src] {
lout := fmt.Sprintf(tpl, src, src, src)
wirter.Write([]byte(lout))
nodes[src] = true
}
if !nodes[dst] {
lout := fmt.Sprintf(tpl, dst, dst, dst)
wirter.Write([]byte(lout))
nodes[dst] = true
}
lout := fmt.Sprintf("_:v%s <followers> _:v%s . \n", src, dst)
wirter.Write([]byte(lout))
}
wirter.Flush()
out.Close()
ch <- f + ".rdf is ok!"
}
func main(){
flag.Parse()
separatorKey := flag.Arg(0)
root := flag.Arg(1)
separatorMap := map[string]string{"s": " ", "t": "\t", "c": ","}
separator := separatorMap[separatorKey]
if 0 == len(separator) {
fmt.Printf("Please specify the separator!\n t for Tab \n s for Space \n c for comma \n such as: ./gocsv2rdf s ./test \n")
return
}
if 0 == len(root) {
fmt.Printf("Please specify the CSV folder!\n such as: ./gocsv2rdf s ./test \n")
return
}
files :=getFilelist(root)
fmt.Printf("%v\n", files)
count := len(files)
if 0 == count {
fmt.Printf("Please specify the correct CSV folder!\n")
return
}
ch := make(chan string, count)
for i:= 0;i<count;i++{
name := files[i]
fmt.Printf("Runninng for %s\n", name)
//rdf(name, separator, ch)
go rdf(name, separator, ch) //内存足够大就用这行代码,并行处理
}
i := 0
for x := range ch {
fmt.Println(x)
i++
if i == int(count) {
break
}
}
fmt.Println("Over!")
}
then bulk load or live load into dgraph.
The schema is
id: int @index(int) .
followers: [uid] .
type twitter_user {
id
followers
}