Skip to content

Messaging Patterns

This page describes the messaging patterns supported by the library and how they map to Azure Service Bus features. Examples use generic message types - replace them with your own domain contracts.

Fire-and-Forget

Send a message without waiting for a response. The sender doesn't know (or care) which consumer processes it.

Topology

EntityTypeSessionsNotes
ordersQueueNoMultiple producers, one or more competing consumers

Sender

csharp
// IMessageSender<OrderPlaced> is auto-registered via MapQueue
await sender.SendAsync(new OrderPlaced { OrderId = orderId, Total = 99.95m }, ct);

Consumer (Azure Functions)

csharp
// Program.cs - register the queue and handler
builder.Services.AddMessagingFunctions(messaging =>
{
    messaging
        .MapQueue<OrderPlaced>("orders")
        .MapHandler(OrderFunctions.HandleOrderPlaced)
        .MapErrorHandler(async (ex, ctx, ct) =>
            await ctx.DeadLetterAsync("ProcessingFailed", ex.Message, ct));
});

// Function class
public class OrderFunctions(IMessageHandler<OrderPlaced> handler)
{
    [Function(nameof(ProcessOrder))]
    public Task ProcessOrder(
        [ServiceBusTrigger("orders", AutoCompleteMessages = false)]
        ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions,
        CancellationToken ct
    ) => handler.HandleAsync(message, messageActions, ct);

    public static async Task HandleOrderPlaced(
        OrderPlaced order, MessageContext ctx, IOrderService orderService, CancellationToken ct)
    {
        await orderService.FulfillAsync(order.OrderId, ct);
        await ctx.CompleteAsync(ct);
    }
}

Consumer (Core library)

csharp
messaging.MapQueue<OrderPlaced>("orders")
    .MapHandler(async (OrderPlaced order, MessageContext ctx, IOrderService svc, CancellationToken ct) =>
    {
        await svc.FulfillAsync(order.OrderId, ct);
        await ctx.CompleteAsync(ct);
    });

When to use

  • Event ingestion (telemetry, audit logs, analytics events)
  • Background job dispatch (email, PDF generation, notifications)
  • Any "produce and move on" workflow

Targeted Commands (Session Routing)

Route messages to a specific consumer using Azure Service Bus sessions. Each consumer opens a session receiver filtered to its own ID, so only it receives messages for that session.

IMPORTANT

Azure Service Bus prerequisite: The queue must be created with RequiresSession = true. This is immutable - you cannot change it after creation. See Sessions for details. [!NOTE] Offline consumers: Messages queue up in the session until a consumer reconnects and accepts the session. Azure Service Bus retains messages until the queue's DefaultMessageTimeToLive expires (default: 14 days).

Topology

EntityTypeSessionsNotes
worker-commandsQueueYesSession ID = target worker/device/tenant ID

How it works

Sender

Set SessionId in SendOptions to target a specific consumer:

csharp
await sender.SendAsync(
    new RunTaskCommand { TaskId = taskId, Payload = "..." },
    opts => opts.SessionId = workerId,
    ct);

Consumer

Use MapSessionHandler - the handler receives SessionContext with the session ID:

csharp
messaging.MapQueue<RunTaskCommand>("worker-commands")
    .MapSessionHandler((RunTaskCommand cmd, SessionContext session, CancellationToken ct) =>
    {
        Console.WriteLine($"Session {session.SessionId} received task: {cmd.TaskId}");
    });

When multiple instances of the same process connect to the queue, add WithAcceptedSessionId so each instance only accepts sessions it owns:

csharp
var workerId = configuration.GetValue<string>("WorkerId")!;

messaging.MapQueue<RunTaskCommand>("worker-commands")
    .WithAcceptedSessionId(workerId)  // filter to this instance's session only
    .MapSessionHandler((RunTaskCommand cmd, SessionContext session, CancellationToken ct) =>
    {
        Console.WriteLine($"Session {session.SessionId} received task: {cmd.TaskId}");
    });

WARNING

Without WithAcceptedSessionId, all competing consumers race to acquire any available session. The winning instance is non-deterministic - and it holds the session lock until expiry, so the mis-routing can appear consistent for minutes before shifting. See Sessions - Competing Consumers for details.

When to use

  • Commands targeted at a specific device, worker, or agent
  • Per-tenant or per-user message routing
  • Ordered processing per entity (sessions guarantee FIFO within a session)

Request/Reply

Send a request and await a correlated response. The library handles session-based correlation automatically through a dedicated reply queue.

IMPORTANT

Azure Service Bus prerequisite: The reply queue must be created with RequiresSession = true. Each request uses a unique session ID so the requester only receives its own response. See Request/Reply for details.

Topology

EntityTypeSessionsNotes
Request queue (varies)QueueDepends on patternWhere the request is sent
repliesQueueYesShared reply queue - sessions correlate responses

How it works

Requester

Use RequestAsync<TResponse> - the library sets ReplyTo, ReplyToSessionId, waits on the reply queue, and deserializes the response:

csharp
// Simple request/reply (non-session request queue)
var response = await sender.RequestAsync<GetStatusResponse>(
    new GetStatusRequest { ResourceId = resourceId },
    ct);

// Combined with session routing - target a specific consumer
var response = await sender.RequestAsync<GetStatusResponse>(
    new GetStatusRequest { ResourceId = resourceId },
    opts => opts.SessionId = workerId,
    ct);

Responder

Inject ReplyContext to send a typed response back. The library reads ReplyTo / ReplyToSessionId from the incoming message and sends the response to the correct session on the reply queue:

csharp
messaging.MapQueue<GetStatusRequest>("worker-commands")
    .MapSessionHandler(async (GetStatusRequest req, SessionContext session, ReplyContext reply, CancellationToken ct) =>
    {
        var status = await GetCurrentStatusAsync(req.ResourceId, ct);

        await reply.RespondAsync(new GetStatusResponse
        {
            ResourceId = req.ResourceId,
            Status = status,
            Timestamp = DateTimeOffset.UtcNow
        }, ct);
    });

Configuration

The reply queue is configured once in the messaging builder:

csharp
var serviceBus = builder.Services.AddMessaging(opts =>
{
    opts.ConnectionString = connectionString;
    opts.ReplyStrategy.ReplyQueue = "replies"; // Must be session-enabled
});

When to use

  • Health checks and status polling
  • Querying remote state on demand
  • Any RPC-style interaction where the caller needs a response

Polymorphic Queue (Multiple Message Types)

Route multiple message types through a single queue using a MessageType discriminator. Each message type gets its own handler, but they share the same underlying Service Bus entity. This reduces queue proliferation when commands share the same routing strategy.

TIP

This pattern works especially well with session routing - all command types targeted at the same entity (gateway, device, tenant) flow through one session-enabled queue.

Topology

EntityTypeSessionsNotes
gateway-commandsQueueYesSingle queue carrying multiple command types

How it works

Sender (Platform/Cloud)

Register each message type as sender-only - each gets its own IMessageSender<T>:

csharp
var commands = messaging.MapPolymorphicQueue("gateway-commands", sessions: true);

// Sender-only registrations (no handlers on the cloud side)
commands.MapMessage<PingGatewayRequest>();
commands.MapMessage<SetBatterySetpointRequest>();
commands.MapMessage<RefreshConfigurationRequest>();

// Send with session routing + request/reply
var response = await sender.RequestAsync<SetBatterySetpointResponse>(
    new SetBatterySetpointRequest
    {
        GatewayId = gatewayId,
        BatteryName = "ess-1",
        SetpointWatts = 5000,
        Mode = BatterySetpointMode.Charging
    },
    opts => opts.SessionId = gatewayId,
    ct);

Consumer (Edge/Gateway)

Map handlers for each message type - the polymorphic dispatcher routes automatically:

csharp
var commands = messaging.MapPolymorphicQueue("gateway-commands", sessions: true);

commands.MapMessage<PingGatewayRequest>(
    async (PingGatewayRequest req, SessionContext session, ReplyContext reply, CancellationToken ct) =>
    {
        await reply.RespondAsync(new PingGatewayResponse
        {
            GatewayId = req.GatewayId,
            ResponseMessage = $"Pong: {req.Message}",
            ReceivedAt = DateTimeOffset.UtcNow,
            RespondedAt = DateTimeOffset.UtcNow
        }, ct);
    });

commands.MapMessage<SetBatterySetpointRequest>(
    async (SetBatterySetpointRequest req, SessionContext session, ReplyContext reply,
           IMediator mediator, CancellationToken ct) =>
    {
        await mediator.Send(new SetBatterySetpointCommand(req.BatteryName, req.SetpointWatts, req.Mode));
        await reply.RespondAsync(new SetBatterySetpointResponse { Success = true, ... }, ct);
    });

commands.MapMessage<RefreshConfigurationRequest>(
    async (RefreshConfigurationRequest req, SessionContext session, ReplyContext reply,
           IMediator mediator, CancellationToken ct) =>
    {
        await mediator.Send(new RefreshConfigurationCommand(req.Reason));
        await reply.RespondAsync(new RefreshConfigurationResponse { Success = true, ... }, ct);
    });

When to use

  • Multiple related commands for the same target entity (device/gateway fleet management)
  • Reducing Azure Service Bus queue count (one queue vs N queues per command type)
  • Commands that share the same routing key (session ID) and delivery guarantees

Pub/Sub (Fan-Out)

Publish a message once to a topic and have multiple independent subscribers each receive a copy. Each subscriber gets its own subscription with optional filtering.

NOTE

Azure Service Bus: Topics support up to 2,000 subscriptions. For higher fan-out, use session routing with enumerated sends instead. [!TIP] Subscriptions support filters - use SQL or correlation filters in Azure to route specific messages to specific subscriptions without application-level filtering.

Topology

EntityTypeSessionsNotes
Topic (e.g. notifications)TopicNoPublisher sends once
Subscription per consumerSubscriptionNoEach subscriber gets a copy

How it works

Publisher

csharp
// Declare topic - registers IMessageSender<PriceChangedEvent>
messaging.MapTopic<PriceChangedEvent>("notifications");

// Send once - all subscriptions receive a copy
await sender.SendAsync(new PriceChangedEvent
{
    ProductId = productId,
    NewPrice = 29.99m,
    EffectiveFrom = DateTimeOffset.UtcNow
}, ct);

Subscriber

csharp
messaging.MapTopic<PriceChangedEvent>("notifications")
    .MapSubscription("sub-billing",
        async (PriceChangedEvent evt, CancellationToken ct) =>
        {
            await billingService.UpdatePriceAsync(evt.ProductId, evt.NewPrice, ct);
        });

When to use

  • Broadcast notifications (config changes, price updates, system announcements)
  • Multiple independent consumers need the same message
  • Decoupled event-driven architecture (cross-service events)

Scheduled Delivery

Schedule a message for future delivery. The message is enqueued immediately but becomes visible to consumers only at the scheduled time. Useful for time-delayed commands or deferred retries.

Sender

csharp
// Schedule a reminder for tomorrow at 9 AM
var scheduledTime = DateTimeOffset.UtcNow.Date.AddDays(1).AddHours(9);

var sequenceNumber = await sender.ScheduleAsync(
    new SendReminderCommand { UserId = userId, Message = "Your trial expires today" },
    scheduledTime,
    ct);

// Cancel if no longer needed
await sender.CancelScheduledAsync(sequenceNumber, ct);

With send options (combine with session routing)

csharp
await sender.SendAsync(
    new SendReminderCommand { UserId = userId, Message = "..." },
    opts =>
    {
        opts.SessionId = userId;
        opts.ScheduledEnqueueTime = DateTimeOffset.UtcNow.AddHours(6);
    },
    ct);

When to use

  • Delayed commands (scheduled emails, maintenance windows, trial expirations)
  • Retry with back-off (re-enqueue with a future delivery time)
  • Expiring offers or time-gated workflows

Supplementary Patterns

These are cross-cutting strategies applied on top of the core patterns above.

Competing Consumers

Multiple instances of the same service process messages from a single queue concurrently. Azure Service Bus handles lock-based delivery - each message is delivered to exactly one consumer.

Azure Functions scales out automatically. For the core library, tune concurrency per handler:

csharp
messaging.MapQueue<OrderPlaced>("orders")
    .MapHandler(
        opts => opts.WithConcurrency(maxConcurrentCalls: 16),
        handler);

NOTE

Competing consumers and session routing are mutually exclusive per session. With sessions, each session is locked to one consumer at a time (FIFO guarantee). Different sessions can still be processed concurrently across consumers.

Dead Letter Processing

Messages that repeatedly fail processing are automatically moved to the dead-letter sub-queue after exceeding MaxDeliveryCount. Use MapDeadLetterHandler to process them - alert, resubmit, or archive.

csharp
messaging.MapQueue<OrderPlaced>("orders")
    .MapDeadLetterHandler(async (OrderPlaced msg, DeadLetterContext dlContext, CancellationToken ct) =>
    {
        logger.LogError("Dead-lettered: {Reason} - {Description}",
            dlContext.DeadLetterReason, dlContext.DeadLetterErrorDescription);

        // Resubmit, archive to blob storage, send alert, etc.
    });

See Error Handling for the full dead letter API.