Saga pattern for microservices example

It's a demo application for saga-framework that shows how latter can be used to implement CreateOrderSaga from Chris Richardson book on Microservices.

Table of contents:

Running an app

Firstly, run all infrastructure

docker-compose up --build --remove-orphans

Then, you can visit next URLs:

Local development

Firstly, run RabbitMQ and Redis

docker-compose  --file docker-compose.local.yaml up 

To run each service, see readme.md files in each service folder.

Architecture and implementation details

TL;DR

For unpatient readers: here's all main solution components saga-deployment-diagram

Also, here's how first two steps of Create Order Saga work. create-order-saga-first-2-steps-explained

See more detailed Saga pattern description at https://github.com/absent1706/saga-framework

Let's now describe current application in more details. Whole ecosystem for this app includes next main components

Orchestrator entrypoint

order_service Flask app which is the saga entrypoint needed just to initiate saga runs.

def _run_saga(input_data):
    order = Order.create(**input_data)
    saga_state = CreateOrderSagaState.create(order_id=order.id)
    CreateOrderSaga(main_celery_app, saga_state.id).execute()
    return f'Scheduled saga #{saga_state.id}. ' \
           f'See its progress in order_service worker'

Here's saga definition

class CreateOrderSaga(StatefulSaga):
    saga_state_repository = CreateOrderSagaRepository()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.steps = [
            SyncStep(
                name='reject_order',
                compensation=self.reject_order
            ),
            
            AsyncStep(
                name='verify_consumer_details',
                action=self.verify_consumer_details,

                base_task_name=verify_consumer_details_message.TASK_NAME,
                queue=consumer_service_messaging.COMMANDS_QUEUE,

                on_success=self.verify_consumer_details_on_success,
                on_failure=self.verify_consumer_details_on_failure
            ),

            AsyncStep(
                name='create_restaurant_ticket',
                action=self.create_restaurant_ticket,
                compensation=self.reject_restaurant_ticket,

                base_task_name=create_ticket_message.TASK_NAME,
                queue=restaurant_service_messaging.COMMANDS_QUEUE,

                on_success=self.create_restaurant_ticket_on_success,
                on_failure=self.create_restaurant_ticket_on_failure
            ),
            # other steps ...
        ]
    
    # method implementation goes next ...

Initiating a saga means simply sending first Celery task to Saga Handler Service:

class CreateOrderSaga(StatefulSaga):
    # other code ...
    
    def verify_consumer_details(self, current_step: AsyncStep):
        logging.info(f'Verifying consumer #{self.saga_state.order.consumer_id} ...')

        message_id = self.send_message_to_other_service(
            current_step,
            asdict(
                verify_consumer_details_message.Payload(
                    consumer_id=self.saga_state.order.consumer_id
                )
            )
        )

        self.saga_state_repository.update(self.saga_id, last_message_id=message_id)

Saga Step Handler Services

consumer_service, restaurant_service, accounting_service.

They all have Celery workers that handle saga steps and report results to Orchestrator.

Results are sent as a Celery tasks, for example:

@command_handlers_celery_app.task(bind=True, name=create_ticket_message.TASK_NAME)
@saga_step_handler(response_queue=CREATE_ORDER_SAGA_RESPONSE_QUEUE)
def create_ticket_task(self: Task, saga_id: int, payload: dict) -> dict:
    request_data = create_ticket_message.Payload(**payload)

    # in real world, we would create a ticket in restaurant service DB
    # here, we will just generate some fake ID of just created ticket
    ticket_id = random.randint(200, 300)
    logging.info(f'Restaurant ticket {request_data} created')
    logging.info(f'Ticket details: {payload}')

    return asdict(create_ticket_message.Response(
        ticket_id=ticket_id
    ))

Here, saga_step_handler decorator sends payload (that our function returns) to CREATE_ORDER_SAGA_RESPONSE_QUEUE.

Response task names are computed based on initial task names (see success_task_name and failure_task_name functions). For example, for "request" Celery task named restaurant_service.create_ticket, corresponding "response" Celery task (which will be handled by Orchestrator) will be named as restaurant_service.create_ticket.response.success

Orchestrator worker

It's an order_service worker, the heart of saga orchestration.

It listens to replies from Saga Handler Services, does certain actions like

def create_restaurant_ticket_on_success(self, step: BaseStep, payload: dict):
    response = create_ticket_message.Response(**payload)
    logging.info(f'Restaurant ticket # {response.ticket_id} created')

    self.saga_state.order.update(restaurant_ticket_id=response.ticket_id)

and launches next saga step (or rolls back a saga if error occured).

See more details at https://github.com/absent1706/saga-framework

Handling replies from Saga Handler Services is also implemented with Celery. Corresponding Celery are registered automatically with CreateOrderSaga.register_async_step_handlers(), see create_order_saga_worker.py file

AsyncAPI documentation

In REST, we have Swagger / OpenAPI. In async messaging, alternative is AsyncAPI standard which provides its own IDL (interface definition language) for describing messages between various services.

Each service has its own AsyncAPI spec (see asyncapi_specification.py files) which generates specs using asyncapi-python library.

asyncapi-python library is also used for describing message schemas using dataclasses. For example, create_ticket_message.py file looks like

import dataclasses
from typing import List

import asyncapi

from saga_framework.asyncapi_utils import \
    asyncapi_message_for_success_response

TASK_NAME = 'restaurant_service.create_ticket'


@dataclasses.dataclass
class OrderItem:
    name: str
    quantity: int


@dataclasses.dataclass
class Payload:
    order_id: int
    customer_id: int
    items: List[OrderItem]


@dataclasses.dataclass
class Response:
    ticket_id: int


message = asyncapi.Message(
    name=TASK_NAME,
    title='Create restaurant ticket',
    summary='This command creates ticket so restaurant knows order details. \n'
            'In real world, ticket may be created automatically or after restaurant manager approves it '
            '(confirm that they will be able to cook desired dishes)',
    payload=Payload,
)

success_response = asyncapi_message_for_success_response(
    TASK_NAME,
    title='Ticket ID is returned',
    payload_dataclass=Response
)

These schemas are used to generate AsyncAPI specification, e.g., (see restaurant_service/restaurant_service/asyncapi_specification.py)

channels = dict([
    message_to_channel(create_ticket_message.message,
                       create_ticket_message.success_response),
    message_to_channel(reject_ticket_message.message),  # compensation step has no resppnse
    message_to_channel(approve_ticket_message.message,
                       approve_ticket_message.success_response),
])

spec = asyncapi.Specification(
    info=asyncapi.Info(
        title='Restaurant service', version='1.0.0',
        description=f'Takes command messages from "{restaurant_service_messaging.COMMANDS_QUEUE}" queue',
    ),
    channels=channels,
    components=asyncapi_components_from_asyncapi_channels(channels.values()),
    servers=fake_asyncapi_servers,
)

if __name__ == '__main__':
    import yaml
    from asyncapi.docs import spec_asjson

    print(yaml.dump(spec_asjson(spec)))

Common files

Async messages sent between services are used by both orchestrator (order_service) and handler services ().

So, messages are described in app_common folder that's shared between the services.

Ideally, there should be no common folder, but either sub-repository or, even better, internal Python package with its own versioning. However, in this demo project, we simply have shared folder