Even the best machine learning model is not helpful if it finds fraud ten minutes after the money is gone. In real life, data is always moving, coming from things like user clicks, IoT sensors, and financial transactions. Today, we’ll close that gap by building a real-time streaming analytics pipeline with Apache Kafka and Python.
What is Kafka?
Apache Kafka is a distributed event streaming platform. When I talk to junior engineers, I tell them to think of Kafka not as a database, but as a fast, reliable conveyor belt for data.
With Kafka, systems can talk to each other right away, instead of one app saving data to a table and another app reading it later.
Here are the three main components you need to understand:
- Producer: The system generating the data. In our case, a Python script simulates live financial transactions.
- Topic: This is the channel, or conveyor belt, where data is sent. Producers send data to topics, and consumers read from them.
- Consumer: This is the system that reads the data. It’s where analytics or machine learning inference takes place.
Why use Kafka instead of sending data straight from one app to another? Kafka separates your systems. If your machine learning API stops working for a few minutes, the Producer keeps running. Kafka saves the messages in the Topic, and the Consumer can continue from where it stopped once it’s back online.
Building Your Real-Time Streaming Analytics Pipeline using Kafka
To get started, make sure you have Docker and Python set up on your computer. We’ll use the confluent_kafka library, a fast Python client for Kafka. Install it with:
pip install confluent-kafka
Step 1: Setting up Kafka with Docker
First, let’s start a local Kafka broker with Docker Compose. Create a file called docker-compose.yml:
services:
kafka:
image: apache/kafka:3.7.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTWe’re using a recent Kafka image (v3.7.0) with Kafka Raft (KRaft). In the past, Kafka needed ZooKeeper to manage its state. KRaft removes that extra step, so setup is simpler.
To start the broker, open your terminal and run:
docker compose up -d
Step 2: Creating the Kafka Topic
Before sending data, we need a topic to store it. Let’s create a topic named transactions. Run this command inside your Kafka container:
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic transactions \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1
In production, you’d use more partitions and higher replication to protect your data if a server fails. For local testing, using 1 is fine.
Step 3: Writing the Producer
Next, let’s make some data. Write a Python script to simulate users making purchases and send those events to Kafka. Name the file producer.py:
import json
import time
import random
from confluent_kafka import Producer
# Configuration for the Kafka Producer connecting to our local broker
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)
def delivery_report(err, msg):
""" Callback triggered when a message is successfully delivered or fails. """
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Produced to topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
# Simulate a live data stream
topic_name = "transactions"
user_ids = [101, 102, 103, 104, 105]
print("Starting transaction stream...")
try:
while True:
# Create a mock transaction dictionary
transaction = {
"user_id": random.choice(user_ids),
"amount": round(random.uniform(5.0, 1500.0), 2),
"timestamp": time.time()
}
# Convert dictionary to JSON string, encode to bytes, and send to Kafka
producer.produce(
topic=topic_name,
value=json.dumps(transaction).encode('utf-8'),
callback=delivery_report
)
# Flush batches the messages for efficiency
producer.poll(0)
time.sleep(0.5) # Send 2 messages per second
except KeyboardInterrupt:
print("Stopping stream.")
finally:
# Ensure all messages are sent before closing
producer.flush()Step 4: Writing the Consumer
Right now, our Producer is sending data, but nothing is reading it. We need a Consumer to read the stream and apply some logic. In this script, we’ll listen to the transactions topic and alert if a transaction is over $1,000. Create a file called consumer.py:
import json
from confluent_kafka import Consumer, KafkaException
# Configure the Consumer
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'analytics-group-v2',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True
}
consumer = Consumer(conf)
consumer.subscribe(['transactions'])
print("Listening for transactions...")
try:
while True:
# Poll for new messages every 1 second
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
try:
# Decode the bytes back into a JSON object
data = json.loads(msg.value().decode('utf-8'))
user_id = data.get('user_id')
amount = data.get('amount')
print(f"Processed: User {user_id} spent ${amount}")
# Simple anomaly detection logic
if amount and amount > 1000:
print(f"⚠️ ANOMALY DETECTED: Large transaction of ${amount} by User {user_id}!")
except json.JSONDecodeError:
print("Invalid JSON received")
except Exception as e:
print(f"Processing error: {e}")
except KeyboardInterrupt:
print("Shutting down consumer...")
finally:
# Always close the consumer cleanly to release resources
consumer.close()Pay attention to group.id and auto.offset.reset in the settings. Kafka tracks what you’ve read with offsets. If your consumer restarts, it uses the group ID to pick up where it left off, so you don’t process the same transaction twice.
Step 5: Running the Pipeline
Open two terminal windows. In the first one, start your Consumer. It will connect to Kafka and wait for data:
python consumer.py
In the second terminal, start your Producer:
python producer.py
Go back to Terminal 1, and you’ll see data streaming in live. The anomaly detection will trigger right away when it finds a match:
Processed: User 104 spent $45.20
Processed: User 101 spent $340.50
Processed: User 103 spent $1250.75
⚠️ ANOMALY DETECTED: Large transaction of $1250.75 by User 103!
Processed: User 105 spent $12.99
Closing Thoughts
That’s how you set up a real-time streaming analytics pipeline with Apache Kafka and Python.
In this tutorial, we used a simple if statement for anomaly detection. But if you swap that out for a Scikit-Learn or PyTorch model, you’ll have a real-time, production-ready ML inference pipeline.
If you found this article helpful, you can follow me on Instagram for daily AI tips and practical resources. You may also be interested in my latest book, Hands-On GenAI, LLMs & AI Agents, a step-by-step guide to prepare you for careers in today’s AI industry.





