HomeNikola Knezevic

In this article

Banner

Pub/Sub and Request-Response Patterns with MassTransit

25 Dec 2024
7 min

Communication is essential in distributed systems, where various services must collaborate seamlessly to accomplish shared objectives.

Asynchronous communication is a popular choice because it decouples services, enhances fault tolerance and supports scalable communication. This can be achieved using platforms like Kafka, RabbitMQ and other similar services.

Messages are sent and processed independently, leading to improved throughput and reduced latency for non-blocking operations.

One popular library for implementing asynchronous messaging in .NET applications is MassTransit.

What is MassTransit?

MassTransit is a popular open-source library in .NET for building distributed applications.

The interfaces offered by MassTransit simplify message-based application development, enabling developers to concentrate on delivering business value.

It offers a unified messaging abstraction across many supported message transports, including:

  • RabbitMQ
  • Azure Service Bus
  • Amazon SQS
  • Kafka

MassTransit supports multiple messaging patterns, including:

  • Publish/Subscribe: Publisher sends messages to multiple subscribers which can listen and react.
  • Request/Response: Service sends a request and waits for a response.
  • Point-to-Point Messaging: Messages are sent to a specific consumer to process them.

In today’s blog, we’ll explore the implementation of pub/sub and request/response patterns. Let’s dive right into the details.

Getting started with MassTransit

To get started with MassTransit, you need to install the NuGet package. You can do this via the NuGet Package Manager or by running the following command in the Package Manager Console:

bash
Install-Package MassTransit

Depending on your message transport, you may need to install additional libraries and make slight adjustments to the configuration.

Since it offers a unified API, switching between different transports is straightforward.

Configuring In-Memory

MassTransit also supports in-memory transport, which can be particularly useful for development while other for staging and production environments.

csharp
builder.Services.AddMassTransit(busConfigurator =>
{
    busConfigurator.SetKebabCaseEndpointNameFormatter();

    busConfigurator.UsingInMemory((context, configurator) =>
    {
        configurator.ConfigureEndpoints(context);
    });
});
  • AddMassTransit: Registers and configures MassTransit services in the application.
  • SetKebabCaseEndpointNameFormatter: Sets the default naming convention for endpoints to kebab-case, it's common in distributed systems.
  • ConfigureEndpoints: Dynamically create and bind message queues and exchanges based on the configured consumers.

In-Memory transport has its downsides. It's not distributed, meaning it can only handle communication within a single application instance.

Additionally, in-memory transport does not provide durability or persistence. If the application crashes or restarts, the message queue is lost and some advanced features are also limited as well.

Configuring RabbitMQ

RabbitMQ is an open-source message broker that enables communication between different applications or services by managing messages through queues and exchanges.

To configure RabbitMQ, you need to add an additional package:

bash
Install-Package MassTransit.RabbitMQ

After installing the package, you can configure MassTransit to use RabbitMQ as the message transport by adding the following configuration in your application:

csharp
builder.Services.AddMassTransit(busConfigurator =>
{
    busConfigurator.SetKebabCaseEndpointNameFormatter();

    busConfigurator.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host(new Uri("amqp://rabbitmq:5672"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        configurator.ConfigureEndpoints(context);
    });
});

UsingRabbitMq method configures RabbitMQ as the transport mechanism for MassTransit. Inside the lambda you can configure your RabbitMQ-specific transport options.

Configuring Azure Service Bus

Azure Service Bus is a fully managed enterprise message broker with message queues and publish-subscribe topics.

To configure Azure Service Bus, you need to add an additional package:

bash
Install-Package MassTransit.Azure.ServiceBus.Core

After installing the package, you can configure MassTransit to use Azure Service Bus as the message transport by adding the following configuration in your application:

csharp
builder.Services.AddMassTransit(busConfigurator =>
{
    busConfigurator.SetKebabCaseEndpointNameFormatter();

    busConfigurator.UsingAzureServiceBus((context, configurator) =>
    {
        configurator.Host("ConnectionString");

        configurator.ConfigureEndpoints(context);
    });
});

UsingAzureServiceBus method configures Azure Servic Bus as the transport mechanism for MassTransit. Inside the lambda you can configure your Azure Service Bus-specific transport options.

Pub/Sub Messaging

Publish-Subscribe (Pub/Sub) Pattern enables one-to-many communication between components. Publishers send messages without knowing the recipients, while subscribers receive messages without knowing the senders.

Our message broker acts as an intermediary, receiving messages from publishers and forwarding them to the appropriate subscribers.

The first step is to define a message that represents the event to be published.

In MassTransit, message contracts are defined code-first by creating a .NET type. Messages must be reference types and can be defined using records, interfaces or classes. Here's an example of the message we'll publish:

csharp
public record OrderSubmitted(Guid OrderId, string SubmittedBy);

Next, we define a consumer. Consumers process messages from the message broker and implement the IConsumer interface, where T is the type of message they handle.

Here's an example of a consumer that processes the OrderSubmitted message:

csharp
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
    public Task Consume(ConsumeContext<OrderSubmitted> context)
    {
        var message = context.Message;

        Console.WriteLine($"Order with Id: {message.OrderId} is submitted by: {message.SubmittedBy}");

        return Task.CompletedTask;
    }
}

Since MassTransit doesn't automatically register consumers, we need to manually register each one using the AddConsumer method inside the AddMassTransit configuration.

csharp
builder.Services.AddMassTransit(busConfigurator =>
{
    busConfigurator.SetKebabCaseEndpointNameFormatter();

    busConfigurator.AddConsumer<OrderSubmittedConsumer>()
        .Endpoint(e => e.Name = "order-submitted-consumer");

    busConfigurator.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host(new Uri("amqp://rabbitmq:5672"), h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        configurator.ConfigureEndpoints(context);
    });
});

When a consumer is registered in the MassTransit configuration, it is automatically subscribed to the appropriate queue or topic in the message broker.

Finally, to publish a message, you can inject and use the IPublishEndpoint. Publish method sends the event to the broker, which then routes it to all queues or topics with consumers subscribed to that message type.

Here's how you can publish the OrderSubmitted message:

csharp
app.MapPut("/orders/submit", async (string submittedBy, IPublishEndpoint publisher) =>
{
    var order = GetOrder();

    order.Submit();

    await publisher.Publish(new OrderSubmitted(order.Id, submittedBy));

    return Results.NoContent();
});

Request-Response Messaging

Request-Response Pattern is a messaging pattern where one component sends a request message to another component and waits for a corresponding response message.

The first step is to define both the request and the response messages. Here’s an example of the request and the response messages:

csharp
public record UserDetailsRequest(Guid UserId);

public record UserDetailsResponse(string FirstName, string LastName);

Next, define a consumer responsible for processing request and sending a response. RespondAsync method is used to send a response back to the requester.

Here’s an example of a consumer that handles the UserDetailsRequest and sends a response:

csharp
public class UserDetailsRequestConsumer : IConsumer<UserDetailsRequest>
{
    public async Task Consume(ConsumeContext<UserDetailsRequest> context)
    {
        var user = GetUser(context.Message.UserId);

        var response = new UserDetailsResponse(
            user.FirstName,
            user.LastName);

        await context.RespondAsync<UserDetailsResponse>(response);
    }
}

The application responsible for consuming messages must register the consumer:

csharp
busConfigurator.AddConsumer<UserDetailsRequestConsumer>()
    .Endpoint(e => e.Name = "user-details-request-consumer");

Application sending requests needs to register a client using the AddRequestClient method to send request messages to the consumer:

csharp
busConfigurator.AddRequestClient<UserDetailsRequest>();

Once the consumers and request client are set up, the final step is to trigger the client request:

csharp
app.MapGet("/orders", async (IRequestClient<UserDetailsRequest> requestClient, CancellationToken cancellationToken) =>
{
    var order = GetOrder();

    var request = new UserDetailsRequest(order.OwnerId);

    var response = await requestClient.GetResponse<UserDetailsResponse>(request, cancellationToken);

    var result = new OrderResponse(
        order.Id,
        order.OwnerId,
        $"{response.Message.FirstName} {response.Message.LastName}",
        order.Price);

    return Results.Ok(result);
});

To do this, inject the IRequestClient interface, create the request message and use the GetResponse method to send the request.

Conclusion

Asynchronous messaging offers enhanced scalability, reliability and flexibility, making it an ideal choice for complex, distributed systems.

With MassTransit, implementing various messaging patterns, such as publish/subscribe and request/response, is straightforward. It also facilitates easy integration with multiple message transports like RabbitMQ, Azure Service Bus and others.

Whether used for development with in-memory transport or for production environments with scalable options, MassTransit simplifies building distributed systems.

If you want to check out examples I created, you can find the source code here:

Source Code

I hope you enjoyed it, subscribe and get a notification when a new blog is up!

Subscribe

Stay tuned for valuable insights every Thursday morning.