Refactor the way to create callback functions to follow the `(request: Request) -> Optional[Response]` pattern
Closed this issue · 0 comments
This issue is intended to resolve one of the biggest confusion causes while someone is using the minos.saga.Saga
class, that is: what is the function's prototype of each callback function?
Set of Changes
Firstly, I'll explain the current API of the minos.saga.Saga
methods, and then, once all of them are understood, a (request: Request) -> Optional[Response]
proposal will be discussed.
Invoke Participant → Step Constructor
Defines an operation (Command
or Query
) call that must be resolved by the same or another external microservice.
Current
Prototype
SagaStep.invoke_participant(
name: str,
callback: Callable[[SagaContext, ...], Union[Any, Awaitable[Any]]],
parameters: Optional[SagaContext],
)
The name
refers to the operation name (or topic in the broker's case), the callback
refers to a function that prepares the request content to be send to the corresponding operation handler and parameters
is an optional mapping of values to be passed to the callback
function. These values are very useful in cases in which the same callback
function is shared among multiple invoke_participant
or with_compensation
definitions.
Proposal
Prototype
SagaStep(
callback: Callable[[SagaContext, ...], Awaitable[SagaRequest]],
parameters: Optional[SagaContext],
)
Example
Saga()
.step(send_product_price_update)
...
async def send_product_price_update(context: SagaContext) -> SagaRequest:
return SagaRequest("UpdateProductPrice", {"uuid": context["product_uuid"], "price": context["price"]})
On Reply → On Success
Defines the handling function of a Response
received after the SagaStep.invoke_participant
is resolved.
Current
Prototype
SagaStep.on_reply(
name: str,
callback: Optional[Callable[[Any, ...], Union[Any, Awaitable[Any]]]],
parameters: Optional[SagaContext],
)
The name
refers to the SagaContext
entry in which the response's content of the invoke_participant
call will be stored. The callback
function refers to an optional transformation to be performed to the response before storing it into the SagaContext
and parameters
is an optional mapping of values to be passed to the callback
function.
Proposal
Prototype
SagaStep.on_success(
callback: Callable[[SagaContext, SagaResponse, ...], Awaitable[SagaContext]]
parameters: Optional[SagaContext],
)
Example
Saga()
.step(send_product_price_update)
.on_success(on_product_price_success)
...
async def on_product_price_success(context: SagaContext, response: SagaResponse) -> SagaContext:
content = await response.content()
context["product_version"] = content["version"]
return context
With Compensation → On Error
Defines an operation to be executed if the status code of the response contains an application level error (raised a ResponseException
)
Current
Prototype
SagaStep.with_compensation(
target: str,
callback: Callable[[SagaContext, ...], Union[Any, Awaitable[Any]]],
parameters: Optional[SagaContext],
)
The function prototype is the same as the SagaStep.invoke_participant
function: The name
refers to the operation name (or topic in the broker's case), the callback
refers to a function that prepares the request content to be send to the corresponding operation handler and parameters
is an optional mapping of values to be passed to the callback
function. These values are very useful in cases in which the same callback
function is shared among multiple invoke_participant
or with_compensation
definitions.
Proposal
Prototype
SagaStep.on_error(
callback: Callable[[SagaContext, SagaResponse, ...], Awaitable[SagaContext]]
parameters: Optional[SagaContext],
)
Example
Saga()
.step(send_product_price_update)
.on_success(on_product_price_success)
.on_error(on_product_price_error)
...
async def on_product_price_error(context: SagaContext, response: SagaResponse) -> SagaContext:
if not response.ok:
raise ValueError("Price could not be updated")
content = await response.content()
context["product_version"] = content["version"]
return context
On Failure
The purpose of this operation is to revert the changes performed by the step execution in case of failure.
Proposal
Prototype
SagaStep.on_failure(
callback: Callable[[SagaContext, ...], Awaitable[SagaRequest]],
parameters: Optional[SagaContext],
)
Example
Saga()
.step(send_product_price)
.on_success(on_product_price_success)
.on_error(on_product_price_error)
.on_failure(on_product_price_failure)
...
async def on_product_price_failure(context: SagaContext) -> SagaRequest:
return SagaRequest(
"UpdateProductPrice", {"uuid": context["product_uuid"], "price": context["previous_price"]}
)
Commit
Defines the ending step of a Saga
with an optional action to be executed before finishing the execution.
Current
Prototype
Saga.commit(
callback: Optional[Callable[[SagaContext, ...], Awaitable[SagaContext]]],
parameters: Optional[SagaContext],
)
The callback
argument refers to an optional function to be executed before ending the saga execution. This function gets a SagaContext
as input and optionally receives another updated SagaContext
as output. The parameters
argument is an optional mapping of values to be passed to the callback
function
Example
Saga()
.step(send_product_price_update)
.on_success(on_product_price_success)
.on_error(on_product_price_error)
.on_failure(on_product_price_failure)
.step(send_update_customers_wallet)
.on_success(on_update_customers_wallet_success)
.commit(on_commit)
async def on_commit(context: SagaContext) -> SagaContext:
context["finished_at"] = current_datetime()
return context
Extra: Composable Sagas
For complex sagas, the composable way to define them may be a good choice to increase readability:
saga = Saga(
steps=[
SagaStep(
send_product_price,
on_success=on_product_price_success,
on_error=on_product_price_error,
on_failure=on_product_price_failure
),
SagaStep(
send_update_customers_wallet,
on_success=on_update_customers_wallet_success
),
],
commit=on_commit
)