Redis is not just a data store; it’s also a powerful message broker through its Publish/Subscribe (Pub/Sub) mechanism. Pub/Sub allows different parts of your application (or even entirely separate applications) to communicate in a decoupled, real-time fashion.
In Pub/Sub:
- Publishers send messages to a
channel. - Subscribers listen for messages on specific
channels. - When a message is published to a channel, all subscribers to that channel immediately receive the message.
Key characteristics:
- Decoupled communication: Publishers don’t know who the subscribers are, and subscribers don’t know who the publishers are. This promotes modular and scalable architectures.
- Real-time: Messages are delivered instantly to active subscribers.
- Fire-and-forget: Redis Pub/Sub does not persist messages. If a subscriber is not connected when a message is published, they will miss that message. (For persistent messaging, Redis Streams are a better fit, which we’ll cover later).
- High performance: Redis can handle a massive volume of messages per second.
In this chapter, we’ll cover:
- Basic Pub/Sub operations using
PUBLISHandSUBSCRIBE. - Pattern matching subscriptions with
PSUBSCRIBE. - Practical examples in Node.js and Python for building real-time features.
Basic Pub/Sub: PUBLISH and SUBSCRIBE
1. SUBSCRIBE channel [channel ...]
Subscribes the client to the specified channels. Once a client is in subscribe mode, it can only receive messages and unsubscription confirmations, and cannot send any other Redis commands until it unsubscribes.
2. UNSUBSCRIBE [channel ...]
Unsubscribes the client from the specified channels.
3. PUBLISH channel message
Publishes message to channel. Returns the number of clients that received the message.
Node.js Example (Conceptual - requires two scripts):
Script 1: Subscriber (subscriber.js)
// subscriber.js
const Redis = require('ioredis');
// It's a good practice to use a separate Redis client instance for Pub/Sub
// as it enters a special "subscribe" mode and cannot perform other commands.
const subscriber = new Redis();
const channelName = 'news_updates';
subscriber.on('connect', () => {
console.log(`Subscriber connected to Redis. Subscribing to '${channelName}'...`);
subscriber.subscribe(channelName, (err, count) => {
if (err) {
console.error('Failed to subscribe:', err.message);
} else {
console.log(`Subscribed to ${count} channel(s).`);
}
});
});
// Event listener for messages
subscriber.on('message', (channel, message) => {
console.log(`[${channel}] Received message: ${message}`);
// Example: if message is 'quit', unsubscribe
if (message === 'quit') {
console.log('Received "quit" message. Unsubscribing...');
subscriber.unsubscribe(channelName);
subscriber.quit();
}
});
subscriber.on('error', (err) => {
console.error('Subscriber Redis error:', err);
});
subscriber.on('end', () => {
console.log('Subscriber connection closed.');
});
// Run this script in one terminal: node subscriber.js
Script 2: Publisher (publisher.js)
// publisher.js
const Redis = require('ioredis');
const publisher = new Redis(); // Use a normal client for publishing
const channelName = 'news_updates';
async function publishMessages() {
console.log('Publisher started. Sending messages...');
let messageCount = 0;
const intervalId = setInterval(async () => {
const message = `Breaking News! Update #${messageCount++} at ${new Date().toLocaleTimeString()}`;
const receivers = await publisher.publish(channelName, message);
console.log(`Published: "${message}" to '${channelName}'. Received by ${receivers} subscribers.`);
if (messageCount >= 5) {
clearInterval(intervalId);
// Send a 'quit' message to instruct subscribers to stop gracefully
await publisher.publish(channelName, 'quit');
console.log('Finished publishing. Sent "quit" message.');
publisher.quit();
}
}, 2000); // Publish every 2 seconds
}
// Run this script in a separate terminal: node publisher.js
// Make sure subscriber.js is running first!
// publishMessages();
Python Example (Conceptual - requires two scripts):
Script 1: Subscriber (subscriber.py)
# subscriber.py
import redis
import time
# Use a separate Redis client instance for Pub/Sub
subscriber_r = redis.Redis(host='localhost', port=6379, db=0)
channel_name = 'stock_prices'
def subscriber_main():
print(f"Python Subscriber connected. Subscribing to '{channel_name}'...")
pubsub = subscriber_r.pubsub()
pubsub.subscribe(channel_name)
try:
for message in pubsub.listen():
if message['type'] == 'message':
channel = message['channel'].decode('utf-8')
data = message['data'].decode('utf-8')
print(f"[{channel}] Received message: {data}")
if data == 'stop':
print("Received 'stop' message. Exiting subscriber.")
break
time.sleep(0.01) # Small delay to prevent busy-waiting
except KeyboardInterrupt:
print("\nSubscriber interrupted.")
finally:
pubsub.unsubscribe(channel_name)
subscriber_r.close()
print("Subscriber connection closed.")
# Run this script in one terminal: python subscriber.py
# subscriber_main()
Script 2: Publisher (publisher.py)
# publisher.py
import redis
import time
import random
publisher_r = redis.Redis(host='localhost', port=6379, db=0) # Use a normal client
channel_name = 'stock_prices'
stocks = ['AAPL', 'GOOG', 'MSFT', 'AMZN']
def publisher_main():
print("Python Publisher started. Sending stock price updates...")
message_count = 0
try:
while message_count < 10: # Publish 10 messages
stock_symbol = random.choice(stocks)
price = round(random.uniform(100.0, 1000.0), 2)
message = f"{stock_symbol}:{price}"
receivers = publisher_r.publish(channel_name, message)
print(f"Published: '{message}' to '{channel_name}'. Received by {receivers} subscribers.")
message_count += 1
time.sleep(1) # Publish every 1 second
# Send a 'stop' message to instruct subscribers to stop gracefully
publisher_r.publish(channel_name, 'stop')
print("Finished publishing. Sent 'stop' message.")
except KeyboardInterrupt:
print("\nPublisher interrupted.")
finally:
publisher_r.close()
# Run this script in a separate terminal: python publisher.py
# Make sure subscriber.py is running first!
# publisher_main()
Pattern Matching Subscriptions: PSUBSCRIBE
Instead of subscribing to exact channel names, you can subscribe to patterns. This is incredibly powerful for scenarios where you have many related channels (e.g., user:1:feed, user:2:feed) and want a single subscriber to listen to all of them, or a subset.
PSUBSCRIBE pattern [pattern ...]: Subscribes the client to the given patterns.*(asterisk) matches any sequence of zero or more characters.?(question mark) matches any single character.
PUNSUBSCRIBE [pattern ...]: Unsubscribes from the given patterns.
When a message is received via PSUBSCRIBE, the handler will get (pattern, channel, message).
Node.js Example:
// subscriber-pattern.js
const Redis = require('ioredis');
const subscriber = new Redis();
const channelPattern = 'app:notifications:*'; // Matches 'app:notifications:user1', 'app:notifications:admin' etc.
subscriber.on('connect', () => {
console.log(`Subscriber connected. Subscribing to pattern '${channelPattern}'...`);
subscriber.psubscribe(channelPattern, (err, count) => {
if (err) {
console.error('Failed to psubscribe:', err.message);
} else {
console.log(`Subscribed to ${count} pattern(s).`);
}
});
});
subscriber.on('pmessage', (pattern, channel, message) => {
console.log(`[Pattern: ${pattern}] [Channel: ${channel}] Received message: ${message}`);
if (message === 'shutdown') {
console.log('Received "shutdown" message. Punsubscribing...');
subscriber.punsubscribe(channelPattern);
subscriber.quit();
}
});
subscriber.on('error', (err) => {
console.error('Subscriber Redis error:', err);
});
// Run this script in one terminal: node subscriber-pattern.js
// publisher-pattern.js
const Redis = require('ioredis');
const publisher = new Redis();
async function publishPatternMessages() {
console.log('Publisher started. Sending patterned messages...');
await publisher.publish('app:notifications:user:123', 'Hello User 123, welcome!');
console.log("Published to 'app:notifications:user:123'");
await publisher.publish('app:notifications:admin', 'Admin alert: Server usage high!');
console.log("Published to 'app:notifications:admin'");
await publisher.publish('app:notifications:user:456', 'Your subscription is expiring soon.');
console.log("Published to 'app:notifications:user:456'");
// A channel not matching the pattern won't be received
await publisher.publish('other:channel', 'This message will not be seen by pattern subscriber.');
console.log("Published to 'other:channel' (not matching pattern)");
await new Promise(resolve => setTimeout(resolve, 1000)); // Give time for messages to process
await publisher.publish('app:notifications:general', 'shutdown'); // Tell subscriber to shutdown
publisher.quit();
console.log('Publisher finished.');
}
// Run this script in a separate terminal: node publisher-pattern.js
// Make sure subscriber-pattern.js is running first!
// publishPatternMessages();
Full Python Example with Pub/Sub
# full_pubsub_operations.py
import redis
import time
import threading
import random
# Global flag to signal the subscriber to stop
stop_subscriber = threading.Event()
def run_subscriber_thread(channel_to_subscribe, pattern_to_psubscribe):
# Use a separate Redis connection for Pub/Sub
sub_r = redis.Redis(host='localhost', port=6379, db=0)
pubsub = sub_r.pubsub()
if channel_to_subscribe:
print(f"Subscriber: Subscribing to channel '{channel_to_subscribe}'")
pubsub.subscribe(channel_to_subscribe)
if pattern_to_psubscribe:
print(f"Subscriber: Subscribing to pattern '{pattern_to_psubscribe}'")
pubsub.psubscribe(pattern_to_psubscribe)
print("Subscriber: Waiting for messages...")
try:
# Loop to process messages from all subscribed channels/patterns
for message in pubsub.listen():
if stop_subscriber.is_set():
print("Subscriber: Stop signal received, unsubscribing...")
break # Exit the loop and stop the subscriber
if message['type'] == 'message':
channel = message['channel'].decode('utf-8')
data = message['data'].decode('utf-8')
print(f"Subscriber [CHANNEL: {channel}] received: {data}")
elif message['type'] == 'pmessage':
pattern = message['pattern'].decode('utf-8')
channel = message['channel'].decode('utf-8')
data = message['data'].decode('utf-8')
print(f"Subscriber [PATTERN: {pattern}] [CHANNEL: {channel}] received: {data}")
# Simulate processing time
time.sleep(0.01)
except KeyboardInterrupt:
print("\nSubscriber: Keyboard interrupt, stopping.")
finally:
pubsub.unsubscribe() # Unsubscribe from all
pubsub.punsubscribe() # Unsubscribe from all patterns
sub_r.close()
print("Subscriber: Connection closed.")
def run_publisher_thread():
# Use a regular Redis connection for publishing
pub_r = redis.Redis(host='localhost', port=6379, db=0)
print("Publisher: Starting to publish messages...")
try:
# Publish to exact channel
pub_r.publish('app:system:events', 'Server starting up...')
print("Publisher: Published to 'app:system:events'")
time.sleep(0.5)
# Publish to a channel that matches a pattern
pub_r.publish('app:user:1:notification', 'Welcome new user!')
print("Publisher: Published to 'app:user:1:notification'")
time.sleep(0.5)
pub_r.publish('app:user:2:notification', 'Your order #123 is shipped!')
print("Publisher: Published to 'app:user:2:notification'")
time.sleep(0.5)
# Publish to a general pattern channel
pub_r.publish('app:general:info', f"System load is {random.randint(40, 90)}%")
print("Publisher: Published to 'app:general:info'")
time.sleep(0.5)
# Publish a 'stop' message to signal the subscriber to gracefully exit
pub_r.publish('app:system:events', 'stop_signal')
pub_r.publish('app:user:1:notification', 'stop_signal') # Send to all patterns too for full shutdown demo
pub_r.publish('app:user:2:notification', 'stop_signal')
pub_r.publish('app:general:info', 'stop_signal')
except KeyboardInterrupt:
print("\nPublisher: Keyboard interrupt, stopping.")
finally:
pub_r.close()
print("Publisher: Connection closed.")
stop_subscriber.set() # Signal the subscriber thread to stop
def run_all_pubsub_examples():
print("--- Running Pub/Sub Examples ---")
# Start the subscriber thread in the background
# This subscriber will listen to 'app:system:events' channel
# AND to 'app:*:notification' pattern
subscriber_thread = threading.Thread(
target=run_subscriber_thread,
args=('app:system:events', 'app:*:notification',)
)
subscriber_thread.start()
time.sleep(1) # Give subscriber time to connect and subscribe
# Start the publisher thread
publisher_thread = threading.Thread(target=run_publisher_thread)
publisher_thread.start()
# Wait for both threads to finish
publisher_thread.join()
subscriber_thread.join()
print("--- Pub/Sub Examples Complete ---")
# To run this, uncomment the line below and execute:
# python full_pubsub_operations.py
# run_all_pubsub_examples()
Exercises / Mini-Challenges
Real-time Stock Update Notifier:
- Create a publisher that randomly picks a stock symbol (e.g.,
AAPL,GOOG,MSFT) and generates a random price. - It should publish this price to a channel named
stock:<SYMBOL>(e.g.,stock:AAPL). - Create a subscriber that uses
PSUBSCRIBEto listen tostock:*. - Whenever a price update comes in, the subscriber should print which stock changed and its new price.
- Challenge: Modify the subscriber to also maintain the last known price for each stock in a Redis Hash. So when a message comes in, it updates the hash and prints both the old and new price.
- Create a publisher that randomly picks a stock symbol (e.g.,
User Activity Feed:
- When a user performs an action (e.g.,
likes_post,comments_on_photo,follows_user), a publisher sends a message to a channel likeuser_activity:<ACTING_USER_ID>. The message could be a JSON string describing the event. - Create a subscriber that
PSUBSCRIBEs touser_activity:*. - For each message, parse the JSON and store the activity in a Redis List
user_feed:<TARGET_USER_ID>for the user who is meant to see this activity (e.g., if A comments on B’s photo, B’s feed gets updated). UseLPUSHandLTRIMto keep the feed to, say, the last 100 items. - Challenge: How would you handle a scenario where many users are active, and a single subscriber gets overloaded? (This is a design question, not a Redis command challenge here, think about sharding or dedicated subscribers).
- When a user performs an action (e.g.,
Chat Room Broadcast:
- Implement a simple chat room using Redis Pub/Sub.
- Have multiple “clients” (separate scripts) that can
PUBLISHmessages to a common channel (e.g.,chat:general). - Each client also
SUBSCRIBEs to thischat:generalchannel to receive messages from others. - Messages should include the sender’s username.
- Challenge: How would you implement a “private message” feature where one user sends a message only to another specific user using Pub/Sub? (Hint:
user:private:<receiver_id>channel).
Pub/Sub is a fundamental building block for real-time applications and event-driven architectures. By mastering it, you’re well on your way to building responsive and interactive systems. Next, we’ll shift our focus to Persistence and Data Durability, learning how Redis ensures your data isn’t lost when the server restarts.