Source Code: https://github.com/endrekrohn/fastws
FastWS is a wrapper around FastAPI to create better WebSocket applications with auto-documentation using AsyncAPI, in a similar fashion as FastAPIs existing use of OpenAPI.
The current supported AsyncAPI verison is 2.4.0
. Once version 3.0.0
is released the plan is to upgrade to this standard.
If you are familiar with FastAPI and want to look at an example project using FastWS look here👨💻
Python 3.11+
FastWS
uses Pydantic v2 and FastAPI.
$ pip install fastws
You will also need an ASGI server, for production such as Uvicorn or Hypercorn.
$ pip install "uvicorn[standard]"
- Create a file
main.py
with:
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import Depends, FastAPI
from fastws import Client, FastWS
service = FastWS()
@service.send("ping", reply="ping")
async def send_event_a():
return
@asynccontextmanager
async def lifespan(app: FastAPI):
service.setup(app)
yield
app = FastAPI(lifespan=lifespan)
@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
await service.serve(client)
We can look at the generated documentation at http://localhost:<port>/asyncapi
.
First we import and initialize the service.
from fastws import Client, FastWS
service = FastWS()
Next up we connect an operation (a WebSocket message) to the service, using the decorator @service.send(...)
. We need to define the operation using a string similar to how we define an HTTP-endpoint using a path.
The operation-identificator is in this case "ping"
, meaning we will use this string to identify what type of message we are receiving.
@service.send("ping", reply="ping")
async def send_event_a():
return
If we want to define an payload
for the operation we can extend the example:
from pydantic import BaseModel
class PingPayload(BaseModel):
foo: str
@service.send("ping", reply="ping")
async def send_event_a(payload: PingPayload):
return
An incoming message should now have the following format. (We will later view this in the generated AsyncAPI-documentation).
{
"type": "ping",
"payload": {
"foo": "bar"
}
}
Next up we connect the service to our running FastAPI application.
@asynccontextmanager
async def lifespan(app: FastAPI):
service.setup(app)
yield
app = FastAPI(lifespan=lifespan)
@app.websocket("/")
async def fastws_stream(client: Annotated[Client, Depends(service.manage)]):
await service.serve(client)
The function service.setup(app)
inside FastAPIs lifespan registers two endpoints
/asyncapi.json
, to retrieve our API definition/asyncapi
, to view the AsyncAPI documentation UI.
You can override both of these URLs when initializing the service, or set them to None
to avoid registering the endpoints at all.
To spread out our service we can use the OperationRouter
-class.
# feature_1.py
from fastws import Client, OperationRouter
from pydantic import BaseModel
router = OperationRouter(prefix="user.")
class SubscribePayload(BaseModel):
topic: str
class SubscribeResponse(BaseModel):
detail: str
topics: set[str]
@router.send("subscribe", reply="subscribe.response")
async def subscribe_to_topic(
payload: SubscribePayload,
client: Client,
) -> SubscribeResponse:
client.subscribe(payload.topic)
return SubscribeResponse(
detail=f"Subscribed to {payload.topic}",
topics=client.topics,
)
We can then include the router in our main service.
# main.py
from fastws import Client, FastWS
from feature_1 import router
service = FastWS()
service.include_router(router)
The service enables two types of operations. Let us define these operations clearly:
-
send
: An operation where API user sends a message to the API server.Note: Up to AsyncAPI version
2.6.0
this refers to apublish
-operation, but is changing tosend
in version3.0.0
. -
recv
: An operation where API server sends a message to the API user.Note: Up to AsyncAPI version
2.6.0
this refers to asubscribe
-operation, but is changing toreceive
in version3.0.0
.
The above examples have only displayed the use of send
-operations.
When using the functions FastWS.client_send(message, client)
or FastWS.serve(client)
, we implicitly send some arguments. These keyword-arguments have the following keywords and types:
client
with typefastws.application.Client
app
with typefastws.application.FastWS
payload
, optional with type defined in the function processing the message.
A send
-operation can therefore access the following arguments:
from fastws import Client, FastWS
from pydantic import BaseModel
class Something(BaseModel):
foo: str
class Thing(BaseModel):
bar: str
@router.send("foo", reply="bar")
async def some_function(
payload: Something,
client: Client,
app: FastWS,
) -> Thing:
print(f"{app.connections=}")
print(f"{client.uid=}")
return Thing(bar=client.uid)
When using the function FastWS.server_send(message, topic)
, we implicitly send some arguments. These keyword-arguments have the keywords and types:
app
with typefastws.application.FastWS
- Optional
payload
with type defined in the function processing the message.
A recv
-operation can therefore access the following arguments:
from fastws import FastWS
from pydantic import BaseModel
class AlertPayload(BaseModel):
message: str
@router.recv("alert")
async def recv_client(payload: AlertPayload, app: FastWS) -> str:
return "hey there!"
If we want create a message on the server side we can do the following:
from fastapi import FastAPI
from fastws import FastWS
service = FastWS()
app = FastAPI()
@app.post("/")
async def alert_on_topic_foobar(message: str):
await service.server_send(
Message(type="alert", payload={"message": message}),
topic="foobar",
)
return "ok"
In the example above all connections subscribed to the topic foobar
will recieve a message the the payload "hey there!"
.
In this way you can on the server-side choose to publish messages from anywhere to any topic. This is especially useful if you have a persistent connection to Redis or similar that reads messages from some channel and want to propagate these to your users.
There are to ways to tackle authentication using FastWS
.
One way is to provide a custom auth_handler
when initializing the service. Below is an example where the API user must provide a secret message within a timeout to authenticate.
import asyncio
import logging
from fastapi import WebSocket
from fastws import FastWS
def custom_auth(to_wait: float = 5):
async def handler(ws: WebSocket) -> bool:
await ws.accept()
try:
initial_msg = await asyncio.wait_for(
ws.receive_text(),
timeout=to_wait,
)
return initial_msg == "SECRET_HUSH_HUSH"
except asyncio.exceptions.TimeoutError:
logging.info("Took to long to provide authentication")
return False
return handler
service = FastWS(auth_handler=custom_auth())
If you want to use your own FastAPI dependency to handle authentication before it enters the FastWS service you will have to set auto_ws_accept
to False
.
import asyncio
from typing import Annotated
from fastapi import Depends, FastAPI, WebSocket, WebSocketException, status
from fastws import Client, FastWS
service = FastWS(auto_ws_accept=False)
app = FastAPI()
async def custom_dep(ws: WebSocket):
await ws.accept()
initial_msg = await asyncio.wait_for(
ws.receive_text(),
timeout=5,
)
if initial_msg == "SECRET_HUSH_HUSH":
return
raise WebSocketException(
code=status.WS_1008_POLICY_VIOLATION,
reason="Not authenticated",
)
@app.websocket("/")
async def fastws_stream(
client: Annotated[Client, Depends(service.manage)],
_=Depends(custom_dep),
):
await service.serve(client)
To handle a WebSocket's lifespan at an application level, FastWS tries to help you by using asyncio.timeout()
-context managers in its serve(client)
-function.
You can set the both:
heartbeat_interval
: Meaning a client needs to send a message within this time.max_connection_lifespan
: Meaning all connections will disconnect when exceeding this time.
These must set during initialization:
from fastws import FastWS
service = FastWS(
heartbeat_interval=10,
max_connection_lifespan=300,
)
Both heartbeat_interval
and max_connection_lifespan
can be set to None to disable any restrictions. Note this is the default.
Please note that you can also set restrictions in your ASGI-server. These restrictions apply at a protocol/server-level and are different from the restrictions set by your application. Applicable settings for uvicorn:
--ws-ping-interval
INTEGER--ws-ping-timeout
INTEGER--ws-max-size
INTEGER