Kafka Sharp

The beginning

It all started by Fred George µService Architecture presentation at Öredev (http://vimeo.com/79866979)

Looking at the presentation give me one of those moments when I think: This is COOL, where can I use this concept. It have to be an application that produce data continuously. Why not use it for my home data collection and control system? Well it’s not 250k messages per second but it’s a steady stream of data.

So now I have an application to test the concept on. Heading down in the cellar to find a box to run Linux on because the Kafa server run best on Linux. As Linux is my second operation system I have to struggle a bit to get the Kafka service to actually run as a service together with the ZooKeeper service (the configuration service). I decided to go for the latest version at the time 0.8 and that gave me some more work later on, but I will get back to that. When everything was up and running you can start a couple of PuTTy terminals and run commands to set up a topic, produce data and consume it:

kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic kafkatopic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic –from-beginning

Then in another terminal:

kafka-console-producer.sh --broker-list localhost:9092 --topic kafkatopic

A c# client please

Well this is exiting for a while but there was a project to do. So to connect the data logger, running as a service written in C#, we need a C# client to connect to the bus. There are a Java client implementation bundled with the Kafka packages and there are clients in several languages here

But there are no C# client! I found one at github but it’s written for version 0.7. No problem I start with that code (thanks to that programmer!) and modify it to use Kafka 0.8 protocol. Well the changes between 0.7 and 0.8 was not minor! There is a good description of the protocol at the Apache Kafka wiki . As the work went on I have done a lot of refactoring and large parts of the code is new.

After getting all the code in place I mange to connect to the bus on the Linux box and both produce and consume messages from my c# client, but I haven’t found any way to create a new topic like the kafka-create-topic.sh do. There is a configuration on the broker to make it auto create a topic but that doesn’t work. OK I have the Java client running, so whay not start up the old reliable WireShark? After some digging in TCP conversations I found out that the Java client made two request for meta data on the topic before it produced on it. The first request returned an error saying that there are no leader broker on this topic but it also trigger a topic creation on the leader broker according to the server log. So the next request tell what leader and which partition the topic can be found on. After changing the producer to do that dance at startup, it work just fine.

First µ-service

At last it was time for a µservice! I decided to pick up data from sensors on the 1-Wire bus and put the data on the Kafka bus. Digging out the 1-Wire bus access code from the old logger (glad I made nice structured code there) and whip up some code to connect it to the Kafka bus resulted in less than 100 lines of code!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private const string TopicName = "OneWireSensor";
static readonly JavaScriptSerializer JavaScriptSerializer = new JavaScriptSerializer();
static readonly KafkaBusConnector BusConnector = new KafkaBusConnector("192.168.0.105", 9092, "KafkaConsole");
private const int UsbPortNumber = 1;
 
static void HandleSensorData(KafkaOneWireData data)
{
   var message = JavaScriptSerializer.Serialize(data);
   BusConnector.Produce(TopicName, -1, message);
}
 
static void Main(string[] args)
{
   var reader = new OwReader("{DS9490}", "USB" + UsbPortNumber);
   reader.Start(HandleSensorData);
   while (!Console.KeyAvailable)
   {
      Thread.Sleep(100);
   }
   reader.Close();
}

The future

My plan is to build a new logger and control Eco-system based on a couple of µ-services that handle a task like:

  • Read sensor data
  • Process data like averaging and filtering
  • React on a value change and send out a command on the bus
  • React to a command and turn on the floor heating
  • Collect data from the bus and store it in the RRD.
  • Etc.

 

To be continued….

 

 

 

 

Kommentera

E-postadressen publiceras inte. Obligatoriska fält är märkta *

Följande HTML-taggar och attribut är tillåtna: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>