Getting Started with Apache Kafka and Python: A Simple Example
Apache Kafka is a distributed streaming platform that allows you to build real-time data pipelines and streaming applications. It is designed to handle high throughput, low latency, and fault tolerance. In this article, we will explore how to create a simple Kafka producer and consumer using the confluent-kafka-python
library. We will generate random names using the Faker
library and send them to a Kafka topic. Then, we will create a Kafka consumer that reads these names from the topic and prints them.
Prerequisites
Before diving into the code, make sure you have the following prerequisites:
Apache Kafka is installed and running on your local machine or a remote server.
Python 3.x installed on your local machine.
confluent-kafka-python
library installed. You can install it usingpip
:
pip install confluent-kafka
Faker
library installed. You can install it usingpip
:
pip install Faker
Kafka Producer
Let's start by creating a Kafka producer that generates random names using theFaker
library and sends them to a Kafka topic calledrealtime_messaging
. Here's the complete code for the Kafka producer:
# KafkaProducer.py
from confluent_kafka import Producer
import json
import time
from faker import Faker
fake = Faker()
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'names_group',
'auto.offset.reset': 'earliest',
'max.partition.fetch.bytes': 10 * 1024 * 1024 # 10 MB
}
producer = Producer(conf)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
while True:
name = fake.name()
message = {'name': name, 'timestamp': time.time()}
print(f"Generated name: {name}")
producer.produce('realtime_messaging', key=None, value=json.dumps(message), callback=delivery_report)
producer.flush()
time.sleep(1)
Importing Libraries
We start by importing the necessary libraries:
from confluent_kafka import Producer
import json
import time
from faker import Faker
confluent_kafka
provides the Producer
class, which we will use to create a Kafka producer. We also import json
to serialize the messages, time
to generate timestamps, and Faker
to generate random names.
Configuration
Next, we define the configuration for the Kafka producer:
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'names_group',
'auto.offset.reset': 'earliest',
'max.partition.fetch.bytes': 10 * 1024 * 1024 # 10 MB
}
The configuration includes the following parameters:
bootstrap.servers
: The address of the Kafka broker(s). In this example, we use a local broker running on the default port 9092.group.id
: A unique identifier for the consumer group. This is not required for the producer but is included for consistency with the consumer configuration.auto.offset.reset
: Determines how the consumer should behave when there is no initial offset or if the current offset does not exist on the server. In this example, we set it toearliest
, which means the consumer will start reading from the beginning of the topic.max.partition.fetch.bytes
: The maximum amount of data per partition the server should return for a fetch request. We set it to 10 MB to handle larger messages.
Creating the Producer
We create a Kafka producer using the Producer
class and the configuration we defined earlier:
producer = Producer(conf)
Delivery Report Callback
We define a callback function calleddelivery_report
that will be called once the message is successfully delivered or if an error occurs:
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
This function takes two arguments: err
and msg
. If err
is not None
, it means there was an error during message delivery, and we print the error message. If err
is None
, the message was successfully delivered, and we print the topic and partition where the message was delivered.
Main Loop
In the main loop, we generate random names using the Faker
library, create a message with the name and a timestamp, and send the message to the realtime_messaging
topic:
while True:
name = fake.name()
message = {'name': name, 'timestamp': time.time()}
print(f"Generated name: {name}")
producer.produce('realtime_messaging', key=None, value=json.dumps(message), callback=delivery_report)
producer.flush()
time.sleep(1)
We use the produce
method to send the message to the Kafka topic. The key
parameter is set to None
because we don't need a key for this example. The value
parameter contains the serialized message, and the callback
parameter specifies the delivery_report
function to be called once the message is delivered or if an error occurs. We call the flush
method to ensure that all messages in the producer queue are sent before the loop continues. Finally, we use time.sleep(1)
to pause the loop for one second before generating the next name.
Kafka Consumer
Now that we have a Kafka producer generating and sending random names to the realtime_messaging
topic, let's create a Kafka consumer that reads these names from the topic and prints them. Here's the complete code for the Kafka consumer:
# KafkaConsumer.py
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'names_group',
'auto.offset.reset': 'earliest',
'max.partition.fetch.bytes': 10 * 1024 * 1024,
'fetch.message.max.bytes': 10 * 1024 * 1024, # 10 MB
}
consumer = Consumer(conf)
consumer.subscribe(['realtime_messaging'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
else:
message_value = json.loads(msg.value().decode('utf-8'))
name = message_value['name']
print(f"Received name: {name}")
Importing Libraries
We start by importing the necessary libraries:
from confluent_kafka import Consumer, KafkaError
import json
confluent_kafka
provides the Consumer
class, which we will use to create a Kafka consumer. We also import json
to deserialize the messages.
Configuration
Next, we define the configuration for the Kafka consumer:
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'names_group',
'auto.offset.reset': 'earliest',
'max.partition.fetch.bytes': 10 * 1024 * 1024,
'fetch.message.max.bytes': 10 * 1024 * 1024, # 10 MB
}
The configuration includes the following parameters:
bootstrap.servers
: The address of the Kafka broker(s). In this example, we use a local broker running on the default port 9092.group.id
: A unique identifier for the consumer group. This is required for the consumer to participate in a consumer group.auto.offset.reset
: Determines how the consumer should behave when there is no initial offset or if the current offset does not exist on the server. In this example, we set it toearliest
, which means the consumer will start reading from the beginning of the topic.max.partition.fetch.bytes
: The maximum amount of data per partition the server should return for a fetch request. We set it to 10 MB to handle larger messages.fetch.message.max.bytes
: The maximum size of a message that the server should return for a fetch request. We set it to 10 MB to handle larger messages.
Creating the Consumer
We create a Kafka consumer using theConsumer
class and the configuration we defined earlier:
consumer = Consumer(conf)
We also subscribe to the realtime_messaging
topic using the subscribe
method:
consumer.subscribe(['realtime_messaging'])
Main Loop
In the main loop, we use the poll
method to read messages from the Kafka topic:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
else:
message_value = json.loads(msg.value().decode('utf-8'))
name = message_value['name']
print(f"Received name: {name}")
We use a timeout of 1 second (1.0
) for the poll
method to avoid blocking indefinitely. If msg
is None
, it means there are no messages to read, so we continue to the next iteration of the loop. If msg.error()
is not None
, it means there was an error during message consumption, and we print the error message. If msg.error()
is None
, the message was successfully consumed, and we deserialize the message using json.loads
. We extract the name
field from the message and print it.
Here are the images depicting the Producer and Consumer:
You can access the code here.
Conclusion
In this article, we explored how to create a simple Kafka producer and consumer using theconfluent-kafka-python
library. We generated random names using the Faker
library and sent them to a Kafka topic. Then, we created a Kafka consumer that read these names from the topic and printed them. This is just a simple example of what you can do with Apache Kafka and Python. You can use Kafka to build real-time data pipelines, stream processing applications, and more. With the confluent-kafka-python
library, you can easily integrate Kafka into your Python applications and take advantage of Kafka's scalability, fault tolerance, and low latency. I hope this article was helpful in getting you started with Kafka and Python. If you have any questions or feedback, feel free to leave a comment below.