Real-Time Streaming Analytics using Kafka

Even the best machine learning model is not helpful if it finds fraud ten minutes after the money is gone. In real life, data is always moving, coming from things like user clicks, IoT sensors, and financial transactions. Today, we’ll close that gap by building a real-time streaming analytics pipeline with Apache Kafka and Python.

What is Kafka?

Apache Kafka is a distributed event streaming platform. When I talk to junior engineers, I tell them to think of Kafka not as a database, but as a fast, reliable conveyor belt for data.

With Kafka, systems can talk to each other right away, instead of one app saving data to a table and another app reading it later.

Here are the three main components you need to understand:

  1. Producer: The system generating the data. In our case, a Python script simulates live financial transactions.
  2. Topic: This is the channel, or conveyor belt, where data is sent. Producers send data to topics, and consumers read from them.
  3. Consumer: This is the system that reads the data. It’s where analytics or machine learning inference takes place.

Why use Kafka instead of sending data straight from one app to another? Kafka separates your systems. If your machine learning API stops working for a few minutes, the Producer keeps running. Kafka saves the messages in the Topic, and the Consumer can continue from where it stopped once it’s back online.

Building Your Real-Time Streaming Analytics Pipeline using Kafka

To get started, make sure you have Docker and Python set up on your computer. We’ll use the confluent_kafka library, a fast Python client for Kafka. Install it with:

pip install confluent-kafka

Step 1: Setting up Kafka with Docker

First, let’s start a local Kafka broker with Docker Compose. Create a file called docker-compose.yml:

services:
  kafka:
    image: apache/kafka:3.7.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093

      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092   

      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

We’re using a recent Kafka image (v3.7.0) with Kafka Raft (KRaft). In the past, Kafka needed ZooKeeper to manage its state. KRaft removes that extra step, so setup is simpler.

To start the broker, open your terminal and run:

docker compose up -d

Step 2: Creating the Kafka Topic

Before sending data, we need a topic to store it. Let’s create a topic named transactions. Run this command inside your Kafka container:

docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \
--create \
--topic transactions \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1

In production, you’d use more partitions and higher replication to protect your data if a server fails. For local testing, using 1 is fine.

Step 3: Writing the Producer

Next, let’s make some data. Write a Python script to simulate users making purchases and send those events to Kafka. Name the file producer.py:

import json
import time
import random
from confluent_kafka import Producer

# Configuration for the Kafka Producer connecting to our local broker
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

def delivery_report(err, msg):
    """ Callback triggered when a message is successfully delivered or fails. """
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Produced to topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")

# Simulate a live data stream
topic_name = "transactions"
user_ids = [101, 102, 103, 104, 105]

print("Starting transaction stream...")
try:
    while True:
        # Create a mock transaction dictionary
        transaction = {
            "user_id": random.choice(user_ids),
            "amount": round(random.uniform(5.0, 1500.0), 2),
            "timestamp": time.time()
        }
        
        # Convert dictionary to JSON string, encode to bytes, and send to Kafka
        producer.produce(
            topic=topic_name,
            value=json.dumps(transaction).encode('utf-8'),
            callback=delivery_report
        )
        
        # Flush batches the messages for efficiency
        producer.poll(0) 
        time.sleep(0.5) # Send 2 messages per second
        
except KeyboardInterrupt:
    print("Stopping stream.")
finally:
    # Ensure all messages are sent before closing
    producer.flush()

Step 4: Writing the Consumer

Right now, our Producer is sending data, but nothing is reading it. We need a Consumer to read the stream and apply some logic. In this script, we’ll listen to the transactions topic and alert if a transaction is over $1,000. Create a file called consumer.py:

import json
from confluent_kafka import Consumer, KafkaException

# Configure the Consumer
conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'analytics-group-v2',   
    'auto.offset.reset': 'earliest',    
    'enable.auto.commit': True
}

consumer = Consumer(conf)
consumer.subscribe(['transactions'])

print("Listening for transactions...")

try:
    while True:
        # Poll for new messages every 1 second
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            raise KafkaException(msg.error())

        try:
            # Decode the bytes back into a JSON object
            data = json.loads(msg.value().decode('utf-8'))

            user_id = data.get('user_id')
            amount = data.get('amount')

            print(f"Processed: User {user_id} spent ${amount}")

            # Simple anomaly detection logic
            if amount and amount > 1000:
                print(f"⚠️ ANOMALY DETECTED: Large transaction of ${amount} by User {user_id}!")

        except json.JSONDecodeError:
            print("Invalid JSON received")
        except Exception as e:
            print(f"Processing error: {e}")

except KeyboardInterrupt:
    print("Shutting down consumer...")

finally:
    # Always close the consumer cleanly to release resources
    consumer.close()

Pay attention to group.id and auto.offset.reset in the settings. Kafka tracks what you’ve read with offsets. If your consumer restarts, it uses the group ID to pick up where it left off, so you don’t process the same transaction twice.

Step 5: Running the Pipeline

Open two terminal windows. In the first one, start your Consumer. It will connect to Kafka and wait for data:

python consumer.py

In the second terminal, start your Producer:

python producer.py

Go back to Terminal 1, and you’ll see data streaming in live. The anomaly detection will trigger right away when it finds a match:

Processed: User 104 spent $45.20
Processed: User 101 spent $340.50
Processed: User 103 spent $1250.75
⚠️ ANOMALY DETECTED: Large transaction of $1250.75 by User 103!
Processed: User 105 spent $12.99

Closing Thoughts

That’s how you set up a real-time streaming analytics pipeline with Apache Kafka and Python.

In this tutorial, we used a simple if statement for anomaly detection. But if you swap that out for a Scikit-Learn or PyTorch model, you’ll have a real-time, production-ready ML inference pipeline.

If you found this article helpful, you can follow me on Instagram for daily AI tips and practical resources. You may also be interested in my latest book, Hands-On GenAI, LLMs & AI Agents, a step-by-step guide to prepare you for careers in today’s AI industry.

Aman Kharwal
Aman Kharwal

AI/ML Engineer | Published Author. My aim is to decode data science for the real world in the most simple words.

Articles: 2068

Leave a Reply

Discover more from AmanXai by Aman Kharwal

Subscribe now to keep reading and get access to the full archive.

Continue reading