How to share websocket context in quart?
arpantechparticle opened this issue · 0 comments
arpantechparticle commented
i have quart app with with websocket method.
charge_points = {}
@app.websocket("/ws/<charge_point_id>")
async def on_connect(charge_point_id: str):
logging.info("charge_point_id: %s", charge_point_id)
try:
await websocket.accept()
cp = SChargePoint(charge_point_id, websocket)
charge_points[charge_point_id] = cp
except asyncio.CancelledError:
print('charger WebSocketDisconnect')
logging.info("charger %s WebSocketDisconnect with code %s", charge_point_id, e.code)
Here SChargePoint which extended ChargePoint class.
class SChargePoint(ChargePoint):
def __init__(self, charge_point_id, connection):
super().__init__(charge_point_id, connection)
self.order_id = None
@on(Action.BootNotification)
def on_boot_notification(
self, charge_point_vendor: str, charge_point_model: str, **kwargs
):
return call_result.BootNotificationPayload(
current_time=datetime.utcnow().isoformat(),
interval=55,
status=RegistrationStatus.accepted,
)
async def remote_start_transaction(self, user, unit_rate, remote_start_item):
#send message to connection.
}
ChargePoint class handle send, receive message of web socket.
when new connection is made i store it in charge_points. it store SChargePoint instance.
when need to send message to specific charger ponit id. i called
@app.post("/fleet/start-charging/<string:charge_point_id>")
async def remote_start(charge_point_id):
remote_start_item = await request.get_json()
schargepoint_instance = charge_points.get(charger_id)
schargepoint_instance.remote_start_transaction(user, 21, remote_start_item)
it class remote_start_transaction method of SChargePoint class. SChargePoint already has websocket connection. but when it try to send message using send then get error ⇒ Not within a websocket context.