Messaging Patterns
This page describes the messaging patterns supported by the library and how they map to Azure Service Bus features. Examples use generic message types - replace them with your own domain contracts.
Fire-and-Forget
Send a message without waiting for a response. The sender doesn't know (or care) which consumer processes it.
Topology
| Entity | Type | Sessions | Notes |
|---|---|---|---|
orders | Queue | No | Multiple producers, one or more competing consumers |
Sender
// IMessageSender<OrderPlaced> is auto-registered via MapQueue
await sender.SendAsync(new OrderPlaced { OrderId = orderId, Total = 99.95m }, ct);Consumer (Azure Functions)
// Program.cs - register the queue and handler
builder.Services.AddMessagingFunctions(messaging =>
{
messaging
.MapQueue<OrderPlaced>("orders")
.MapHandler(OrderFunctions.HandleOrderPlaced)
.MapErrorHandler(async (ex, ctx, ct) =>
await ctx.DeadLetterAsync("ProcessingFailed", ex.Message, ct));
});
// Function class
public class OrderFunctions(IMessageHandler<OrderPlaced> handler)
{
[Function(nameof(ProcessOrder))]
public Task ProcessOrder(
[ServiceBusTrigger("orders", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
CancellationToken ct
) => handler.HandleAsync(message, messageActions, ct);
public static async Task HandleOrderPlaced(
OrderPlaced order, MessageContext ctx, IOrderService orderService, CancellationToken ct)
{
await orderService.FulfillAsync(order.OrderId, ct);
await ctx.CompleteAsync(ct);
}
}Consumer (Core library)
messaging.MapQueue<OrderPlaced>("orders")
.MapHandler(async (OrderPlaced order, MessageContext ctx, IOrderService svc, CancellationToken ct) =>
{
await svc.FulfillAsync(order.OrderId, ct);
await ctx.CompleteAsync(ct);
});When to use
- Event ingestion (telemetry, audit logs, analytics events)
- Background job dispatch (email, PDF generation, notifications)
- Any "produce and move on" workflow
Targeted Commands (Session Routing)
Route messages to a specific consumer using Azure Service Bus sessions. Each consumer opens a session receiver filtered to its own ID, so only it receives messages for that session.
IMPORTANT
Azure Service Bus prerequisite: The queue must be created with RequiresSession = true. This is immutable - you cannot change it after creation. See Sessions for details. [!NOTE] Offline consumers: Messages queue up in the session until a consumer reconnects and accepts the session. Azure Service Bus retains messages until the queue's DefaultMessageTimeToLive expires (default: 14 days).
Topology
| Entity | Type | Sessions | Notes |
|---|---|---|---|
worker-commands | Queue | Yes | Session ID = target worker/device/tenant ID |
How it works
Sender
Set SessionId in SendOptions to target a specific consumer:
await sender.SendAsync(
new RunTaskCommand { TaskId = taskId, Payload = "..." },
opts => opts.SessionId = workerId,
ct);Consumer
Use MapSessionHandler - the handler receives SessionContext with the session ID:
messaging.MapQueue<RunTaskCommand>("worker-commands")
.MapSessionHandler((RunTaskCommand cmd, SessionContext session, CancellationToken ct) =>
{
Console.WriteLine($"Session {session.SessionId} received task: {cmd.TaskId}");
});When multiple instances of the same process connect to the queue, add WithAcceptedSessionId so each instance only accepts sessions it owns:
var workerId = configuration.GetValue<string>("WorkerId")!;
messaging.MapQueue<RunTaskCommand>("worker-commands")
.WithAcceptedSessionId(workerId) // filter to this instance's session only
.MapSessionHandler((RunTaskCommand cmd, SessionContext session, CancellationToken ct) =>
{
Console.WriteLine($"Session {session.SessionId} received task: {cmd.TaskId}");
});WARNING
Without WithAcceptedSessionId, all competing consumers race to acquire any available session. The winning instance is non-deterministic - and it holds the session lock until expiry, so the mis-routing can appear consistent for minutes before shifting. See Sessions - Competing Consumers for details.
When to use
- Commands targeted at a specific device, worker, or agent
- Per-tenant or per-user message routing
- Ordered processing per entity (sessions guarantee FIFO within a session)
Request/Reply
Send a request and await a correlated response. The library handles session-based correlation automatically through a dedicated reply queue.
IMPORTANT
Azure Service Bus prerequisite: The reply queue must be created with RequiresSession = true. Each request uses a unique session ID so the requester only receives its own response. See Request/Reply for details.
Topology
| Entity | Type | Sessions | Notes |
|---|---|---|---|
| Request queue (varies) | Queue | Depends on pattern | Where the request is sent |
replies | Queue | Yes | Shared reply queue - sessions correlate responses |
How it works
Requester
Use RequestAsync<TResponse> - the library sets ReplyTo, ReplyToSessionId, waits on the reply queue, and deserializes the response:
// Simple request/reply (non-session request queue)
var response = await sender.RequestAsync<GetStatusResponse>(
new GetStatusRequest { ResourceId = resourceId },
ct);
// Combined with session routing - target a specific consumer
var response = await sender.RequestAsync<GetStatusResponse>(
new GetStatusRequest { ResourceId = resourceId },
opts => opts.SessionId = workerId,
ct);Responder
Inject ReplyContext to send a typed response back. The library reads ReplyTo / ReplyToSessionId from the incoming message and sends the response to the correct session on the reply queue:
messaging.MapQueue<GetStatusRequest>("worker-commands")
.MapSessionHandler(async (GetStatusRequest req, SessionContext session, ReplyContext reply, CancellationToken ct) =>
{
var status = await GetCurrentStatusAsync(req.ResourceId, ct);
await reply.RespondAsync(new GetStatusResponse
{
ResourceId = req.ResourceId,
Status = status,
Timestamp = DateTimeOffset.UtcNow
}, ct);
});Configuration
The reply queue is configured once in the messaging builder:
var serviceBus = builder.Services.AddMessaging(opts =>
{
opts.ConnectionString = connectionString;
opts.ReplyStrategy.ReplyQueue = "replies"; // Must be session-enabled
});When to use
- Health checks and status polling
- Querying remote state on demand
- Any RPC-style interaction where the caller needs a response
Polymorphic Queue (Multiple Message Types)
Route multiple message types through a single queue using a MessageType discriminator. Each message type gets its own handler, but they share the same underlying Service Bus entity. This reduces queue proliferation when commands share the same routing strategy.
TIP
This pattern works especially well with session routing - all command types targeted at the same entity (gateway, device, tenant) flow through one session-enabled queue.
Topology
| Entity | Type | Sessions | Notes |
|---|---|---|---|
gateway-commands | Queue | Yes | Single queue carrying multiple command types |
How it works
Sender (Platform/Cloud)
Register each message type as sender-only - each gets its own IMessageSender<T>:
var commands = messaging.MapPolymorphicQueue("gateway-commands", sessions: true);
// Sender-only registrations (no handlers on the cloud side)
commands.MapMessage<PingGatewayRequest>();
commands.MapMessage<SetBatterySetpointRequest>();
commands.MapMessage<RefreshConfigurationRequest>();
// Send with session routing + request/reply
var response = await sender.RequestAsync<SetBatterySetpointResponse>(
new SetBatterySetpointRequest
{
GatewayId = gatewayId,
BatteryName = "ess-1",
SetpointWatts = 5000,
Mode = BatterySetpointMode.Charging
},
opts => opts.SessionId = gatewayId,
ct);Consumer (Edge/Gateway)
Map handlers for each message type - the polymorphic dispatcher routes automatically:
var commands = messaging.MapPolymorphicQueue("gateway-commands", sessions: true);
commands.MapMessage<PingGatewayRequest>(
async (PingGatewayRequest req, SessionContext session, ReplyContext reply, CancellationToken ct) =>
{
await reply.RespondAsync(new PingGatewayResponse
{
GatewayId = req.GatewayId,
ResponseMessage = $"Pong: {req.Message}",
ReceivedAt = DateTimeOffset.UtcNow,
RespondedAt = DateTimeOffset.UtcNow
}, ct);
});
commands.MapMessage<SetBatterySetpointRequest>(
async (SetBatterySetpointRequest req, SessionContext session, ReplyContext reply,
IMediator mediator, CancellationToken ct) =>
{
await mediator.Send(new SetBatterySetpointCommand(req.BatteryName, req.SetpointWatts, req.Mode));
await reply.RespondAsync(new SetBatterySetpointResponse { Success = true, ... }, ct);
});
commands.MapMessage<RefreshConfigurationRequest>(
async (RefreshConfigurationRequest req, SessionContext session, ReplyContext reply,
IMediator mediator, CancellationToken ct) =>
{
await mediator.Send(new RefreshConfigurationCommand(req.Reason));
await reply.RespondAsync(new RefreshConfigurationResponse { Success = true, ... }, ct);
});When to use
- Multiple related commands for the same target entity (device/gateway fleet management)
- Reducing Azure Service Bus queue count (one queue vs N queues per command type)
- Commands that share the same routing key (session ID) and delivery guarantees
Pub/Sub (Fan-Out)
Publish a message once to a topic and have multiple independent subscribers each receive a copy. Each subscriber gets its own subscription with optional filtering.
NOTE
Azure Service Bus: Topics support up to 2,000 subscriptions. For higher fan-out, use session routing with enumerated sends instead. [!TIP] Subscriptions support filters - use SQL or correlation filters in Azure to route specific messages to specific subscriptions without application-level filtering.
Topology
| Entity | Type | Sessions | Notes |
|---|---|---|---|
Topic (e.g. notifications) | Topic | No | Publisher sends once |
| Subscription per consumer | Subscription | No | Each subscriber gets a copy |
How it works
Publisher
// Declare topic - registers IMessageSender<PriceChangedEvent>
messaging.MapTopic<PriceChangedEvent>("notifications");
// Send once - all subscriptions receive a copy
await sender.SendAsync(new PriceChangedEvent
{
ProductId = productId,
NewPrice = 29.99m,
EffectiveFrom = DateTimeOffset.UtcNow
}, ct);Subscriber
messaging.MapTopic<PriceChangedEvent>("notifications")
.MapSubscription("sub-billing",
async (PriceChangedEvent evt, CancellationToken ct) =>
{
await billingService.UpdatePriceAsync(evt.ProductId, evt.NewPrice, ct);
});When to use
- Broadcast notifications (config changes, price updates, system announcements)
- Multiple independent consumers need the same message
- Decoupled event-driven architecture (cross-service events)
Scheduled Delivery
Schedule a message for future delivery. The message is enqueued immediately but becomes visible to consumers only at the scheduled time. Useful for time-delayed commands or deferred retries.
Sender
// Schedule a reminder for tomorrow at 9 AM
var scheduledTime = DateTimeOffset.UtcNow.Date.AddDays(1).AddHours(9);
var sequenceNumber = await sender.ScheduleAsync(
new SendReminderCommand { UserId = userId, Message = "Your trial expires today" },
scheduledTime,
ct);
// Cancel if no longer needed
await sender.CancelScheduledAsync(sequenceNumber, ct);With send options (combine with session routing)
await sender.SendAsync(
new SendReminderCommand { UserId = userId, Message = "..." },
opts =>
{
opts.SessionId = userId;
opts.ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddHours(6);
},
ct);When to use
- Delayed commands (scheduled emails, maintenance windows, trial expirations)
- Retry with back-off (re-enqueue with a future delivery time)
- Expiring offers or time-gated workflows
Supplementary Patterns
These are cross-cutting strategies applied on top of the core patterns above.
Competing Consumers
Multiple instances of the same service process messages from a single queue concurrently. Azure Service Bus handles lock-based delivery - each message is delivered to exactly one consumer.
Azure Functions scales out automatically. For the core library, tune concurrency per handler:
messaging.MapQueue<OrderPlaced>("orders")
.MapHandler(
opts => opts.WithConcurrency(maxConcurrentCalls: 16),
handler);NOTE
Competing consumers and session routing are mutually exclusive per session. With sessions, each session is locked to one consumer at a time (FIFO guarantee). Different sessions can still be processed concurrently across consumers.
Dead Letter Processing
Messages that repeatedly fail processing are automatically moved to the dead-letter sub-queue after exceeding MaxDeliveryCount. Use MapDeadLetterHandler to process them - alert, resubmit, or archive.
messaging.MapQueue<OrderPlaced>("orders")
.MapDeadLetterHandler(async (OrderPlaced msg, DeadLetterContext dlContext, CancellationToken ct) =>
{
logger.LogError("Dead-lettered: {Reason} - {Description}",
dlContext.DeadLetterReason, dlContext.DeadLetterErrorDescription);
// Resubmit, archive to blob storage, send alert, etc.
});See Error Handling for the full dead letter API.