Aggregates
waku supports two approaches to modeling event-sourced aggregates: OOP aggregates (mutable, class-based)
and functional deciders (immutable, function-based).
|
OOP Aggregate |
Functional Decider |
| State |
Mutable object |
Immutable value |
| Testing |
Assert aggregate properties |
Given/When/Then DSL |
| Complexity |
Simpler for basic CRUD |
Better for complex decision logic |
| Snapshots |
SnapshotEventSourcedRepository |
SnapshotDeciderRepository |
Tip
Start with OOP aggregates. Move to deciders when decision logic is complex or you want
pure-function testability.
Domain Events
Both approaches share the same event definitions — frozen dataclasses implementing INotification:
| from dataclasses import dataclass
from waku.cqrs import INotification
@dataclass(frozen=True, kw_only=True)
class AccountOpened(INotification):
account_id: str
owner: str
@dataclass(frozen=True, kw_only=True)
class MoneyDeposited(INotification):
account_id: str
amount: int
@dataclass(frozen=True, kw_only=True)
class MoneyWithdrawn(INotification):
account_id: str
amount: int
|
OOP Aggregates
The classic approach — extend EventSourcedAggregate, raise events through command methods,
and apply them to mutate internal state. This walkthrough builds a complete bank account
example from aggregate to running application.
Defining the Aggregate
| from typing_extensions import override
from waku.cqrs import INotification
from waku.eventsourcing import EventSourcedAggregate
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
class BankAccount(EventSourcedAggregate):
def __init__(self) -> None:
super().__init__()
self.account_id: str = ''
self.owner: str = ''
self.balance: int = 0
def open(self, account_id: str, owner: str) -> None:
self._raise_event(AccountOpened(account_id=account_id, owner=owner))
def deposit(self, account_id: str, amount: int) -> None:
if amount <= 0:
msg = 'Deposit amount must be positive'
raise ValueError(msg)
self._raise_event(MoneyDeposited(account_id=account_id, amount=amount))
def withdraw(self, account_id: str, amount: int) -> None:
if amount > self.balance:
msg = f'Insufficient funds: balance={self.balance}, requested={amount}'
raise ValueError(msg)
self._raise_event(MoneyWithdrawn(account_id=account_id, amount=amount))
@override
def _apply(self, event: INotification) -> None:
match event:
case AccountOpened(account_id=account_id, owner=owner):
self.account_id = account_id
self.owner = owner
case MoneyDeposited(amount=amount):
self.balance += amount
case MoneyWithdrawn(amount=amount):
self.balance -= amount
|
Why constructor fields have placeholder defaults
The aggregate is never used in its initial state — _apply() sets the real values when
replaying the creation event. The defaults ('', 0) exist only to satisfy the type
checker and provide a valid initial shape for the object.
Key points:
_raise_event() first applies the event (state mutation), then queues it for persistence
_apply() must handle every event type the aggregate can produce
- Use
match statements for clean event routing
Repository
Subclass EventSourcedRepository with the aggregate type parameter:
| from waku.eventsourcing import EventSourcedRepository
from app.aggregate import BankAccount
class BankAccountRepository(EventSourcedRepository[BankAccount]):
pass
|
Command Handler
EventSourcedCommandHandler coordinates loading, executing, saving, and publishing:
| from dataclasses import dataclass
from typing_extensions import override
from waku.cqrs import Request, Response
from waku.eventsourcing import EventSourcedCommandHandler
from app.aggregate import BankAccount
@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
account_id: str
@dataclass(frozen=True, kw_only=True)
class OpenAccountCommand(Request[OpenAccountResult]):
account_id: str
owner: str
class OpenAccountHandler(EventSourcedCommandHandler[OpenAccountCommand, OpenAccountResult, BankAccount]):
@override
def _aggregate_id(self, request: OpenAccountCommand) -> str:
return request.account_id
@override
def _is_creation_command(self, request: OpenAccountCommand) -> bool:
return True
@override
async def _execute(self, request: OpenAccountCommand, aggregate: BankAccount) -> None:
aggregate.open(request.account_id, request.owner)
@override
def _to_response(self, aggregate: BankAccount) -> OpenAccountResult:
return OpenAccountResult(account_id=aggregate.account_id)
@dataclass(frozen=True, kw_only=True)
class DepositResult(Response):
balance: int
@dataclass(frozen=True, kw_only=True)
class DepositCommand(Request[DepositResult]):
account_id: str
amount: int
class DepositHandler(EventSourcedCommandHandler[DepositCommand, DepositResult, BankAccount]):
@override
def _aggregate_id(self, request: DepositCommand) -> str:
return request.account_id
@override
async def _execute(self, request: DepositCommand, aggregate: BankAccount) -> None:
aggregate.deposit(request.account_id, request.amount)
@override
def _to_response(self, aggregate: BankAccount) -> DepositResult:
return DepositResult(balance=aggregate.balance)
|
Override _is_creation_command() to return True for commands that create new aggregates.
For all other commands, the handler loads the aggregate from the store.
EventSourcedVoidCommandHandler is available for commands that don't return a response.
Module Wiring
Register aggregates, event types, and command handlers with the module system:
| from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from waku.eventsourcing.store.in_memory import InMemoryEventStore
from app.commands import (
DepositCommand,
DepositHandler,
OpenAccountCommand,
OpenAccountHandler,
)
from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn
from app.repository import BankAccountRepository
@module(
extensions=[
EventSourcingExtension().bind_aggregate(
repository=BankAccountRepository,
event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
),
MediatorExtension()
.bind_request(OpenAccountCommand, OpenAccountHandler)
.bind_request(DepositCommand, DepositHandler),
],
)
class BankModule:
pass
@module(
imports=[
BankModule,
EventSourcingModule.register(EventSourcingConfig(store=InMemoryEventStore)),
MediatorModule.register(),
],
)
class AppModule:
pass
|
Run
Wire everything together and send commands through the mediator:
| import asyncio
from waku import WakuFactory
from waku.cqrs import IMediator
from app.commands import DepositCommand, OpenAccountCommand
from app.modules import AppModule
async def main() -> None:
app = WakuFactory(AppModule).create()
async with app, app.container() as container:
mediator = await container.get(IMediator)
await mediator.send(OpenAccountCommand(account_id='acc-1', owner='dex'))
result = await mediator.send(DepositCommand(account_id='acc-1', amount=500))
print(f'Balance: {result.balance}')
if __name__ == '__main__':
asyncio.run(main())
|
See Event Store for PostgreSQL setup.
Functional Deciders
The decider pattern separates state, decisions, and evolution into pure functions.
State is immutable — each event produces a new state value.
Defining State
| from dataclasses import dataclass
@dataclass(frozen=True)
class BankAccountState:
owner: str = ''
balance: int = 0
|
Defining the Decider
A decider implements three methods from the IDecider protocol:
initial_state() — returns the starting state
decide(command, state) — validates and returns new events
evolve(state, event) — applies an event to produce new state
| from dataclasses import dataclass, replace
from waku.eventsourcing import IDecider
from app.events import AccountOpened, MoneyDeposited
from app.state import BankAccountState
@dataclass(frozen=True, kw_only=True)
class OpenAccount:
account_id: str
owner: str
@dataclass(frozen=True, kw_only=True)
class DepositMoney:
account_id: str
amount: int
BankCommand = OpenAccount | DepositMoney
BankEvent = AccountOpened | MoneyDeposited
class BankAccountDecider(IDecider[BankAccountState, BankCommand, BankEvent]):
def initial_state(self) -> BankAccountState:
return BankAccountState()
def decide(self, command: BankCommand, state: BankAccountState) -> list[BankEvent]:
match command:
case OpenAccount(account_id=aid, owner=owner):
return [AccountOpened(account_id=aid, owner=owner)]
case DepositMoney(account_id=aid, amount=amount):
if amount <= 0:
msg = 'Deposit amount must be positive'
raise ValueError(msg)
return [MoneyDeposited(account_id=aid, amount=amount)]
def evolve(self, state: BankAccountState, event: BankEvent) -> BankAccountState:
match event:
case AccountOpened(owner=owner):
return replace(state, owner=owner)
case MoneyDeposited(amount=amount):
return replace(state, balance=state.balance + amount)
return state
|
Repository
DeciderRepository requires three type parameters: [State, Command, Event].
| from waku.eventsourcing import DeciderRepository
from app.decider import BankCommand, BankEvent
from app.state import BankAccountState
class BankAccountDeciderRepository(DeciderRepository[BankAccountState, BankCommand, BankEvent]):
pass
|
Command Handler
DeciderCommandHandler adds a _to_command() step that converts the CQRS request into a domain command:
| from dataclasses import dataclass
from typing_extensions import override
from waku.cqrs import Request, Response
from waku.eventsourcing import DeciderCommandHandler
from app.decider import BankCommand, BankEvent, OpenAccount
from app.state import BankAccountState
@dataclass(frozen=True, kw_only=True)
class OpenAccountResult(Response):
owner: str
@dataclass(frozen=True, kw_only=True)
class OpenAccountRequest(Request[OpenAccountResult]):
account_id: str
owner: str
class OpenAccountDeciderHandler(
DeciderCommandHandler[
OpenAccountRequest,
OpenAccountResult,
BankAccountState,
BankCommand,
BankEvent,
],
):
@override
def _aggregate_id(self, request: OpenAccountRequest) -> str:
return request.account_id
@override
def _to_command(self, request: OpenAccountRequest) -> BankCommand:
return OpenAccount(account_id=request.account_id, owner=request.owner)
@override
def _is_creation_command(self, request: OpenAccountRequest) -> bool:
return True
@override
def _to_response(self, state: BankAccountState, version: int) -> OpenAccountResult:
return OpenAccountResult(owner=state.owner)
|
DeciderVoidCommandHandler is available for commands that don't return a response.
Module Wiring
Use bind_decider() instead of bind_aggregate():
| from waku import module
from waku.cqrs import MediatorExtension, MediatorModule
from waku.eventsourcing import EventSourcingConfig, EventSourcingExtension, EventSourcingModule
from waku.eventsourcing.store.in_memory import InMemoryEventStore
from app.decider import BankAccountDecider
from app.events import AccountOpened, MoneyDeposited
from app.handler import OpenAccountDeciderHandler, OpenAccountRequest
from app.repository import BankAccountDeciderRepository
@module(
extensions=[
EventSourcingExtension().bind_decider(
repository=BankAccountDeciderRepository,
decider=BankAccountDecider,
event_types=[AccountOpened, MoneyDeposited],
),
MediatorExtension().bind_request(OpenAccountRequest, OpenAccountDeciderHandler),
],
)
class BankDeciderModule:
pass
@module(
imports=[
BankDeciderModule,
EventSourcingModule.register(EventSourcingConfig(store=InMemoryEventStore)),
MediatorModule.register(),
],
)
class AppModule:
pass
|
The bootstrap and run code is the same as the OOP example — swap the module import.
Shared Features
The following features apply to both OOP aggregates and functional deciders.
Idempotency
Command handlers support idempotent event appends through the _idempotency_key() hook.
Override it to extract a deduplication token from the incoming request:
When an idempotency_key is provided, the repository generates per-event keys in the format
{idempotency_key}:0, {idempotency_key}:1, etc. Retrying the same command with the same key
is safe — the event store returns the existing stream version without duplicating events.
See Event Store — Idempotency for deduplication semantics and error handling.
Stream Length Guard
Repositories can enforce a maximum stream length to prevent unbounded event replay. Set the
max_stream_length class variable on your repository:
When a stream exceeds the configured limit, load() raises StreamTooLargeError with a message
guiding you to configure snapshots.
Tip
The default is None (no limit). Use this as a safety valve for aggregates that
might accumulate many events — it catches unbounded growth before it impacts performance.
Note
Snapshot-aware repositories (SnapshotEventSourcedRepository, SnapshotDeciderRepository)
inherit the guard but only apply it during full replay. When a valid snapshot exists, the
repository replays only the events after the snapshot, which naturally stays within bounds.
Concurrency Control
Both repository types use ExpectedVersion for optimistic concurrency:
| Variant |
Behavior |
NoStream() |
Stream must not exist (creation) |
Exact(version=N) |
Stream version must match exactly |
StreamExists() |
Stream must exist (any version) |
AnyVersion() |
No version check |
The repositories handle this automatically — NoStream for new aggregates,
Exact for existing ones. A ConcurrencyConflictError is raised on mismatch.
Automatic Retry
Both EventSourcedCommandHandler and DeciderCommandHandler include a built-in retry loop
for optimistic concurrency conflicts. When save() raises ConcurrencyConflictError, the
handler re-loads the aggregate from the store and re-executes the command with fresh state.
The default is 3 attempts (1 initial + 2 retries). Override max_attempts on your handler
subclass to change this:
Set max_attempts = 1 for no retries — only the initial attempt runs, and ConcurrencyConflictError propagates immediately.
Note
Creation commands (where _is_creation_command() returns True) are not retried.
A NoStream conflict means another process already created the stream — retrying with
a fresh aggregate would produce the same failure. Handle this case in your application
logic (e.g., load the existing aggregate and update it).
Tip
The retry loop re-reads state from the event store on each attempt, so it always works
with the latest version. No backoff is applied — the handler retries immediately with
fresh state.
Aggregate Naming
Both repository types auto-resolve aggregate_name from their type parameters. This name
determines the event stream prefix (e.g., BankAccount-acc-1).
Resolution rules
| Pattern |
Source |
Example |
| OOP |
Aggregate class name, as-is |
EventSourcedRepository[BankAccount] → "BankAccount" |
| Decider |
State class name, State suffix stripped |
DeciderRepository[BankAccountState, ...] → "BankAccount" |
For deciders, the canonical naming convention is {AggregateName}State (e.g., CounterState,
BankAccountState). The State suffix is automatically removed to derive the stream prefix.
If the state class has no State suffix, the full name is used as-is.
Explicit override
Set aggregate_name as a class variable to override auto-resolution:
class LegacyAccountRepo(EventSourcedRepository[BankAccount]):
aggregate_name = 'Account'
Uniqueness
aggregate_name must be unique across all repositories in the application.
Duplicate names are detected at startup and raise DuplicateAggregateNameError.
Two repositories with the same aggregate_name would write to the same event streams,
causing data corruption.
Warning
The stream ID format uses a hyphen separator (BankAccount-acc-1), so aggregate_name
must not contain hyphens. This is validated at StreamId construction time.
Further reading
- Event Store — in-memory and PostgreSQL event persistence
- Projections — build read models from event streams
- Snapshots — optimize loading for long-lived aggregates
- Testing — Given/When/Then DSL for aggregates and deciders
- Schema Evolution — upcasting and event type registries