graphql-python/gql

EuropeanTour websockets API using gql

HJA24 opened this issue · 4 comments

HJA24 commented

Hi,

I want to communicate with a graphql-server that, I assume, uses the Apollo websockets transport protocol

My code is copied from the example in the documentation:

import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport


logging.basicConfig(level=logging.INFO)
    
async def main():
    transport = WebsocketsTransport(url='wss://btec-websocket.services.imgarena.com',
                                    init_payload={'accept-language': 'en',
                                                  'ec-version': '5.1.88',
                                                  'operator': 'europeantour',
                                                  'referrer': 'https://www.europeantour.com/',
                                                  'sport': 'GOLF'})
    async with Client(transport=transport,
                      fetch_schema_from_transport=False) as session:
        # do something

When I expect the traffic between the client and the server here (click on a player to see the course) I see the following:

client -> server

{"type":"connection_init","payload":{"accept-language":"en","ec-version":"5.1.88","sport":"GOLF","referrer":"https://www.europeantour.com/","operator":"europeantour"}}

client <- server

{"wsIdentity":"vBeXw1roT6CBTf9EvEEfyQ/5535528"}

client -> server

{"id":"1073897396","type":"start","operationName":"SubscribeToGolfTournamentStatus","eventId":488}

As you can see the last message doesn't consist of a query.
How can I reproduce this message within gql and send it to the server?

Thank you

  • OS: mac
  • Python version: 3.8
  • gql version: 3.4.0
  • graphql-core: 3.2.3

The EuropeanTour website does not use the Apollo websocket protocol for its API so you cannot use the standard WebsocketTransport class.

What I found out:

  • instead of returning a connection_ack message after the connection_init, it returns a wsIdentity message
  • for a subscription, the client will first send a start message containing an operationName and an eventId, without any GraphQL query
  • the server will then answer with a request-full-subscription message, requiring the client to send the full subscription query
  • the client has then to send a full-subscription message containing the full requested query, including the operationName and a new field subscriptionName
  • the id is not actually random, it is generated from hashing together the subscriptionName and the variables, from a json format with the dictionary keys sorted. It took me a while to figure it out as no error is generated if the id is not correct
  • the server seems to cache the GraphQL subscription based on the IP address probably so it will not request the subscription with a request-full-subscription message if it has already received it previously

Here is a Chrome screenshot showing the request-full-subscription messages:

Chrome websocket messages

You can then create your own transport by inheriting the WebsocketTransport class and making the necessary modifications.

Here is an example of working code:

import asyncio
import json
import logging
from typing import Any, AsyncGenerator, Dict, Optional, Tuple

from graphql import DocumentNode, ExecutionResult, print_ast

from gql import Client, gql
from gql.transport.exceptions import TransportProtocolError
from gql.transport.websockets import WebsocketsTransport

logging.basicConfig(level=logging.INFO)


class EuropeanTourWebsocketsTransport(WebsocketsTransport):
    def _hash(self, e):
        t = 5381
        r = len(e)
        while r:
            r -= 1
            t = t * 33 ^ ord(e[r])
        return t & 0xFFFFFFFF

    def _calc_id(self, subscription_name, variables):

        obj = {
            "subscriptionName": subscription_name,
            "variables": variables,
        }

        obj_stringified = json.dumps(
            obj,
            separators=(",", ":"),
            sort_keys=True,
        )

        hashed_value = self._hash(obj_stringified)

        return hashed_value

    async def _send_query(
        self,
        document: DocumentNode,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: Optional[str] = None,
    ) -> int:

        # Calculate the id by hashing the subscription name and the variables
        query_id = self._calc_id(self.latest_subscription_name, variable_values)

        # Creating the payload for the full subscription
        payload: Dict[str, Any] = {"query": print_ast(document)}
        if variable_values:
            payload["variables"] = variable_values
        if operation_name:
            payload["operationName"] = operation_name
            payload["subscriptionName"] = self.latest_subscription_name

        # Saving the full query first and waiting for the server to request it later
        self.saved_full_subscriptions[str(query_id)] = payload

        # Then first start to request the subscription only with the operation name
        query_str = json.dumps(
            {
                "id": str(query_id),
                "type": "start",
                "operationName": operation_name,
                "eventId": self.latest_event_id,
            }
        )

        await self._send(query_str)

        return query_id

    async def subscribe(
        self,
        document: DocumentNode,
        *,
        variable_values: Optional[Dict[str, Any]] = None,
        operation_name: str,
        subscription_name: str,
        event_id: int,
        send_stop: Optional[bool] = True,
    ) -> AsyncGenerator[ExecutionResult, None]:

        self.latest_event_id = event_id
        self.latest_subscription_name = subscription_name

        async for result in super().subscribe(
            document,
            variable_values=variable_values,
            operation_name=operation_name,
            send_stop=send_stop,
        ):
            yield result

    async def _wait_ack(self) -> None:

        self.saved_full_subscriptions = {}

        while True:
            init_answer = await self._receive()

            answer_type, answer_id, execution_result = self._parse_answer(init_answer)

            if answer_type == "wsIdentity":
                return

            raise TransportProtocolError(
                "Websocket server did not return a wsIdentity response"
            )

    def _parse_answer(
        self, answer: str
    ) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
        try:
            json_answer = json.loads(answer)
        except ValueError:
            raise TransportProtocolError(
                f"Server did not return a GraphQL result: {answer}"
            )

        if "wsIdentity" in json_answer:
            return ("wsIdentity", json_answer["wsIdentity"], None)

        elif (
            "type" in json_answer and json_answer["type"] == "request-full-subscription"
        ):
            return ("request-full-subscription", json_answer["id"], None)

        else:

            return self._parse_answer_apollo(json_answer)

    async def send_full_subscription(self, answer_id: str):

        if answer_id not in self.saved_full_subscriptions:
            raise Exception(f"Full subscription not found for id {answer_id}")

        payload = self.saved_full_subscriptions[answer_id]

        query_str = json.dumps(
            {"id": answer_id, "type": "full-subscription", "payload": payload}
        )

        await self._send(query_str)

    async def _handle_answer(
        self,
        answer_type: str,
        answer_id: Optional[int],
        execution_result: Optional[ExecutionResult],
    ) -> None:

        if answer_type == "request-full-subscription":
            await self.send_full_subscription(answer_id)

        else:
            await super()._handle_answer(answer_type, answer_id, execution_result)


async def main():

    transport = EuropeanTourWebsocketsTransport(
        url="wss://btec-websocket.services.imgarena.com",
        init_payload={
            "accept-language": "en",
            "ec-version": "5.1.88",
            "operator": "europeantour",
            "referrer": "https://www.europeantour.com/",
            "sport": "GOLF",
        },
    )

    async with Client(
        transport=transport, fetch_schema_from_transport=False
    ) as session:

        query = gql(
            """
subscription ShotTrackerSubscribeToGolfTournamentGroupScores($input: SubscribeToGolfTournamentGroupScoresInput!) {
  subscribeToGolfTournamentGroupScores(input: $input) {
    groupId
    l1Course
    teamId
    players {
      id
      lastName
      firstName
    }
    roundScores {
      courseId
      roundNo
      toParToday {
        value
      }
      holesThrough {
        value
      }
      startHole
      holes {
        holePar
        holeStrokes
        holeOrder
        holeNumber
      }
      isPlayoff
    }
    toPar {
      value
    }
    tournamentPosition {
      format
      value
      displayValue
    }
    status
  }
}
"""
        )

        variables = {
            "input": {
                "teamId": 21,
                "tournamentId": 488,
                "roundNo": 4,
            },
        }

        async for result in session.subscribe(
            query,
            operation_name="ShotTrackerSubscribeToGolfTournamentGroupScores",
            variable_values=variables,
            subscription_name="subscribeToGolfTournamentGroupScores",
            event_id=488,
        ):

            print(result)


asyncio.run(main())
HJA24 commented

Again I would like to thank you very much for the time and effort!

You're welcome!

HJA24 commented

@leszekhanusz sorry for opening this topic again.. Could you take a look at the following SO-question if you're available? Its closely related to this topic, but focuses on queries instead of subscriptions.