Clariteia/minos_microservice_saga

Refactor the way to create callback functions to follow the `(request: Request) -> Optional[Response]` pattern

Closed this issue · 0 comments

This issue is intended to resolve one of the biggest confusion causes while someone is using the minos.saga.Saga class, that is: what is the function's prototype of each callback function?

Set of Changes

Firstly, I'll explain the current API of the minos.saga.Saga methods, and then, once all of them are understood, a (request: Request) -> Optional[Response] proposal will be discussed.

Invoke Participant → Step Constructor

Defines an operation (Command or Query) call that must be resolved by the same or another external microservice.

Current

Prototype

SagaStep.invoke_participant(
    name: str, 
    callback: Callable[[SagaContext, ...], Union[Any, Awaitable[Any]]], 
    parameters: Optional[SagaContext],
)

The name refers to the operation name (or topic in the broker's case), the callback refers to a function that prepares the request content to be send to the corresponding operation handler and parameters is an optional mapping of values to be passed to the callback function. These values are very useful in cases in which the same callback function is shared among multiple invoke_participant or with_compensation definitions.

Proposal

Prototype

SagaStep(
    callback: Callable[[SagaContext, ...], Awaitable[SagaRequest]], 
    parameters: Optional[SagaContext],
)

Example

Saga()
    .step(send_product_price_update)
    ...


async def send_product_price_update(context: SagaContext) -> SagaRequest:
    return SagaRequest("UpdateProductPrice", {"uuid": context["product_uuid"], "price": context["price"]})

On Reply → On Success

Defines the handling function of a Response received after the SagaStep.invoke_participant is resolved.

Current

Prototype

SagaStep.on_reply(
    name: str, 
    callback: Optional[Callable[[Any, ...], Union[Any, Awaitable[Any]]]], 
    parameters: Optional[SagaContext],
)

The name refers to the SagaContext entry in which the response's content of the invoke_participant call will be stored. The callback function refers to an optional transformation to be performed to the response before storing it into the SagaContext and parameters is an optional mapping of values to be passed to the callback function.

Proposal

Prototype

SagaStep.on_success(
    callback: Callable[[SagaContext, SagaResponse, ...], Awaitable[SagaContext]]
    parameters: Optional[SagaContext],
)

Example

Saga()
    .step(send_product_price_update)
        .on_success(on_product_price_success)
    ...


async def on_product_price_success(context: SagaContext, response: SagaResponse) -> SagaContext:
    content = await response.content()
    context["product_version"] =  content["version"]
    return context

With Compensation → On Error

Defines an operation to be executed if the status code of the response contains an application level error (raised a ResponseException)

Current

Prototype

SagaStep.with_compensation(
    target: str, 
    callback: Callable[[SagaContext, ...], Union[Any, Awaitable[Any]]], 
    parameters: Optional[SagaContext],
)

The function prototype is the same as the SagaStep.invoke_participant function: The name refers to the operation name (or topic in the broker's case), the callback refers to a function that prepares the request content to be send to the corresponding operation handler and parameters is an optional mapping of values to be passed to the callback function. These values are very useful in cases in which the same callback function is shared among multiple invoke_participant or with_compensation definitions.

Proposal

Prototype

SagaStep.on_error(
    callback: Callable[[SagaContext, SagaResponse, ...], Awaitable[SagaContext]]
    parameters: Optional[SagaContext],
)

Example

Saga()
    .step(send_product_price_update)
        .on_success(on_product_price_success)
        .on_error(on_product_price_error)
    ...


async def on_product_price_error(context: SagaContext, response: SagaResponse) -> SagaContext:
    if not response.ok:
        raise ValueError("Price could not be updated")

    content = await response.content()
    context["product_version"] =  content["version"]
    return context

On Failure

The purpose of this operation is to revert the changes performed by the step execution in case of failure.

Proposal

Prototype

SagaStep.on_failure(
    callback: Callable[[SagaContext, ...], Awaitable[SagaRequest]], 
    parameters: Optional[SagaContext],
)

Example

Saga()
    .step(send_product_price)
        .on_success(on_product_price_success)
        .on_error(on_product_price_error)
        .on_failure(on_product_price_failure)
    ...


async def on_product_price_failure(context: SagaContext) -> SagaRequest:
    return SagaRequest(
        "UpdateProductPrice", {"uuid": context["product_uuid"], "price": context["previous_price"]}
    )

Commit

Defines the ending step of a Saga with an optional action to be executed before finishing the execution.

Current

Prototype

Saga.commit(
    callback: Optional[Callable[[SagaContext, ...], Awaitable[SagaContext]]], 
    parameters: Optional[SagaContext],
)

The callback argument refers to an optional function to be executed before ending the saga execution. This function gets a SagaContext as input and optionally receives another updated SagaContext as output. The parameters argument is an optional mapping of values to be passed to the callback function

Example

Saga()
    .step(send_product_price_update)
        .on_success(on_product_price_success)
        .on_error(on_product_price_error)
        .on_failure(on_product_price_failure)
    .step(send_update_customers_wallet)
        .on_success(on_update_customers_wallet_success)
    .commit(on_commit)


async def on_commit(context: SagaContext) -> SagaContext:
    context["finished_at"] = current_datetime()
    return context

Extra: Composable Sagas

For complex sagas, the composable way to define them may be a good choice to increase readability:

saga = Saga(
    steps=[
        SagaStep(
            send_product_price,
            on_success=on_product_price_success,
            on_error=on_product_price_error,
            on_failure=on_product_price_failure
        ),
        SagaStep(
            send_update_customers_wallet,
            on_success=on_update_customers_wallet_success
        ),
    ],
    commit=on_commit
)

Related issues:

  • #168 - Pass request instead of context to saga's on_reply().
  • #177 - Enrich SagaRequest and SagaResponse.
  • #179 - Rename SagaStep methods.
  • #180 - Add SagaStep.on_error method.