For the last 2 years, I've found myself so much into MSA and distributed transactions.
Dealing with edge cases and failover, I realized, unlike what people often say, performance is the key
To reduce technical failure because at the end of the day, you don't get to make a mess on your application code
But rather, failure happens when network is not stable and instances go down and so on, meaning that
the faster you handle traffics, the better and more reliable the system gets.
On top of this performance, I got interested in Rust's ownership system as it is very must different in handling
pointers and lifetime of values. Added to that, I got to learn that Rust type system is by far the saftest
with its low level performance.
So the last question remaining was, how fast I could make system which assumes both performance and safety.
And here is the experiment.
In this project, I deliberately chose Event Driven Architecture
with adoption of Domain Driven Design
.
Each transaction that may require subsequent transaction both internally and externally will be handled
by leaving outbox
message together in one transaction. So, there are three key architectural concerns covered
- Event Driven Architecture
- Domain Driven Design
- Outbox Pattern
Due to the immaturity of asynchronous ORM, I decided not to use Diesel(or its async variant) but SQLx.
As claimed, the notion of Aggregate
taked from DDD community, transaction boundary is made strictly by
consistency boundary that is set by aggregate.
pub trait Aggregate: Send + Sync {
fn collect_events(&mut self) -> VecDeque<Box<dyn Message>> {
if !self.events().is_empty() {
self.take_events()
} else {
VecDeque::new()
}
}
fn events(&self) -> &VecDeque<Box<dyn Message>>;
fn take_events(&mut self) -> VecDeque<Box<dyn Message>>;
fn raise_event(&mut self, event: Box<dyn Message>);
}
And each event that's raised after handling command is internally registerd.
Repository
is to keep persistence concerns outside of the domain model.
Its usability really shines in this project as I didn't use any ORM model.
If your business logic is entangled with the query logic, it easily gets messy.
You can use Repository
pattern to steer clear of that situation.
In this project, one repository for one aggregate
priciple is strictly observed.
So, every access to persistence layer is done through repository and its generic type, Aggregate
as follows:
#[async_trait]
pub trait TRepository<A: Aggregate + 'static>{
...
}
To insert, fetch and update aggregate, the following method must be implemented
pub trait TRepository {
...
async fn add(&mut self, aggregate: &mut A) -> Result<String, ApplicationError> {
self._collect_events(aggregate);
self._add(aggregate).await
}
async fn _add(&mut self, aggregate: &A) -> Result<String, ApplicationError>;
async fn get(&self, aggregate_id: &str) -> Result<A, ApplicationError>;
async fn update(&mut self, aggregate: &mut A) -> Result<(), ApplicationError> {
self._collect_events(aggregate);
self._update(aggregate).await
}
async fn _update(&mut self, aggregate: &A) -> Result<(), ApplicationError>;
}
As you can see, _collect_events
event hook is followed by the query operations such as _add
and _update
.
This is to collect events that are raised in domain logic so it can trigger the subsequent event handling logics.
pub trait TRepository {
...
fn get_events(&mut self) -> VecDeque<Box<dyn Message>>;
fn set_events(&mut self, events: VecDeque<Box<dyn Message>>);
fn _collect_events(&mut self, aggregate: &mut A) {
self.set_events(aggregate.collect_events())
}
}
Here, we have get_events
and set_events
methods that must be implemeted on any concrete repository that implements TRepository
trait. If then, _collect_event
is called on every add
and update
operation, as those operations are what requires data changes.
Notice, however, that these events are for internal handling that happens to be side effects of command handling but not necessarily as important as command call. But what if the following event is as important so you can't just ignore the failure? Then we need to take more strigent data saving mechanism. The choice in this project is outbox pattern.
For that, we need higher level of abstraction for transaction management - UnitOfWork
.
Unit of work is what guarantees an atomic transaction WITHIN a service handler. Service handler is what provides a certain service to the end user which is also referred to as application handler. So, it must hold a connection to storage of your interest and access to repository:
// library::services::unit_of_work.rs
pub struct UnitOfWork<R, A>
where
R: TRepository<A>,
A: Aggregate + 'static,
{
executor: Arc<RwLock<Executor>>,
context: AtomicContextManager,
repository: R,
_aggregate: PhantomData<A>,
}
Here, UnitOfWork
has generic parameter R
and A
which represents Repository
and Aggregate
respectively and holds context
and executor
. Leaving aside context
and executor
, every time a client needs to access repository, it needs to go through UnitOfWork
as follows:
...
pub fn repository(&mut self) -> &mut R {
&mut self.repository
}
More importantly, you may have multiple access to repository for various reasons. Regardlessly, every operation should be within one and only one atomic transaction. For that, UnitOfWork
assumes a number of methods that emulates database-related operations:
pub async fn begin(&mut self) -> Result<(), ApplicationError> {
// TODO Need to be simplified
let mut executor = self.executor.write().await;
executor.begin().await
}
pub async fn commit(mut self) -> ApplicationResult<()> {
// To drop uow itself!
self._commit_hook().await?;
self._commit().await
}
async fn _commit(&mut self) -> ApplicationResult<()> {
let mut executor = self.executor.write().await;
executor.commit().await
}
pub async fn rollback(self) -> ApplicationResult<()> {
let mut executor = self.executor.write().await;
executor.rollback().await
}
Also note that everytime commit or rollback is invoked, that's the end of UnitOfWork
too. You can tell it by the signature thereof.
Again, events must be persisted. And the probably the most appropriate place for that would be right before the commit
operation. As you may have noticed, we have _commit_hook
just for that.
pub async fn _commit_hook(&mut self) -> ApplicationResult<()> {
let event_sender = &mut self.context.write().await.sender;
let mut outboxes = vec![];
for e in self.repository.get_events() {
if e.externally_notifiable() {
outboxes.push(e.outbox());
};
if e.internally_notifiable() {
event_sender
.send(e.message_clone())
.await
.expect("Event Collecting failed!")
}
}
Outbox::add(self.executor(), outboxes).await
}
Here, context
shows up again and it actually has sender out of channel and its other paring receiver is listening to the channel. We will talk about that later. The rest of the codes is about sorting out the event type and deciding whether or not it is saved in the form of outbox
or sent to channel.
We've come a long way. Let's look at how this whole service is used from bird's eye view:
pub struct ServiceHandler;
impl ServiceHandler {
pub fn create_board(
cmd: CreateBoard,
context: AtomicContextManager,
) -> Future<ServiceResponse> {
Box::pin(async move {
let mut uow = UnitOfWork::<Repository<BoardAggregate>, BoardAggregate>::new(context.clone()).await;
uow.begin().await.unwrap();
let builder = BoardAggregate::builder();
let mut board_aggregate: BoardAggregate = builder.build();
board_aggregate.create_board(cmd);
let res = uow.repository().add(&mut board_aggregate).await?;
uow.commit().await?;
Ok(res.into())
})
}
...
}
Inside async block, we firstly initialize UnitOfWork
given the context with context
which plays a role of managing whole lifespan of local session and sending message to receiver. Transaction begin
follows, and commit
is placed right before returning the result. Between the begin
and commit
operation, we have lines of code that processes business logic.
It wouldn't be really useful to have such a rather complicated system if the whole thing was for simple request-and-response system. At the heart of event driven architecture, there is 'what if' mindset. For example, you want to implement checkout service which calls payment service
. Within a payment service however, you often have a number of steps we need to go through such as:
- PG transaction
- Point system call
- Coupon service call
- Notifying delivery service upon the finalization of the process
None of them is not business critical, which means that you want to gear yourself up for failover. Even without failover, the simple call for payment service
is not actually one-off call. That means, you want to have a central controller that manages:
- handling
command
, the first call from enduser - collecting of events
- assigning the events to the appropriate handlers
- collecting subsequent events about both happy and fail cases
To achieve that, we can think of things like Saga
or Process Manager
but those are one of these things you can think of only when the service grows beyond the certain level. For this project, I assume everything is handled within the application. Nonethless, the architecture adopted in this project just suffices enough to explain how it should work even for the larger system. And here, we have MessageBus
// library::services::messagebus.rs
pub struct MessageBus {
#[cfg(test)]
pub book_keeper: AtomicI32,
command_handler: &'static CommandHandler<AtomicContextManager>,
event_handler: &'static EventHandler<AtomicContextManager>,
}
Okay, MessageBus
holds static reference to both command handler(s) and event handler(s). Firstly let's see handle
method, the entry to any request to the any service in your application.
impl MessageBus {
...
pub async fn handle<C>(&self, message: C) -> ApplicationResult<ServiceResponse>
where
C: Command + AnyTrait,
{
let (context_manager, mut event_receiver) = ContextManager::new().await;
let res = self
.command_handler
.get(&message.type_id())
.ok_or_else(|| {
eprintln!("Unprocessable Command Given!");
ApplicationError::CommandNotFound
})?(message.as_any(), context_manager.clone())
.await?;
'event_handling_loop: loop {
// * use of try_recv is to stop blocking it when all events are drained.
match event_receiver.try_recv() {
// * Logging!
Ok(msg) => {
if let Err(ApplicationError::EventNotFound) =
self.handle_event(msg, context_manager.clone()).await
{
continue;
};
}
Err(TryRecvError::Empty) => {
if Arc::strong_count(&context_manager) == 1 {
break 'event_handling_loop;
} else {
continue;
}
}
Err(TryRecvError::Disconnected) => break 'event_handling_loop,
};
}
drop(context_manager);
Ok(res)
}
}
If, ContextManager
shows up again, and as explained, the initialization of this object is PER TRAFFIC, so it wouldn't cause 'logical' race condition and the like. And the initialization returns not only ContextManager
but also event_receiver(or listener). With the command argument you pass to this method, you process command first and in the service, as we investigated before, events are collected, sorted out and some of them are sent to the receiver. The next step is therefore taking out the events and calling handle_event
method. If nothing is sent, it ends the loop and return the result you get from command_handler
.
``
async fn handle_event(
&self,
msg: Box<dyn Message>,
context_manager: AtomicContextManager,
) -> ApplicationResult<()> {
// ! msg.topic() returns the name of event. It is crucial that it corresponds to the key registered on Event Handler.
let handlers = self
.event_handler
.get(&msg.metadata().topic)
.ok_or_else(|| {
eprintln!("Unprocessable Event Given! {:?}", msg);
ApplicationError::EventNotFound
})?;
for handler in handlers.iter() {
match handler(msg.message_clone(), context_manager.clone()).await {
Err(ApplicationError::StopSentinel) => {
eprintln!("Stop Sentinel Reached!");
break;
}
Err(err) => {
eprintln!("Error Occurred While Handling Event! Error:{}", err);
}
Ok(_val) => {
println!("Event Handling Succeeded!");
}
};
#[cfg(test)]
{
self.book_keeper
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
drop(context_manager);
Ok(())
}
Unlike command, event may have a number of attatched handlers so iterating over the handlers given the message topic is required. For the specific handler, you may go through failure after which you should stop the subsequent event handling. For that, Your application may want to have StopSentinel
error and when that is caught, you just break out of the loop. Logically, before sending StopSentinel
, if failover handling is necessary, the sentinel raising handler should issue another event for those cases, which will be handled with another set of handlers attached to overcome the failure.