EuropeanTour websockets API using gql
HJA24 opened this issue · 4 comments
Hi,
I want to communicate with a graphql
-server that, I assume, uses the Apollo websockets transport protocol
My code is copied from the example in the documentation:
import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport
logging.basicConfig(level=logging.INFO)
async def main():
transport = WebsocketsTransport(url='wss://btec-websocket.services.imgarena.com',
init_payload={'accept-language': 'en',
'ec-version': '5.1.88',
'operator': 'europeantour',
'referrer': 'https://www.europeantour.com/',
'sport': 'GOLF'})
async with Client(transport=transport,
fetch_schema_from_transport=False) as session:
# do something
When I expect the traffic between the client and the server here (click on a player to see the course) I see the following:
client -> server
{"type":"connection_init","payload":{"accept-language":"en","ec-version":"5.1.88","sport":"GOLF","referrer":"https://www.europeantour.com/","operator":"europeantour"}}
client <- server
{"wsIdentity":"vBeXw1roT6CBTf9EvEEfyQ/5535528"}
client -> server
{"id":"1073897396","type":"start","operationName":"SubscribeToGolfTournamentStatus","eventId":488}
As you can see the last message doesn't consist of a query.
How can I reproduce this message within gql
and send it to the server?
Thank you
- OS: mac
- Python version: 3.8
- gql version: 3.4.0
- graphql-core: 3.2.3
The EuropeanTour website does not use the Apollo websocket protocol for its API so you cannot use the standard WebsocketTransport class.
What I found out:
- instead of returning a
connection_ack
message after theconnection_init
, it returns awsIdentity
message - for a subscription, the client will first send a start message containing an operationName and an eventId, without any GraphQL query
- the server will then answer with a
request-full-subscription
message, requiring the client to send the full subscription query - the client has then to send a
full-subscription
message containing the full requested query, including theoperationName
and a new fieldsubscriptionName
- the id is not actually random, it is generated from hashing together the
subscriptionName
and the variables, from a json format with the dictionary keys sorted. It took me a while to figure it out as no error is generated if the id is not correct - the server seems to cache the GraphQL subscription based on the IP address probably so it will not request the subscription with a
request-full-subscription
message if it has already received it previously
Here is a Chrome screenshot showing the request-full-subscription
messages:
You can then create your own transport by inheriting the WebsocketTransport
class and making the necessary modifications.
Here is an example of working code:
import asyncio
import json
import logging
from typing import Any, AsyncGenerator, Dict, Optional, Tuple
from graphql import DocumentNode, ExecutionResult, print_ast
from gql import Client, gql
from gql.transport.exceptions import TransportProtocolError
from gql.transport.websockets import WebsocketsTransport
logging.basicConfig(level=logging.INFO)
class EuropeanTourWebsocketsTransport(WebsocketsTransport):
def _hash(self, e):
t = 5381
r = len(e)
while r:
r -= 1
t = t * 33 ^ ord(e[r])
return t & 0xFFFFFFFF
def _calc_id(self, subscription_name, variables):
obj = {
"subscriptionName": subscription_name,
"variables": variables,
}
obj_stringified = json.dumps(
obj,
separators=(",", ":"),
sort_keys=True,
)
hashed_value = self._hash(obj_stringified)
return hashed_value
async def _send_query(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
) -> int:
# Calculate the id by hashing the subscription name and the variables
query_id = self._calc_id(self.latest_subscription_name, variable_values)
# Creating the payload for the full subscription
payload: Dict[str, Any] = {"query": print_ast(document)}
if variable_values:
payload["variables"] = variable_values
if operation_name:
payload["operationName"] = operation_name
payload["subscriptionName"] = self.latest_subscription_name
# Saving the full query first and waiting for the server to request it later
self.saved_full_subscriptions[str(query_id)] = payload
# Then first start to request the subscription only with the operation name
query_str = json.dumps(
{
"id": str(query_id),
"type": "start",
"operationName": operation_name,
"eventId": self.latest_event_id,
}
)
await self._send(query_str)
return query_id
async def subscribe(
self,
document: DocumentNode,
*,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: str,
subscription_name: str,
event_id: int,
send_stop: Optional[bool] = True,
) -> AsyncGenerator[ExecutionResult, None]:
self.latest_event_id = event_id
self.latest_subscription_name = subscription_name
async for result in super().subscribe(
document,
variable_values=variable_values,
operation_name=operation_name,
send_stop=send_stop,
):
yield result
async def _wait_ack(self) -> None:
self.saved_full_subscriptions = {}
while True:
init_answer = await self._receive()
answer_type, answer_id, execution_result = self._parse_answer(init_answer)
if answer_type == "wsIdentity":
return
raise TransportProtocolError(
"Websocket server did not return a wsIdentity response"
)
def _parse_answer(
self, answer: str
) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
try:
json_answer = json.loads(answer)
except ValueError:
raise TransportProtocolError(
f"Server did not return a GraphQL result: {answer}"
)
if "wsIdentity" in json_answer:
return ("wsIdentity", json_answer["wsIdentity"], None)
elif (
"type" in json_answer and json_answer["type"] == "request-full-subscription"
):
return ("request-full-subscription", json_answer["id"], None)
else:
return self._parse_answer_apollo(json_answer)
async def send_full_subscription(self, answer_id: str):
if answer_id not in self.saved_full_subscriptions:
raise Exception(f"Full subscription not found for id {answer_id}")
payload = self.saved_full_subscriptions[answer_id]
query_str = json.dumps(
{"id": answer_id, "type": "full-subscription", "payload": payload}
)
await self._send(query_str)
async def _handle_answer(
self,
answer_type: str,
answer_id: Optional[int],
execution_result: Optional[ExecutionResult],
) -> None:
if answer_type == "request-full-subscription":
await self.send_full_subscription(answer_id)
else:
await super()._handle_answer(answer_type, answer_id, execution_result)
async def main():
transport = EuropeanTourWebsocketsTransport(
url="wss://btec-websocket.services.imgarena.com",
init_payload={
"accept-language": "en",
"ec-version": "5.1.88",
"operator": "europeantour",
"referrer": "https://www.europeantour.com/",
"sport": "GOLF",
},
)
async with Client(
transport=transport, fetch_schema_from_transport=False
) as session:
query = gql(
"""
subscription ShotTrackerSubscribeToGolfTournamentGroupScores($input: SubscribeToGolfTournamentGroupScoresInput!) {
subscribeToGolfTournamentGroupScores(input: $input) {
groupId
l1Course
teamId
players {
id
lastName
firstName
}
roundScores {
courseId
roundNo
toParToday {
value
}
holesThrough {
value
}
startHole
holes {
holePar
holeStrokes
holeOrder
holeNumber
}
isPlayoff
}
toPar {
value
}
tournamentPosition {
format
value
displayValue
}
status
}
}
"""
)
variables = {
"input": {
"teamId": 21,
"tournamentId": 488,
"roundNo": 4,
},
}
async for result in session.subscribe(
query,
operation_name="ShotTrackerSubscribeToGolfTournamentGroupScores",
variable_values=variables,
subscription_name="subscribeToGolfTournamentGroupScores",
event_id=488,
):
print(result)
asyncio.run(main())
Again I would like to thank you very much for the time and effort!
You're welcome!
@leszekhanusz sorry for opening this topic again.. Could you take a look at the following SO-question if you're available? Its closely related to this topic, but focuses on queries instead of subscriptions.