Let’s take a look at running a test instance (as single instance) of Kafka and write a producer and consumer application in C# to interact with it. As is my preference, we’ll use docker to run up our instance of Kafka and in my case this is running on an Ubuntu server.
Kafka running in Docker
We’ll start with the simplest docker compose file we can. So create the file docker-compose.yml and paste the following into it
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://192.168.0.1:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
This is very simple, it’s running a single instance of Kafka (which is only really likely to be something we’d use for testing). Kafka uses Zookeeper (although I believe that dependency may have gone or easy potentially going away), so we have Zookeeper running as well.
In the above file we’re setting the PLAINTEXT_HOST to the machine running the instance of Kafka, obvious this is not ideal so we can change this first to allow the environment to be supplied by either an environment variable of via a .env file. For this example let’s change that line to
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://${HOST}:29092
just add a .env file in the same location as the docker compose file, and have something like this in it
HOST=192.168.0.1
Now we can run the Kakfa and Zookeeper up using
docker-compose up -d
Remove the -d if you want to watch the log, which I would recommend to at least feel like things are running as expected. Also you can always run docker-compose ps to check that the services are running successfully
C# Producer
We’ll create a console application that will simply send some messages to a topic, it’s our producer. Here’s my Producer.csproj
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
</ItemGroup>
</Project>
Whilst we can read configuration for Kafka from an INI file or the like’s but for simplicity we’ll handle these in code. So here’s a very basic sample of a producer (this is heavily based on the Confluent Kafka example)
using Confluent.Kafka;
var config = new List<KeyValuePair<string, string>>
{
new("bootstrap.servers", "192.168.0.1:19092"),
new("client.id", "my-producer")
};
const string topic = "my-topic";
string[] tickers = { "AAPL", "GOOGL", "MSFT", "AMZN", "META", "TSLA", "GS" };
string[] trades = { "Buy 100", "Sell 1000", "Buy 9090", "Sell 45", "Buy 900000", "Sell 123", "Buy 8901" };
using var producer = new ProducerBuilder<string, string>(config).Build();
var rnd = new Random();
for (var i = 0; i < 10; ++i)
{
var ticker = tickers[rnd.Next(tickers.Length)];
var trade = trades[rnd.Next(trades.Length)];
producer.Produce(topic, new Message<string, string> { Key = ticker, Value = trade },
deliveryReport =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Error sending event: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"Sent event topic = {topic}: key = {ticker} value = {trade}");
}
});
}
producer.Flush(TimeSpan.FromSeconds(10));
In the above we’re creating a configuration, with reference to our bootstrap server with a unique client.id. We also need a topic which should be unique and will need to be known by the consumers who want to fetch events for a given topic.
In this example we create a batch of simple string key, string value events and the build the producer object. Then we just randomly pick a ticker and assign a trade against it and send that event to Kakfa.
C# Consumer
Obviously we’re going to want to fetch these events at some point. We do this via a consumer. Once events are added to Kafka (and depending upon it’s setup/configuration) these event will “play” to a consumer that attaches to the correct topic. Once the events are received by the consumer they will not be replayed again, unless we explicitly force Kafka to do so.
Again this example is based heavily on the Confluent Kafka C# consumer. Create a Console application and replace the contents of the .csproj with the same csproj listed earlier for the Producer – this is just adding the relevant client package. Here’s the code for our Console based consumer
using Confluent.Kafka;
var config = new List<KeyValuePair<string, string>>
{
new("bootstrap.servers", "192.168.0.1:19092"),
new("group.id", "my-group"),
new("auto.offset.reset", "earliest")
};
const string topic = "my-topic";
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(topic);
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed event, topic {topic}: key = {cr.Message.Key} value = {cr.Message.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
There’s a little more here than required, just to keep the consumer running and watching for events. In a service we ofcourse wouldn’t need half of this code.
Essentially we create a configurations which tells Kafka that consumer has a group.id (this is mandatory) and where we want the offset to reset to, for playing the events from. In other words, this example will connect to Kafka and only consume events it hasn’t already consumed. It will not replay events from the first to last.
If, and I’ve found it useful in some debugging situations, but it may be required in real world application, we wish to get ALL events, then we change the ConsumerBuilder line to the following
using var consumer = new ConsumerBuilder<string, string>(config)
.SetPartitionsAssignedHandler((c, partitions) =>
{
// reset the offsets for this client
var offsets = partitions.Select(tp => new TopicPartitionOffset(tp, Offset.Beginning));
return offsets;
})
.Build();
Multiple brokers
A single Kafka broker is fine for testing, but Kafa was designed for multiple brokers, here’s a docker compose file that takes out single instance and add’s two more to create three Kafka brokers (I think this is often viewed as the minimal for production, but don’t quote me on that)
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka-broker1:
image: confluentinc/cp-kafka:latest
hostname: kafka-broker1
depends_on:
- zookeeper
ports:
- 19092:19092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker1:9092,PLAINTEXT_HOST://${HOST}:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka-broker2:
image: confluentinc/cp-kafka:latest
hostname: kafka-broker2
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker2:9092,PLAINTEXT_HOST://${HOST}:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
kafka-broker3:
image: confluentinc/cp-kafka:latest
hostname: kafka-broker3
depends_on:
- zookeeper
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker3:9092,PLAINTEXT_HOST://${HOST}:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
We’ve added the following, a KAFKA_BROKER_ID and the KAFKA_ADVERTISED_LISTENERS which references the newly added hostname. Just run up in docker-compose and the previous client code should work happily against this setup.
Code etc.
Code and docker compose files are available as part of my github blog-projects repo.