Recently I had a work problem to come up with a queuing system for inter-application data transfer. So I did a POC on the major available queueing systems in the market.

RabbitMQ 

RabbitMq is a queuing mechanism supporting both publish subscribe as well as p2p mechanism. RabbitMQ works on the top of the AMQP protocol. Some of the jargons related to the RabbitMq are as follows :

 

  • Producer : Producer is the rabbitMQ client that puts the data into the queue. There may be multiple producer putting the data into the same queue.
  • Exchange : Exchange is the mediator between the producer and the queue. Every message that is put on the queue is routed through the exchange. It’s just like our telephone exchange or the Internet Service Provider routing our message to the correct host or connecting our line to the correct telephone. Its various types are discussed later.  
  • Channel : Channel is the medium through which the data flows from the exchange to the queue. There can be multiple channels connected to the same queue.

 

Different types Of Exchanges :

    • Direct : A direct exchange is the one in which the messages are directed to the correct queue on the basis of the routing queue. The message is published along with a routing key. The message is routed to the queue where the routing key matches the name of the queue.
    • Fanout Exchange : In a fanout exchange all the messages are sent to all the queues connected to the exchange.

 

  • Topic Exchange : In a topic exchange one can decide which queue to write to by using wild cards which are as follows :

 

    • ‘*’ : 1 word for instance *.hello.* means one word before hello and one word after hello. This message will go to the queues with such a name.
    • ‘#’ : Any number of words for instance hello.# means any number of words after hello. This means that credit must be the first word in queue name.

I set up a cluster with master slave  a.k.a  active/active config with full sync between both the queues. The jargon for the same in rabbitMQ terms is mirroring of a queue.

rabbitmqui-2

I ran a test for 5 million messages of 1 kb each. Here is the matrix for the same :

Configuration Producer Threads Consumer Threads Producer Channels Consumer Channels Time Taken Prefetch Count Sync. Messages Size Auto Acknowledgement
Mirrored(Master/Slave) 10 10 10 1 13 minutes 35 seconds 5000 Fully 5 million 1 Kb FALSE

Kafka

 

Highly scalable persistent queues developed at Linkedin and powered by apache. It is easy to spawn multiple instances reading and writing to the same queue. Replication strategies and smooth master, slave transitions is a USP for kafka. The time to keep the data in the broker is configurable. The multiple instances are managed by zookeeper which is a workhorse instance manager out of hadoop stack. It works on a publish/subscribe model with producers publishing to a topic and consumers subscribing and polling from the same. It stores the messages in flat files and consumers ask messages based on an offset.

Architecture Used :

 

 

kafkadesign

    

Video Demonstration :

Features and Benchmarks :

  1. Ease of configuration : Pretty easy. Just set up zookeeper and Kafka and run their instances from command line or shell. Then set up a multi partitioned topic and we are ready to go. If one has to customize and tweak the network parameters like io.threads etc then need to change the server configuration files. Need thorough understanding of Kafka documentation for the same.
  1. Insertion and retrieval time: So I did my POC by using a custom Notification object. I wrote custom encoder and decoder for marshalling and unmarshalling of the objects while writing and reading from the kafka cluster. So here are the insights for the analysis:
  • Number Of Partitions : 4. Each with a replication factor of two.
  • Number of kafka servers/brokers : 2. So each server had 4 partitions
  • Producer : single threaded. Is thread safe. Kafka takes care of this internally.
  • Consumers : 1 consumer group with 4 consumers which equals the number of partitions as recommended by kafka folks.
  • Payload : 1.1 kb of custom json read as java object marshalled into json when writing to queue, then unmarshalled as java object at consumer end and written to a flat text file in the form of Json using Gson library.
  • Total number of records : 10 million
  • Writing from producer started at  : Mon May 02 18:58:35 IST 2016
  • Writing from producer ended at : end: Mon May 02 19:38:04 IST 2016
  • Reading at consumer end and writing to file  started at : Mon May 02 18:58:35 IST 2016
  • Reading at consumer end and writing to file  ended at : Mon May 02 20:23:31 IST 2016
  • Time taken to write : ~ 40 minutes
  • Time taken to read at consumer and write to a text file : ~83 minutes
  • Total data written ~ 11 gb.
  • Machine Used : Core i5 with 8 gigs of RAM.
  • Cpu Usage 70 % to 100 %. RAM Usage : 7.36 gb when the simultaneous write and read was going on.
  • Cpu Usage 44% to 52 %. RAM Usage : 6.22 gb when only the write operation was going on.
  • Rest of the params configured by me can be seen in the attached source code.
  • Partitioner Used : Round Robin partitioner.
  1. Fault Tolerance/Quality Of Service/ Reliability : Load balancing and fail back mechanism comes out of the box with Apache kafka. Can configure the number of retries and retry limit in the configuration of producer.  Still need to explore more on the acknowledgement of the messages from the consumer to producer. Multiple servers and partition make the mechanism scalable as well as pretty reliable.
  1. Accessibility from different JVMs. : Need to configure a single dependency in pom.xml. Once done just mentioned the port and host on which zookeeper and kafka instances are running the =n these can be accessed from pretty much any jvm in the world.
  1. Maintainence factor : Need to monitor a little bit. Generates a lots of logs which would have to be cleaned or suppressed. on the other hand this is also an advantage as everything is logged well and every exception on each instances can be retrieved.
  1. Optimization : Can be made incredibly fast by increasing number of partitions and spawning more servers. this is the usp of apache kafka.
  2. Need to go through the documentation thoroughly once more to understand the parameters so as to achieve best possible configuration if we use kafka for production purpose.
  3. Seems resource intensive when the simultaneous read and write are going on. Once they end does not cause any lag or system slowness.
  4. User Interface : Kafka Tool is a package available as a freeware which when bundled with kafka provides following features :
  • Quickly view all your Kafka clusters, including their brokers, topics and consumers
  • View contents of messages in your partitions and add new messages
  • View offsets of the Kafka consumers, including Apache Storm Kafka spout consumers
  • Show JSON and XML messages in a pretty-printed format
  • Add and drop topics plus other management features
  • Save individual messages from your Kafka partitions to local hard drive
  • Kafka Tool runs on Windows, Linux and Mac OS
  1. Acknowledgement :
  • First way is to get an acknowledgement at the producer end by using the future object and checking its metadata so as to check the metadata of sent message.
Future<RecordMetadata> ack = producer.send(new ProducerRecord<String, Notification>("part2", 
notification));

try {

RecordMetadata m = ack.get();

System.out.println("Message produced, offset: " + m.offset());

System.out.println("Message produced, partition : " + m.partition());

System.out.println("Message produced, topic: " + m.topic());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

 

  • Second is to set request.required.acks = 1 for 1 replica and -1 for all replicas so that each broker notifies the producer of successful delivery.
  • Now coming to the documentation of apache kafka about this is pretty clear. It’s a bit large but really important to read this through guys :

 

Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system.

Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.

What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems.

First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice.

The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed).

Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

Final Statistics

Kafka Kafka Kafka ActiveMQ
(Non-Persistent)
ActiveMQ
(Persistent- Customized settings)
RabbitMQ
(Non-Persistent)
RabbitMQ
(Persistent)
RabbitMQ RE-RUN
(Non-Persistent)
Number of Partitions 4 2 1 N/A N/A N/A 1 channel N/A
No. Of Brokers in Cluster __ 2 2 N/A N/A N/A 1 queue N/A
Replication __ 1 1 N/A N/A N/A No N/A
Producer Threads 1 1 1 1 1 1 1 1
Consumer Threads 4 4 4 1 1 1 1 1
Payload 1.1 kb 1.1 kb 1.1 kb 1.0 kb 1.0 kb 1.0 kb 1.0 kb 1.0 kb
Total Records 10 Million 10 Million 10 Million 10 Million 10 Million 10 Millon 10 Millon 10 Millon
Time taken to write ~ 40 Minutes ~ 20 Minutes ~ 15 Minutes ~ 25 Minutes ~ 45 Minutes ~ 39 Minutes ~ 30 minutes ~ 2 Hours 20 Minutes
Time taken to read ~ 83 Minutes ~ 20 Minutes ~ 20 Minutes ~ 40 Minutes ~ 86 Minutes ~ 2 Hours 15 Minutes ~ 30 minutes ~ 2 Hours 15 Minutes
Comments used following attributes in conf/activemq.xml
<persistenceAdapter>
<kahaDB .. enableJournalDiskSyncs=”false” preallocationStrategy=”zeros”/> </persistenceAdapter>
Used Custom
acknowledgements
and failover
handling both at
consumer and
producer side.
System Info and Performance
Processor and RAM Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Cpu Usage read/write 70% to 100% 70% to 100% 70% to 100% 61 % to 90 %. 35% to 72%. 39% to 70% 39% to 70% 39% to 70%
Cpu Usage write 44% to 52% 44% to 52% 44% to 52% 55% to 80% 20% to 30% 16% to 32% 18% to 48% 16% to 32%
RAM Usage read/write 7.36 GB 7.36 GB 7.36 GB 6.59 GB 6.52 GB 4.45 GB 3.77 GB 4.45 GB
RAM Usage write 6.22 GB 6.22 GB 6.22 GB 6.50 GB 6.10 GB 4.17 GB 4.60 GB 4.17 GB

Source Code :

Queue POCs

Verdict :

While rabbitMQ and Kafka are the front runners in the persistent queue alternatives. I personally recommend RabbitMQ due to its great monitoring support. Also RabbitMQ is less CPU intensive as compared to Kafka.