Logo

The Data Daily

Kafka Python Tutorial for Fast Data Architecture - DZone Big Data

Last updated: 07-12-2018

Read original article here

Kafka Python Tutorial for Fast Data Architecture - DZone Big Data

Kafka Python Tutorial for Fast Data Architecture
DZone's Guide to
Kafka Python Tutorial for Fast Data Architecture
In this Kafka Python tutorial, we will create a Python application that will publish data to a Kafka topic and another app that will consume the messages.
by
Jul. 08, 18 · Big Data Zone ·
Free Resource
Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.
Fast Data Series Articles
Kafka Tutorial for Fast Data Architecture
Kafka Python Tutorial for Fast Data Architecture
This is the third article in my Fast Data Architecture series that walks you through implementing Bid Data using a SMACK Stack. This article builds on the others so if you have not read through those, I highly suggest you do so that you have the infrastructure you need to follow along in this tutorial.
This article will walk you though pulling website metrics from Clicky.com. I have another article where we will pull metrics from Google Analytics and publish the metrics to Apache Kafka: Kafka Python and Google Analytics .
In order to demonstrate how to analyze your big data, we will be configuring a big data pipeline that will pull site metrics from Clicky.com and push those metrics to a Kafka topic on our Kafka Cluster.
This is just one pipeline that you might want to implement in your Big Data Implementation. Website statistics can be a valuable part of your data as this can give you data about web site visitors, pages visited, etc. Combine this data with other data like social media shares when you perform your data analytics and you would be able to make some pretty neat business decisions about when is the best time for you to post site updates to social media in order to attract the most visitors. That is the main benefit of implementing big data: not necessarily the raw data itself but the knowledge you can extract from that raw data and make more informed decisions.
In this example, we will pull the 'pages' statistics from the Clicky.com API and push them to the admintome-pages Kafka topic. This will give us JSON data from AdminTome's top pages.
Clicky Web Analytics
In order to fully follow along in this article, you will need to have a website linked to Clicky.com. It's free so why not. Register your site at clicky.com . I personally use it because it has better metrics reporting for blogs (like abandon rate) than Google Analytics gives. You will need to add some code to your page so that clicky can start collecting metrics.
After your page is sending metrics to clicky you will need to get some values in order to use the Clicky API and pull metrics from our Python application. Go to preferences for your site and you will see two numbers that we will need:
Site ID
Site key
Don't publish these anywhere because they could give anyone access to your website data. We will need these numbers later when we connect to the API and pull our site statistics.
Preparing Kafka
First, we need to prepare our Kafka Cluster by adding a topic to our Kafka cluster that we will use to send messages to. As you can see from the diagram above, our topic in Kafka is going to be admintome-pages.
Login to the Mesos Master you ran Kafka-mesos from. If you followed the previous article, the master I used was mesos1.admintome.lab. Next, we will create the topic using the kafka-mesos.sh script:
$ cd kafka/ $ ./kafka-mesos.sh topic add admintome-pages --broker=0 --api=http://mslave2.admintome.lab:7000
Notice that the API parameter points to the Kafka scheduler we created using kafka-mesos in the last article. You can verify that you now have the correct topics:
$ ./kafka-mesos.sh topic list --api=http://mslave2.admintome.lab:7000 topics: name: __consumer_offsets partitions: 0:[0], 1:[0], 2:[0], 3:[0], 4:[0], 5:[0], 6:[0], 7:[0], 8:[0], 9:[0], 10:[0], 11:[0], 12:[0], 13:[0], 14:[0], 15:[0], 16:[0], 17:[0], 18:[0], 19:[0], 20:[0], 21:[0], 22:[0], 23:[0], 24:[0], 25:[0], 26:[0], 27:[0], 28:[0], 29:[0], 30:[0], 31:[0], 32:[0], 33:[0], 34:[0], 35:[0], 36:[0], 37:[0], 38:[0], 39:[0], 40:[0], 41:[0], 42:[0], 43:[0], 44:[0], 45:[0], 46:[0], 47:[0], 48:[0], 49:[0] options: segment.bytes=104857600,cleanup.policy=compact,compression.type=producer name: admintome partitions: 0:[0] name: admintome-pages partitions: 0:[0]
And there is our new topic ready to go! Now it's time to get to the fun stuff and start developing our Python application.
Now that we have Kafka ready to go we will start to develop our Kafka producer. The producer will get page metrics from the Clicky API and push those metrics in JSON form to our topic that we created earlier.
I assume that you have Python 3 installed on your system and virtualenv installed as well.
To get started we will need to setup our environment.
$ mkdir ~/Development/python/venvs $ mkdir ~/Development/python/site-stats-intake $ cd ~/Development/python/site-stats-intake $ virtualenv ../venvs/intake $ source ../venvs/intake/bin/activate (intake) $ pip install kafka-python requests (intake) $ pip freeze > requirements.txt
Next, we need to create our classes.
Clicky Class
We will create a new Python class called Clicky that we will use to interact with the Clicky API. Create a new file called clicky.py and add the following content:
import requests import json class Clicky(object): def __init__(self, site_id, sitekey): self.site_id = site_id self.sitekey = sitekey self.output = "json" def get_data(self, data_type): click_api_url = "https://api.clicky.com/api/stats/4" payload = {"site_id": self.site_id, "sitekey": self.sitekey, "type": data_type, "output": self.output} response = requests.get(click_api_url, params=payload) raw_stats = response.text return raw_stats def get_pages_data(self): data = self.get_data("pages") return json.loads(data)
Save the file and exit.
In order to get our metrics, we need to send an HTTP GET request to the Clicky API URL which is
https://api.clicky.com/api/stats/4
We also need to include several parameters:
site_id: This is the Site ID number that we got earlier.
sitekey: This is the Site key number that also got earlier.
type: To get our top pages we set the type to 'pages.'
output: We set this to "json" so that the API will return JSON data.
Finally, we call the request Python module to perform an HTTP GET to our API URL with the parameters we specified. In the get_pages_data method, we return a dict that represents our JSON data. Next, we will code our Kafka class implementation.
MyKafka Class
This class will interact with our Kafka cluster and push website metrics to our topic for us. Create a new file called mykafka.py and add the following content:
from kafka import KafkaProducer import json class MyKafka(object): def __init__(self, kafka_brokers): self.producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_brokers ) def send_page_data(self, json_data): self.producer.send('admintome-pages', json_data)
First, we import the kafka-python library, specifically the KafkaProducer class, that will let us code a Kafka producer and publish messages to our Kafka Topic.
from kafka import KafkaProducer
We now define our MyKafka class and create the constructor for it:
class MyKafka(object): def __init__(self, kafka_brokers):
This takes an argument that represents the Kafka brokers that will be used to connect to our Kafka cluster. This an array of strings in the form of:
[ "broker:ip", "broker:ip" ]
We will use only one broker where is the one we created in the last article: mslave1.admintome.lab:31000:
[ "mslave1.admintome.lab:31000" ]
We next instantiate a new KafkaProducer object named producer. Since we will be sending data to Kafka in the form of JSON we tell the KafkaProducer to use the JSON decoder dumps to parse the data using the value_serializer parameter. We also tell it to use our brokers with the bootstrap_servers parameter.
self.producer = KafkaProducer( value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_brokers )
Finally, we create a new method that we will use to send the messages to our admintome-pages topic:
def send_page_data(self, json_data): self.producer.send('admintome-pages', json_data)
That's all there is to it. Now we will write our Main class that will control everything.
Main Class
Create a new file called main.py and add the following content:
from clicky import Clicky from mykafka import MyKafka import logging import time import os from logging.config import dictConfig class Main(object): def __init__(self): if 'KAFKA_BROKERS' in os.environ: kafka_brokers = os.environ['KAFKA_BROKERS'].split(',') else: raise ValueError('KAFKA_BROKERS environment variable not set') if 'SITE_ID' in os.environ: self.site_id = os.environ['SITE_ID'] else: raise ValueError('SITE_ID environment variable not set') if 'SITEKEY' in os.environ: self.sitekey = os.environ['SITEKEY'] else: raise ValueError('SITEKEY environment variable not set') logging_config = dict( version=1, formatters={ 'f': {'format': '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'} }, handlers={ 'h': {'class': 'logging.StreamHandler', 'formatter': 'f', 'level': logging.DEBUG} }, root={ 'handlers': ['h'], 'level': logging.DEBUG, }, ) self.logger = logging.getLogger() dictConfig(logging_config) self.logger.info("Initializing Kafka Producer") self.logger.info("KAFKA_BROKERS={0}".format(kafka_brokers)) self.mykafka = MyKafka(kafka_brokers) def init_clicky(self): self.clicky = Clicky(self.site_id, self.sitekey) self.logger.info("Clicky Stats Polling Initialized") def run(self): self.init_clicky() starttime = time.time() while True: data = self.clicky.get_pages_data() self.logger.info("Successfully polled Clicky pages data") self.mykafka.send_page_data(data) self.logger.info("Published page data to Kafka") time.sleep(300.0 - ((time.time() - starttime) % 300.0)) if __name__ == "__main__": logging.info("Initializing Clicky Stats Polling") main = Main() main.run()
The end state of this example is to build a Docker container that we will then run on Marathon. With that in mind, we don't want to hardcode some of our sensitive information (like our clicky site id and site key) in our code. We want to be able to pull those from environment variables. If they are not set then we through an exception and exit out.
if 'KAFKA_BROKERS' in os.environ: kafka_brokers = os.environ['KAFKA_BROKERS'].split(',') else: raise ValueError('KAFKA_BROKERS environment variable not set') if 'SITE_ID' in os.environ: self.site_id = os.environ['SITE_ID'] else: raise ValueError('SITE_ID environment variable not set') if 'SITEKEY' in os.environ: self.sitekey = os.environ['SITEKEY'] else: raise ValueError('SITEKEY environment variable not set')
We also configure logging so that we can see what is going on with our application. I have coded an infinite loop in our code that will poll clicky and push the metrics to our Kafka topic every five minutes.
def run(self): self.init_clicky() starttime = time.time() while True: data = self.clicky.get_pages_data() self.logger.info("Successfully polled Clicky pages data") self.mykafka.send_page_data(data) self.logger.info("Published page data to Kafka") time.sleep(300.0 - ((time.time() - starttime) % 300.0))
Save the file and exit.
Running Our Application
To test that everything works you can try running the application after you set your environment variables:
(intake) $ export KAFKA_BROKERS="mslave1.admintome.lab:31000" (intake) $ export SITE_ID="{your site id}" (intake) $ export SITEKEY="{your sitekey}" (intake) $ python main.py 2018-06-25 15:34:32,259 root INFO Initializing Kafka Producer 2018-06-25 15:34:32,259 root INFO KAFKA_BROKERS=['mslave1.admintome.lab:31000'] 2018-06-25 15:34:32,374 root INFO Clicky Stats Polling Initialized 2018-06-25 15:34:32,754 root INFO Successfully polled Clicky pages data 2018-06-25 15:34:32,755 root INFO Published page data to Kafka
We are now sending messages to our Kafka Topic! We will build our Docker container next and deploy it to Marathon. Finally, we will wrap up by writing a test consumer that will get our messages from our topic.
I have created a GitHub repository for all the code used in this article: https://github.com/admintome/clicky-state-intake
Now that we have our application code written, we can create a Docker container so that we can deploy it to Marathon. Create a Dockerfile file in your application directory with the following contents:
FROM python:3 WORKDIR /usr/src/app COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt COPY . . CMD [ "python", "./main.py" ]
Build the container
$ docker build -t {your docker hub username}site-stats-intake .
After the Docker build is completed, you will want to push it to your Docker repository that your Mesos Slaves have access to. For me, this is Docker Hub:
$ docker push -t admintome/site-stats-intake
Then log in to each of your Mesos slaves and pull the image down:
$ docker pull admintome/site-stats-intake
We are now ready to create a Marathon application deployment for our application.
Go to your Marathon GUI.
http://mesos1.admintome.lab:8080


Read the rest of this article here