Skip to content

Projections

Projections build read models from event streams. waku provides two types:

  • Inline projections run synchronously during append_to_stream, guaranteeing immediate consistency between writes and reads.
  • Catch-up projections process events asynchronously in a background loop, eventually catching up with the event store.
graph LR
    subgraph Inline
        direction LR
        W[append_to_stream] --> P1[Projection]
        P1 --> RM1[Read Model]
    end
    subgraph Catch-Up
        direction LR
        ES[Event Store] -->|poll| R[Runner]
        R -->|batch| P2[Projection]
        P2 --> RM2[Read Model]
        P2 -->|checkpoint| CP[(Checkpoint Store)]
    end

Inline Projections

Implement IProjection to create an inline projection. It runs inside the same scope as append_to_stream, so the read model is always consistent with the write model.

Every projection must define a projection_name class attribute.

from collections.abc import Sequence

from waku.eventsourcing.contracts.event import StoredEvent
from waku.eventsourcing.contracts.stream import StreamId
from waku.eventsourcing.projection.interfaces import IProjection

from app.events import MoneyDeposited, MoneyWithdrawn


class AccountBalanceProjection(IProjection):
    projection_name = 'account_balance'

    def __init__(self) -> None:
        self.balances: dict[StreamId, int] = {}

    async def project(self, events: Sequence[StoredEvent], /) -> None:
        for event in events:
            match event.data:
                case MoneyDeposited(amount=amount):
                    self.balances[event.stream_id] = self.balances.get(event.stream_id, 0) + amount
                case MoneyWithdrawn(amount=amount):
                    self.balances[event.stream_id] = self.balances.get(event.stream_id, 0) - amount

Register inline projections through bind_aggregate() (or bind_decider()):

EventSourcingExtension().bind_aggregate(
    repository=BankAccountRepository,
    event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    projections=[AccountBalanceProjection],
)

Warning

Inline projections add latency to every write because they execute within the same operation. Keep them lightweight or use catch-up projections for expensive read model updates.

Catch-Up Projections

Implement ICatchUpProjection for projections that run asynchronously in a background process. Catch-up projections poll the event store, process events in batches, and checkpoint their progress.

At-least-once delivery

The checkpoint is saved after project() processes a batch. If the process crashes between projection and checkpoint save, the same batch will be re-delivered on restart.

project() must be idempotent.

Error handling is configured per-projection via bind_catch_up_projection() (defaults to ErrorPolicy.STOP with no retries — see Error Policies).

Each catch-up projection also has two optional hooks:

  • teardown() — called during rebuilds to clean up existing state
  • on_skip(events, error) — called when a batch is skipped due to errors
from collections.abc import Sequence
from dataclasses import dataclass

from waku.eventsourcing.contracts.event import StoredEvent
from waku.eventsourcing.projection.interfaces import ICatchUpProjection

from app.events import AccountOpened, MoneyDeposited, MoneyWithdrawn


@dataclass
class AccountSummary:
    owner: str = ''
    balance: int = 0
    transaction_count: int = 0


class AccountSummaryProjection(ICatchUpProjection):
    projection_name = 'account_summary'

    def __init__(self) -> None:
        self.summaries: dict[str, AccountSummary] = {}

    async def project(self, events: Sequence[StoredEvent], /) -> None:
        for event in events:
            stream_key = event.stream_id.stream_key
            summary = self.summaries.setdefault(stream_key, AccountSummary())

            match event.data:
                case AccountOpened(owner=owner):
                    summary.owner = owner
                case MoneyDeposited(amount=amount):
                    summary.balance += amount
                    summary.transaction_count += 1
                case MoneyWithdrawn(amount=amount):
                    summary.balance -= amount
                    summary.transaction_count += 1

    async def teardown(self) -> None:
        self.summaries.clear()

Register catch-up projections via bind_catch_up_projection():

from waku.eventsourcing.projection.interfaces import ErrorPolicy

(
    EventSourcingExtension()
    .bind_aggregate(
        repository=BankAccountRepository,
        event_types=[AccountOpened, MoneyDeposited, MoneyWithdrawn],
    )
    .bind_catch_up_projection(
        AccountSummaryProjection,
        error_policy=ErrorPolicy.SKIP,
        max_retry_attempts=3,
    )
)

Event Type Filtering

Catch-up projections can declare which event types they handle via the event_types class variable. When set, the projection only receives events of those types — the event store filters at the query level, avoiding unnecessary reads.

class OrderSummaryProjection(ICatchUpProjection):
    projection_name = 'order_summary'
    event_types = [OrderPlaced, OrderShipped]
    ...

When event_types is None (the default), all events are delivered.

Tip

Always set event_types on projections that only care about a few event types. In systems with hundreds of event types, this avoids reading and discarding irrelevant events on every polling cycle.

Error Policies

Policy Behavior
ErrorPolicy.STOP Stop the projection (default)
ErrorPolicy.SKIP Skip failed batch and continue; calls on_skip() hook before advancing

Both policies retry first when max_retry_attempts > 0. The policy only applies after retries are exhausted.

Fail-fast by default

The default is STOP with zero retries — projection failures surface immediately rather than silently losing events. Opt into retries or SKIP explicitly when your projection can tolerate partial gaps or transient errors.

CatchUpProjectionRunner

CatchUpProjectionRunner polls the event store and dispatches event batches to registered catch-up projections. Each projection runs in its own concurrent task.

Use the create() classmethod to build a runner from a DI container:

runner = await CatchUpProjectionRunner.create(
    container,
    lock=InMemoryProjectionLock(),
    projections=[AccountSummaryProjection],  # optional filter; None = all registered
    polling=PollingConfig(),                 # optional; defaults to sensible values
)
await runner.run()

create() resolves the CatchUpProjectionRegistry from the container to discover bindings. When projections is provided, only those projection classes are included.

from waku.di import AsyncContainer
from waku.eventsourcing.projection.lock.in_memory import InMemoryProjectionLock
from waku.eventsourcing.projection.runner import CatchUpProjectionRunner


async def run_projections(container: AsyncContainer) -> None:
    runner = await CatchUpProjectionRunner.create(
        container=container,
        lock=InMemoryProjectionLock(),
    )
    await runner.run()

The runner listens for SIGTERM and SIGINT to shut down. Call request_shutdown() for programmatic shutdown when running inside another async context.

Use rebuild(projection_name) to reprocess all events from the beginning. This calls teardown() on the projection, resets the checkpoint to -1 (nothing processed), and replays every event through the projection.

Tip

Run the projection runner as a separate process (e.g., a dedicated worker or sidecar) so it does not block your main application.

Configuration

All per-projection behavior — batch size, error handling, retry, and gap detection — is configured through bind_catch_up_projection():

Parameter Default Description
projection (required) The ICatchUpProjection class
error_policy ErrorPolicy.STOP What to do after retries are exhausted
max_retry_attempts 0 Retry count before applying the error policy
base_retry_delay_seconds 10.0 Initial delay between retries (exponential backoff)
max_retry_delay_seconds 300.0 Maximum delay cap for retries
batch_size 100 Maximum events per batch
gap_detection_enabled False Enable contiguity checks (see Gap Detection)
gap_timeout_seconds 10.0 Seconds before a gap is considered permanent and skipped

The runner's polling interval is configured globally via PollingConfig (passed to the runner constructor, defaults to sensible values if omitted):

Field Default Description
poll_interval_min_seconds 0.5 Minimum polling interval when events are available
poll_interval_max_seconds 5.0 Maximum polling interval when idle
poll_interval_step_seconds 1.0 Increment per idle cycle
poll_interval_jitter_factor 0.1 Random jitter factor applied to the interval

Gap Detection

When multiple writers append to the event store concurrently, a projection may read events with non-contiguous global positions — a gap appears when a concurrent transaction has not yet committed. Advancing the checkpoint past a gap would permanently skip that event.

Enable gap detection per-projection via bind_catch_up_projection():

(
    EventSourcingExtension()
    .bind_aggregate(...)
    .bind_catch_up_projection(
        AccountSummaryProjection,
        gap_detection_enabled=True,
        gap_timeout_seconds=10.0,
    )
)

When active, the processor queries committed positions from the event store and only advances the checkpoint to the last contiguous position. Gaps are tracked with a timeout — if a gap persists beyond gap_timeout_seconds, it is assumed permanent (e.g., a rolled-back transaction) and skipped.

Single-writer deployments

If your event store has a single writer process, gaps cannot occur and gap detection adds unnecessary overhead. Leave it disabled (the default).

Distributed Locking

IProjectionLock ensures only one instance of each catch-up projection runs at a time across multiple processes. This prevents duplicate processing and checkpoint conflicts.

class IProjectionLock(abc.ABC):
    @contextlib.asynccontextmanager
    async def acquire(self, projection_name: str) -> AsyncIterator[bool]:
        """Yields True if the lock was acquired, False if held by another instance."""

Choosing a Lock

graph TD
    Q1{Single process?}
    Q1 -->|Yes| IM[InMemoryProjectionLock]
    Q1 -->|No| Q2{Using PgBouncer<br/>in transaction mode?}
    Q2 -->|Yes| LB[PostgresLeaseProjectionLock]
    Q2 -->|No| Q3{Long-running projections<br/>with many connections?}
    Q3 -->|Yes| LB
    Q3 -->|No| ADV[PostgresAdvisoryProjectionLock]
InMemoryProjectionLock PostgresLeaseProjectionLock PostgresAdvisoryProjectionLock
Use case Single process, testing Multi-process production Multi-process, simple setups
Connection held None Only during heartbeats Entire lock duration
PgBouncer compatible N/A Yes No (session-bound)
Extra table required No Yes (es_projection_leases) No
Lock granularity Per process Per lease TTL Per DB session
Failure detection Instant Heartbeat interval Connection drop

InMemoryProjectionLock

Always acquires the lock immediately. Tracks held lock names for testing. Use this for single-process deployments and in tests.

PostgresLeaseProjectionLock

Uses a database table with TTL-based leases for multi-process coordination. A background heartbeat task renews the lease periodically. If the heartbeat detects the lease was stolen (e.g., by another instance after TTL expiry), it cancels the projection task.

Configured via LeaseConfig:

Field Default Description
ttl_seconds 30.0 How long the lease is valid before expiring
renew_interval_factor 1/3 Fraction of TTL at which the lease is renewed
renew_interval_seconds (derived) ttl_seconds * renew_interval_factor — read-only property

With the defaults, the lease renews every 10 seconds and expires after 30 seconds of silence.

Consistency guarantees

There is no fencing token mechanism — a stalled holder (e.g., GC pause) can briefly overlap with a new holder until its next heartbeat fires.

In practice this is safe because the runner resolves the projection, event reader, and checkpoint store from a single DI scope. When using SqlAlchemyCheckpointStore with a scoped AsyncSession, the projection writes and checkpoint save share the same transaction (the checkpoint store calls flush(), not commit()). This means either both succeed atomically or both roll back — duplicate processing from a brief overlap will not corrupt the read model.

PostgresAdvisoryProjectionLock

Uses PostgreSQL advisory locks via pg_try_advisory_lock(hashtext(name)). The lock is session-bound — it holds a database connection for the entire duration.

Warning

Advisory locks are not compatible with PgBouncer in transaction-pooling mode because the lock is tied to the database session, not the transaction. Releasing the connection back to the pool releases the lock. Use PostgresLeaseProjectionLock instead.

Checkpoint Store

ICheckpointStore tracks each catch-up projection's last processed position so it resumes from where it left off after restarts.

class ICheckpointStore(abc.ABC):
    async def load(self, projection_name: str, /) -> Checkpoint | None: ...
    async def save(self, checkpoint: Checkpoint, /) -> None: ...

The Checkpoint dataclass carries the projection name, last processed global position, and timestamp.

Built-in implementations:

  • InMemoryCheckpointStore — dictionary-backed, suitable for single-process deployments and testing
  • SqlAlchemyCheckpointStore — PostgreSQL-backed via SQLAlchemy async session

Configure the checkpoint store through EventSourcingConfig:

1
2
3
4
5
6
from waku.eventsourcing import EventSourcingConfig
from waku.eventsourcing.projection.sqlalchemy import make_sqlalchemy_checkpoint_store

es_config = EventSourcingConfig(
    checkpoint_store=make_sqlalchemy_checkpoint_store(checkpoints_table),
)

make_sqlalchemy_checkpoint_store() works the same way as make_sqlalchemy_event_store() — it returns a factory that Dishka uses to construct the store with its AsyncSession dependency injected automatically.

Table Schema Reference

es_checkpoints

Column Type Constraints Description
projection_name Text PK Unique projection identifier
position BigInteger NOT NULL Last processed global position
updated_at TIMESTAMP WITH TIME ZONE NOT NULL Last checkpoint update time
created_at TIMESTAMP WITH TIME ZONE default now() First checkpoint time

Bind with bind_checkpoint_tables(metadata) from waku.eventsourcing.projection.sqlalchemy.

es_projection_leases

Only required when using PostgresLeaseProjectionLock.

Column Type Constraints Description
projection_name Text PK Projection being locked
holder_id Text NOT NULL UUID of the lock holder instance
acquired_at TIMESTAMP WITH TIME ZONE default now() When the lease was first acquired
renewed_at TIMESTAMP WITH TIME ZONE default now() Last heartbeat renewal time
expires_at TIMESTAMP WITH TIME ZONE NOT NULL When the lease expires if not renewed

Bind with bind_lease_tables(metadata) from waku.eventsourcing.projection.lock.sqlalchemy.

The Event Store as Outbox

In event sourcing, the event store is already a durable, ordered log of everything that happened — it naturally serves as a transactional outbox.

The publisher.publish() call in command handlers is an in-process convenience — it dispatches mediator notifications to other handlers within the same process. It is not a reliability mechanism. If the process crashes after saving events but before publishing, the in-process notifications are lost.

For reliable cross-service messaging (e.g., publishing domain events to Kafka, RabbitMQ, or another service), write a catch-up projection that reads from the event store and publishes to your message broker:

from collections.abc import Sequence

from waku.eventsourcing.contracts.event import StoredEvent
from waku.eventsourcing.projection.interfaces import ICatchUpProjection


class OrderEventPublisher(ICatchUpProjection):
    projection_name = 'order_event_publisher'

    def __init__(self, broker: MessageBroker) -> None:
        self._broker = broker

    async def project(self, events: Sequence[StoredEvent], /) -> None:
        for event in events:
            await self._broker.publish(
                topic='orders',
                key=str(event.stream_id),
                value=event.data,
            )

The same at-least-once semantics apply.

Further reading

  • Snapshots — optimize loading for long-lived aggregates
  • Schema Evolution — handling evolved events in projections
  • Testing — in-memory stores and projection wait utilities