Kafka is a high performance publish-subscribe messaging system, implemented as a distributed commit log. The message streams are organized into topics, topics are broken down furter into partitions.
This system is suitable for realtime applications, using Zookeeper as the strong consistency provider. The high level APIs are pretty easy to use but often misunderstood. I have spent some time trying to figure out why the libraries out there do not work as I thought but after a while I realized it is better to write a very simple library to wrap the functionality I need.
Shovel is a minimal wrapper around the Kafka client APIs. The code is mostly documented and type hinted for better performance. Kafka by default tolerates anyservice outages on the consumer side, meaning it is going to resume the operation when the broker comes back, however the producer just simply throws a connection refused exception. This behavior enables us to consume the messages with a simple blocking stream that blocks the execution when there are no new messages or the broker is down. Lets have a closer look how the consumer works.
The Kafka consumer consists of few things. First we need to get a ConsumerConnector to connect to the broker. I am using a hashmap for the configuration and convert it to java.util.Properties to create a ConsumerConfig and ConsumerConnector that is returned.
The ConsumerConnector can be used to get the message streams (java.util.ArrayList).
Message streams than consumed by a simple iterator. There is the Kafka way of doing that but also the more idiomatic way in Clojure, the later is how I implemented it.
Each message is a kafka.message.MessageAndMetadata that can be processed the following way:
The key and the message is a byte array than can be easily converted to a string.
A Kafka producer is similar to a consumer, there is a producer connector that can be used to send a message.
The kudos go to @nikore.