Aryan Singh's World

Data Science, Statistics, ML, Deep Learning

Comparing Different Queue Mechanisms

Recently I had a work problem to come up with a queuing system for inter-application data transfer. So I did a POC on the major available queueing systems in the market.

RabbitMQ 

RabbitMq is a queuing mechanism supporting both publish subscribe as well as p2p mechanism. RabbitMQ works on the top of the AMQP protocol. Some of the jargons related to the RabbitMq are as follows :

 

  • Producer : Producer is the rabbitMQ client that puts the data into the queue. There may be multiple producer putting the data into the same queue.
  • Exchange : Exchange is the mediator between the producer and the queue. Every message that is put on the queue is routed through the exchange. It’s just like our telephone exchange or the Internet Service Provider routing our message to the correct host or connecting our line to the correct telephone. Its various types are discussed later.  
  • Channel : Channel is the medium through which the data flows from the exchange to the queue. There can be multiple channels connected to the same queue.

 

Different types Of Exchanges :

    • Direct : A direct exchange is the one in which the messages are directed to the correct queue on the basis of the routing queue. The message is published along with a routing key. The message is routed to the queue where the routing key matches the name of the queue.
    • Fanout Exchange : In a fanout exchange all the messages are sent to all the queues connected to the exchange.

 

  • Topic Exchange : In a topic exchange one can decide which queue to write to by using wild cards which are as follows :

 

    • ‘*’ : 1 word for instance *.hello.* means one word before hello and one word after hello. This message will go to the queues with such a name.
    • ‘#’ : Any number of words for instance hello.# means any number of words after hello. This means that credit must be the first word in queue name.

I set up a cluster with master slave  a.k.a  active/active config with full sync between both the queues. The jargon for the same in rabbitMQ terms is mirroring of a queue.

rabbitmqui-2

I ran a test for 5 million messages of 1 kb each. Here is the matrix for the same :

Configuration Producer Threads Consumer Threads Producer Channels Consumer Channels Time Taken Prefetch Count Sync. Messages Size Auto Acknowledgement
Mirrored(Master/Slave) 10 10 10 1 13 minutes 35 seconds 5000 Fully 5 million 1 Kb FALSE

Kafka

 

Highly scalable persistent queues developed at Linkedin and powered by apache. It is easy to spawn multiple instances reading and writing to the same queue. Replication strategies and smooth master, slave transitions is a USP for kafka. The time to keep the data in the broker is configurable. The multiple instances are managed by zookeeper which is a workhorse instance manager out of hadoop stack. It works on a publish/subscribe model with producers publishing to a topic and consumers subscribing and polling from the same. It stores the messages in flat files and consumers ask messages based on an offset.

Architecture Used :

 

 

kafkadesign

    

Video Demonstration :

Features and Benchmarks :

  1. Ease of configuration : Pretty easy. Just set up zookeeper and Kafka and run their instances from command line or shell. Then set up a multi partitioned topic and we are ready to go. If one has to customize and tweak the network parameters like io.threads etc then need to change the server configuration files. Need thorough understanding of Kafka documentation for the same.
  1. Insertion and retrieval time: So I did my POC by using a custom Notification object. I wrote custom encoder and decoder for marshalling and unmarshalling of the objects while writing and reading from the kafka cluster. So here are the insights for the analysis:
  • Number Of Partitions : 4. Each with a replication factor of two.
  • Number of kafka servers/brokers : 2. So each server had 4 partitions
  • Producer : single threaded. Is thread safe. Kafka takes care of this internally.
  • Consumers : 1 consumer group with 4 consumers which equals the number of partitions as recommended by kafka folks.
  • Payload : 1.1 kb of custom json read as java object marshalled into json when writing to queue, then unmarshalled as java object at consumer end and written to a flat text file in the form of Json using Gson library.
  • Total number of records : 10 million
  • Writing from producer started at  : Mon May 02 18:58:35 IST 2016
  • Writing from producer ended at : end: Mon May 02 19:38:04 IST 2016
  • Reading at consumer end and writing to file  started at : Mon May 02 18:58:35 IST 2016
  • Reading at consumer end and writing to file  ended at : Mon May 02 20:23:31 IST 2016
  • Time taken to write : ~ 40 minutes
  • Time taken to read at consumer and write to a text file : ~83 minutes
  • Total data written ~ 11 gb.
  • Machine Used : Core i5 with 8 gigs of RAM.
  • Cpu Usage 70 % to 100 %. RAM Usage : 7.36 gb when the simultaneous write and read was going on.
  • Cpu Usage 44% to 52 %. RAM Usage : 6.22 gb when only the write operation was going on.
  • Rest of the params configured by me can be seen in the attached source code.
  • Partitioner Used : Round Robin partitioner.
  1. Fault Tolerance/Quality Of Service/ Reliability : Load balancing and fail back mechanism comes out of the box with Apache kafka. Can configure the number of retries and retry limit in the configuration of producer.  Still need to explore more on the acknowledgement of the messages from the consumer to producer. Multiple servers and partition make the mechanism scalable as well as pretty reliable.
  1. Accessibility from different JVMs. : Need to configure a single dependency in pom.xml. Once done just mentioned the port and host on which zookeeper and kafka instances are running the =n these can be accessed from pretty much any jvm in the world.
  1. Maintainence factor : Need to monitor a little bit. Generates a lots of logs which would have to be cleaned or suppressed. on the other hand this is also an advantage as everything is logged well and every exception on each instances can be retrieved.
  1. Optimization : Can be made incredibly fast by increasing number of partitions and spawning more servers. this is the usp of apache kafka.
  2. Need to go through the documentation thoroughly once more to understand the parameters so as to achieve best possible configuration if we use kafka for production purpose.
  3. Seems resource intensive when the simultaneous read and write are going on. Once they end does not cause any lag or system slowness.
  4. User Interface : Kafka Tool is a package available as a freeware which when bundled with kafka provides following features :
  • Quickly view all your Kafka clusters, including their brokers, topics and consumers
  • View contents of messages in your partitions and add new messages
  • View offsets of the Kafka consumers, including Apache Storm Kafka spout consumers
  • Show JSON and XML messages in a pretty-printed format
  • Add and drop topics plus other management features
  • Save individual messages from your Kafka partitions to local hard drive
  • Kafka Tool runs on Windows, Linux and Mac OS
  1. Acknowledgement :
  • First way is to get an acknowledgement at the producer end by using the future object and checking its metadata so as to check the metadata of sent message.
Future<RecordMetadata> ack = producer.send(new ProducerRecord<String, Notification>("part2", 
notification));

try {

RecordMetadata m = ack.get();

System.out.println("Message produced, offset: " + m.offset());

System.out.println("Message produced, partition : " + m.partition());

System.out.println("Message produced, topic: " + m.topic());

} catch (InterruptedException | ExecutionException e) {

e.printStackTrace();

}

 

  • Second is to set request.required.acks = 1 for 1 replica and -1 for all replicas so that each broker notifies the producer of successful delivery.
  • Now coming to the documentation of apache kafka about this is pretty clear. It’s a bit large but really important to read this through guys :

 

Keeping track of what has been consumed, is, surprisingly, one of the key performance points of a messaging system.

Most messaging systems keep metadata about what messages have been consumed on the broker. That is, as a message is handed out to a consumer, the broker either records that fact locally immediately or it may wait for acknowledgement from the consumer. This is a fairly intuitive choice, and indeed for a single machine server it is not clear where else this state could go. Since the data structure used for storage in many messaging systems scale poorly, this is also a pragmatic choice–since the broker knows what is consumed it can immediately delete it, keeping the data size small.

What is perhaps not obvious, is that getting the broker and consumer to come into agreement about what has been consumed is not a trivial problem. If the broker records a message as consumed immediately every time it is handed out over the network, then if the consumer fails to process the message (say because it crashes or the request times out or whatever) that message will be lost. To solve this problem, many messaging systems add an acknowledgement feature which means that messages are only marked as sent not consumed when they are sent; the broker waits for a specific acknowledgement from the consumer to record the message as consumed. This strategy fixes the problem of losing messages, but creates new problems.

First of all, if the consumer processes the message but fails before it can send an acknowledgement then the message will be consumed twice.

The second problem is around performance, now the broker must keep multiple states about every single message (first to lock it so it is not given out a second time, and then to mark it as permanently consumed so that it can be removed).

Tricky problems must be dealt with, like what to do with messages that are sent but never acknowledged.

Kafka handles this differently. Our topic is divided into a set of totally ordered partitions, each of which is consumed by one consumer at any given time. This means that the position of consumer in each partition is just a single integer, the offset of the next message to consume. This makes the state about what has been consumed very small, just one number for each partition. This state can be periodically checkpointed. This makes the equivalent of message acknowledgements very cheap.

There is a side benefit of this decision. A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.

Final Statistics

Kafka Kafka Kafka ActiveMQ
(Non-Persistent)
ActiveMQ
(Persistent- Customized settings)
RabbitMQ
(Non-Persistent)
RabbitMQ
(Persistent)
RabbitMQ RE-RUN
(Non-Persistent)
Number of Partitions 4 2 1 N/A N/A N/A 1 channel N/A
No. Of Brokers in Cluster __ 2 2 N/A N/A N/A 1 queue N/A
Replication __ 1 1 N/A N/A N/A No N/A
Producer Threads 1 1 1 1 1 1 1 1
Consumer Threads 4 4 4 1 1 1 1 1
Payload 1.1 kb 1.1 kb 1.1 kb 1.0 kb 1.0 kb 1.0 kb 1.0 kb 1.0 kb
Total Records 10 Million 10 Million 10 Million 10 Million 10 Million 10 Millon 10 Millon 10 Millon
Time taken to write ~ 40 Minutes ~ 20 Minutes ~ 15 Minutes ~ 25 Minutes ~ 45 Minutes ~ 39 Minutes ~ 30 minutes ~ 2 Hours 20 Minutes
Time taken to read ~ 83 Minutes ~ 20 Minutes ~ 20 Minutes ~ 40 Minutes ~ 86 Minutes ~ 2 Hours 15 Minutes ~ 30 minutes ~ 2 Hours 15 Minutes
Comments used following attributes in conf/activemq.xml
<persistenceAdapter>
<kahaDB .. enableJournalDiskSyncs=”false” preallocationStrategy=”zeros”/> </persistenceAdapter>
Used Custom
acknowledgements
and failover
handling both at
consumer and
producer side.
System Info and Performance
Processor and RAM Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Core i5 64bit
8 RAM
Cpu Usage read/write 70% to 100% 70% to 100% 70% to 100% 61 % to 90 %. 35% to 72%. 39% to 70% 39% to 70% 39% to 70%
Cpu Usage write 44% to 52% 44% to 52% 44% to 52% 55% to 80% 20% to 30% 16% to 32% 18% to 48% 16% to 32%
RAM Usage read/write 7.36 GB 7.36 GB 7.36 GB 6.59 GB 6.52 GB 4.45 GB 3.77 GB 4.45 GB
RAM Usage write 6.22 GB 6.22 GB 6.22 GB 6.50 GB 6.10 GB 4.17 GB 4.60 GB 4.17 GB

Source Code :

Queue POCs

Verdict :

While rabbitMQ and Kafka are the front runners in the persistent queue alternatives. I personally recommend RabbitMQ due to its great monitoring support. Also RabbitMQ is less CPU intensive as compared to Kafka.

Gloria : An IoT Enabled Chatbot Ecosystem.

Over the years, inanimate things talking has been an intriguing subject. Movies like Beauty And The Beast have added an uncanny whimsy around the subject. Five years ago if somebody would have told me that your fridge will talk to you one day and tell you when you need to buy milk and vegetables, I would have shrugged my shoulders and laughed it off too. But the advancement in artificial intelligence and natural language processing has brought this dream within the cusps of reality. Simultaneously the development in the internet of things sphere has provided a medium of communication to all the devices surrounding us. Gloria is an ecosystem that lies at the crossroads of these three incredible phenomena. It combines AI, NLP, and IoT into a compact system that lets you control everything surrounding you at the command of your voice.

 

gloria_flow_diagram-2

Gloria Architecture and Flow

An android application listens for the voice commands from the user. It converts these voice commands to text using android.speech library. A server hosts a Mosquitto message broker and a java application with artificial intelligence markup language component. The text command is transferred to the java application via the MQTT Message Broker. This message is processed by the AIML component and then the corresponding reply is sent back to the smartphone over the MQTT broker. Also, this conversation is saved into the Cassandra database for future references. Depending on the reply received the smartphone either goes to Google API to fetch information requested by the user like weather and news or controls one of the appliances via the Bluetooth Low Energy and Arduino circuit. Simultaneously, the android application converts this textual reply to speech and conveys it back to the user. Thus Gloria helps the user to get the information he needs and empowers him to control his environment just by giving vocal commands to a mobile application.

Videos:

Gloria being an ecosystem has various applications and use cases :

  1. Helping the differently abled and old people to control the appliances without any hindrance.
  2. Helps the retail business owners by providing a touch of personalization to their customers. A customer could request information like store location, merchandise availability, and various other FAQs and would get the detailed information in the human voice.
  3. Moreover, such devices could be placed in stores where user instead of looking for an employee asks about the product he/she wants into one of the android/ios devices placed on the kiosk. The device not only points the customer to the exact location of the product, it also tells the customer about the offers on that product.
  4. Smart homes and offices are two of the major applications.
  5. Voice controlled security solutions with theft monitoring.
  6. Set reminders for meetings and alarms to wake up.
  7. Retrieve any information from the internet.
  8. Learn what user tells it and then give suggestions on basis of that.
  9. Auto-replies to calls with customised messages.

Awards :

  1. Winner of Global POC Challenge 2016.

globalpocwin

2.  Fourth most powerful smart city idea at Nasscom’s TachNgage Smart Cities Hackathon.

img-20160924-wa0007              NasscomTechNgage

Source Code :

Gloria Source Code.

Electrack : Electricity Tracking Powered By DS and IoT

Electrack is a smart electricity management ecosystem which helps to track the electricity consumption, predict the future consumption and track down the losses and theft of electricity. It uses IoT paired with Big Data analytics to achieve this.

  1. Smart Meter measures the units of electricity consumed , which it transfers to the server using IoT . Based on this data the server calculates the bill to be paid for this month and reports to the Mobile App of the user. Users can also pay their bill through the mobile application.
  2. Apart from this, users can track their electricity consumption for current as well as past months. Users can also visualize their daily consumption based on real time data.
  3. The USP of Electrack is the analytics algorithm which predicts the future consumption based on current & past usage. Another interesting feature is anomaly detection, which detects theft, electricity faults, wastage based on real time data analysis.

Architectural Flow Diagram :

 

Electrack_Architecture

Checkout the source code at: https://github.com/aryancodify/Electrack

To Run The App : Download the source code zip It contains Electrack.apk Install it in the device

Go to sign in screen and sign in with following credentials:

User Module : Username: U1 Password : electra1234 Admin Module Username: U3 Password : electra1234

P.S. When first time you install the app and run it, it will take some time 30sec to 1 min as it fetches the location and does some background processing, so give it a minute. Once it runs for the first time it will start instantaneously.

Awards :

Honourable mention at Nasscom’s hackathon TachNgage.

techngage

Nasscom TechNgage