Contents

How to Build a Real-Time Data Pipeline with Redis Streams

Redis Streams provide a lightweight, self-hosted alternative to Apache Kafka for building event-driven data pipelines. They offer append-only log semantics, consumer groups with acknowledgment tracking, and sub-millisecond latency on a single Redis 7.4+ instance. You write producers that XADD events to a stream, create consumer groups with XGROUP CREATE, and read with XREADGROUP in Python using redis-py , getting at-least-once processing semantics through manual acknowledgment (XACK) and a pending entry list (PEL) that tracks unacknowledged messages.

What follows covers stream primitives, consumer groups with failure recovery, a complete producer/consumer pipeline with dead-letter queue handling, and the operational practices for keeping Redis Streams healthy in production.

Redis Streams vs. Kafka: When the Lightweight Option Wins

Kafka is the default answer when someone says “event streaming,” and for good reason. It handles multi-datacenter replication, petabyte-scale retention, and has a massive ecosystem of connectors through Kafka Connect . But Kafka also demands infrastructure: a minimum three-node cluster running KRaft (or the older ZooKeeper coordination), JVM tuning, topic partition planning, and dedicated operational expertise. For many teams, that is a lot of overhead when the actual requirement is “process 50K events per second between three microservices.”

Redis Streams, introduced in Redis 5.0 and mature in Redis 7.4+, implement an append-only log data structure with entry IDs based on <millisecondsTime>-<sequenceNumber>, similar to Kafka’s offset model. The core abstraction is the same: producers append events, consumers read them in order, and consumer groups distribute the work across multiple workers.

A single Redis instance handles roughly 100,000 to 500,000 XADD operations per second depending on entry size and hardware. Kafka handles 1M+ messages per second per partition, but that number comes from a multi-broker cluster, not a single process. Benchmark results from real-world comparisons show Redis delivering roughly 3x higher throughput than Kafka for 100K message batches, though Kafka’s batching and compression advantages emerge at higher volumes.

Latency characteristics differ too. Redis Streams deliver sub-millisecond median latency for XADD and XREAD, but tail latency can spike under memory pressure - one benchmark measured a p99 spread of 0.56ms to 810ms. Kafka’s median latency sits higher (5-50ms typically) but its tail latency is tighter, with a measured spread of 19.68ms to 265.92ms. Redis optimizes for raw speed; Kafka optimizes for consistency.

On the storage side, Redis Streams use a radix tree of macro-nodes with memory-efficient encoding for fields that repeat across entries. A stream of 10 million events with 5 fields each typically consumes 2-4 GB of RAM depending on field sizes. That is manageable on modern hardware, but it means retention is bounded by available memory rather than disk.

FeatureRedis StreamsApache Kafka
Minimum infrastructureSingle Redis instance3-node KRaft cluster
Throughput (single node)100K-500K ops/secN/A (requires cluster)
Median latencySub-millisecond5-50ms
StorageIn-memory (RAM-bounded)Disk-based (unlimited retention)
Consumer groupsBuilt-in with PEL trackingBuilt-in with offset tracking
Multi-datacenter replicationManual (Redis Sentinel/Cluster)Built-in (MirrorMaker)
Connector ecosystemNoneKafka Connect (hundreds of connectors)

Choose Redis Streams when you already run Redis, your throughput stays under 500K events per second, you do not need multi-day retention, and you want to avoid operating a separate Kafka cluster. Choose Kafka when you need cross-datacenter replication, long-term event retention, or the Kafka Connect integration ecosystem.

Valkey 8.1 (the open-source Redis fork, currently at version 8.1.6) is a drop-in replacement with identical Streams support and no licensing concerns.

Stream Fundamentals: XADD, XREAD, and Stream IDs

Before building consumer groups, you need to understand the three primitives: adding entries, reading entries, and how stream IDs work.

XADD appends an entry to a stream:

XADD mystream * sensor temp value 22.5 unit celsius

The * tells Redis to auto-generate the entry ID using the current timestamp and a sequence number. The returned ID looks like 1679000000000-0 - milliseconds since epoch, dash, sequence number. You can specify an explicit ID for deterministic replay, but auto-generation is the standard approach.

Unlike Kafka messages, which are opaque byte arrays that require external serialization, Redis Stream entries are ordered dictionaries. Each entry has an ID and one or more field-value pairs with built-in structure. This means you can inspect entries directly from redis-cli without a deserializer.

Redis stream showing a producer appending timestamped entries with field-value pairs to a stream consumed downstream
A producer appends structured entries to a Redis stream, each identified by a millisecond-precision timestamp ID
Image: Redis Documentation

XREAD retrieves entries from one or more streams:

XREAD COUNT 10 BLOCK 5000 STREAMS mystream 0

This reads up to 10 entries starting from ID 0 (the beginning of the stream), blocking for up to 5 seconds if no entries are available. Use $ instead of 0 to read only entries that arrive after the command is issued.

For range queries and stream inspection:

XLEN mystream                              # total entry count
XRANGE mystream - +                        # all entries (start to end)
XRANGE mystream 1679000000000-0 + COUNT 100 # paginated from a specific ID

Trimming prevents unbounded growth:

XTRIM mystream MAXLEN ~ 1000000

The ~ allows Redis to trim efficiently by removing whole macro-nodes from the radix tree rather than entry by entry. The actual count may slightly exceed 1 million. You can also use MINID to trim entries older than a specific timestamp-based ID.

In Python with redis-py 7.1+:

import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

# Produce an entry
entry_id = r.xadd("mystream", {"sensor": "temp", "value": "22.5"})
print(f"Added entry: {entry_id}")

# Read entries from the beginning
entries = r.xread({"mystream": "0-0"}, count=10, block=5000)
for stream_name, stream_entries in entries:
    for entry_id, fields in stream_entries:
        print(f"  {entry_id}: {fields}")

Consumer Groups: Distributed Processing with Acknowledgments

Consumer groups are what turn Redis Streams from a simple log into a real data pipeline. They provide the same core abstraction as Kafka consumer groups: multiple consumers share the work of processing a stream, each message is delivered to exactly one consumer in the group, and unacknowledged messages are tracked for redelivery.

Create a consumer group:

XGROUP CREATE mystream mygroup $ MKSTREAM

The $ means start consuming only from entries that arrive after group creation. Use 0 to process the entire stream history. MKSTREAM creates the stream if it does not exist yet.

Read as a consumer within the group:

XREADGROUP GROUP mygroup consumer1 COUNT 10 BLOCK 5000 STREAMS mystream >

The > is significant - it means “give me entries that have not been delivered to any consumer in this group.” Each entry goes to exactly one consumer, distributing the workload automatically.

Redis consumer groups with multiple producers feeding a stream, classifier consumers using XREADGROUP/XACK, and independent XREAD consumers for alerting and data warehousing
Consumer groups distribute entries across workers via XREADGROUP while independent consumers read the full stream via XREAD
Image: Redis Documentation

After processing an entry, acknowledge it:

XACK mystream mygroup 1679000000000-0

This removes the entry from the consumer’s pending entry list (PEL). Until acknowledged, the entry remains pending and can be claimed by another consumer if the original one fails.

The PEL is central to reliability. Inspect it with:

XPENDING mystream mygroup

This returns the total pending count, the minimum and maximum pending entry IDs, and per-consumer counts. When a consumer crashes without acknowledging its entries, those entries sit in the PEL waiting for recovery.

Recovery happens through XAUTOCLAIM:

XAUTOCLAIM mystream mygroup consumer2 60000 0-0

This transfers entries that have been pending for more than 60 seconds (60000 milliseconds) to consumer2. It is the Redis Streams equivalent of Kafka’s consumer rebalance, but explicit rather than automatic - you decide when and how stale entries get reassigned.

The Python pattern for consumer group reading:

# Create the group (run once)
try:
    r.xgroup_create("mystream", "mygroup", id="$", mkstream=True)
except redis.exceptions.ResponseError as e:
    if "BUSYGROUP" not in str(e):
        raise  # group already exists, that's fine

# Read as a consumer
results = r.xreadgroup(
    "mygroup", "consumer1",
    {"mystream": ">"},
    count=10,
    block=5000
)

for stream_name, entries in results:
    for entry_id, fields in entries:
        print(f"Processing {entry_id}: {fields}")
        # ... process the entry ...
        r.xack("mystream", "mygroup", entry_id)

Building the Pipeline: Producer, Consumer, and Dead-Letter Queue

With the primitives in place, here is a complete pipeline with a producer, consumer workers, retry logic, and a dead-letter queue for poison messages.

The producer appends events with automatic stream trimming:

import json
import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

def produce_event(event_type: str, payload: dict):
    entry_id = r.xadd(
        "events",
        {
            "type": event_type,
            "version": "1",
            "payload": json.dumps(payload),
        },
        maxlen=1_000_000,  # cap stream at ~1M entries
    )
    return entry_id

# Example: order created event
produce_event("order.created", {"order_id": "12345", "total": 99.99})

Including a version field in every entry is a simple schema evolution strategy. Consumers check the version and apply the appropriate deserialization logic. When you need to change the payload structure, bump the version and update consumers to handle both old and new formats. If your producers are HTTP services, leaning on type-safe payload validation at the edge keeps malformed entries out of the stream in the first place. For more formal schema management, consider integrating Protobuf or Avro serialization with a schema registry, though for many Redis Streams use cases the version-field approach is sufficient.

The consumer worker runs an infinite loop with error handling:

import json
import logging
import redis

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

r = redis.Redis(host="localhost", port=6379, decode_responses=True)

MAX_RETRIES = 3
CONSUMER_NAME = "consumer1"
GROUP_NAME = "mygroup"
STREAM_NAME = "events"

def process_entry(entry_id: str, fields: dict):
    """Your business logic here."""
    event_type = fields["type"]
    payload = json.loads(fields["payload"])
    logger.info(f"Processing {event_type}: {payload}")
    # ... actual processing ...

def move_to_dlq(entry_id: str, fields: dict, error: str):
    """Move a poison message to the dead-letter queue."""
    r.xadd("dead_letters", {
        "original_id": entry_id,
        "original_stream": STREAM_NAME,
        "type": fields.get("type", "unknown"),
        "payload": fields.get("payload", "{}"),
        "error": error,
    })
    r.xack(STREAM_NAME, GROUP_NAME, entry_id)
    r.delete(f"retries:{entry_id}")
    logger.warning(f"Moved {entry_id} to dead-letter queue: {error}")

def consumer_loop():
    # Ensure group exists
    try:
        r.xgroup_create(STREAM_NAME, GROUP_NAME, id="$", mkstream=True)
    except redis.exceptions.ResponseError:
        pass

    while True:
        results = r.xreadgroup(
            GROUP_NAME, CONSUMER_NAME,
            {STREAM_NAME: ">"},
            count=10,
            block=5000,
        )
        if not results:
            continue

        for stream_name, entries in results:
            for entry_id, fields in entries:
                # Check retry count
                retry_count = int(r.hget(f"retries:{entry_id}", "count") or 0)
                if retry_count >= MAX_RETRIES:
                    move_to_dlq(entry_id, fields, "max retries exceeded")
                    continue

                try:
                    process_entry(entry_id, fields)
                    r.xack(STREAM_NAME, GROUP_NAME, entry_id)
                    r.delete(f"retries:{entry_id}")
                except Exception as e:
                    logger.error(f"Failed to process {entry_id}: {e}")
                    r.hincrby(f"retries:{entry_id}", "count", 1)
                    # Do NOT ack - entry stays in PEL for XAUTOCLAIM

if __name__ == "__main__":
    consumer_loop()

On failure, the entry remains unacknowledged in the PEL. A retry counter stored in a Redis hash tracks how many times processing has been attempted. After MAX_RETRIES failures, the entry gets moved to a dead_letters stream where it can be inspected and reprocessed manually.

Reclaiming stale entries requires a separate process (or a periodic task within each consumer):

def reclaim_stale_entries():
    """Claim entries pending for more than 60 seconds."""
    result = r.xautoclaim(
        STREAM_NAME, GROUP_NAME, CONSUMER_NAME,
        min_idle_time=60000,  # 60 seconds
        start_id="0-0",
        count=50,
    )
    # result contains (next_start_id, claimed_entries, deleted_ids)
    if result[1]:
        logger.info(f"Reclaimed {len(result[1])} stale entries")

To scale, run multiple instances of the consumer worker with different consumer names (consumer1, consumer2, consumer3) in the same group. Redis distributes entries across them automatically. If one instance dies, XAUTOCLAIM in the surviving instances picks up its pending work. To verify this failover behavior before shipping, spin up a real Redis container in your test suite using the patterns from our guide on integration tests with Testcontainers .

Redis Insight UI showing a stream with consumer groups, pending entry counts, and idle times for each consumer
Redis Insight provides a visual interface for inspecting stream data, consumer groups, and pending entries
Image: Redis Insight Documentation

For exactly-once processing semantics, Redis Streams natively provide at-least-once delivery through the PEL and XACK mechanism. To get closer to exactly-once, make your consumers idempotent by checking a processed-IDs set before processing:

if r.sismember("processed", entry_id):
    r.xack(STREAM_NAME, GROUP_NAME, entry_id)
    continue
# ... process ...
r.sadd("processed", entry_id)
r.xack(STREAM_NAME, GROUP_NAME, entry_id)

Redis Pub/Sub vs. Redis Streams

Streams and Redis Pub/Sub both handle messaging but serve different purposes.

Pub/Sub is fire-and-forget broadcasting. Messages are not persisted. If a subscriber disconnects and reconnects, every message published during the disconnect is lost permanently. There is no acknowledgment, no replay, and no consumer groups. Latency is extremely low (sub-millisecond), making Pub/Sub ideal for real-time notifications where message loss is acceptable - chat presence indicators, cache invalidation signals, live dashboard updates.

Streams are persistent and acknowledged. Entries remain in the stream until explicitly trimmed. Consumers can disconnect, reconnect, and resume from where they left off. The PEL ensures nothing gets silently dropped. The tradeoff is slightly higher latency (typically 1-2ms overhead versus Pub/Sub) and memory consumption for the persisted entries.

AspectRedis Pub/SubRedis Streams
PersistenceNone (fire-and-forget)Append-only log (until trimmed)
Delivery guaranteeNoneAt-least-once (with XACK)
Consumer groupsNoYes
Message replayNot possibleRead from any point in history
Disconnect handlingMessages lostResume from last position
Typical latencySub-millisecond1-2ms

Use Pub/Sub for ephemeral notifications. Use Streams for anything that requires reliable processing.

Monitoring, Trimming, and Production Operations

A data pipeline that runs unmonitored will eventually fill memory or silently drop events. Here are the operational practices that keep Redis Streams healthy in production.

The two numbers that matter most are XLEN (total entries in the stream) and the pending count from XPENDING (delivered but unacknowledged entries). If XLEN grows beyond your expected retention window, consumers are falling behind. If the pending count grows, consumers are crashing without acknowledging.

Use XINFO to inspect stream and group state:

> XINFO STREAM mystream
 1) "length"
 2) (integer) 524387
 3) "radix-tree-keys"
 4) (integer) 1043
 5) "radix-tree-nodes"
 6) (integer) 2089
 7) "last-generated-id"
 8) "1679500000000-0"
 9) "entries-added"
10) (integer) 1250000
11) "first-entry"
12) 1) "1679400000000-0"
    2) 1) "sensor"  2) "temp"  3) "value"  4) "22.5"
13) "last-entry"
14) 1) "1679500000000-0"
    2) 1) "sensor"  2) "humidity"  3) "value"  4) "65.2"

> XINFO GROUPS mystream
1) 1) "name"           2) "mygroup"
   3) "consumers"      4) (integer) 3
   5) "pending"        6) (integer) 42
   7) "last-delivered-id"  8) "1679499990000-0"
   9) "entries-read"  10) (integer) 1249958
  11) "lag"           12) (integer) 42

The lag field in XINFO GROUPS is calculated as the difference between total entries added and entries read by the group. Alert when lag exceeds a threshold - 10,000 entries or 60 seconds of wall-clock time are reasonable starting points.

Memory management requires active trimming on the Redis side, and on the Python side it is worth profiling consumer memory usage so long-lived workers do not leak under sustained load. Run XTRIM either in a cron job or a dedicated trimmer process:

# Cap at approximately 500K entries
XTRIM mystream MAXLEN ~ 500000

# Or retain exactly 24 hours of data by ID
XTRIM mystream MINID ~ 1679400000000-0

The MINID approach is often more useful because it provides time-based retention regardless of throughput volume.

Expose metrics to Prometheus using redis_exporter or by querying INFO directly. The key metrics from INFO are used_memory, connected_clients, and the stream-specific statistics. Build Grafana dashboards that show stream length over time, consumer lag, pending entry count, and memory usage.

Grafana dashboard for redis_exporter showing uptime, connected clients, memory usage gauge, commands per second, hit/miss ratio, and network I/O graphs
The redis_exporter community Grafana dashboard provides at-a-glance Redis health metrics
Image: redis_exporter

Persistence matters for critical pipelines. Redis Streams are persisted through RDB snapshots and AOF logging like any other Redis data structure. For pipelines where data loss is unacceptable, configure:

appendonly yes
appendfsync everysec

This limits potential data loss to approximately one second of events on a crash. For even stronger guarantees, use appendfsync always at the cost of write throughput.

For high availability, use either Redis Sentinel (automatic failover of a single primary) or Redis Cluster (horizontal sharding). In Cluster mode, use hash tags to pin a stream and its related keys to the same shard:

XADD {pipeline1}:events * type order.created ...
SET {pipeline1}:retries:1679000000000-0 3

The {pipeline1} hash tag ensures both keys land on the same node, allowing multi-key operations. Be mindful that concentrating too much data behind a single hash tag creates a hot shard - balance data locality needs against even distribution across the cluster.

When the producers and consumers ship as containers, follow a container security checklist to run them as non-root with minimal base images so a compromised worker cannot pivot into the Redis instance.

A well-monitored Redis Streams pipeline with proper trimming, consumer group management, and alerting on lag and pending counts is a reliable, low-maintenance alternative to Kafka for moderate-scale event processing. Start with a single Redis instance, add Sentinel for failover, and you have a production-grade data pipeline running on infrastructure you probably already operate.