Query/mutation via websocket does not terminate

I try to send my queries and mutations via websocket to dgraph. This works till I reach a specific amount of requests quite good. Then it starts to lag. So the responses got delayed.

In my opinion, the reason for this is the handing of the requests.
Accorting the apollo protocol a request starts with the message type “start” and ends with the meassage type “end”. As far as I can analyse it, the “stop” message get ignored.

So dgraph start a subscription when it get the “start” message but does not terminate it with the “stop” message.

For further investigation I created a minimal code example:

import json
import websocket
import uuid


def connect():
    return websocket.create_connection(
        "ws://10.60.2.4:8080/graphql",
        subprotocols=["graphql-ws"]
    )


def init_graph_connection():
    data = json.dumps({
        'type': 'connection_init',
        'payload': {
            "headers": ""
        }
    })
    ws.send(data)


def run_query(query_string):
    payload = {
        "headers": {"Content-Type": "application/json"},
        "query": query_string
    }
    query_id = uuid.uuid4().hex

    data_to_send = json.dumps(
        {"id": query_id,
         "type": 'start',
         "payload": payload
         }
    )
    ws.send(data_to_send)
    res = json.loads(ws.recv())
    print(res)
    ws.send(json.dumps({
        'id': query_id,
        'type': 'stop'
    }))

    res = json.loads(ws.recv())
    print(res)


ws = connect()
init_graph_connection()

query = 'query {queryLocation(){locationId}}'

for _ in range(10):
    run_query(query)

The log file look like this:

dgraph_1          | I1029 07:21:02.558528      82 poller.go:111] Subscription polling is started for the ID 10234
dgraph_1          | I1029 07:21:02.564463      82 poller.go:111] Subscription polling is started for the ID 10235
dgraph_1          | I1029 07:21:02.571085      82 poller.go:111] Subscription polling is started for the ID 10236
dgraph_1          | I1029 07:21:02.577385      82 poller.go:111] Subscription polling is started for the ID 10237
dgraph_1          | I1029 07:21:02.583112      82 poller.go:111] Subscription polling is started for the ID 10238
dgraph_1          | I1029 07:21:02.589003      82 poller.go:111] Subscription polling is started for the ID 10239
dgraph_1          | I1029 07:21:02.596125      82 poller.go:111] Subscription polling is started for the ID 10240
dgraph_1          | I1029 07:21:02.603510      82 poller.go:111] Subscription polling is started for the ID 10241
dgraph_1          | I1029 07:21:02.609195      82 poller.go:111] Subscription polling is started for the ID 10242
dgraph_1          | I1029 07:21:02.615683      82 poller.go:111] Subscription polling is started for the ID 10243
dgraph_1          | I1029 07:21:02.629777      82 poller.go:255] Terminating subscription for the subscription ID 10234
dgraph_1          | I1029 07:21:02.629861      82 poller.go:255] Terminating subscription for the subscription ID 10236
dgraph_1          | I1029 07:21:02.629892      82 poller.go:255] Terminating subscription for the subscription ID 10238
dgraph_1          | I1029 07:21:02.629925      82 poller.go:255] Terminating subscription for the subscription ID 10235
dgraph_1          | I1029 07:21:02.630313      82 poller.go:255] Terminating subscription for the subscription ID 10237
dgraph_1          | I1029 07:21:02.630495      82 poller.go:255] Terminating subscription for the subscription ID 10240
dgraph_1          | I1029 07:21:02.630756      82 poller.go:255] Terminating subscription for the subscription ID 10241
dgraph_1          | I1029 07:21:02.630863      82 poller.go:255] Terminating subscription for the subscription ID 10242
dgraph_1          | I1029 07:21:02.630991      82 poller.go:255] Terminating subscription for the subscription ID 10243
dgraph_1          | I1029 07:21:02.631153      82 poller.go:255] Terminating subscription for the subscription ID 10239

As you can see, dgraph open a subscription but does not close it.
The subscripition get termineated when the program itselve quit.

I use the latest docker/standalone image.

Do I make anything wrong?

I extended my tests by adding the @withSubscription flag to a test type in my schema.

I also update the minimal example in this way:

import json
import websocket
import uuid
from threading import Thread


def connect():
    return websocket.create_connection(
        "ws://10.60.2.4:8080/graphql",
        subprotocols=["graphql-ws"]
    )


def init_graph_connection():
    data = json.dumps({
        'type': 'connection_init',
        'payload': {
            "headers": ""
        }
    })
    ws.send(data)


def recieve_task():
    while True:
        res = json.loads(ws.recv())
        print(res)


def run_query(query_string):
    payload = {
        "headers": {"Content-Type": "application/json"},
        "query": query_string
    }
    query_id = uuid.uuid4().hex

    data_to_send = json.dumps(
        {"id": query_id,
         "type": 'start',
         "payload": payload
         }
    )
    ws.send(data_to_send)

    ws.send(json.dumps({
        'id': query_id,
        'type': 'stop'
    }))


def run_subscription(query_string):
    payload = {
        "headers": {"Content-Type": "application/json"},
        "query": query_string
    }
    query_id = uuid.uuid4().hex

    data_to_send = json.dumps(
        {"id": query_id,
         "type": 'start',
         "payload": payload
         }
    )
    ws.send(data_to_send)


ws = connect()
init_graph_connection()

query = ''' {
  queryLocation(){
    locationId
    friendlyName
  }
}'''

recieve_thread = Thread(target=recieve_task)
recieve_thread.start()
run_query(query)
run_subscription(query)

Now I’m able to do subscriptions.

This is the output of the program:

{'type': 'connection_ack'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office1', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 1902376, 'execution': {'resolvers': [{'duration': 1833960, 'startOffset': 40554, 'dgraph': [{'duration': 1734515, 'label': 'query', 'startOffset': 87212}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:42.797721237Z', 'endTime': '2020-10-29T10:21:42.799623626Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'complete', 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office1', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 2359163, 'execution': {'resolvers': [{'duration': 2262678, 'startOffset': 72834, 'dgraph': [{'duration': 2110487, 'label': 'query', 'startOffset': 173557}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:42.800147641Z', 'endTime': '2020-10-29T10:21:42.802506835Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}

according the specification I got a measage back with type ‘complete’.
The subscription should be terminated.

When I run a mutation, I get several responses. Several for the the query and several for the subscription.
So the query is also an open not canceled subscription.

Output after mutation:

{'type': 'connection_ack'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office1', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 1902376, 'execution': {'resolvers': [{'duration': 1833960, 'startOffset': 40554, 'dgraph': [{'duration': 1734515, 'label': 'query', 'startOffset': 87212}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:42.797721237Z', 'endTime': '2020-10-29T10:21:42.799623626Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'complete', 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office1', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 2359163, 'execution': {'resolvers': [{'duration': 2262678, 'startOffset': 72834, 'dgraph': [{'duration': 2110487, 'label': 'query', 'startOffset': 173557}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:42.800147641Z', 'endTime': '2020-10-29T10:21:42.802506835Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3216314, 'execution': {'resolvers': [{'duration': 2869655, 'startOffset': 320361, 'dgraph': [{'duration': 2684096, 'label': 'query', 'startOffset': 460951}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.818045846Z', 'endTime': '2020-10-29T10:21:49.821262205Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3216314, 'execution': {'resolvers': [{'duration': 2869655, 'startOffset': 320361, 'dgraph': [{'duration': 2684096, 'label': 'query', 'startOffset': 460951}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.818045846Z', 'endTime': '2020-10-29T10:21:49.821262205Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3168727, 'execution': {'resolvers': [{'duration': 2990981, 'startOffset': 153429, 'dgraph': [{'duration': 2847034, 'label': 'query', 'startOffset': 252671}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.911864914Z', 'endTime': '2020-10-29T10:21:49.915033721Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3168727, 'execution': {'resolvers': [{'duration': 2990981, 'startOffset': 153429, 'dgraph': [{'duration': 2847034, 'label': 'query', 'startOffset': 252671}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.911864914Z', 'endTime': '2020-10-29T10:21:49.915033721Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 4390070, 'execution': {'resolvers': [{'duration': 4190194, 'startOffset': 170317, 'dgraph': [{'duration': 3737908, 'label': 'query', 'startOffset': 557567}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.914058243Z', 'endTime': '2020-10-29T10:21:49.918448261Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 4390070, 'execution': {'resolvers': [{'duration': 4190194, 'startOffset': 170317, 'dgraph': [{'duration': 3737908, 'label': 'query', 'startOffset': 557567}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:49.914058243Z', 'endTime': '2020-10-29T10:21:49.918448261Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3263691, 'execution': {'resolvers': [{'duration': 3039078, 'startOffset': 196924, 'dgraph': [{'duration': 2854201, 'label': 'query', 'startOffset': 309736}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:50.522073703Z', 'endTime': '2020-10-29T10:21:50.525337628Z', 'version': 1}}}, 'id': '6541f04ba9e943a2b2ced579a5f641d2'}
{'type': 'data', 'payload': {'data': {'queryLocation': [{'friendlyName': 'office2', 'locationId': 'office'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'duration': 3263691, 'execution': {'resolvers': [{'duration': 3039078, 'startOffset': 196924, 'dgraph': [{'duration': 2854201, 'label': 'query', 'startOffset': 309736}], 'returnType': '[Location]', 'parentType': 'Query', 'path': ['queryLocation'], 'fieldName': 'queryLocation'}]}, 'startTime': '2020-10-29T10:21:50.522073703Z', 'endTime': '2020-10-29T10:21:50.525337628Z', 'version': 1}}}, 'id': 'c84f7b64e7a64617b6a0739d259ce4ff'}

Hi @AndreasSchwalb, We previously had this issue that when we send a stop message to the server it wasn’t terminating the subscription. We have resolved that issue, Please make sure you are working on the recent master image or try recreating your docker image. And if you still face it, please let us know.

When I run the above code, I got the below response. I have changed the query.

{'type': 'connection_ack'}
{'id': '1851ccf41ed14920a68d6c6d3cfe9440', 'type': 'complete'}
{'id': '1851ccf41ed14920a68d6c6d3cfe9440', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.039953842+05:30', 'endTime': '2020-11-05T17:21:33.040777559+05:30', 'duration': 823694, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 24727, 'duration': 789637, 'dgraph': [{'label': 'query', 'startOffset': 65409, 'duration': 732577}]}]}}}}, 'type': 'data'}
{'id': '35bca9eac96748e2a9566a08ea058d1f', 'type': 'complete'}
{'id': '35bca9eac96748e2a9566a08ea058d1f', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.041047321+05:30', 'endTime': '2020-11-05T17:21:33.041652263+05:30', 'duration': 604927, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 20891, 'duration': 572857, 'dgraph': [{'label': 'query', 'startOffset': 54359, 'duration': 518697}]}]}}}}, 'type': 'data'}
{'id': '35bca9eac96748e2a9566a08ea058d1f', 'type': 'complete'}
{'id': '7365b3e8302d400b909a455b6c1226f6', 'type': 'complete'}
{'id': '7365b3e8302d400b909a455b6c1226f6', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.041933289+05:30', 'endTime': '2020-11-05T17:21:33.042491075+05:30', 'duration': 557780, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 20661, 'duration': 529098, 'dgraph': [{'label': 'query', 'startOffset': 46993, 'duration': 487943}]}]}}}}, 'type': 'data'}
{'id': '7365b3e8302d400b909a455b6c1226f6', 'type': 'complete'}
{'id': 'ab13048330144368a9c9233e0d1a34e2', 'type': 'complete'}
{'id': '16b35c1e90be4b218fa5c8d5fdd62c03', 'type': 'complete'}
{'id': '16b35c1e90be4b218fa5c8d5fdd62c03', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.043162846+05:30', 'endTime': '2020-11-05T17:21:33.043709715+05:30', 'duration': 546858, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 20738, 'duration': 518645, 'dgraph': [{'label': 'query', 'startOffset': 46963, 'duration': 478435}]}]}}}}, 'type': 'data'}
{'id': '96127d4212dc4076a79c3694d3058ac1', 'type': 'complete'}
{'id': '96127d4212dc4076a79c3694d3058ac1', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.043794226+05:30', 'endTime': '2020-11-05T17:21:33.044404902+05:30', 'duration': 610665, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 23067, 'duration': 578313, 'dgraph': [{'label': 'query', 'startOffset': 49909, 'duration': 537708}]}]}}}}, 'type': 'data'}
{'id': '040daef2518e49e7930f810bc8fa95da', 'type': 'complete'}
{'id': '040daef2518e49e7930f810bc8fa95da', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.044482353+05:30', 'endTime': '2020-11-05T17:21:33.044988446+05:30', 'duration': 506088, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 17985, 'duration': 480300, 'dgraph': [{'label': 'query', 'startOffset': 47881, 'duration': 435989}]}]}}}}, 'type': 'data'}
{'id': '755c0ef1af5d4ccc868132cc0ec8a4e8', 'type': 'complete'}
{'id': '25c4fcb1624d4a25885d163a6946c527', 'type': 'complete'}
{'id': '25c4fcb1624d4a25885d163a6946c527', 'payload': {'data': {'queryTodo': [{'text': 'jatin1', 'owner': 'jatin'}]}, 'extensions': {'touched_uids': 3, 'tracing': {'version': 1, 'startTime': '2020-11-05T17:21:33.045747749+05:30', 'endTime': '2020-11-05T17:21:33.046283279+05:30', 'duration': 535521, 'execution': {'resolvers': [{'path': ['queryTodo'], 'parentType': 'Query', 'fieldName': 'queryTodo', 'returnType': '[Todo]', 'startOffset': 18169, 'duration': 509955, 'dgraph': [{'label': 'query', 'startOffset': 53547, 'duration': 457809}]}]}}}}, 'type': 'data'}
{'id': 'c110a4a8c0e946a3be3d94305165e712', 'type': 'complete'}

And also the subscriptions gets terminated as we see in below alpha logs. I have also printed the message type received from the client.

I1105 17:18:51.376232  501912 connection.go:161] connection_init
I1105 17:18:51.376263  501912 connection.go:161] start
I1105 17:18:51.377248  501912 poller.go:111] Subscription polling is started for the ID 1
I1105 17:18:51.377272  501912 connection.go:161] stop
I1105 17:18:51.377309  501912 poller.go:255] Terminating subscription for the subscription ID 1
I1105 17:18:51.377504  501912 connection.go:161] start
I1105 17:18:51.378201  501912 poller.go:111] Subscription polling is started for the ID 2
I1105 17:18:51.378627  501912 connection.go:161] stop
I1105 17:18:51.378658  501912 poller.go:255] Terminating subscription for the subscription ID 2
I1105 17:18:51.378819  501912 connection.go:161] start
I1105 17:18:51.379436  501912 poller.go:111] Subscription polling is started for the ID 3
I1105 17:18:51.379788  501912 connection.go:161] stop
I1105 17:18:51.379815  501912 poller.go:255] Terminating subscription for the subscription ID 3
I1105 17:18:51.379986  501912 connection.go:161] start
I1105 17:18:51.380630  501912 poller.go:111] Subscription polling is started for the ID 4
I1105 17:18:51.380986  501912 connection.go:161] stop
I1105 17:18:51.381015  501912 poller.go:255] Terminating subscription for the subscription ID 4
I1105 17:18:51.381187  501912 connection.go:161] start
I1105 17:18:51.381752  501912 poller.go:111] Subscription polling is started for the ID 5
I1105 17:18:51.382106  501912 connection.go:161] stop
I1105 17:18:51.382130  501912 poller.go:255] Terminating subscription for the subscription ID 5
I1105 17:18:51.382312  501912 connection.go:161] start
I1105 17:18:51.382912  501912 poller.go:111] Subscription polling is started for the ID 6
I1105 17:18:51.383304  501912 connection.go:161] stop
I1105 17:18:51.383333  501912 poller.go:255] Terminating subscription for the subscription ID 6
I1105 17:18:51.383516  501912 connection.go:161] start
I1105 17:18:51.384141  501912 poller.go:111] Subscription polling is started for the ID 7
I1105 17:18:51.384502  501912 connection.go:161] stop
I1105 17:18:51.384529  501912 poller.go:255] Terminating subscription for the subscription ID 7
I1105 17:18:51.384705  501912 connection.go:161] start
I1105 17:18:51.385272  501912 poller.go:111] Subscription polling is started for the ID 8
I1105 17:18:51.385637  501912 connection.go:161] stop
I1105 17:18:51.385662  501912 poller.go:255] Terminating subscription for the subscription ID 8
I1105 17:18:51.385823  501912 connection.go:161] start
I1105 17:18:51.386410  501912 poller.go:111] Subscription polling is started for the ID 9
I1105 17:18:51.386771  501912 connection.go:161] stop
I1105 17:18:51.386808  501912 poller.go:255] Terminating subscription for the subscription ID 9
I1105 17:18:51.386986  501912 connection.go:161] start
I1105 17:18:51.387686  501912 poller.go:111] Subscription polling is started for the ID 10
I1105 17:18:51.388068  501912 connection.go:161] stop
I1105 17:18:51.388097  501912 poller.go:255] Terminating subscription for the subscription ID 10

And there is already an issue as you mention that for a mutation update, we get duplicated subscription updates. We are working on this issue and will get back to you soon.