
Modern digital applications generate an enormous volume of data, which often needs to be processed, analyzed, and sometimes stored for future use. In such scenarios, a powerful data streaming platform like Apache Kafka proves invaluable. Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day.
To provide a hands-on understanding of how Kafka can be used in real-time data streaming applications, we are going to use OpenWeatherMap API, which provides a wealth of meteorological data, in conjunction with Kafka.
We'll break down our exploration into three parts, each corresponding to a different Python script:
app.py
fetches data from the OpenWeatherMap API and publishes it to a Kafka topic.consumer_script.py
consumes the weather data from the Kafka topic and prints it on the console.
storage_consumer.py
consumes the same weather data and stores it in an SQLite database.
Fetching and Publishing Weather Data - app.py
The app.py
script lies at the core of our system, acting as the data producer. Let's walk through the script:
Firstly, we import the necessary modules:
import requests
import json
import time
from kafka import KafkaProducer
from config import api_key
requests
module allows us to send HTTP requests.json
module enables us to work with JSON data - converting between JSON string and Python objects.time
module provides various time-related functions. We will use this module to pause our script.KafkaProducer
fromkafka
module is a class that helps to publish data to Kafka topics.api_key
is imported from a separateconfig
module and is used to authenticate our requests to OpenWeatherMap API.
Next, we define the get_weather(api_key, city)
function that retrieves weather data from the OpenWeatherMap API.
def get_weather(api_key, city):
base_url = "http://api.openweathermap.org/data/2.5/weather"
params = {
'q': city,
'appid': api_key,
'units': 'imperial'
}
response = requests.get(base_url, params=params)
weather_data = response.json()
return weather_data
Here, requests.get
sends a GET request to the OpenWeatherMap API endpoint, passing the city name, API key, and temperature unit as parameters. The API responds with the weather data in JSON format, which is then converted to a Python dictionary using response.json()
.
The main function in app.py
initializes the Kafka producer and sets up a loop that fetches weather data and sends it to a Kafka topic:
def main():
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
city = 'Cleveland,OH,USA'
while True:
weather_data = get_weather(api_key, city)
print(f"Current temperature in {city} is {weather_data['main']['temp']}°F")
producer.send('weather_data', weather_data)
producer.flush()
time.sleep(1)
In the KafkaProducer
initialization, we specify the Kafka broker location (in this case, running on the same machine) and define a value_serializer
. The value_serializer
is a function that turns our Python objects into a format that can be sent across the network - here, we're converting the data to a JSON string and then encoding it to bytes.
We then define the city for which we want to fetch weather data and enter a continuous loop. Within this loop, we fetch the weather data, print the current temperature to the console, and send the data to a Kafka topic named 'weather_data'. After sending the data, we call producer.flush()
to ensure that all asynchronous messages are sent. We pause for a second before making the next request, providing a pseudo-real-time stream of weather data.
Consuming Weather Data - consumer_script.py
The consumer_script.py
receives the weather data from the Kafka topic and prints it on the console. Let's dissect the script:
We begin by importing the required modules:
from confluent_kafka import Consumer, KafkaError
import json
Here, Consumer
and KafkaError
are imported from the confluent_kafka
module, and we import the json
module to handle JSON data.
The main function in consumer_script.py
creates a Kafka Consumer instance and sets up a loop that continuously polls for new messages:
def main():
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
'receive.message.max.bytes': 1e25
})
c.subscribe(['weather_data'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
weather_data = json.loads(msg.value().decode('utf-8'))
print(weather_data)
The Kafka Consumer configuration requires us to specify the Kafka broker location, consumer group id, offset reset policy, and the maximum bytes the consumer can receive per message. The consumer group allows you to divide the processing of a topic's messages over multiple consumer instances, which belong to the same group. The offset reset policy is set to 'earliest' to start consuming messages from the beginning if no previous offset is found.
In the polling loop, the consumer checks for messages every second. If a message is present, it is decoded from bytes to a JSON string and then converted into a Python dictionary. If the message represents an error, it's checked to see if it is an end of partition event, signaling that the consumer has read all messages from the current partition and can continue. If the error is not an end of partition event, the error is printed, and the loop is broken.
Storing Weather Data - storage_consumer.py
The storage_consumer.py
script functions similarly to consumer_script.py
, but instead of printing the data, it stores it in an SQLite database. Let's look at the script in more detail:
The script begins by importing the necessary modules:
import sqlite3
from confluent_kafka import Consumer, KafkaError
import json
Here, sqlite3
is used to interact with an SQLite database, Consumer
and KafkaError
from confluent_kafka
are used to consume messages from Kafka, and json
is used to handle JSON data.
Next, the script establishes a connection to the SQLite database and creates a table if it doesn't exist:
def main():
conn = sqlite3.connect('weather.db')
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS weather_data (
city TEXT,
weather TEXT,
temperature REAL,
timestamp INTEGER
)
''')
The table weather_data
has four columns: city
, weather
, temperature
, and timestamp
.
The script then initializes a Kafka consumer, subscribing it to the 'weather_data' topic:
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'weather_group',
'auto.offset.reset': 'earliest',
'receive.message.max.bytes': 1e25
})
consumer.subscribe(['weather_data'])
The configurations are similar to those in consumer_script.py
. The consumer subscribes to the 'weather_data' topic and is ready to consume the messages.
Next, the script enters a continuous loop, polling for new messages:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
weather_data = json.loads(msg.value().decode('utf-8'))
c.execute("INSERT INTO weather_data VALUES (?, ?, ?, ?)",
(weather_data['name'],
weather_data['weather'][0]['description'],
weather_data['main']['temp'],
weather_data['dt']))
conn.commit()
The consumer polls for new messages every second. If a message is received, it's parsed from a JSON string back into a Python dictionary. If the message is an error, the script checks whether it's an 'end of partition' event; if it isn't, it breaks the loop.
The script extracts the city name, weather description, temperature, and timestamp from the weather data and inserts these into the 'weather_data' table in the SQLite database. After each insertion, the script commits the changes to the database to ensure that the data is saved.
The SQLite database allows you to query the data later and perform various analyses. You can access the full code here.
Conclusion
This comprehensive walkthrough demonstrates how to use Apache Kafka for real-time data processing, utilizing the OpenWeatherMap API for data sourcing. The application fetches, publishes, consumes, and stores weather data in a near real-time manner.
This is a basic setup and a real-world scenario might require more sophisticated configurations such as additional error handling, data validation, security considerations, and Kafka's more advanced features like partitioning and replication to handle large data volumes and ensure fault tolerance.
Nonetheless, this tutorial should serve as a solid foundation for developing real-time data processing applications using Apache Kafka and Python. As a next step, you could extend this system to support additional data sources, consumers, or processing stages, enhancing its capabilities according to your specific requirements.
Thanks for this, Sohail. Would you mind sharing github link for this code reference