Sure!
- Start Dgraph Zero with command
./dgraph zero
- Start Dgraph Alpha with command
./dgraph alpha --lru_mb=32784 --tls_dir=tls --acl_secret_file=acl_secret_file --encryption_key_file=enc_key_file
- Start Go program with command
go run main.go
My main.go file look’s something like this:
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/snoculars/api/test"
"github.com/snoculars/api/x"
"github.com/dgraph-io/dgo/v200"
"github.com/dgraph-io/dgo/v200/protos/api"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
var (
users = flag.Int("users", 1000, "Number of accounts.")
conc = flag.Int("txns", 100, "Number of concurrent transactions per client.")
dur = flag.String("dur", "24h", "How long to run the transactions.")
alpha = flag.String("alpha", "localhost:9080", "Address of Dgraph alpha.")
verbose = flag.Bool("verbose", false, "Output all logs in verbose mode.")
login = flag.Bool("login", true, "Login as groot. Used for ACL-enabled cluster.")
)
var startBal = 10
var kacp = keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: time.Second,
PermitWithoutStream: true,
}
type account struct {
UID string `json:"uid"`
Key int `json:"key,omitempty"`
Bal int `json:"bal,omitempty"`
Typ string `json:"typ"`
}
type state struct {
aborts int32
runs int32
}
func (s *state) createAccounts(dg *dgo.Dgraph) {
op := api.Operation{DropAll: true}
x.Check(dg.Alter(context.Background(), &op))
op.DropAll = false
op.Schema = `
key: int @index(int) @upsert .
bal: int .
typ: string @index(exact) @upsert .
`
x.Check(dg.Alter(context.Background(), &op))
var all []account
for i := 1; i <= *users; i++ {
a := account{
Key: i,
Bal: startBal,
Typ: "ba",
}
all = append(all, a)
}
data, err := json.Marshal(all)
x.Check(err)
txn := dg.NewTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
var mu api.Mutation
mu.SetJson = data
if *verbose {
log.Printf("mutation: %s\n", mu.SetJson)
}
_, err = txn.Mutate(context.Background(), &mu)
x.Check(err)
x.Check(txn.Commit(context.Background()))
}
func (s *state) runTotal(dg *dgo.Dgraph) error {
query := `
{
q(func: eq(typ, "ba")) {
uid
key
bal
}
}
`
txn := dg.NewReadOnlyTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
resp, err := txn.Query(context.Background(), query)
if err != nil {
return err
}
m := make(map[string][]account)
if err := json.Unmarshal(resp.Json, &m); err != nil {
return err
}
accounts := m["q"]
sort.Slice(accounts, func(i, j int) bool {
return accounts[i].Key < accounts[j].Key
})
var total int
for _, a := range accounts {
total += a.Bal
}
if *verbose {
log.Printf("Read: %v. Total: %d\n", accounts, total)
}
if len(accounts) > *users {
log.Fatalf("len(accounts) = %d", len(accounts))
}
if total != *users*startBal {
log.Fatalf("Total = %d", total)
}
return nil
}
func (s *state) findAccount(txn *dgo.Txn, key int) (account, error) {
query := fmt.Sprintf(`{ q(func: eq(key, %d)) { key, uid, bal, typ }}`, key)
resp, err := txn.Query(context.Background(), query)
if err != nil {
return account{}, err
}
m := make(map[string][]account)
if err := json.Unmarshal(resp.Json, &m); err != nil {
log.Fatal(err)
}
accounts := m["q"]
if len(accounts) > 1 {
log.Printf("Query: %s. Response: %s\n", query, resp.Json)
log.Fatal("Found multiple accounts")
}
if len(accounts) == 0 {
if *verbose {
log.Printf("Unable to find account for K_%02d. JSON: %s\n", key, resp.Json)
}
return account{Key: key, Typ: "ba"}, nil
}
return accounts[0], nil
}
func (s *state) transferBalance(dg *dgo.Dgraph, buf *bytes.Buffer) error {
w := bufio.NewWriter(buf)
fmt.Fprintf(w, "==>\n")
defer func() {
fmt.Fprintf(w, "---\n")
_ = w.Flush()
}()
ctx := context.Background()
txn := dg.NewTxn()
defer func() {
if err := txn.Discard(context.Background()); err != nil {
log.Fatalf("Discarding transaction failed: %+v\n", err)
}
}()
var sk, sd int
for {
sk = rand.Intn(*users + 1)
sd = rand.Intn(*users + 1)
if sk == 0 || sd == 0 { // Don't touch zero.
continue
}
if sk != sd {
break
}
}
src, err := s.findAccount(txn, sk)
if err != nil {
return err
}
dst, err := s.findAccount(txn, sd)
if err != nil {
return err
}
if src.Key == dst.Key {
return nil
}
amount := rand.Intn(10)
if src.Bal-amount <= 0 {
amount = src.Bal
}
fmt.Fprintf(w, "Transferring [$%d, K_%02d -> K_%02d]. Src:%+v. Dst: %+v\n",
amount, src.Key, dst.Key, src, dst)
src.Bal -= amount
dst.Bal += amount
var mu api.Mutation
if len(src.UID) > 0 {
// If there was no src.UID, then don't run any mutation.
if src.Bal == 0 {
pb, err := json.Marshal(src)
x.Check(err)
mu.DeleteJson = pb
fmt.Fprintf(w, "Deleting K_%02d: %s\n", src.Key, mu.DeleteJson)
} else {
data, err := json.Marshal(src)
x.Check(err)
mu.SetJson = data
}
_, err := txn.Mutate(ctx, &mu)
if err != nil {
fmt.Fprintf(w, "Error while mutate: %v", err)
return err
}
}
mu = api.Mutation{}
data, err := json.Marshal(dst)
x.Check(err)
mu.SetJson = data
assigned, err := txn.Mutate(ctx, &mu)
if err != nil {
fmt.Fprintf(w, "Error while mutate: %v", err)
return err
}
if err := txn.Commit(ctx); err != nil {
return err
}
if len(assigned.GetUids()) > 0 {
fmt.Fprintf(w, "Created K_%02d: %+v for %+v\n", dst.Key, assigned.GetUids(), dst)
for _, uid := range assigned.GetUids() {
dst.UID = uid
}
}
fmt.Fprintf(w, "Transferred [$%d, K_%02d -> K_%02d]. Src:%+v. Dst: %+v\n",
amount, src.Key, dst.Key, src, dst)
return nil
}
func (s *state) loop(dg *dgo.Dgraph, wg *sync.WaitGroup) {
defer wg.Done()
dur, err := time.ParseDuration(*dur)
x.Check(err)
end := time.Now().Add(dur)
var buf bytes.Buffer
for i := 0; ; i++ {
if i%5 == 0 {
if err := s.runTotal(dg); err != nil {
log.Printf("Error while runTotal: %v", err)
}
continue
}
buf.Reset()
err := s.transferBalance(dg, &buf)
if *verbose {
log.Printf("Balance transfer failed: %v. %s", err, buf.String())
}
if err != nil {
atomic.AddInt32(&s.aborts, 1)
} else {
r := atomic.AddInt32(&s.runs, 1)
if r%100 == 0 {
a := atomic.LoadInt32(&s.aborts)
fmt.Printf("Runs: %d. Aborts: %d\n", r, a)
}
if time.Now().After(end) {
return
}
}
}
}
func dgraphClientWithCert(serviceAddr string, conf *viper.Viper) (*dgo.Dgraph, error) {
tlsCfg, err := x.LoadClientTLSConfig(conf)
if err != nil {
return nil, err
}
dialOpts := []grpc.DialOption{grpc.WithBlock(), grpc.WithKeepaliveParams(kacp), grpc.WithTimeout(10 * time.Second)}
if tlsCfg != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
conn, err := grpc.Dial(serviceAddr, dialOpts...)
if err != nil {
return nil, err
}
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
return dg, nil
}
func helloServer(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
}
func main() {
flag.Parse()
all := strings.Split(*alpha, ",")
test.PrintHello()
go func() {
http.HandleFunc("/", helloServer)
http.ListenAndServe(":5000", nil)
}()
var clients []*dgo.Dgraph
for _, one := range all {
conf := viper.New()
conf.Set("tls_cacert", "../../dgraph/dgraph/tls/ca.crt")
hostname, err := os.Hostname()
if err != nil {
log.Fatalf("Unable to get hostname %s", err.Error())
}
conf.Set("tls_server_name", hostname)
dg, err := dgraphClientWithCert(one, conf)
if err != nil {
log.Fatalf("Unable to get Dgraph client: %s", err.Error())
}
if *login {
// login as groot to perform the DropAll operation later
x.Check(dg.Login(context.Background(), "groot", "password"))
}
clients = append(clients, dg)
}
s := state{}
s.createAccounts(clients[0])
var wg sync.WaitGroup
for i := 0; i < *conc; i++ {
for _, dg := range clients {
wg.Add(1)
go s.loop(dg, &wg)
}
}
wg.Wait()
fmt.Println()
fmt.Println("Total aborts", s.aborts)
fmt.Println("Total success", s.runs)
if err := s.runTotal(clients[0]); err != nil {
log.Fatal(err)
}
}
Note: This is based off the example here dgraph/main.go at 06ea4c545bc3cf0ed730327557dbff96406e75f8 · dgraph-io/dgraph · GitHub
As you can see my users
and txns
values are pretty high… and the higher I make them the more errors I get like the following:
18446744073709551615 from list with key 00000362616c0000000000040b4cab: readTs: 438907294 less than minTs: 438907295 for key: "\x00\x00\x03bal\x00\x00\x00\x00\x00\x04\vL\xab"
2020/07/02 07:00:29 Error while runTotal: rpc error: code = Unknown desc = : cannot find value without language tag from list with key 0000036b65790000000000040b3158: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000040b3158: readTs: 438907786 less than minTs: 438907790 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x04\v1X"
2020/07/02 07:00:29 Error while runTotal: rpc error: code = Unknown desc = : cannot find value without language tag from list with key 0000036b65790000000000040b3158: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000040b3158: readTs: 438907786 less than minTs: 438907790 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x04\v1X"
2020/07/02 07:00:29 Error while runTotal: rpc error: code = Unknown desc = : cannot find value without language tag from list with key 0000036b65790000000000040b3158: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000040b3158: readTs: 438907786 less than minTs: 438907790 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x04\v1X"
2020/07/02 07:00:29 Error while runTotal: rpc error: code = Unknown desc = : cannot find value without language tag from list with key 0000036b65790000000000040b3158: cannot retrieve posting for UID 18446744073709551615 from list with key 0000036b65790000000000040b3158: readTs: 438907789 less than minTs: 438907790 for key: "\x00\x00\x03key\x00\x00\x00\x00\x00\x04\v1X"
Here’s a screen recording! Dgraph on Vimeo
Does anyone have any ideas? Is this normal? Thanks in advance!