Advanced Topics: Redis Streams for Event Sourcing

In the “Publish/Subscribe” chapter, we learned about real-time, fire-and-forget messaging. While powerful for certain use cases, traditional Pub/Sub has a limitation: messages are not persisted. If a subscriber is offline, it misses messages. This is where Redis Streams come in.

Redis Streams, introduced in Redis 5.0, are a more robust, persistent, and highly scalable messaging solution. They are append-only data structures that act as a continuously growing log, similar in concept to Apache Kafka. Streams are ideal for:

  • Event Sourcing: Storing a complete, ordered sequence of all state-changing events in an application.
  • Persistent Queues: Ensuring messages are not lost and can be processed by consumers at their own pace.
  • Real-time Data Pipelines: Building complex data processing workflows.
  • Activity Feeds: Storing and retrieving user activities.

In this chapter, we’ll dive deep into Redis Streams, covering:

  • The fundamental concepts of Streams and Stream Entries.
  • How to add data to a Stream (XADD).
  • Different ways to read data from a Stream (XRANGE, XREAD).
  • The power of Consumer Groups (XGROUP, XREADGROUP) for distributed and fault-tolerant consumption.
  • Acknowledging messages (XACK) and handling pending entries.
  • Trimming Streams to manage memory (XTRIM).

1. Understanding Redis Streams

A Redis Stream is an append-only log where each new entry is assigned a unique, time-based ID. Once an entry is added, it cannot be modified or deleted individually (you can only delete entire stream entries by ID or trim the stream from the ends).

Each Stream Entry is essentially a small dictionary or hash, containing one or more field-value pairs.

key -> [ (ID1, {field1: value1, field2: value2}), (ID2, {fieldA: valueA}), ... ]

Key characteristics:

  • Append-only: New entries are always added to the end.
  • Immutable Entries: Once an entry is written, its content cannot be changed.
  • Unique IDs: Each entry has a unique ID, typically a timestamp (<millisecondsTime>-<sequenceNumber>). You can let Redis generate this ID automatically using *.
  • Persistence: Stream data is stored in memory and can be persisted to disk using RDB and AOF.
  • Multiple Consumers: Supports various consumption patterns, including Consumer Groups.

2. Producing to a Stream: XADD

The XADD command is used to add new entries to a stream.

XADD key [MAXLEN | MINID] [~] count *|ID field value [field value ...]

  • key: The name of the stream.
  • MAXLEN [~] count: (Optional) Limits the stream to approximately count elements. The ~ makes it an approximation, for better performance.
  • MINID [~] ID: (Optional) Evicts entries with IDs smaller than ID.
  • *|ID: The entry ID. Use * to let Redis automatically generate a unique, monotonically increasing ID. You can also specify a custom ID, but it must be greater than the last entry’s ID.
  • field value [field value ...]: One or more field-value pairs that make up the entry.

Node.js Example (Producer):

// stream_producer.js
const Redis = require('ioredis');
const redis = new Redis();

const sensorStreamKey = 'sensor_readings';

async function produceSensorData() {
  let sensorId = 1;
  setInterval(async () => {
    const temperature = (Math.random() * 50 + 10).toFixed(2); // 10.00 to 60.00
    const humidity = (Math.random() * 80 + 20).toFixed(2);   // 20.00 to 100.00
    const location = `room_${(sensorId % 3) + 1}`;

    try {
      // XADD key * field value [field value ...]
      // MAXLEN ~ 1000: Keep approx. 1000 entries to prevent unbounded growth
      const entryId = await redis.xadd(
        sensorStreamKey, 'MAXLEN', '~', 1000,
        '*', // Let Redis generate a unique ID
        'sensorId', sensorId++,
        'temperature', temperature,
        'humidity', humidity,
        'location', location
      );
      console.log(`Produced entry ${entryId}: temp=${temperature}, hum=${humidity}, loc=${location}`);
    } catch (err) {
      console.error('Error producing to stream:', err);
    }
  }, 1000); // Produce a new entry every second
}

// produceSensorData(); // Run this in one terminal
// To stop: Ctrl+C

Python Example (Producer):

# stream_producer.py
import redis
import time
import random

r = redis.Redis(host='localhost', port=6379, db=0)
sensor_stream_key = 'sensor_readings_py'

def produce_sensor_data():
    sensor_id = 1
    print("Python producer started. Producing sensor data...")
    try:
        while True:
            temperature = round(random.uniform(10.0, 60.0), 2)
            humidity = round(random.uniform(20.0, 100.0), 2)
            location = f"room_{((sensor_id % 3) + 1)}"

            # XADD key * field value [field value ...]
            # MAXLEN ~ 1000: Keep approx. 1000 entries
            entry_id = r.xadd(
                sensor_stream_key,
                {'sensorId': sensor_id, 'temperature': temperature, 'humidity': humidity, 'location': location},
                maxlen=1000,
                approximate=True # This corresponds to '~' in CLI
            )
            print(f"Produced entry {entry_id.decode('utf-8')}: temp={temperature}, hum={humidity}, loc={location}")
            sensor_id += 1
            time.sleep(1)
    except KeyboardInterrupt:
        print("\nPython producer stopped.")
    finally:
        r.close()

# produce_sensor_data() # Run this in one terminal
# To stop: Ctrl+C

3. Consuming from a Stream: XRANGE and XREAD

There are two primary ways to consume data from a stream for simple, non-grouped consumption.

a. XRANGE key start end [COUNT count] (Range Read)

Reads a range of entries from the stream, typically used for fetching historical data or a segment of the stream.

  • start, end: Entry IDs. Use - for the smallest possible ID and + for the largest possible ID to read the entire stream. You can also specify an exact ID to start from.
  • COUNT count: Limits the number of entries returned.

Node.js Example (XRANGE):

// stream_consumer_xrange.js
const Redis = require('ioredis');
const redis = new Redis();

const sensorStreamKey = 'sensor_readings';

async function readStreamRange() {
  try {
    console.log(`\n--- Reading all entries from ${sensorStreamKey} ---`);
    let entries = await redis.xrange(sensorStreamKey, '-', '+', 'COUNT', 10); // Get first 10 entries

    if (entries.length === 0) {
      console.log('No entries in stream yet. Run a producer first!');
    } else {
      console.log(`Fetched ${entries.length} entries:`);
      entries.forEach(([id, fields]) => {
        const decodedFields = Object.fromEntries(fields.map((_, i, arr) => (i % 2 === 0 ? [arr[i], arr[i + 1]] : null)).filter(Boolean));
        console.log(`  ID: ${id}, Data: ${JSON.stringify(decodedFields)}`);
      });
    }

    console.log('\n--- Reading the last 5 entries ---');
    // To get the last N entries, you need to know the highest ID or use a different approach.
    // For XRANGE, typically you fetch a known range.
    // A common pattern to get 'latest' with XRANGE is to fetch the full range, then take last N
    // Or, more efficiently, use 'XREVRANGE' with a count if you just need recent.
    let lastFiveEntries = await redis.xrevrange(sensorStreamKey, '+', '-', 'COUNT', 5);
    console.log(`Fetched ${lastFiveEntries.length} latest entries (reversed order):`);
    lastFiveEntries.forEach(([id, fields]) => {
      const decodedFields = Object.fromEntries(fields.map((_, i, arr) => (i % 2 === 0 ? [arr[i], arr[i + 1]] : null)).filter(Boolean));
      console.log(`  ID: ${id}, Data: ${JSON.stringify(decodedFields)}`);
    });


  } catch (err) {
    console.error('Error reading stream with XRANGE:', err);
  } finally {
    await redis.quit();
  }
}

// Ensure producer is running in another terminal
// readStreamRange(); // Run this in another terminal

b. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key ID [key ID ...] (Blocking Read)

Reads new entries from one or more streams, optionally blocking if no new entries are available.

  • COUNT count: Limits the number of entries per stream.
  • BLOCK milliseconds: If specified, blocks until new data is available or milliseconds timeout. 0 means block indefinitely.
  • STREAMS key ID [key ID ...]: Specifies which streams to read from and the ID from which to start reading. Use $ to read only new messages that arrive after the XREAD call.

Node.js Example (XREAD - simple consumer):

// stream_consumer_xread.js
const Redis = require('ioredis');
const redis = new Redis();

const sensorStreamKey = 'sensor_readings';
let lastId = '$'; // Start from the latest entry when connecting

async function consumeStream() {
  console.log(`Consumer started for stream '${sensorStreamKey}'. Waiting for new entries...`);
  try {
    while (true) {
      // XREAD BLOCK 0 STREAMS key ID
      const result = await redis.xread('BLOCK', 0, 'COUNT', 1, 'STREAMS', sensorStreamKey, lastId);

      if (result) {
        // Result format: [[streamName, [[id, [field1, value1, ...]], ...]]]
        const streamData = result[0];
        const entries = streamData[1];
        if (entries.length > 0) {
          entries.forEach(([id, fields]) => {
            const decodedFields = Object.fromEntries(fields.map((_, i, arr) => (i % 2 === 0 ? [arr[i], arr[i + 1]] : null)).filter(Boolean));
            console.log(`  New Entry [${id}]: ${JSON.stringify(decodedFields)}`);
            lastId = id; // Update lastId to read subsequent entries
          });
        }
      }
    }
  } catch (err) {
    console.error('Error consuming stream with XREAD:', err);
  } finally {
    await redis.quit();
  }
}

// Ensure producer is running in another terminal
// consumeStream(); // Run this in another terminal

4. Consumer Groups: Scalable and Fault-Tolerant Consumption

For serious production use cases, especially with distributed services, Consumer Groups are essential. A Consumer Group allows multiple consumers to cooperate in processing a stream, where each message is delivered to only one consumer in the group, ensuring distributed and fault-tolerant processing.

Key concepts for Consumer Groups:

  • Consumer Group: A named entity that tracks the last delivered message ID for its consumers.
  • Consumer: An instance (e.g., a microservice replica) that belongs to a Consumer Group.
  • Pending Entries List (PEL): For each consumer in a group, Redis keeps track of messages that have been delivered but not yet acknowledged. This is crucial for recovery.

Commands for Consumer Groups:

1. XGROUP CREATE key groupname *|$ [MKSTREAM]

Creates a new Consumer Group for a stream.

  • key: The stream name.
  • groupname: The name of the consumer group.
  • *|$: The ID from which the consumer group should start reading. * means from the first entry currently in the stream. $ means from now on (only new messages).
  • MKSTREAM: (Optional, recommended) Creates the stream if it doesn’t exist.

2. XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID [key ID ...]

Reads messages from a stream as part of a Consumer Group.

  • groupname: The name of the consumer group.
  • consumername: The unique name of the consumer (e.g., my_app_instance_1).
  • STREAMS key ID: Specifies the stream and the ID to read from. Use > to mean “messages never delivered to this consumer group before.” Use a specific ID (e.g., 0) to read historical messages, including those already delivered.
  • NOACK: (Optional) Skips adding messages to the PEL. Not recommended for most use cases as it loses durability.

3. XACK key groupname ID [ID ...]

Acknowledges successful processing of messages. This removes the messages from the consumer’s Pending Entries List.

4. XPENDING key groupname [start end count] [consumername]

Shows pending messages for a Consumer Group or a specific consumer within it. Essential for monitoring and debugging.

Node.js Example (Consumer Group - requires multiple scripts/processes):

Script 1: Producer (stream_producer.js - use the same one from above)

Script 2: Consumer Group Consumer (stream_consumer_group.js)

// stream_consumer_group.js
const Redis = require('ioredis');
// Separate connection for each consumer, recommended for long-running processes
const redis = new Redis();

const streamKey = 'sensor_readings';
const groupName = 'analytics_group';
const consumerName = process.argv[2] || `consumer_${process.pid}`; // Unique name for each consumer

async function setupConsumerGroup() {
  try {
    // Try to create the consumer group. If it exists, Redis will return an error, which we can ignore.
    await redis.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM').catch(err => {
      if (err.message.includes('BUSYGROUP')) {
        console.log(`Consumer group '${groupName}' already exists.`);
      } else {
        throw err;
      }
    });
    console.log(`Consumer '${consumerName}' joined group '${groupName}' for stream '${streamKey}'.`);
    await consumeMessages();
  } catch (err) {
    console.error('Consumer group setup error:', err);
    redis.quit();
  }
}

async function consumeMessages() {
  try {
    while (true) {
      // Read new messages with XREADGROUP, blocking indefinitely
      // Use '>' for ID to get new messages never delivered to this group before
      const result = await redis.xreadgroup(
        'GROUP', groupName, consumerName,
        'COUNT', 1, // Read one message at a time
        'BLOCK', 0, // Block indefinitely
        'STREAMS', streamKey, '>'
      );

      if (result) {
        const streamData = result[0]; // Example: [['stream_name', [[id, [field, value, ...]], ...]]]
        const entries = streamData[1];

        for (const [id, fields] of entries) {
          const decodedFields = Object.fromEntries(fields.map((_, i, arr) => (i % 2 === 0 ? [arr[i], arr[i + 1]] : null)).filter(Boolean));
          console.log(`Consumer '${consumerName}' processing entry [${id}]: ${JSON.stringify(decodedFields)}`);
          
          // Simulate some processing time
          await new Promise(resolve => setTimeout(resolve, Math.random() * 500)); 

          // Acknowledge the message
          await redis.xack(streamKey, groupName, id);
          console.log(`Consumer '${consumerName}' acknowledged entry [${id}]`);
        }
      }
    }
  } catch (err) {
    console.error(`Error consuming messages for '${consumerName}':`, err);
  } finally {
    redis.quit();
  }
}

// To run:
// 1. Start producer: node stream_producer.js
// 2. Start multiple consumers in separate terminals:
//    node stream_consumer_group.js consumer_alpha
//    node stream_consumer_group.js consumer_beta
//    node stream_consumer_group.js consumer_gamma
// Observe how messages are distributed among consumers, and how they acknowledge.
// Try stopping a consumer (Ctrl+C) and restarting it.
// To inspect pending messages: redis-cli> XPENDING sensor_readings analytics_group
// You'll see messages that were delivered to a consumer but not acknowledged yet.
// To reclaim messages from a crashed consumer: XCLAIM key groupname newconsumer minidleTime id1 [id2 ...]
// Example: XCLAIM sensor_readings analytics_group consumer_alpha 0 1678886400000-0
setupConsumerGroup();

5. Stream Management: Trimming and Pending Entries

Streams can grow indefinitely, consuming memory. XTRIM (or MAXLEN in XADD) is crucial for managing this.

XTRIM key MAXLEN [~] count or XTRIM key MINID [~] ID

  • MAXLEN: Limits the stream size to count elements.
  • MINID: Trims elements with IDs lower than ID.

XPENDING helps in monitoring message processing within consumer groups. If a consumer crashes, its pending messages remain in the PEL. You can use XCLAIM to transfer these pending messages to another consumer for processing.

Node.js Example (Inspecting Pending Entries):

// stream_pending_inspector.js
const Redis = require('ioredis');
const redis = new Redis();

const streamKey = 'sensor_readings';
const groupName = 'analytics_group';

async function inspectPending() {
  try {
    console.log(`\n--- Inspecting Pending Entries for group '${groupName}' on stream '${streamKey}' ---`);

    // Basic XPENDING: provides summary for the group
    let pendingSummary = await redis.xpending(streamKey, groupName);
    console.log(`\nPending Summary for group '${groupName}':`);
    console.log(`Total pending messages: ${pendingSummary[0]}`);
    console.log(`First pending ID: ${pendingSummary[1]}`);
    console.log(`Last pending ID: ${pendingSummary[2]}`);
    console.log(`Consumers with pending messages: ${JSON.stringify(pendingSummary[3])}`);

    // Detailed XPENDING: lists specific pending messages
    console.log('\nDetailed Pending Entries:');
    // XPENDING key groupname start end count [consumername]
    let detailedPending = await redis.xpending(streamKey, groupName, '-', '+', 10); // Get first 10
    if (detailedPending.length === 0) {
      console.log('No detailed pending messages found.');
    } else {
      detailedPending.forEach(entry => {
        console.log(`  ID: ${entry[0]}, Consumer: ${entry[1]}, Idle Time: ${entry[2]}ms, Deliveries: ${entry[3]}`);
      });
    }

  } catch (err) {
    console.error('Error inspecting pending messages:', err);
  } finally {
    await redis.quit();
  }
}

// To use:
// 1. Run the stream_producer.js
// 2. Run at least one stream_consumer_group.js, let it process some messages, then kill it WITHOUT acknowledging.
// 3. Run: node stream_pending_inspector.js
inspectPending();

Full Python Example with Redis Streams and Consumer Groups

# full_streams_operations.py
import redis
import time
import threading
import random

# Global flag to signal threads to stop
stop_flag = threading.Event()

REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0

STREAM_KEY = 'iot_data_stream'
GROUP_NAME = 'data_processors'

def producer_thread_func():
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
    print("Producer: Starting to send IoT data...")
    try:
        device_id = 100
        while not stop_flag.is_set():
            temperature = round(random.uniform(20.0, 35.0), 2)
            pressure = round(random.uniform(900.0, 1100.0), 2)
            timestamp = int(time.time() * 1000)

            entry_id = r.xadd(
                STREAM_KEY,
                {'device_id': device_id, 'temperature': temperature, 'pressure': pressure, 'timestamp': timestamp},
                maxlen=5000, # Keep max 5000 entries
                approximate=True
            )
            print(f"Producer: Added entry {entry_id.decode('utf-8')} from device {device_id}")
            device_id += 1
            time.sleep(0.1) # Send data frequently
    except Exception as e:
        print(f"Producer error: {e}")
    finally:
        r.close()
        print("Producer: Shut down.")

def consumer_thread_func(consumer_name):
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
    try:
        # Create consumer group if it doesn't exist
        r.xgroup_create(STREAM_KEY, GROUP_NAME, mkstream=True)
        print(f"Consumer '{consumer_name}': Joined group '{GROUP_NAME}'.")
    except redis.exceptions.DataError as e:
        if "BUSYGROUP" in str(e):
            print(f"Consumer '{consumer_name}': Group '{GROUP_NAME}' already exists.")
        else:
            raise

    try:
        while not stop_flag.is_set():
            # XREADGROUP GROUP groupname consumername BLOCK milliseconds STREAMS key ID
            # '>' means read new messages never delivered to this group before
            messages = r.xreadgroup(
                GROUP_NAME,
                consumer_name,
                {STREAM_KEY: '>'}, # Read from the latest entry for this consumer group
                count=1,
                block=2000 # Block for 2 seconds if no new messages
            )

            if messages:
                for stream_name, entries in messages:
                    for entry_id, data_fields in entries:
                        decoded_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data_fields.items()}
                        print(f"Consumer '{consumer_name}': Processing entry {entry_id.decode('utf-8')}: {decoded_data}")
                        
                        # Simulate processing time
                        time.sleep(random.uniform(0.1, 0.5))

                        # Acknowledge the message
                        r.xack(STREAM_KEY, GROUP_NAME, entry_id)
                        print(f"Consumer '{consumer_name}': Acknowledged entry {entry_id.decode('utf-8')}")
            else:
                print(f"Consumer '{consumer_name}': No new messages in 2 seconds. Checking pending...")
                # Check for pending messages if nothing new
                pending = r.xpending(STREAM_KEY, GROUP_NAME)
                if pending and pending[0] > 0: # If there are pending messages
                    print(f"Consumer '{consumer_name}': Found {pending[0]} pending messages. Attempting to claim oldest...")
                    # Claim the oldest message from other consumers that have been idle
                    # XAUTOCLAIM key groupname consumername minIdleTime startID [COUNT count]
                    claimed_messages = r.xautoclaim(STREAM_KEY, GROUP_NAME, consumer_name, 5000, '0-0', count=1)
                    if claimed_messages[1]: # claimed_messages[1] contains list of messages
                        for entry_id, data_fields in claimed_messages[1]:
                            decoded_data = {k.decode('utf-8'): v.decode('utf-8') for k, v in data_fields.items()}
                            print(f"Consumer '{consumer_name}': Claimed and processed {entry_id.decode('utf-8')}: {decoded_data}")
                            r.xack(STREAM_KEY, GROUP_NAME, entry_id)
                
    except Exception as e:
        print(f"Consumer '{consumer_name}' error: {e}")
    finally:
        r.close()
        print(f"Consumer '{consumer_name}': Shut down.")

def run_all_streams_examples():
    print("--- Running Redis Streams Examples ---")
    
    # Clean up previous stream and group for a fresh start
    r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
    try:
        r.delete(STREAM_KEY)
        # Attempt to delete group, ignore if it doesn't exist
        r.xgroup_destroy(STREAM_KEY, GROUP_NAME)
        print(f"Cleaned up stream '{STREAM_KEY}' and group '{GROUP_NAME}'.")
    except Exception as e:
        print(f"Cleanup initial error (expected if not exists): {e}")
    r.close()

    producer_thread = threading.Thread(target=producer_thread_func)
    consumer1_thread = threading.Thread(target=consumer_thread_func, args=('worker-1',))
    consumer2_thread = threading.Thread(target=consumer_thread_func, args=('worker-2',))

    producer_thread.start()
    consumer1_thread.start()
    consumer2_thread.start()

    # Let them run for a while
    time.sleep(15)

    # Signal stop to all threads
    stop_flag.set()

    # Wait for threads to finish
    producer_thread.join()
    consumer1_thread.join()
    consumer2_thread.join()

    print("--- All Redis Streams Examples Complete ---")

# run_all_streams_examples()

Exercises / Mini-Challenges

  1. Distributed Order Processing:

    • Create a stream orders:new.
    • A “front-end” publisher adds new order events (e.g., productId, userId, quantity, timestamp) to this stream. Use XADD with MAXLEN to keep, say, the last 10,000 orders.
    • Create a consumer group order_processors.
    • Run two or three “worker” consumers, each belonging to order_processors.
    • Each worker should XREADGROUP orders, simulate processing (e.g., print order details), and XACK them.
    • Challenge: Simulate a worker crashing. Use XPENDING to see its unacknowledged messages. How would you manually or automatically reclaim these messages for another worker? (Hint: XAUTOCLAIM).
  2. User Notification System:

    • Create a stream user:notifications.
    • Whenever a new notification needs to be sent (e.g., “new_follower”, “product_update”), XADD an entry with userId, type, message, timestamp.
    • Create multiple consumer groups, e.g., email_notifier_group, sms_notifier_group, push_notifier_group.
    • Each group has its own set of workers. The email_notifier_group workers only process email notifications, etc.
    • Challenge: How would you ensure each group processes all messages from the beginning, but within a group, messages are distributed? (Hint: XGROUP CREATE with 0 for all groups, and > for individual consumers in XREADGROUP).
  3. Basic Event Sourcing for a Shopping Cart:

    • Create a stream cart:events:<userId>. Each user has their own cart event stream.
    • When a user adds an item (item:add), removes an item (item:remove), or checks out (cart:checkout), XADD an event to their respective stream, including timestamp, eventType, itemId, quantity (for add/remove).
    • Create a consumer group cart_projectors that processes these events to maintain a “current state” of the shopping cart for each user in a Redis Hash (cart:current:<userId>).
    • Challenge: When a new cart_projector comes online, it should be able to rebuild the current state of all carts by re-reading the entire stream (XREADGROUP from 0) and projecting the events onto the hash. How would you coordinate which user’s stream each new projector processes efficiently without stepping on each other’s toes?

Redis Streams are a cornerstone for building modern, resilient, and scalable event-driven architectures. By mastering them, you equip yourself with the tools to handle complex real-time data challenges. In our next chapter, we’ll look into Redis Modules, which expand Redis’s capabilities even further.