Topic-based subscription extension for python aiohttp web applications.
Features:
- Automatically created webhook aiohttp handlers based on definitions
- Unified R4B/R5 API for automatic registration (managed subscriptions)
- Optional managed subscriptions
- Optional authentication using
X-Api-Key
id-only
/full-resource
support
Install fhir-tbs[r4b]
or fhir-tbs[r5]
using poetry/pipenv/pip.
- Instantiate R4BTBS/R5TBS class with optionally passing predefined subscriptions using
subscriptions
arg andsubscription_defaults
with default subscription parameters (e.g.payload_content
,timeout
orheartbeat_period
):tbs = R4BTBS(subscription_defaults={"payload_content": "full-resource"})
- Define subscriptions using decorator
tbs.define
:@tbs.define( topic="https://example.com/SubscriptionTopic/new-appointment-event", filter_by=[ { "resource_type": "Appointment", "filter_parameter": "status", "value": "booked" } ], webhook_id="new-appointment" ) async def new_appointment_handler( app: web.Application, reference: str, _included_resources: list[r4b.AnyResource], _timestamp: str | None, ) -> None: logging.info("New appointment %s", reference)
- Invoke
setup_tbs
on app initialization passing needed parameters (see specification below):setup_tbs(app, tbs, webhook_path_prefix="webhook")
fhir_tbs.r4b.R4BTBS/fhir_tbs.r5.R5TBS
- subscriptions (list[fhir_tbs.SubscriptionDefinitionWithHandler], optional) - predefined list of subscriptions.
- subscription_defaults (optional) - default parameters for all subscription definitions.
- payload_content (str, optional):
id-only
/full-resource
(default isid-only
) - timeout (int, optional): default is
60
- heartbeat_period (int, optional): default is
20
- payload_content (str, optional):
tbs_instance.define
- topic (str): URL of SubscriptionTopic to subscribe.
- webhook_id (optional): Optional webhook id that will be part of webhook URL.
- filter_by (list[FilterBy], optional): Optional list of filters applied to topic.
- payload_content (str, optional):
id-only
/full-resource
(default isid-only
) - timeout (int, optional): default is
60
- heartbeat_period (int, optional): default is
20
setup_tbs
- app (web.Application): aiohttp application.
- tbs (R4BTBS/R5TBS): TBS class instance.
- webhook_path_prefix (str): Prefix for the generated aiohttp routes.
- webhook_token (str, optional): The authorization token that is checked in X-Api-Token header.
- manage_subscriptions (bool, optional): The flag that indicates whether subscription registration/population should be enabled.
- handle_delivery_errors (bool, optional): WIP The flag that indicated whether subscription delivery errors (e.g. broken connection or missing events) should be handled.
- app_url (str, optional): Application url that is used when
manage_subscriptions
/handle_delivery_errors
are set. - get_fhir_client (Callable[[web.Application], AsyncFHIRClient], optional): Getter for web.Application that returns AsyncFHIRClient that further used when
manage_subscriptions
/handle_delivery_errors
are set.
Create subscriptions.py
with the following content:
import logging
from fhirpy import AsyncFHIRClient
import fhirpy_types_r4b as r4b
from fhir_tbs import SubscriptionDefinitionWithHandler
from fhir_tbs.r4b import R4BTBS
from aiohttp import web
# Make sure that app has fhir_client_key
fhir_client_key = web.AppKey("fhir_client_key", AsyncFHIRClient)
async def new_appointment_sub(
app: web.Application,
appointment_ref: str,
included_resources: list[r4b.AnyResource],
_timestamp: str
) -> None:
fhir_client = app[fhir_client_key]
# For id-only use fhir_client to fetch the resource
appointment = r4b.Appointment(**(await fhir_client.get(appointment_ref)))
# For full-resource find in in included resources by reference (straightforward example)
appointment = [
resource for resource in included_resources
if appointment_ref == f"{resource.resourceType}/{resource.id}"
][0]
logging.info("New appointment %s", appointment.model_dump())
tbs = R4BTBS()
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_tbs(
app,
tbs,
webhook_path_prefix="webhook",
app_url="http://app:8080",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
external_webhook_path_prefix_parts = ["external-webhook"]
external_tbs = R4BTBS(subscriptions=subscriptions)
def setup_external_tbs(app: web.Application) -> None:
setup_tbs(
app,
external_tbs,
webhook_prefix_path="/".join(external_webhook_path_prefix_parts),
app_url="http://aidbox.example.com",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
@sdk.operation(
methods=["POST"],
path=[*external_webhook_path_prefix_parts, {"name": "webhook-name"}],
public=True,
)
async def external_webhook_proxy_op(
_operation: SDKOperation, request: SDKOperationRequest
) -> web.Response:
session = request["app"][ak.session]
app_url = str(request["app"][ak.settings].APP_URL).rstrip("/")
webhook_name = request["route-params"]["webhook-name"]
path = "/".join([*external_webhook_path_prefix_parts, webhook_name])
token = request["headers"].get("x-api-key")
async with session.post(
f"{app_url}/{path}",
headers={"X-Api-Key": token} if token else {},
json=request["resource"],
) as response:
return web.Response(
body=await response.text(),
status=response.status,
content_type=response.content_type,
)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_external_tbs(app)