Running Scheduled Task in Websokcets Or Scaling with multiple workers
suryanarayanan035 opened this issue · 1 comments
Scenario
Hello there,
Thanks for creating this cool and elegant library. I am building a chat application using this library.
In my application I want to ping the client for every 10 seconds to verify whether they are connected still. But the issue is I cannot do so.
What I have tried
I tried to create an async function which will sleep for 10 seconds then call websockets.ping method.
But this can't be used because if I use asyncio.sleep
then I need to use await
which will block the codes below the function call.
Expectation
So, I thought of moving the pinging work to a separate worker. So my main server, will handle the connections and my worker will do the pinging. To do so, I have to serialize/pickle the websocket object which is not possible.
So, I would like to know a way to scale websockets with multiple worker or running interval based tasks without blocking the main process.
Here is my server code:
async def handler(websocket):
user = save_user_data_on_connect(websocket)
CURRENT_CONNECTIONS[user.get_id()] = websocket
status, pair = pair_user(user)
if status == 'paired':
message = json.dumps(construct_message_dict('connected'))
await CURRENT_CONNECTIONS[pair].send(message)
await websocket.send(message)
*** this is where the logic for start the scheduled job will come***
logging.info(f"connected msg sent to user {websocket.id}")
async for message in websocket:
receiver = get_user_pair(str(websocket.id))
try:
print(f"Message Received: {message}")
message = parse_message(message)
message['type'] = 'user_sent'
if receiver and 'content' in message:
message = save_message_to_db(str(websocket.id), receiver, message)
await CURRENT_CONNECTIONS[receiver].send(message)
except websockets.exceptions.ConnectionClosed:
cleanup_disconnected_connection(receiver)
message = json.dumps(construct_message_dict('disconnected'))
await CURRENT_CONNECTIONS[str(websocket.id)].send(message)
except Exception as e:
print(e)
Happy to provide more information if needed. Thanks in adavance!
So, I thought of moving the pinging work to a separate worker. So my main server, will handle the connections and my worker will do the pinging. To do so, I have to serialize/pickle the websocket object which is not possible.
"worker" in this context is "asyncio coroutine". Coroutines share the same memory space. Therefore, you don't need to serialize/pickle.
(Besides, you cannot (easily) share sockets between processes. Therefore, generally speaking, serialize/pickle a network connection with the intent of sharing it never makes sense. If it's the same process, you don't need to serialize. If it's in a different process, you cannot share.)
Are you aware the websockets already does this for you and you can simply tune the parameters as you wish?
e.g. serve(ping_interval=10, ping_timeout=5)
if you think that latency > 5s means the connection is dead