Sinking Kafka Topic to MongoDB using Pymongo
Introduction
One way to store data in a non-relational form such as tweets from Twitter is to store it with non-relational databases such as MongoDB or Cassandra. On this occasion, the author will demonstrate how data storage Apache Kafka receives and then saves to MongoDB using the help of the pymongo package.
Prerequisites
· Apache Kafka
· Jupyter Notebook
· pymongo
Getting Started
Before you start the integration between Kafka and MongoDB, you are required to install pymongo using the following syntax:
Start by running Apache Kafka installed on your system. To do this, open your Ubuntu terminal then run Zookeeper which is in the Kafka folder.
Then, running one additional terminal tab, start up the Kafka server in the Kafka folder.
Then, create a topic kafka. In this tutorial, I use a topic called pizza-orders
When it’s finished, then run the Python Producer Program as the author has explained on this page. This program is useful for generating artificial data to be included in the Kafka Topic that we have created.
Then, run MongoDB and MongoDB Compass which are already installed on your system. The installation process for MongoDB can be done here and the MongoDB Compass installation process can be done here.
Apache Kafka and MongoDB integration with Pymongo
Next, create a new notebook (on a jupyter notebook) and fill it in according to the syntax below.
This programming block is a block for inserting modules needed by python in carrying out its duties as an intermediary between Kafka and MongoDB. MongoClient is a function that connects python with the location where MongoDB is located. Meanwhile, KafkaConsumer is a function where python can get data published by Kafka.
This programming block receives messages from Kafka’s broadcast. Here it can be seen that the message received is then converted into json form. name, shop, phoneNumber, address, and pizzas are adjusted to the data received. Then, the data from each array is then converted into a dictionary and then digested into MongoDB.
Happy coding :)
Reference
[1] A.M. Christonasis’ Twitter Sentiment Analysis Code