Message Brokers¶
-
Back to Bootstrap Overview
Return to the Bootstrap overview page with all configuration options.
Overview¶
Message brokers are used to publish NotificationEvent and ECSTEvent events to external systems. The python-cqrs package supports multiple message broker implementations.
| Broker Type | Use Case | Production Ready |
|---|---|---|
| DevnullMessageBroker | Testing, development | ❌ No |
| KafkaMessageBroker | High-throughput, distributed systems | ✅ Yes |
| AMQPMessageBroker | RabbitMQ, traditional message queues | ✅ Yes |
Choosing a Message Broker
- DevnullMessageBroker: Use for testing and development when you don't need actual message publishing
- KafkaMessageBroker: Use for high-throughput, distributed systems with strong ordering guarantees
- AMQPMessageBroker: Use for traditional message queue patterns, RabbitMQ integration
DevnullMessageBroker¶
The DevnullMessageBroker is a no-op broker used for testing. It doesn't actually send messages anywhere but logs warnings.
Development Only
Never use DevnullMessageBroker in production. It's designed for testing and development only.
from cqrs.message_brokers import devnull
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
message_broker=devnull.DevnullMessageBroker(),
)
KafkaMessageBroker¶
The KafkaMessageBroker publishes events to Apache Kafka topics.
from cqrs.message_brokers import kafka
from cqrs.adapters.kafka import KafkaProducerAdapter
# Create Kafka producer adapter
kafka_producer = KafkaProducerAdapter(
bootstrap_servers=["localhost:9092"],
client_id="my-app",
acks="all", # Wait for all replicas
enable_idempotence=True,
)
# Create message broker
kafka_broker = kafka.KafkaMessageBroker(
producer=kafka_producer,
aiokafka_log_level="ERROR", # Suppress verbose logging
)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=kafka_broker,
)
AMQPMessageBroker¶
The AMQPMessageBroker publishes events to RabbitMQ or other AMQP-compatible brokers.
from cqrs.message_brokers import amqp
from cqrs.adapters.amqp import AMQPPublisherAdapter
import aio_pika
# Create AMQP publisher
amqp_publisher = AMQPPublisherAdapter(
dsn="amqp://user:password@localhost/",
)
# Create message broker
amqp_broker = amqp.AMQPMessageBroker(
publisher=amqp_publisher,
exchange_name="events",
pika_log_level="ERROR",
)
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
domain_events_mapper=events_mapper,
message_broker=amqp_broker,
)
Custom Message Broker¶
You can create custom message brokers by implementing the MessageBroker protocol:
from cqrs.message_brokers import protocol
class CustomMessageBroker(protocol.MessageBroker):
async def send_message(self, message: protocol.Message) -> None:
# Custom implementation
print(f"Sending {message.message_name} to {message.topic}")
# Send to your custom broker
mediator = bootstrap.bootstrap(
di_container=di.Container(),
commands_mapper=commands_mapper,
message_broker=CustomMessageBroker(),
)