Category Archives: Programming

Messing around with MediatR

MediatR is an implementation of the Mediator pattern. It doesn’t match the pattern exactly, but as the creator, Jimmy Bogard states that “It matches the problem description (reducing chaotic dependencies), the implementation doesn’t exactly match…”. It’s worth reading his post You Probably Don’t Need to Worry About MediatR.

This pattern is aimed at decoupling the likes of business logic from a UI layer or request/response’s.

There are several ways we can already achieve this in our code, for example, using interfaces to decouple the business logic from the UI or API layers as “services” as we’ve probably all done for years. The only drawback of this approach is it requires the interfaces to be either passed around in our code or via DI and is a great way to do things. Another way to do this is, as used within UI, using WPF, Xamarin Forms, MAUI and others where we often use in-process message queues to send messages around our application tell it to undertake some task and this is essentially what MediatR is giving us.

Let’s have a look at using MediatR. I’m going to create an ASP.NET web API (obviously you could use MediatR in other types of solutions)

  • Create an ASP.NET Core Web API. I’m using Minimal API, so feel free to check that or stick with controllers as you prefer.
  • Add the nuget package MediatR
  • To the Program.cs file add
    builder.Services.AddMediatR(cfg => 
      cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
    

At this point we have MediatR registering services for us at startup. We can passing multiple assemblies to the RegisterServicesFromAssembly method, so if we have all our reqeust/response code in multiple assemblies we can supply just those assemblies. Obviously this makes our life simpler but at the cost of reflecting across our code at startup.

The ASP.NET Core Web API creates the WeatherForecast example, we’ll just use this for our sample code as well.

The first thing you’ll notice is that the route to the weatherforecast is tightly coupled to the sample code. Ofcourse it’s an example, so this is fine, but we’re going to clean things up here and move the implementation into a file named GetWeatherForecastHandler but before we do that…

Note: Ofcourse we could just move the weather forecast code into an WeatherForecastService, create an IWeatherForecastService interface and there’s no reason not to do that, MediatR just offers and alternative way of doing things.

MediatR will try to find a matching handler for your request. In this example we have no request parameters. This begs the question as to how MediatR will match against our GetWeatherForecastHandler. It needs a unique request type to map to our handler, in this case the simplest thing to do is create yourself the request type. Mine’s named GetWeatherForecast and looks like this

public record GetWeatherForecast : IRequest<WeatherForecast[]>
{
    public static GetWeatherForecast Default { get; } = new();
}

Note: I’ve created a static method so we’re not creating an instance for every call, however this is not required and obviously when you are passing parameters you will be creating an instance of a type each time – this does obviously concern me a little if we need high performance and are trying to write allocation free code, but then we’d do lots differently then including probably not using MediatR.

Now we’ll create the GetWeatherForecastHandler file and the code looks like this

public class GetWeatherForecastHandler : IRequestHandler<GetWeatherForecast, WeatherForecast[]>
{
  private static readonly string[] Summaries = new[]
  {
    "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
  };

  public Task<WeatherForecast[]> Handle(GetWeatherForecast request, CancellationToken cancellationToken)
  {
    var forecast = Enumerable.Range(1, 5).Select(index =>
      new WeatherForecast
      {
        Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
        TemperatureC = Random.Shared.Next(-20, 55),
        Summary = Summaries[Random.Shared.Next(Summaries.Length)]
      })
    .ToArray();

    return Task.FromResult(forecast);
  }
}

At this point we’ve created a way for MediatR to find the required handler (i.e. using the GetWeatherForecast type) and we’ve created a handler to create the response. In this example we’re not doing any async work, so we just wrap the result in a Task.FromResult.

Next go back to the Program.cs or if you’ve used controllers, go to your controller. If using controller you’ll need the constructor to take the parameters IMediator mediator and assign to a readonly field in the usually way.

For our minimal API example, go back to the Program.cs file remove the summaries variable/code and then change the route code to look like this

app.MapGet("/weatherforecast",  (IMediator mediator) => 
  mediator.Send(GetWeatherForecast.Default))
.WithName("GetWeatherForecast")
.WithOpenApi();

We’re not really playing too nice in the code above, in that we’re not returning results code, so let’s add some basic result handling

app.MapGet("/weatherforecast",  async (IMediator mediator) => 
  await mediator.Send(GetWeatherForecast.Default) is var results 
    ? Results.Ok(results) 
    : Results.NotFound())
  .WithName("GetWeatherForecast")
  .WithOpenApi();

Now for each new HTTP method call, we would create a request object and a handler object. In this case we send no parameters, but as you can no doubt see, for a request that takes (for example) a string for your location, we’d create a specific type for wrapping that parameter and the handler can then be mapped to that request type.

In our example we used the MediatR Send method. This sends a request to a single handler and expects a response of some type, but MediatR also has the ability to Publish to multiple handlers. These types of handlers are different, firstly they need to implement the INotificationHandler interface and secondly no response is expected when using Publish. These sorts of handlers are more like event broadcasts, so you might use then to send a message to an email service or database code which sends out an email upon request or updates a database.

Or WeatherForecast sample doesn’t give me any good ideas for using Publish in it’s current setup, so let’s just assume we have a way to set the current location. Like I said this example’s a little contrived as we’re going to essentially set the location for everyone connecting to this service, but you get the idea.

We’re going to add a SetLocation request type that looks like this

public record SetLocation(string Location) : INotification;

Notice that for publish our type is implementing the INotification interface. Our handles look like this (my file is named SetLocationHandler.cs but I’ll put both handlers in there just to be a little lazy)

public class UpdateHandler1 : INotificationHandler<SetLocation>
{
  public Task Handle(SetLocation notification, CancellationToken cancellationToken)
  {
    Console.WriteLine(nameof(UpdateHandler1));
    return Task.CompletedTask;
  }
}

public class UpdateHandler2 : INotificationHandler<SetLocation>
{
  public Task Handle(SetLocation notification, CancellationToken cancellationToken)
  {
    Console.WriteLine(nameof(UpdateHandler2));
    return Task.CompletedTask;
  }
}

As you can see, the handlers need to implement INotificationHandler with the correct request type. In this sample we’ll just write messages to console, but you might have a more interesting set of handlers in mind.

Finally let’s add the following to the Program.cs to publish a message

app.MapGet("/setlocation", (IMediator mediator, string location) =>
  mediator.Publish(new SetLocation(location)))
.WithName("SetLocation")
.WithOpenApi();

When you run up your server and use Swagger or call the setlocation method via it’s URL you’ll see that all your handlers that handle the request get called.

Ofcourse we can also Send and Post messages/request from our handlers, so maybe we get the weather forecast data then publish a message for some logging system to update the logs.

MediatR also includes the ability to stream from a requests where our request type implements the IStreamRequest and our handlers implement IStreamRequestHandler.

If we create a simple request type but this one implements IStreamRequest for example

public record GetWeatherStream : IStreamRequest<WeatherForecast>;

and now add a handler which implements IStreamRequestHandler, something like this (which delay’s to just give a feel of getting data from somewhere else)

public class GetWeatherStreamHandler : IStreamRequestHandler<GetWeatherStream, WeatherForecast>
{
  public async IAsyncEnumerable<WeatherForecast> Handle(GetWeatherStream request, 
    [EnumeratorCancellation] CancellationToken cancellationToken)
  {
    var index = 0;
    while (!cancellationToken.IsCancellationRequested)
    {
      await Task.Delay(500, cancellationToken);
      yield return new WeatherForecast
      {
        Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
        TemperatureC = Random.Shared.Next(-20, 55),
        Summary = Data.Summaries[Random.Shared.Next(Data.Summaries.Length)]
      };

      index++;
      if(index > 10)
        break;
    }
  }
}

Finally we can declare our streaming route using Minimal API very simply, for example

app.MapGet("/stream", (IMediator mediator) =>
  mediator.CreateStream(new GetWeatherStream()))
.WithName("Stream")
.WithOpenApi();

Collection Expressions in C# 12

C# 12 includes something called Collection Expressions. These offer more generic way to create our collections from array-like syntax.

Let’s look first at the old style creation of an array of integers

var array = new [] { 1, 2, 3 };

This is simple enough array is of type int[]?. This way of creation arrays is not going away, but what if we want to change the array to a different collection then we end up using collection initializers like this

var list = new List<int> { 1, 2, 3 };

There’s nothing much wrong with this, but essentially we’re sort of doing the same thing, just with different syntax.

Collection expressions now allow us to use syntax such as (below) to create our collection regardless of type

int[] array = [1, 2, 3 ];
List<int> list = [1, 2, 3 ];

On the surface this may not seem a big deal, but imagine you’ve a class that accepts an int[] and maybe you change the type to a List, passing the values via the collection expression [] syntax means that part of your code remains unchanged, it just remains as [1, 2, 3].

Along with this we get to use the spread operator .. for example

List<int> list = [1, 2, 3 ];
int[] array = [.. list];

In this example we’ve created a list then basically copied the items to the array, but a spread operator can be used to concatenate values (or collections), such as

int[] array = [-3, -2, -1, 0, .. list];

Creating your own collections to use collection expressions

For many of the original types, such as List<T> the collection expression code is built in. But newer collections and, if we want, our own collection can take advantage of this syntax by following a minimal set of rules.

All we need to do is create our collection type and add the CollectionBuilderAttribute to it like this

[CollectionBuilder(typeof(MyCollection), nameof(MyCollection.Create))]
public class MyCollection<T>
{
   // our code
}

Now this is not going to work, the typeof expects a non-generic type, so we create a simple non-generic version of this class to handle the creation of the generic version. Also notice the CollectionBuilder expects the name of the method to call and expects a method that takes a single parameter of type ReadOnlySpan and returns the collection type, now initialized, like this

public class MyCollection
{
  public static MyCollection<T> Create<T>(ReadOnlySpan<T> items)
  {
     // returns a MyCollection<T>
  }
}

Let’s look at potential bare minimum implementation of this collection type which can be used with the collection expression syntax. Notice we will also need to implement IEnumerable and/or IEnumerable<T>

[CollectionBuilder(typeof(MyCollection), nameof(MyCollection.Create))]
public class MyCollection<T> : IEnumerable<T>
{
  public static readonly MyCollection<T> Empty = new(Array.Empty<T>());

  private readonly List<T> _innerCollection;

  internal MyCollection(T[]? items)
  {
    _innerCollection = items == null ? new List<T>() : [..items];
  }

  public T this[int index] => _innerCollection[index];
  public IEnumerator<T> GetEnumerator() => _innerCollection.GetEnumerator();
  IEnumerator IEnumerable.GetEnumerator() => _innerCollection.GetEnumerator();
}

public class MyCollection
{
  public static MyCollection<T> Create<T>(ReadOnlySpan<T> items)
  {
    return items.IsEmpty ? 
      MyCollection<T>.Empty : 
      new MyCollection<T>(items.ToArray());
  }
}

Ofcourse this is a silly example as we’re not adding anything that the inner List<T> cannot supply, but you get the idea. Now we can use the collection expression syntax on our new collection type

MyCollection<int> collection = [1, 2, 6, 7];

C# interop with F#

Note: I’m going through draft posts that go back to 2014 and publishing where they still may have value. They may not be 100% upto date but better published late than never.

The intention of this post is to demonstrate how various bits of F# code are viewed from C# code. Obviously as both are .NET languages compiling to IL they can call one another’s code.

F# modules

Let’s start at the top, an F# module. So let’s look at a simple module

module MyModule

let squared x = x * x

The module will appears to C# as a static class and the function squared will become a static method, for example

public static class MyModule
{
  public static int squared(int x);
}

F# inferred the type to be an int

Ofcourse from C# we’ll call this function like any other static function

int result = MyModule.squared(4);

Null

The preference within F# is to use the Option type. But if you are working in C# and not wanting to include F# specific types you might prefer to still return a null. However if you are doing something like the following

match results with
| null -> null
| a -> new Thing(a)

This will fail to compiler with an error such as “The type ‘Thing’ does not have ‘null’ as a proper value.”

We can solve this by marking the Thing type with the attribute AllowNullLiteral, for example

[<AllowNullLiteral>]
type Thing() =
   // members etc.

or we might change the origina F# code to

match results with
| null -> Operators.Unchecked.defaultof<Scale>
| a -> new Thing(a)

Currying and Partial applications in F#

Note: I’m going through draft posts that go back to 2014 and publishing where they still may have value. They may not be 100% upto date but better published late than never.

Currying

Currying leads to the ability to create partial applications.

Currying is the process of taking a function with more than one arguments and turning it into single argument functions, for example

let add a b = a + b

// becomes

let add a = 
    let add' b = 
        a + b
    add'

This results in a function syntax which looks like this

val add : a:int -> (int -> int)

The bracketed int -> int shows the function which takes an int and returns and int.

Partial Applications

A partial application is a way of creating functions which have some of their arguments supplied and thus creating new functions. For example in it’s simplest form we might have

let add a b = a + b

let partialAdd a = add a 42

So, nothing too exciting there but what we can also do is, if we supply the first n arguments as per the following example

let add a b = a + b

let partialAdd = add 42

notice how we’ve removed the arguments to partialAdd but we can call the function thus

partialAdd 10 
|> printfn "%d"

we’re still supplying an argument because the function being called (the add function) requires an extra argument. The partialAdd declaration creates a new function which is partially implemented.

By declaring functions so that the last element(s) are to be supplied by a calling function we can better combine and/reuse functions.

Specflow/Gherkin tags

We’re going to take a look at tags.

We add tags to our features like this, using the @ to prefix a name

@Calculator
Scenario: Calculate two values
# Given/When/Then steps

We can have multiple tags for a scenario, just comma separate them, like this

@Calculator, @Math
Scenario: Calculate two values
# Given/When/Then steps

Great, so what use do I have for tags?

Tags can be used to create documentation, they can be used to for start up and clean up code and they can be uses within the test runners to run groups of tests via their category, you guessed it, denoted by the tag, for example

// run tests on anything tagged Math
dotnet test MyTests.dll --filter Category=Math

// to run tests with both Calculator and Math tags
dotnet test MyTests.dll --filter "Category=Calculator & Category=Math"

// to run tests with either Calculator or Math tags
dotnet test MyTests.dll --filter "Category=Calculator | Category=Math"

The “Custom” control type and WinAppDriver/Appium

So you’ve and application that you want to UI automation test using WinAppDriver/Appium. You’ve got a property grid with the left hand being the text/label and the right hand being the editor. You decided that a cool way to change values on the edit controls is to inspect what the ControlType is, then customise the code to SendKeys or Click or whatever on those controls.

Sound fair?

Well all this is great if your controls are not (as the title of this post suggests) “Custom” controls. So for WPF this is a UserControl or Control. This is fine if we have a single custom control but no so good if we have multiple custom control types.

This issue raise it’s head due to a HorizontalToggle control which we’re importing into our application via a NuGet package. The control derives from Control and is pretty much invisible to the UI Automation code apart from one Automation Id “SwithThumb”. So to fix this I wrapped the control in a UserControl and added an AutomationProperties.AutomationId attached property. Ofcourse, we could get the source if it’s available and change the code ourselves, but then we’ll have to handle upgrades etc. which may or may not be an issue in the future.

That’s great, now I can see the control but I have some generic code that wants to know the control type, so what can we do on this front?

The truth is we’re still quite limited in what we can do, if we’re getting all elements and trying to decide what to do based upon the ControlType. TextBoxes are Edit control types, Buttons are Button control types, but UserControls are still Custom control types.

Whilst this is NOT a perfect solutions, we can derive a class from a UserControl (which will still be used to wrap the original control), let’s call ours HorizontalToggleControl and it looks like this

public class HorizontalToggleControl : UserControl
{
   protected override AutomationPeer OnCreateAutomationPeer() => 
      new HorizontalToggleControlAutomationPeer(this);
}

What we’re doing here is taking over the OnCreateAutomationPeer and supplying our own automation peer, which will itself allow us to override some of the automation properties, specifically in our case the GetAutomationControlTypeCore.

My HorizontalToggleControlAutomationPeer class looks like this

internal class HorizontalToggleControlAutomationPeer : 
   UserControlAutomationPeer
{
   public HorizontalToggleControlAutomationPeer(UserControl owner) :
      base(owner)
   {
   }

   protected override AutomationControlType GetAutomationControlTypeCore() => 
      AutomationControlType.Thumb;

   protected override string GetLocalizedControlTypeCore() =>
      nameof(HorizontalToggleControl);

}

Now what’s happening in the above code is the we’re creating a localized control name “HorizontalToggleControl”, ofcourse this could literally be localised and read from the resources, but in our case we’re sticking with the actual control name. This, unfortunately is still no use to us as the ControlType in an element will still read as Custom. Changing the GetAutomationControlTypeCore return value fixes this but at the expense of only being able to set the control type to one of the AutomationControlType enums. So it’s of limited use, but as mentioned previously, we only really see the SwitchThumb automation id on the original control and so, Thumb seemed like a possible control type. In reality we might prefer CheckBox, but ofcourse the downside here is if we have check box elements, we’d need to ensure we also look at the automation name or property to determine what type of check box this is, a real Windows one or one that acts like a check box. Either way of doing this is fine.

Is your Universal Windows application running on a device which supports this hardware ?

Just going through some old draft posts and found this one, which might be of use to somebody. Let’s call it a Quick Post as there’s not too much substance…

When writing a Universal Windows application we’re basically trying to write code that will work on multiple devices. But different devices have different capabilities. For example a mobile phone has a back button, so we might want to handle the back button BackPressed event in some way, but this event is not available when the application is run on a desktop machine.

Obviously it’d be no good using #define to enable/disable code as we want the application’s code to be universal and run “as-is” on multiple devices. So we need a method call at runtime to tell us whether the device supports the BackButton. Or more specifically whether it supports the HardwareButtons input mechanism.

So to check whether we can hook up code to the BackPressed event we might code the following

if(ApiInformation.IsTypePresent("Windows.Phone.UI.Input.HardwareButtons"))
{
   HardwareButtons.BackPressed += HandleBackPressed;
}

Azure Functions

Azure functions (like AWS lambdas and GCP cloud functions) allow us to write serverless code literally just as functions, i.e. no need to fire up a web application or VM. Ofcourse just like Azure containers, there is a server component but we, the developer, need not concerns ourselves with handling configuration etc.

Azure functions will be spun up as and when required, meaning we will only be charged when they’re used. The downside of this is they have to spin up from a “cold” state. In other words the first person to hit your function will likely incur a performance hit whilst the function is started then invoked.

The other thing to remember is Azure functions are stateless. You might store state with a DB like CosmoDB, but essentially a function is invoked, does something then after a timeout period it’s shut back down.

Let’s create an example function and see how things work…

  • Create a new Azure Functions project
  • When you get to the options for the Function, select Http trigger and select Amonymous Authorization level
  • Complete the wizard by clicking the Create button

The Authorization level allows the function to be triggered without providing a key. The HTTP trigger, as it sounds, means the function is triggered by an HTTP request.

The following is basically the code that’s created from the Azure Function template

public static class ExampleFunction
{
  [FunctionName("Example")]
  public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
        ILogger log)
  {
    log.LogInformation("HTTP trigger function processed a request.");

    string name = req.Query["name"];

    var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
    dynamic data = JsonConvert.DeserializeObject(requestBody);
    name = name ?? data?.name;

    var responseMessage = string.IsNullOrEmpty(name) 
      ? "Pass a name in the query string or in the request body for a personalized response."
            : $"Hello, {name}. This HTTP triggered function executed successfully.";

    return new OkObjectResult(responseMessage);
  }
}

We can actually run this and debug via Visual Studio in the normal way. We’ll get a URL supplied, something like this http://localhost:7071/api/Example to access our function.

As you can see from the above code, we’ll get passed an ILogger and an HttpRequest. From this we can get query parameters, so this URL above would be used like this http://localhost:7071/api/Example?name=PutridParrot

Ofcourse the whole purpose of the Azure Function is for it to run on Azure. To publish it…

  • From Visual Studio, right mouse click on the project and select Publish
  • For the target, select Azure. Click Next
  • Select Azure Function App (Windows) or Linux if you prefer. Click Next again
  • Either select a Function instance if one already exist or you can create a new instance from this wizard page

If you’re creating a new instance, select the resource group etc. as usual and then click Create when ready.

Note: I chose Consumption plan, which is the default when creating an Azure Functions instance. This is basically a “pay only for executions of your functions app”, so should be the cheapest plan.

The next step is to Finish the publish process. If all went well you’ll see everything configures and you can close the Publish dialog.

From the Azure dashboard you can simply type into the search textbox Function App and you should see the published function with a status of Running. If you click on the function name it will show you the current status of the function as well as it’s URL which we can access like we did with localhost, i.e.

https://myfunctionssomewhere.azurewebsites.net/api/Example?name=PutridParrot

Blazor and the GetFromJsonAsync exception TypeError: Failed to Fetch

I have an Azure hosted web api. I also have a simple Blazor standalone application that’s meant to call the API to get a list of categories to display. i.e. the Blazor app is meant to call the Azure web api, fetch the data and display it – should be easy enough, right ?

The web api can easily accessed via a web browser or a console app using the .NET HttpClient, but the Blazor code using the following simply kept throwing exception with the cryptic message “TypeError: Failed to Fetch”

@inject HttpClient Http

// Blazor and other code

protected override async Task OnInitializedAsync()
{
   try
   {
      _categories = await Http.GetFromJsonAsync<string[]>("categories");
   }
   catch (Exception e)
   {
      Debug.WriteLine(e);
   }
}

What was happening is I was actually getting a CORS error, sadly not really reported via the exception so no exactly obvious.

If you get this error interacting with your web api via Blazor then go to the Azure dashboard. I’m running my web api as a container app, type CORS into the left search bar of the resource (in my case a Container App). you should see the Settings section CORS subsection.

Add * to the Allowed Origins and click apply.

Now your Blazor app should be able to interact with the Azure web api app.

Working with Kafka host via Docker from a C# client

Let’s take a look at running a test instance (as single instance) of Kafka and write a producer and consumer application in C# to interact with it. As is my preference, we’ll use docker to run up our instance of Kafka and in my case this is running on an Ubuntu server.

Kafka running in Docker

We’ll start with the simplest docker compose file we can. So create the file docker-compose.yml and paste the following into it

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://192.168.0.1:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

This is very simple, it’s running a single instance of Kafka (which is only really likely to be something we’d use for testing). Kafka uses Zookeeper (although I believe that dependency may have gone or easy potentially going away), so we have Zookeeper running as well.

In the above file we’re setting the PLAINTEXT_HOST to the machine running the instance of Kafka, obvious this is not ideal so we can change this first to allow the environment to be supplied by either an environment variable of via a .env file. For this example let’s change that line to

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://${HOST}:29092

just add a .env file in the same location as the docker compose file, and have something like this in it

HOST=192.168.0.1

Now we can run the Kakfa and Zookeeper up using

docker-compose up -d

Remove the -d if you want to watch the log, which I would recommend to at least feel like things are running as expected. Also you can always run docker-compose ps to check that the services are running successfully

C# Producer

We’ll create a console application that will simply send some messages to a topic, it’s our producer. Here’s my Producer.csproj

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0</TargetFramework>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="2.3.0" />
  </ItemGroup>

</Project>

Whilst we can read configuration for Kafka from an INI file or the like’s but for simplicity we’ll handle these in code. So here’s a very basic sample of a producer (this is heavily based on the Confluent Kafka example)

using Confluent.Kafka;

var config = new List<KeyValuePair<string, string>>
{
    new("bootstrap.servers", "192.168.0.1:19092"),
    new("client.id", "my-producer")
};

const string topic = "my-topic";

string[] tickers = { "AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "GS" };
string[] trades = { "Buy 100", "Sell 1000", "Buy 9090", "Sell 45", "Buy 900000", "Sell 123", "Buy 8901" };

using var producer = new ProducerBuilder<string, string>(config).Build();

var rnd = new Random();

for (var i = 0; i < 10; ++i)
{
    var ticker = tickers[rnd.Next(tickers.Length)];
    var trade = trades[rnd.Next(trades.Length)];

    producer.Produce(topic, new Message<string, string> { Key = ticker, Value = trade },
        deliveryReport =>
        {
            if (deliveryReport.Error.Code != ErrorCode.NoError)
            {
                Console.WriteLine($"Error sending event: {deliveryReport.Error.Reason}");
            }
            else
            {
                Console.WriteLine($"Sent event topic = {topic}: key = {ticker} value = {trade}");
            }
        });
}

producer.Flush(TimeSpan.FromSeconds(10));

In the above we’re creating a configuration, with reference to our bootstrap server with a unique client.id. We also need a topic which should be unique and will need to be known by the consumers who want to fetch events for a given topic.

In this example we create a batch of simple string key, string value events and the build the producer object. Then we just randomly pick a ticker and assign a trade against it and send that event to Kakfa.

C# Consumer

Obviously we’re going to want to fetch these events at some point. We do this via a consumer. Once events are added to Kafka (and depending upon it’s setup/configuration) these event will “play” to a consumer that attaches to the correct topic. Once the events are received by the consumer they will not be replayed again, unless we explicitly force Kafka to do so.

Again this example is based heavily on the Confluent Kafka C# consumer. Create a Console application and replace the contents of the .csproj with the same csproj listed earlier for the Producer – this is just adding the relevant client package. Here’s the code for our Console based consumer

using Confluent.Kafka;

var config = new List<KeyValuePair<string, string>>
        {
            new("bootstrap.servers", "192.168.0.1:19092"),
            new("group.id", "my-group"),
            new("auto.offset.reset", "earliest")
        };

const string topic = "my-topic";

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true; // prevent the process from terminating.
    cts.Cancel();
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();

consumer.Subscribe(topic);
try
{
    while (true)
    {
        var cr = consumer.Consume(cts.Token);
        Console.WriteLine($"Consumed event, topic {topic}: key = {cr.Message.Key} value = {cr.Message.Value}");
    }
}
catch (OperationCanceledException)
{
    // Ctrl-C was pressed.
}
finally
{
    consumer.Close();
}

There’s a little more here than required, just to keep the consumer running and watching for events. In a service we ofcourse wouldn’t need half of this code.

Essentially we create a configurations which tells Kafka that consumer has a group.id (this is mandatory) and where we want the offset to reset to, for playing the events from. In other words, this example will connect to Kafka and only consume events it hasn’t already consumed. It will not replay events from the first to last.

If, and I’ve found it useful in some debugging situations, but it may be required in real world application, we wish to get ALL events, then we change the ConsumerBuilder line to the following

using var consumer = new ConsumerBuilder<string, string>(config)
    .SetPartitionsAssignedHandler((c, partitions) =>
    {
        // reset the offsets for this client
        var offsets = partitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
        return offsets;
    })
    .Build();

Multiple brokers

A single Kafka broker is fine for testing, but Kafa was designed for multiple brokers, here’s a docker compose file that takes out single instance and add’s two more to create three Kafka brokers (I think this is often viewed as the minimal for production, but don’t quote me on that)

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka-broker1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-broker1
    depends_on:
      - zookeeper
    ports:
      - 19092:19092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker1:9092,PLAINTEXT_HOST://${HOST}:19092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
  kafka-broker2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-broker2
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker2:9092,PLAINTEXT_HOST://${HOST}:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
  kafka-broker3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka-broker3
    depends_on:
      - zookeeper
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker3:9092,PLAINTEXT_HOST://${HOST}:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3

We’ve added the following, a KAFKA_BROKER_ID and the KAFKA_ADVERTISED_LISTENERS which references the newly added hostname. Just run up in docker-compose and the previous client code should work happily against this setup.

Code etc.

Code and docker compose files are available as part of my github blog-projects repo.