Able To Be Aggregated
Written by Ivan Dugalic - December 2021
In the previous blog post - Types And Functions, we learned how to structure the various data of our domain model by using algebraic data types (commands, states, and events), and how to achieve a clear separation of the entity’s structure and functions/behavior of the entity.
In this blog post, we are going to focus on behavior. Fmodel library1 offers generic and abstract components to specialize in for your specific case/expected behavior:
- Decider
- View
- Saga
These components are effectively modeling the processes of your business. Notice, processes
are plural. You can have more than one, and they can depend on each other.
Can we combine/aggregate these components into one, and why should we consider this?
Discovery process
To better understand this pattern, we need to go back to the first blog post (The Template) and focus on the discovery process at first.
We are building an imaginary information system / a fast-food, online ordering system.
At this stage, our focus should be on modeling the flow of information and how we produce new facts/events over time.
The events are arranged against the timeline without branching.
A single Decider component is producing these events, representing a decision-making algorithm for the whole information system.
The Decider and its state are represented by the yellow sticky note (Restaurant
).
Based on the current State and the Command it received, the Decider will make new decisions/Events.
New Events will update/evolve the State of the Decider (yellow), and the View (green).
Decider
fun restaurantDecider() = RestaurantDecider(
// Initial state of the Restaurant is `null`. It does not exist.
initialState = null,
// Exhaustive command handler(s): for each type of RestaurantCommand you are going to publish specific events/facts, as required by the current state/s of the Restaurant.
decide = { c, s ->
when (c) {
is CreateRestaurantCommand ->
// ** positive flow **
if (s == null) flowOf(RestaurantCreatedEvent(c.identifier, c.name, c.menu))
// ** negative flow 1 (publishing business error events) **
else flowOf(RestaurantNotCreatedEvent(c.identifier, c.name, c.menu, "Restaurant already exists"))
is ChangeRestaurantMenuCommand ->
if (s != null) flowOf(RestaurantMenuChangedEvent(c.identifier, c.menu))
else flowOf(RestaurantMenuNotChangedEvent(c.identifier, c.menu, "Restaurant does not exist"))
is ActivateRestaurantMenuCommand ->
if (s != null) flowOf(RestaurantMenuActivatedEvent(c.identifier, c.menuId))
else flowOf(RestaurantMenuNotActivatedEvent(c.identifier, c.menuId, "Restaurant does not exist"))
is PassivateRestaurantMenuCommand ->
if (s != null) flowOf(RestaurantMenuPassivatedEvent(c.identifier, c.menuId))
else flowOf(RestaurantMenuNotPassivatedEvent(c.identifier, c.menuId, "Restaurant does not exist"))
is PlaceRestaurantOrderCommand ->
if ((s != null && s.isValid(c))) flowOf(RestaurantOrderPlaced(c.identifier, c.lineItems, c.restaurantOrderIdentifier))
else if ((s != null && !s.isValid(c))) flowOf(RestaurantOrderRejectedEvent(c.identifier, c.restaurantOrderIdentifier, "Not on the menu"))
else flowOf(RestaurantOrderRejectedEvent(c.identifier, c.restaurantOrderIdentifier, "Restaurant does not exist"))
null -> emptyFlow() // We ignore the `null` command by emitting the empty flow. Only the Decider that can handle `null` command can be combined with other Deciders.
}
},
// Exhaustive event handler(s): for each event of type RestaurantEvent you are going to evolve from the current state/s of the [Restaurant] to a new state of the Restaurant.
evolve = { s, e ->
when (e) {
is RestaurantCreatedEvent -> Restaurant(e.identifier, e.name, RestaurantMenu(e.menu.menuId, e.menu.menuItems.map { MenuItem(it.id, it.menuItemId, it.name, it.price) }.toImmutableList(), e.menu.cuisine), Status.OPEN)
is RestaurantMenuChangedEvent ->
s?.copy(
menu = RestaurantMenu(
e.menu.menuId,
e.menu.menuItems.map { MenuItem(it.id, it.menuItemId, it.name, it.price) }.toImmutableList(),
e.menu.cuisine
)
)
is RestaurantMenuActivatedEvent -> s?.copy(menu = s.menu.copy(status = ACTIVE))
is RestaurantMenuPassivatedEvent -> s?.copy(menu = s.menu.copy(status = PASSIVE))
is RestaurantOrderPlaced -> s
is RestaurantErrorEvent -> s // Error events are not changing the state / We return current state instead.
null -> s // Null events are not changing the state / We return current state instead. Only the Decider that can handle `null` event can be combined (Monoid) with other Deciders.
}
}
)
We have a straightforward, linear model of the process, and this just might work for you.
Divide and conquer?
In more complex situations, we tend to split single process into multiple processes, so we can:
- Distribute them and scale them independently
- Archive greater degree of parallelization and concurrency
- Understand them better
- Test them easier
- Achieve Decider As A Service (DaaS) or Function As A Service (FaaS)
Our first/naive attempt could look like this:
Now we have two deciders Restaurant
and RestaurantOrder
.
We are breaking the rule/requirement/invariant: “You can place an order only if the order items are on the restaurant menu.”
Notice the PlaceRestaruantOrder
command.
It is handled by the RestaurantOrder
decider to produce the RestaurantOrderRejected
event.
The RestaurantOrder
decider can not decide this because it is no longer aware of the Restaurant menu.
It has to communicate with the Restaurant
decider to do this.
Solution space
A Saga component does the communication between these two deciders. It is acting like a dumb stateless pipe, in which case the Deciders are smart endpoints.
Notice the new commands and events we just added to the flow to support the splitting! It is also reflected in the source code by
- splitting the
RestaruantCommand
abstract/sealed class intoRestaruantCommand
andRestaruantOrderCommand
- splitting the
RestaruantEvent
abstract/sealed class intoRestaruantEvent
andRestaruantOrderEvent
- creating new Saga components of type
Saga<RestaurantOrderEvent?, RestaurantCommand?>
andSaga<RestaurantEvent?, RestaurantOrderCommand?>
- splitting the
Decider<RestaurantCommand?, Restaurant?, RestaurantEvent?>
intoDecider<RestaurantCommand?, Restaurant?, RestaurantEvent?>
andDecider<RestaurantOrderCommand?, RestaurantOrder?, RestaurantOrderEvent?>
The price of splitting a single process into multiple processes is obvious now. Sometimes, splitting is easy / does not cost much, as there are no business invariants between the processes. We should have two deciders modeling them (no new sagas or new events or commands).
Now we can run our logic in different ways:
- Distributing the logic by respectfully injecting the deciders into dedicated aggregate components:
EventSourcingAggregate<RestaurantCommand?, Restaurant?, RestaurantEvent?>
andEventSourcingAggregate<RestaurantOrderCommand?, RestaurantOrder?, RestaurantOrderEvent?>
. These two aggregates can be deployed individually, asynchronously communicating over the network, each in its transaction boundary. See the demo application. - Combining the logic by injecting all deciders into a single aggregate component:
EventSourcingAggregate<Command?, Pair<RestaurantOrder?, Restaurant?>, Event?>
. Notice thatRestaurantCommand
andRestaurantOrderCommand
are extending theCommand
, andRestaurantEvent
andRestaurantOrderEvent
are extending theEvent
. The computations and effects (saving and fetching the data) are now synchronously executed in a single transaction. See the demo application
By distributing the logic, we increase complexity on the operational side. Usually, we need message brokers to route messages between processes, and we need to adapt to the asynchronous way of communication.
By combining the logic, we decrease complexity on the operational side, and we can benefit from the simplicity of having a single transaction executing the logic synchronously.
Notice that the domain model is not changing whatever option you choose, distributed or combined. This is extremely valuable, as we now can switch between monolithic and microservices styles, and more importantly, from synchronous to asynchronous or the other way around.
Your architecture can evolve now, flattening the cost curve of the overall solution.
Combine/Aggregate
The combine
is a binary operation over the decider
, satisfying associativity and having an identity/empty element. Associativity facilitates parallelization by giving us the freedom to break problems into chunks that can be computed in parallel.
Functional paradigm and category theory define this algebra as a Monoid
.
Stated tersely, a monoid
is a type together with a binary operation (combine) over that type, satisfying associativity and having an identity element (zero/empty).
(decider1 + decider2) + decider3 = decider1 + (decider2 + decider3)
It is like you are adding numbers / +
is combine
and numbers are deciders.
By combining two or more deciders you get the new decider.
This is a formal signature of the combine
extension function defined on the decider
:
inline fun <reified C1 : C_SUPER, S1, reified E1 : E_SUPER, reified C2 : C_SUPER, S2, reified E2 : E_SUPER, C_SUPER, E_SUPER> Decider<C1?, S1, E1?>.combine(y: Decider<C2?, S2, E2?>): Decider<C_SUPER, Pair<S1, S2>, E_SUPER>
Type parameters are restricted by generic constraints. Notice the upper bound C1 : C_SUPER
.
It is only possible to use the combine
function when:
E1
andE2
have common superclassE_SUPER
/ in general, we need a SUM/OR relationship here; inheritance is just one way of modeling it:E_SUPER =
E1OR
E2`C1
andC2
have common superclassC_SUPER
/ in general, we need a SUM/OR relationship here; inheritance is just one way of modeling it:C_SUPER =
C1OR
C2`C1?
andC2?
andE1?
andE2?
are nullable.
If the constraints are not met, the function will not be available for usage!
val restaurantDecider: Decider<RestaurantCommand?, Restaurant?, RestaurantEvent?> = ...
val restaurantOrderDecider: Decider<RestaurantOrderCommand?, RestaurantOrder?, RestaurantOrderEvent?> = ...
val restaurantSaga: Saga<RestaurantOrderEvent?, RestaurantCommand?> = ...
val restaurantOrderSaga: Saga<RestaurantEvent?, RestaurantOrderCommand?> = ...
val aggregate: EventSourcingOrchestratingAggregate<Command?, Pair<RestaurantOrder?, Restaurant?>, Event?>
= eventSourcingOrchestratingAggregate(
// Combining two deciders into one.
decider = restaurantOrderDecider.combine(restaurantDecider),
// Fetch and Store events.
eventRepository = eventRepository,
// Combining individual `choreography` Sagas into single `orchestrating` Saga.
saga = restaurantOrderSaga.combine(restaurantSaga)
)
Duality
The question is rising!
Is implementing the monolith, sync system dual to implementing a distributed, async system?
In other words:
Is this statement
Decider<RestaurantCommand?, Restaurant?, RestaurantEvent?> `combine` Decider<RestaurantOrderCommand?, RestaurantOrder?, RestaurantOrderEvent?> = Decider<Command?, Pair<RestaurantOrder?, Restaurant?>, Event?>;
Decider<Command?, Pair<RestaurantOrder?, Restaurant?>, Event?> == unique model of the business
dual to
Decider<RestaurantCommand?, Restaurant?, RestaurantEvent?> == 1st part of the unique model of the business;
Decider<RestaurantOrderCommand?, RestaurantOrder?, RestaurantOrderEvent?> == 2nd part of the unique model of the business;
1st part of the unique model of our business + 2nd part of the unique model of the business == unique model of our business.
The short answer is YES, and both statements are TRUE because of that. Check the tweet please.
The combine
algebra is in the middle of it, enabling us to switch sides by aggregating processes (deciders
, views
and sagas
) in different ways!
Special credits to Jérémie Chassaing for sharing his research. It usually takes a lot of time and thinking to find and formulate these abstractions and signatures.
In the next blog post we are going to discuss other functions that are defined on the decider
, like map
and apply
.
Happy coding!