{"id":10135,"date":"2023-11-19T11:54:18","date_gmt":"2023-11-19T11:54:18","guid":{"rendered":"https:\/\/putridparrot.com\/blog\/?p=10135"},"modified":"2023-11-19T12:03:10","modified_gmt":"2023-11-19T12:03:10","slug":"working-with-kafka-host-via-docker-from-a-c-client","status":"publish","type":"post","link":"https:\/\/putridparrot.com\/blog\/working-with-kafka-host-via-docker-from-a-c-client\/","title":{"rendered":"Working with Kafka host via Docker from a C# client"},"content":{"rendered":"<p>Let&#8217;s take a look at running a test instance (as single instance) of Kafka and write a producer and consumer application in C# to interact with it. As is my preference, we&#8217;ll use docker to run up our instance of Kafka and in my case this is running on an Ubuntu server.<\/p>\n<p><strong>Kafka running in Docker<\/strong><\/p>\n<p>We&#8217;ll start with the simplest docker compose file we can. So create the file docker-compose.yml and paste the following into it<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\nversion: '3'\r\nservices:\r\n  zookeeper:\r\n    image: confluentinc\/cp-zookeeper:latest\r\n    environment:\r\n      ZOOKEEPER_CLIENT_PORT: 2181\r\n      ZOOKEEPER_TICK_TIME: 2000\r\n\r\n  kafka:\r\n    image: confluentinc\/cp-kafka:latest\r\n    depends_on:\r\n      - zookeeper\r\n    ports:\r\n      - 29092:29092\r\n    environment:\r\n      KAFKA_BROKER_ID: 1\r\n      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\r\n      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:\/\/kafka:9092,PLAINTEXT_HOST:\/\/192.168.0.1:29092\r\n      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT\r\n      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT\r\n      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1\r\n<\/pre>\n<p>This is very simple, it&#8217;s running a single instance of Kafka (which is only really likely to be something we&#8217;d use for testing). Kafka uses Zookeeper (although I believe that dependency may have gone or easy potentially going away), so we have Zookeeper running as well.<\/p>\n<p>In the above file we&#8217;re setting the PLAINTEXT_HOST to the machine running the instance of Kafka, obvious this is not ideal so we can change this first to allow the environment to be supplied by either an environment variable of via a .env file. For this example let&#8217;s change that line to<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\nKAFKA_ADVERTISED_LISTENERS: PLAINTEXT:\/\/kafka:9092,PLAINTEXT_HOST:\/\/${HOST}:29092\r\n<\/pre>\n<p>just add a .env file in the same location as the docker compose file, and have something like this in it<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\nHOST=192.168.0.1\r\n<\/pre>\n<p>Now we can run the Kakfa and Zookeeper up using<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\ndocker-compose up -d\r\n<\/pre>\n<p><em>Remove the -d if you want to watch the log, which I would recommend to at least feel like things are running as expected. Also you can always run docker-compose ps to check that the services are running successfully<\/em><\/p>\n<p><strong>C# Producer<\/strong><\/p>\n<p>We&#8217;ll create a console application that will simply send some messages to a topic, it&#8217;s our producer. Here&#8217;s my Producer.csproj<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\n&lt;Project Sdk=&quot;Microsoft.NET.Sdk&quot;&gt;\r\n\r\n  &lt;PropertyGroup&gt;\r\n    &lt;OutputType&gt;Exe&lt;\/OutputType&gt;\r\n    &lt;TargetFramework&gt;net8.0&lt;\/TargetFramework&gt;\r\n    &lt;ImplicitUsings&gt;enable&lt;\/ImplicitUsings&gt;\r\n    &lt;Nullable&gt;enable&lt;\/Nullable&gt;\r\n  &lt;\/PropertyGroup&gt;\r\n\r\n  &lt;ItemGroup&gt;\r\n    &lt;PackageReference Include=&quot;Confluent.Kafka&quot; Version=&quot;2.3.0&quot; \/&gt;\r\n  &lt;\/ItemGroup&gt;\r\n\r\n&lt;\/Project&gt;\r\n<\/pre>\n<p>Whilst we can read configuration for Kafka from an INI file or the like&#8217;s but for simplicity we&#8217;ll handle these in code. So here&#8217;s a very basic sample of a producer (this is heavily based on the Confluent Kafka example)<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing Confluent.Kafka;\r\n\r\nvar config = new List&lt;KeyValuePair&lt;string, string&gt;&gt;\r\n{\r\n    new(&quot;bootstrap.servers&quot;, &quot;192.168.0.1:19092&quot;),\r\n    new(&quot;client.id&quot;, &quot;my-producer&quot;)\r\n};\r\n\r\nconst string topic = &quot;my-topic&quot;;\r\n\r\nstring&#x5B;] tickers = { &quot;AAPL&quot;, &quot;GOOGL&quot;, &quot;MSFT&quot;, &quot;AMZN&quot;, &quot;META&quot;, &quot;TSLA&quot;, &quot;GS&quot; };\r\nstring&#x5B;] trades = { &quot;Buy 100&quot;, &quot;Sell 1000&quot;, &quot;Buy 9090&quot;, &quot;Sell 45&quot;, &quot;Buy 900000&quot;, &quot;Sell 123&quot;, &quot;Buy 8901&quot; };\r\n\r\nusing var producer = new ProducerBuilder&lt;string, string&gt;(config).Build();\r\n\r\nvar rnd = new Random();\r\n\r\nfor (var i = 0; i &lt; 10; ++i)\r\n{\r\n    var ticker = tickers&#x5B;rnd.Next(tickers.Length)];\r\n    var trade = trades&#x5B;rnd.Next(trades.Length)];\r\n\r\n    producer.Produce(topic, new Message&lt;string, string&gt; { Key = ticker, Value = trade },\r\n        deliveryReport =&gt;\r\n        {\r\n            if (deliveryReport.Error.Code != ErrorCode.NoError)\r\n            {\r\n                Console.WriteLine($&quot;Error sending event: {deliveryReport.Error.Reason}&quot;);\r\n            }\r\n            else\r\n            {\r\n                Console.WriteLine($&quot;Sent event topic = {topic}: key = {ticker} value = {trade}&quot;);\r\n            }\r\n        });\r\n}\r\n\r\nproducer.Flush(TimeSpan.FromSeconds(10));\r\n<\/pre>\n<p>In the above we&#8217;re creating a configuration, with reference to our bootstrap server with a unique client.id. We also need a topic which should be unique and will need to be known by the consumers who want to fetch events for a given topic.<\/p>\n<p>In this example we create a batch of simple <em>string key, string value<\/em> events and the build the producer object. Then we just randomly pick a ticker and assign a trade against it and send that event to Kakfa.<\/p>\n<p><strong>C# Consumer<\/strong><\/p>\n<p>Obviously we&#8217;re going to want to fetch these events at some point. We do this via a consumer. Once events are added to Kafka (and depending upon it&#8217;s setup\/configuration) these event will &#8220;play&#8221; to a consumer that attaches to the correct topic. Once the events are received by the consumer they will not be replayed again, unless we explicitly force Kafka to do so.<\/p>\n<p>Again this example is based heavily on the Confluent Kafka C# consumer. Create a Console application and replace the contents of the .csproj with the same csproj listed earlier for the Producer &#8211; this is just adding the relevant client package. Here&#8217;s the code for our Console based consumer<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing Confluent.Kafka;\r\n\r\nvar config = new List&lt;KeyValuePair&lt;string, string&gt;&gt;\r\n        {\r\n            new(&quot;bootstrap.servers&quot;, &quot;192.168.0.1:19092&quot;),\r\n            new(&quot;group.id&quot;, &quot;my-group&quot;),\r\n            new(&quot;auto.offset.reset&quot;, &quot;earliest&quot;)\r\n        };\r\n\r\nconst string topic = &quot;my-topic&quot;;\r\n\r\nvar cts = new CancellationTokenSource();\r\nConsole.CancelKeyPress += (_, e) =&gt;\r\n{\r\n    e.Cancel = true; \/\/ prevent the process from terminating.\r\n    cts.Cancel();\r\n};\r\n\r\nusing var consumer = new ConsumerBuilder&lt;string, string&gt;(config).Build();\r\n\r\nconsumer.Subscribe(topic);\r\ntry\r\n{\r\n    while (true)\r\n    {\r\n        var cr = consumer.Consume(cts.Token);\r\n        Console.WriteLine($&quot;Consumed event, topic {topic}: key = {cr.Message.Key} value = {cr.Message.Value}&quot;);\r\n    }\r\n}\r\ncatch (OperationCanceledException)\r\n{\r\n    \/\/ Ctrl-C was pressed.\r\n}\r\nfinally\r\n{\r\n    consumer.Close();\r\n}\r\n<\/pre>\n<p>There&#8217;s a little more here than required, just to keep the consumer running and watching for events. In a service we ofcourse wouldn&#8217;t need half of this code.<\/p>\n<p>Essentially we create a configurations which tells Kafka that consumer has a group.id (this is mandatory) and where we want the offset to reset to, for playing the events from. In other words, this example will connect to Kafka and only consume events it hasn&#8217;t already consumed. It will not replay events from the first to last.<\/p>\n<p>If, and I&#8217;ve found it useful in some debugging situations, but it may be required in real world application, we wish to get ALL events, then we change the ConsumerBuilder line to the following<\/p>\n<pre class=\"brush: csharp; title: ; notranslate\" title=\"\">\r\nusing var consumer = new ConsumerBuilder&lt;string, string&gt;(config)\r\n    .SetPartitionsAssignedHandler((c, partitions) =&gt;\r\n    {\r\n        \/\/ reset the offsets for this client\r\n        var offsets = partitions.Select(tp =&gt; new TopicPartitionOffset(tp, Offset.Beginning));\r\n        return offsets;\r\n    })\r\n    .Build();\r\n<\/pre>\n<p><strong>Multiple brokers<\/strong><\/p>\n<p>A single Kafka broker is fine for testing, but Kafa was designed for multiple brokers, here&#8217;s a docker compose file that takes out single instance and add&#8217;s two more to create three Kafka brokers (I think this is often viewed as the minimal for production, but don&#8217;t quote me on that)<\/p>\n<pre class=\"brush: plain; title: ; notranslate\" title=\"\">\r\nversion: '3'\r\nservices:\r\n  zookeeper:\r\n    image: confluentinc\/cp-zookeeper:latest\r\n    environment:\r\n      ZOOKEEPER_CLIENT_PORT: 2181\r\n      ZOOKEEPER_TICK_TIME: 2000\r\n\r\n  kafka-broker1:\r\n    image: confluentinc\/cp-kafka:latest\r\n    hostname: kafka-broker1\r\n    depends_on:\r\n      - zookeeper\r\n    ports:\r\n      - 19092:19092\r\n    environment:\r\n      KAFKA_BROKER_ID: 1\r\n      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\r\n      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:\/\/kafka-broker1:9092,PLAINTEXT_HOST:\/\/${HOST}:19092\r\n      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT\r\n      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT\r\n      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3\r\n  kafka-broker2:\r\n    image: confluentinc\/cp-kafka:latest\r\n    hostname: kafka-broker2\r\n    depends_on:\r\n      - zookeeper\r\n    ports:\r\n      - 29092:29092\r\n    environment:\r\n      KAFKA_BROKER_ID: 2\r\n      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\r\n      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:\/\/kafka-broker2:9092,PLAINTEXT_HOST:\/\/${HOST}:29092\r\n      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT\r\n      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT\r\n      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3\r\n  kafka-broker3:\r\n    image: confluentinc\/cp-kafka:latest\r\n    hostname: kafka-broker3\r\n    depends_on:\r\n      - zookeeper\r\n    ports:\r\n      - 39092:39092\r\n    environment:\r\n      KAFKA_BROKER_ID: 3\r\n      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\r\n      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT:\/\/kafka-broker3:9092,PLAINTEXT_HOST:\/\/${HOST}:39092\r\n      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT\r\n      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT\r\n      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3\r\n<\/pre>\n<p>We&#8217;ve added the following, a KAFKA_BROKER_ID and the KAFKA_ADVERTISED_LISTENERS which references the newly added hostname. Just run up in docker-compose and the previous client code should work happily against this setup.<\/p>\n<p><strong>Code etc.<\/strong><\/p>\n<p>Code and docker compose files are available as part of my <a href=\"https:\/\/github.com\/putridparrot\/blog-projects\/tree\/master\/kafka-test\" rel=\"noopener\" target=\"_blank\">github blog-projects<\/a> repo.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Let&#8217;s take a look at running a test instance (as single instance) of Kafka and write a producer and consumer application in C# to interact with it. As is my preference, we&#8217;ll use docker to run up our instance of Kafka and in my case this is running on an Ubuntu server. Kafka running in [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_jetpack_memberships_contains_paid_content":false,"footnotes":""},"categories":[49,3,337],"tags":[],"class_list":["post-10135","post","type-post","status-publish","format-standard","hentry","category-net","category-c","category-kafka"],"jetpack_sharing_enabled":true,"jetpack_featured_media_url":"","_links":{"self":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/10135","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/comments?post=10135"}],"version-history":[{"count":5,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/10135\/revisions"}],"predecessor-version":[{"id":10154,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/posts\/10135\/revisions\/10154"}],"wp:attachment":[{"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/media?parent=10135"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/categories?post=10135"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/putridparrot.com\/blog\/wp-json\/wp\/v2\/tags?post=10135"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}