Skip to content

Routing & Endpoints

Every send() and publish() call dispatches through an endpoint. When no explicit routing is configured, waku auto-creates a default local queue endpoint and assigns all registered handlers to it. Only invoke() is truly inline — it always executes in the caller's DI scope and returns a typed response.

Routing lets you control which endpoint a message goes to. You declare endpoints (local queues or external transports) and map message types or entire modules to them.

graph LR
    Invoke["bus.invoke(req)"] --> Inline[Inline Handler]
    Send["bus.send(msg)"] --> EP[Endpoint]
    Publish["bus.publish(msg)"] --> EP
    EP --> Worker[Background Worker]
    Worker --> Handler[Handler in fresh scope]

Default Behavior

Without routing configuration, send() and publish() dispatch through an auto-created default local queue endpoint:

1
2
3
4
from waku.messaging import MessagingConfig, MessagingModule

# No explicit endpoints — a default local queue is created automatically.
MessagingModule.register(MessagingConfig())

This is equivalent to calling MessagingModule.register() with no arguments. Handlers run asynchronously in a background worker with a fresh DI scope. If your application only needs synchronous, in-process handling, use invoke().


Local Queue Endpoints

A local queue is a buffered, in-process endpoint backed by an anyio memory stream. Messages sent to a local queue are enqueued and return immediately. A background worker task drains the queue and processes each message in a fresh DI scope.

local_queue() creates an endpoint descriptor that MessagingModule uses to construct a live LocalQueueEndpoint during application initialization:

from waku.messaging.endpoints.base import local_queue

entry = local_queue('domain-events')

The string argument is the endpoint URI — a logical name you reference in route declarations.

When to use local queues

Local queues are useful when you want fire-and-forget semantics without leaving the process. Common cases: sending emails, updating projections, recording analytics. They decouple the handler's execution time from the caller's response time.


Per-Type Routing

Use route(MessageType).to('endpoint-uri') to route a specific message type to an endpoint:

1
2
3
4
5
6
7
8
9
from waku.messaging import MessagingConfig, MessagingModule
from waku.messaging.endpoints.base import local_queue
from waku.messaging.router import route

config = MessagingConfig(
    endpoints=[local_queue('domain-events')],
    routing=[route(OrderPlaced).to('domain-events')],
)
MessagingModule.register(config)

When bus.publish(OrderPlaced(...)) is called, handlers for OrderPlaced are dispatched to the domain-events endpoint. Handlers in other modules that are not covered by the route are dispatched to the default endpoint (see Additive Routing).


Module-Level Routing

Use route_module(Module).to('endpoint-uri') to route all message types registered in a module to an endpoint. This is more maintainable than per-type routing when a module owns many message types:

1
2
3
4
5
6
7
8
9
from waku.messaging import MessagingConfig, MessagingModule
from waku.messaging.endpoints.base import local_queue
from waku.messaging.router import route_module

config = MessagingConfig(
    endpoints=[local_queue('domain-events')],
    routing=[route_module(PaymentModule).to('domain-events')],
)
MessagingModule.register(config)

Every message type bound via MessagingExtension().bind(...) inside PaymentModule is routed to the domain-events endpoint.


Additive Routing

When a message has handlers in multiple modules and only some are explicitly routed, routing is additive:

  1. Explicitly routed handlers dispatch to the specified endpoint.
  2. Remaining handlers dispatch to the default endpoint.
graph TD
    Publish["bus.publish(OrderPlaced)"] --> Check{Explicitly routed?}
    Check -->|Module A handler routed| EP[Named Endpoint]
    EP --> WorkerA[Worker: Module A handler]
    Check -->|Module B handler not routed| Default[Default Endpoint]
    Default --> WorkerB[Worker: Module B handler]

Consider an example where OrderPlaced has handlers in two modules:

  • Module A — handler is routed to a named local queue endpoint.
  • Module B — handler has no explicit route, so it goes to the default endpoint.

When you call bus.publish(OrderPlaced(...)), both handlers run asynchronously in their respective endpoints.


Routing Precedence

Routes are evaluated in this order:

Source Example
Per-type route route(OrderPlaced).to('events')
Module-level route route_module(OrdersModule).to('events')
Default endpoint No explicit route configured

A per-type route overrides a module-level route for the same message type. If route(OrderPlaced).to('priority') and route_module(OrdersModule).to('events') both match OrderPlaced, only the per-type route applies. Unrouted handlers go to the default endpoint.


Endpoint Lifecycle

Endpoints are managed automatically by EndpointLifecycleExtension, which is registered internally by MessagingModule. You do not need to start or stop endpoints manually.

Phase Action
After app init All endpoints are started (background workers spawn)
On app shutdown All endpoints are stopped (queues drain and close)

Endpoints start after all modules have been initialized and stop in reverse order during shutdown.


Method Semantics with Routing

Each dispatch method interacts with routing differently:

Method Routable Behavior
invoke() No Always inline. Returns a typed response.
send() Yes Always endpoint-dispatched. Raises NoRouteError if no handler registered.
publish() Yes Always endpoint-dispatched. Silent no-op if no handlers registered.

invoke() is never routed

invoke() always executes inline because it returns a typed response to the caller. Routing is inherently asynchronous — there is no way to return a response from a background worker. Use send() if you want a routable fire-and-forget dispatch.


Further reading