What is MassTransit?
MassTransit is an abstraction over various messaging transports, such as RabbitMQ, Azure Service Bus, Amazon SQS and others. We can use MassTransit in place of these transports and, ofcourse, change the underlying transport without affecting our code. It also includes some microservice based patterns, as Sagas, Routing Slip etc.
MassTransit offers an in-memory transport for testing code against.
Creating a simple test application
Note: I’m basically going to reproduce the code from In Memory, so I’d suggest going there to check the latest steps.
First off let’s add the MassTransit project templates to our machine
dotnet new --install MassTransit.Templates
Now this will add a bunch of useful templates that we can use via the dotnet CLI or via an IDE such as Visual Studio. In this case we’ll follow the In Memory tutorial by running the following command to create our project
dotnet new mtworker -n HelloWorld
or from Visual Studio or Rider you can create a MassTransit project from the MassTransit Worker project template.
Note: I’m differing from the MassTransit tutorial by naming my project HelloWorld, for no other reason that I pretty much always start these sorts of things with a HelloWorld project. So just in case you’re following along both the Mass Transit tutorial and this post, this is the only real difference in code.
Default project
At the time of writing, the default project targets .NET 6.0 so I’ve changed that to .NET 7.0 and language support to C# 11 in Rider, so my .csproj has the following
<PropertyGroup>
<TargetFramework>net7.0</TargetFramework>
<LangVersion>11</LangVersion>
</PropertyGroup>
All the code is currenting within the Program.cs, the code that matters at the moment is
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
// By default, sagas are in-memory, but should be changed to a durable
// saga repository.
x.SetInMemorySagaRepositoryProvider();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.AddSagaStateMachines(entryAssembly);
x.AddSagas(entryAssembly);
x.AddActivities(entryAssembly);
x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); });
});
As you can see we add MassTransit and at the end of the code we set MassTransit to use the in memory transport. Whilst this will build and run, it’s not really doing anything for us as we need to add at least one contract.
Adding a contract
From the MassTransit tutorial we simply run the following command in the CLI from the .csproj folder
dotnet new mtconsumer
This will add the Consumers folder along with the Contracts folder along with the consumer and contract code. Within the Consumer folder we have two files, the HelloWorldConsumer.cs file looks like this
Note: I’m not include the using and namespace lines in these files so that we can concentrate on the implementation code.
public class HelloWorldConsumer :
IConsumer<HelloWorld>
{
public Task Consume(ConsumeContext<HelloWorld> context)
{
return Task.CompletedTask;
}
}
The other file in the HelloWorldConsumerDefinition.cs looks like this
public class HelloWorldConsumerDefinition :
ConsumerDefinition<HelloWorldConsumer>
{
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<HelloWorldConsumer> consumerConfigurator)
{
endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
}
}
The Contracts folder includes a single file, HelloWorld.cs which looks like this
public record HelloWorld
{
public string Value { get; init; }
}
At this point we now have a consumer and a message/contract.
When a message arrives on the transport, our consumer, Consume message is called. So let’s add some logging to the consumer, exactly like the MassTransit example, so add the following to the HelloWorldConsumer class
private readonly ILogger<HelloWorldConsumer> _logger;
public HelloWorldConsumer(ILogger<HelloWorldConsumer> logger)
{
_logger = logger;
}
Now in the Consume method we’ll just add a line of code to log information from the message so the method looks like this
public Task Consume(ConsumeContext<HelloWorld> context)
{
_logger.LogInformation("Hello {Name}", context.Message.Value);
return Task.CompletedTask;
}
The HelloWorldConsumerDefinition class can be used for setting up out configuration, we’re not going to do anything with this at the moment. Instead we need some code to publish methods. We are going to add a background service.
Background service
Add a new file/class to the root folder, named Worker. Here’s the one in the MassTransit tutorial
public class Worker : BackgroundService
{
private readonly IBus _bus;
public Worker(IBus bus)
{
_bus = bus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await _bus.Publish(new Contracts.HelloWorld(value: $"The time is {DateTimeOffset.Now}"), stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
}
Next, we need to register our work, by adding the following code to the Program.cs
services.AddHostedService<Worker>();
Does it run?
If all’s gone to plan, we can either
dotnet run
or run via Visual Studio or Rider. This will run our service and we’ll see output to the console from our logging code.
What next?
This is fun, but let’s face it we really want to see this working with a “real” transport, so let’s make changes to work with RabbitMQ. To host an instance of RabbitMQ within Docker we can use the MassTransit image
docker run -p 15672:15672 -p 5672:5672 masstransit/rabbitmq
To check whether RabbitMQ is up and running, use your browser to connect to the machine hosting it, on port 15672 (i.e. http://192.168.1.123:15672/). To login, the default username and password are both guest.
Now we need to add the MassTransit RabbitMQ transport to our project, so add MassTransit.RabbitMQ via the nuget package manager or from the CLI use
dotnet add package MassTransit.RabbitMQ
Finally, replace everything x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }); within Program.cs with
x.UsingRabbitMq((context,cfg) =>
{
cfg.Host("192.168.1.123", "/", h => {
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
Note: If you’ve changed the user or password, obviously replace those in the code above. Also replace 192.168.1.123 with localhost or your remote server name/ip.
Run your MassTransit service.
Next, access your RabbitMQ dashboard and you should see messages coming into RabbitMQ (note: if you’ve changed the port, then the port number goes into the Host code like this, we’re using port 1234 to demonstrate a different port number)
x.UsingRabbitMq((context,cfg) =>
{
cfg.Host("192.168.1.123", 1234, "/", h => {
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
Note: remember to allow access to the remote server’s port 5672, if it’s not already accessible.