Intermediate Topics: Publish/Subscribe (Pub/Sub)

Redis is not just a data store; it’s also a powerful message broker through its Publish/Subscribe (Pub/Sub) mechanism. Pub/Sub allows different parts of your application (or even entirely separate applications) to communicate in a decoupled, real-time fashion.

In Pub/Sub:

  • Publishers send messages to a channel.
  • Subscribers listen for messages on specific channels.
  • When a message is published to a channel, all subscribers to that channel immediately receive the message.

Key characteristics:

  • Decoupled communication: Publishers don’t know who the subscribers are, and subscribers don’t know who the publishers are. This promotes modular and scalable architectures.
  • Real-time: Messages are delivered instantly to active subscribers.
  • Fire-and-forget: Redis Pub/Sub does not persist messages. If a subscriber is not connected when a message is published, they will miss that message. (For persistent messaging, Redis Streams are a better fit, which we’ll cover later).
  • High performance: Redis can handle a massive volume of messages per second.

In this chapter, we’ll cover:

  • Basic Pub/Sub operations using PUBLISH and SUBSCRIBE.
  • Pattern matching subscriptions with PSUBSCRIBE.
  • Practical examples in Node.js and Python for building real-time features.

Basic Pub/Sub: PUBLISH and SUBSCRIBE

1. SUBSCRIBE channel [channel ...]

Subscribes the client to the specified channels. Once a client is in subscribe mode, it can only receive messages and unsubscription confirmations, and cannot send any other Redis commands until it unsubscribes.

2. UNSUBSCRIBE [channel ...]

Unsubscribes the client from the specified channels.

3. PUBLISH channel message

Publishes message to channel. Returns the number of clients that received the message.

Node.js Example (Conceptual - requires two scripts):

Script 1: Subscriber (subscriber.js)

// subscriber.js
const Redis = require('ioredis');
// It's a good practice to use a separate Redis client instance for Pub/Sub
// as it enters a special "subscribe" mode and cannot perform other commands.
const subscriber = new Redis();

const channelName = 'news_updates';

subscriber.on('connect', () => {
  console.log(`Subscriber connected to Redis. Subscribing to '${channelName}'...`);
  subscriber.subscribe(channelName, (err, count) => {
    if (err) {
      console.error('Failed to subscribe:', err.message);
    } else {
      console.log(`Subscribed to ${count} channel(s).`);
    }
  });
});

// Event listener for messages
subscriber.on('message', (channel, message) => {
  console.log(`[${channel}] Received message: ${message}`);
  // Example: if message is 'quit', unsubscribe
  if (message === 'quit') {
    console.log('Received "quit" message. Unsubscribing...');
    subscriber.unsubscribe(channelName);
    subscriber.quit();
  }
});

subscriber.on('error', (err) => {
  console.error('Subscriber Redis error:', err);
});

subscriber.on('end', () => {
  console.log('Subscriber connection closed.');
});

// Run this script in one terminal: node subscriber.js

Script 2: Publisher (publisher.js)

// publisher.js
const Redis = require('ioredis');
const publisher = new Redis(); // Use a normal client for publishing

const channelName = 'news_updates';

async function publishMessages() {
  console.log('Publisher started. Sending messages...');
  let messageCount = 0;

  const intervalId = setInterval(async () => {
    const message = `Breaking News! Update #${messageCount++} at ${new Date().toLocaleTimeString()}`;
    const receivers = await publisher.publish(channelName, message);
    console.log(`Published: "${message}" to '${channelName}'. Received by ${receivers} subscribers.`);

    if (messageCount >= 5) {
      clearInterval(intervalId);
      // Send a 'quit' message to instruct subscribers to stop gracefully
      await publisher.publish(channelName, 'quit');
      console.log('Finished publishing. Sent "quit" message.');
      publisher.quit();
    }
  }, 2000); // Publish every 2 seconds
}

// Run this script in a separate terminal: node publisher.js
// Make sure subscriber.js is running first!
// publishMessages();

Python Example (Conceptual - requires two scripts):

Script 1: Subscriber (subscriber.py)

# subscriber.py
import redis
import time

# Use a separate Redis client instance for Pub/Sub
subscriber_r = redis.Redis(host='localhost', port=6379, db=0)

channel_name = 'stock_prices'

def subscriber_main():
    print(f"Python Subscriber connected. Subscribing to '{channel_name}'...")
    pubsub = subscriber_r.pubsub()
    pubsub.subscribe(channel_name)

    try:
        for message in pubsub.listen():
            if message['type'] == 'message':
                channel = message['channel'].decode('utf-8')
                data = message['data'].decode('utf-8')
                print(f"[{channel}] Received message: {data}")
                if data == 'stop':
                    print("Received 'stop' message. Exiting subscriber.")
                    break
            time.sleep(0.01) # Small delay to prevent busy-waiting
    except KeyboardInterrupt:
        print("\nSubscriber interrupted.")
    finally:
        pubsub.unsubscribe(channel_name)
        subscriber_r.close()
        print("Subscriber connection closed.")

# Run this script in one terminal: python subscriber.py
# subscriber_main()

Script 2: Publisher (publisher.py)

# publisher.py
import redis
import time
import random

publisher_r = redis.Redis(host='localhost', port=6379, db=0) # Use a normal client

channel_name = 'stock_prices'
stocks = ['AAPL', 'GOOG', 'MSFT', 'AMZN']

def publisher_main():
    print("Python Publisher started. Sending stock price updates...")
    message_count = 0

    try:
        while message_count < 10: # Publish 10 messages
            stock_symbol = random.choice(stocks)
            price = round(random.uniform(100.0, 1000.0), 2)
            message = f"{stock_symbol}:{price}"
            
            receivers = publisher_r.publish(channel_name, message)
            print(f"Published: '{message}' to '{channel_name}'. Received by {receivers} subscribers.")
            
            message_count += 1
            time.sleep(1) # Publish every 1 second
            
        # Send a 'stop' message to instruct subscribers to stop gracefully
        publisher_r.publish(channel_name, 'stop')
        print("Finished publishing. Sent 'stop' message.")

    except KeyboardInterrupt:
        print("\nPublisher interrupted.")
    finally:
        publisher_r.close()

# Run this script in a separate terminal: python publisher.py
# Make sure subscriber.py is running first!
# publisher_main()

Pattern Matching Subscriptions: PSUBSCRIBE

Instead of subscribing to exact channel names, you can subscribe to patterns. This is incredibly powerful for scenarios where you have many related channels (e.g., user:1:feed, user:2:feed) and want a single subscriber to listen to all of them, or a subset.

  • PSUBSCRIBE pattern [pattern ...]: Subscribes the client to the given patterns.

    • * (asterisk) matches any sequence of zero or more characters.
    • ? (question mark) matches any single character.
  • PUNSUBSCRIBE [pattern ...]: Unsubscribes from the given patterns.

When a message is received via PSUBSCRIBE, the handler will get (pattern, channel, message).

Node.js Example:

// subscriber-pattern.js
const Redis = require('ioredis');
const subscriber = new Redis();

const channelPattern = 'app:notifications:*'; // Matches 'app:notifications:user1', 'app:notifications:admin' etc.

subscriber.on('connect', () => {
  console.log(`Subscriber connected. Subscribing to pattern '${channelPattern}'...`);
  subscriber.psubscribe(channelPattern, (err, count) => {
    if (err) {
      console.error('Failed to psubscribe:', err.message);
    } else {
      console.log(`Subscribed to ${count} pattern(s).`);
    }
  });
});

subscriber.on('pmessage', (pattern, channel, message) => {
  console.log(`[Pattern: ${pattern}] [Channel: ${channel}] Received message: ${message}`);
  if (message === 'shutdown') {
    console.log('Received "shutdown" message. Punsubscribing...');
    subscriber.punsubscribe(channelPattern);
    subscriber.quit();
  }
});

subscriber.on('error', (err) => {
  console.error('Subscriber Redis error:', err);
});

// Run this script in one terminal: node subscriber-pattern.js
// publisher-pattern.js
const Redis = require('ioredis');
const publisher = new Redis();

async function publishPatternMessages() {
  console.log('Publisher started. Sending patterned messages...');
  
  await publisher.publish('app:notifications:user:123', 'Hello User 123, welcome!');
  console.log("Published to 'app:notifications:user:123'");
  await publisher.publish('app:notifications:admin', 'Admin alert: Server usage high!');
  console.log("Published to 'app:notifications:admin'");
  await publisher.publish('app:notifications:user:456', 'Your subscription is expiring soon.');
  console.log("Published to 'app:notifications:user:456'");
  
  // A channel not matching the pattern won't be received
  await publisher.publish('other:channel', 'This message will not be seen by pattern subscriber.');
  console.log("Published to 'other:channel' (not matching pattern)");

  await new Promise(resolve => setTimeout(resolve, 1000)); // Give time for messages to process
  await publisher.publish('app:notifications:general', 'shutdown'); // Tell subscriber to shutdown
  publisher.quit();
  console.log('Publisher finished.');
}

// Run this script in a separate terminal: node publisher-pattern.js
// Make sure subscriber-pattern.js is running first!
// publishPatternMessages();

Full Python Example with Pub/Sub

# full_pubsub_operations.py
import redis
import time
import threading
import random

# Global flag to signal the subscriber to stop
stop_subscriber = threading.Event()

def run_subscriber_thread(channel_to_subscribe, pattern_to_psubscribe):
    # Use a separate Redis connection for Pub/Sub
    sub_r = redis.Redis(host='localhost', port=6379, db=0)
    pubsub = sub_r.pubsub()

    if channel_to_subscribe:
        print(f"Subscriber: Subscribing to channel '{channel_to_subscribe}'")
        pubsub.subscribe(channel_to_subscribe)
    if pattern_to_psubscribe:
        print(f"Subscriber: Subscribing to pattern '{pattern_to_psubscribe}'")
        pubsub.psubscribe(pattern_to_psubscribe)

    print("Subscriber: Waiting for messages...")
    try:
        # Loop to process messages from all subscribed channels/patterns
        for message in pubsub.listen():
            if stop_subscriber.is_set():
                print("Subscriber: Stop signal received, unsubscribing...")
                break # Exit the loop and stop the subscriber

            if message['type'] == 'message':
                channel = message['channel'].decode('utf-8')
                data = message['data'].decode('utf-8')
                print(f"Subscriber [CHANNEL: {channel}] received: {data}")
            elif message['type'] == 'pmessage':
                pattern = message['pattern'].decode('utf-8')
                channel = message['channel'].decode('utf-8')
                data = message['data'].decode('utf-8')
                print(f"Subscriber [PATTERN: {pattern}] [CHANNEL: {channel}] received: {data}")
            
            # Simulate processing time
            time.sleep(0.01)

    except KeyboardInterrupt:
        print("\nSubscriber: Keyboard interrupt, stopping.")
    finally:
        pubsub.unsubscribe() # Unsubscribe from all
        pubsub.punsubscribe() # Unsubscribe from all patterns
        sub_r.close()
        print("Subscriber: Connection closed.")

def run_publisher_thread():
    # Use a regular Redis connection for publishing
    pub_r = redis.Redis(host='localhost', port=6379, db=0)
    
    print("Publisher: Starting to publish messages...")
    try:
        # Publish to exact channel
        pub_r.publish('app:system:events', 'Server starting up...')
        print("Publisher: Published to 'app:system:events'")
        time.sleep(0.5)

        # Publish to a channel that matches a pattern
        pub_r.publish('app:user:1:notification', 'Welcome new user!')
        print("Publisher: Published to 'app:user:1:notification'")
        time.sleep(0.5)

        pub_r.publish('app:user:2:notification', 'Your order #123 is shipped!')
        print("Publisher: Published to 'app:user:2:notification'")
        time.sleep(0.5)
        
        # Publish to a general pattern channel
        pub_r.publish('app:general:info', f"System load is {random.randint(40, 90)}%")
        print("Publisher: Published to 'app:general:info'")
        time.sleep(0.5)

        # Publish a 'stop' message to signal the subscriber to gracefully exit
        pub_r.publish('app:system:events', 'stop_signal')
        pub_r.publish('app:user:1:notification', 'stop_signal') # Send to all patterns too for full shutdown demo
        pub_r.publish('app:user:2:notification', 'stop_signal')
        pub_r.publish('app:general:info', 'stop_signal')
        
    except KeyboardInterrupt:
        print("\nPublisher: Keyboard interrupt, stopping.")
    finally:
        pub_r.close()
        print("Publisher: Connection closed.")
        stop_subscriber.set() # Signal the subscriber thread to stop

def run_all_pubsub_examples():
    print("--- Running Pub/Sub Examples ---")

    # Start the subscriber thread in the background
    # This subscriber will listen to 'app:system:events' channel
    # AND to 'app:*:notification' pattern
    subscriber_thread = threading.Thread(
        target=run_subscriber_thread,
        args=('app:system:events', 'app:*:notification',)
    )
    subscriber_thread.start()

    time.sleep(1) # Give subscriber time to connect and subscribe

    # Start the publisher thread
    publisher_thread = threading.Thread(target=run_publisher_thread)
    publisher_thread.start()

    # Wait for both threads to finish
    publisher_thread.join()
    subscriber_thread.join()

    print("--- Pub/Sub Examples Complete ---")

# To run this, uncomment the line below and execute:
# python full_pubsub_operations.py
# run_all_pubsub_examples()

Exercises / Mini-Challenges

  1. Real-time Stock Update Notifier:

    • Create a publisher that randomly picks a stock symbol (e.g., AAPL, GOOG, MSFT) and generates a random price.
    • It should publish this price to a channel named stock:<SYMBOL> (e.g., stock:AAPL).
    • Create a subscriber that uses PSUBSCRIBE to listen to stock:*.
    • Whenever a price update comes in, the subscriber should print which stock changed and its new price.
    • Challenge: Modify the subscriber to also maintain the last known price for each stock in a Redis Hash. So when a message comes in, it updates the hash and prints both the old and new price.
  2. User Activity Feed:

    • When a user performs an action (e.g., likes_post, comments_on_photo, follows_user), a publisher sends a message to a channel like user_activity:<ACTING_USER_ID>. The message could be a JSON string describing the event.
    • Create a subscriber that PSUBSCRIBEs to user_activity:*.
    • For each message, parse the JSON and store the activity in a Redis List user_feed:<TARGET_USER_ID> for the user who is meant to see this activity (e.g., if A comments on B’s photo, B’s feed gets updated). Use LPUSH and LTRIM to keep the feed to, say, the last 100 items.
    • Challenge: How would you handle a scenario where many users are active, and a single subscriber gets overloaded? (This is a design question, not a Redis command challenge here, think about sharding or dedicated subscribers).
  3. Chat Room Broadcast:

    • Implement a simple chat room using Redis Pub/Sub.
    • Have multiple “clients” (separate scripts) that can PUBLISH messages to a common channel (e.g., chat:general).
    • Each client also SUBSCRIBEs to this chat:general channel to receive messages from others.
    • Messages should include the sender’s username.
    • Challenge: How would you implement a “private message” feature where one user sends a message only to another specific user using Pub/Sub? (Hint: user:private:<receiver_id> channel).

Pub/Sub is a fundamental building block for real-time applications and event-driven architectures. By mastering it, you’re well on your way to building responsive and interactive systems. Next, we’ll shift our focus to Persistence and Data Durability, learning how Redis ensures your data isn’t lost when the server restarts.