01-Redis 发布订阅 - PubSub

Redis 中的发布/订阅功能

发布/订阅系统 是 Web 系统中比较常用的一个功能。简单说就是 发布者发布消息,订阅者接受消息

虽然可以用一个 list 列表结构结合 lpushrpop 来实现消息队列的功能,但是很难实现实现 消息多播 的功能。

为了支持消息多播,Redis 不能再依赖于那 5 种基础的数据结构了,它单独使用了一个模块来支持消息多播,这个模块就是 PubSub,也就是 **Publisher/Subscriber(发布者/订阅者模式)**。

PubSub 简介

基于 list 结构的消息队列,是一种 Publisher 与 Consumer 点对点的强关联关系,Redis 为了消除这样的强关联,引入了另一种概念:频道 (channel)。

当 Publisher 往 channel 中发布消息时,关注了相应 channel 的 Consumer 就能够同时受到消息。

需要注意的是,消费者订阅一个频道是必须 明确指定频道名称 的,这意味着,如果我们想要 订阅多个 频道,那么就必须 显式地关注多个 名称。

为了简化订阅的繁琐操作,Redis 提供了 模式订阅 的功能 Pattern Subscribe,这样就可以 一次性关注多个频道 了,即使生产者新增了同模式的频道,消费者也可以立即受到消息:

例如上图中,Publisherwmyskxz.chat 这个 channel 中发送了一条消息,关注了这个频道的 Consumer 1Consumer 2 能够收到消息,wmyskxz.chat 和模式 wmyskxz.* 匹配,所以 Redis 此时同样发送消息给订阅了 wmyskxz.* 这个模式的 Consumer 3

相同的,如果接收消息的频道是 wmyskxz.log,那么 Consumer 3 也会受到消息。

快速体验

在 Redis 中,PubSub 模块的使用非常简单,常用的命令就下面几条:

1
2
3
4
5
6
7
8
9
10
# 订阅频道:
SUBSCRIBE channel [channel ....] # 订阅给定的一个或多个频道的信息
PSUBSCRIBE pattern [pattern ....] # 订阅一个或多个符合给定 模式 的频道

# 发布频道:
PUBLISH channel message # 将消息发送到指定的频道

# 退订频道:
UNSUBSCRIBE [channel [channel ....]] # 退订指定的频道
PUNSUBSCRIBE [pattern [pattern ....]] # 退订所有给定模式的频道
  1. consumer1 订阅 abc.com 频道

    1
    2
    3
    4
    5
    127.0.0.1:6379> SUBSCRIBE abc.com
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "abc.com"
    3) (integer) 1
  2. consumer2 模式订阅 abc.* 频道

    1
    2
    3
    4
    5
    127.0.0.1:6379> PSUBSCRIBE abc.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "abc.*"
    3) (integer) 1
  3. publisher 发布消息到 abc.com 频道

    1
    2
    127.0.0.1:6379> PUBLISH abc.com "hello"
    (integer) 2
  4. 此时 consumer1 和 consumer2 均收到消息

  5. 新增 consumer3,订阅 abc.net 频道

    1
    2
    3
    4
    5
    127.0.0.1:6379> SUBSCRIBE abc.net
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "abc.net"
    3) (integer) 1
  6. publisher 发布消息到 abc.net 频道

    1
    2
    127.0.0.1:6379> PUBLISH abc.net "hello2"
    (integer) 2
  7. 此时 consumer2 和 consumer3 均收到消息,consumer1 不会收到消息。

实现原理

我们通过简单的两条命令,就可以简单使用一个 发布/订阅系统 了,但具体是怎么样实现的呢?

每个 Redis 服务器进程维持着一个标识服务器状态redis.h/redisServer 结构,其中 保存着有订阅的频道 以及 订阅模式 的信息:

1
2
3
4
5
6
struct redisServer {
// ...
dict *pubsub_channels; // 订阅频道
list *pubsub_patterns; // 订阅模式
// ...
};

订阅频道原理

当客户端订阅某一个频道之后,Redis 就会往 pubsub_channels 这个字典中新添加一条数据,实际上这个 dict 字典维护的是一张链表,比如,下图展示的 pubsub_channels 示例中,client 1client 2 就订阅了 channel 1,而其他频道也分别被其他客户端订阅:

SUBSCRIBE 命令

SUBSCRIBE 命令的行为可以用下列的伪代码表示:

1
2
3
4
5
def SUBSCRIBE(client, channels):
# 遍历所有输入频道
for channel in channels:
# 将客户端添加到链表的末尾
redisServer.pubsub_channels[channel].append(client)

通过 pubsub_channels 字典,程序只要检查某个频道是否为字典的键,就可以知道该频道是否正在被客户端订阅;只要取出某个键的值,就可以得到所有订阅该频道的客户端的信息。

PUBLISH 命令

了解了 SUBSCRIBE 那么 PUBLISH 命令的实现也变得十分简单了,只需要通过上述字典定位到具体的客户端,再把消息发送给它们就好了,伪代码实现如下

1
2
3
4
5
def PUBLISH(channel, message):
# 遍历所有订阅频道 channel 的客户端
for client in server.pubsub_channels[channel]:
# 将信息发送给它们
send_message(client, message)

UNSUBSCRIBE 命令

使用 UNSUBSCRIBE 命令可以退订指定的频道,这个命令执行的是订阅的反操作:它从 pubsub_channels 字典的给定频道(键)中,删除关于当前客户端的信息,这样被退订频道的信息就不会再发送给这个客户端。

订阅模式原理

当发送一条消息到 wmyskxz.chat 这个频道时,Redis 不仅仅会发送到当前的频道,还会发送到匹配于当前模式的所有频道,实际上,pubsub_patterns 背后还维护了一个 redis.h/pubsubPattern 结构:

1
2
3
4
typedefstruct pubsubPattern {
redisClient *client; // 订阅模式的客户端
robj *pattern; // 订阅的模式
} pubsubPattern;

每当调用 PSUBSCRIBE 命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern 结构,并将该结构添加到 redisServer.pubsub_patterns 链表中。

我们来看一个 pusub_patterns 链表的示例:

这个时候客户端 client 3 执行 PSUBSCRIBE wmyskxz.java.*,那么 pubsub_patterns 链表就会被更新成这样:

通过遍历整个 pubsub_patterns 链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。

PUBLISH 命令

上面给出的伪代码并没有 完整描述 PUBLISH 命令的行为,因为 PUBLISH 除了将 message 发送到 所有订阅 channel 的客户端 之外,它还会将 channel 和 pubsub_patterns 中的 模式 进行对比,如果 channel 和某个模式匹配的话,那么也将 message 发送到 订阅那个模式的客户端

完整描述 PUBLISH 功能的伪代码定于如下:

1
2
3
4
5
6
7
8
9
10
11
12
def PUBLISH(channel, message):
# 遍历所有订阅频道 channel 的客户端
for client in server.pubsub_channels[channel]:
# 将信息发送给它们
send_message(client, message)

# 取出所有模式,以及订阅模式的客户端
for pattern, client in server.pubsub_patterns:
# 如果 channel 和模式匹配
if match(channel, pattern):
# 那么也将信息发给订阅这个模式的客户端
send_message(client, message)

PUNSUBSCRIBE 命令

使用 PUNSUBSCRIBE 命令可以退订指定的模式,这个命令执行的是订阅模式的反操作:序会删除 redisServer.pubsub_patterns 链表中,所有和被退订模式相关联的 pubsubPattern 结构,这样客户端就不会再收到和模式相匹配的频道发来的信息。

PubSub 的缺点

尽管 Redis 实现了 PubSub 模式来达到了 多播消息队列 的目的,但在实际的消息队列的领域,几乎 找不到特别合适的场景,因为它的缺点十分明显:

  • 没有 Ack 机制,也不保证数据的连续: PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果没有一个消费者,那么消息会被直接丢弃。如果开始有三个消费者,其中一个突然挂掉了,过了一会儿等它再重连时,那么重连期间的消息对于这个消费者来说就彻底丢失了。

  • 不持久化消息: 如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息都会被直接丢弃。

基于上述缺点,Redis 的作者甚至单独开启了一个 Disque 的项目来专门用来做多播消息队列,不过该项目目前好像都没有成熟。不过后来在 2018 年 6 月,Redis 5.0 新增了 Stream 数据结构,这个功能给 Redis 带来了 持久化消息队列,从此 PubSub 作为消息队列的功能可以说是就消失了。

参考资料

  1. https://www.cnblogs.com/wmyskxz/p/12499532.html
  2. Introduction to Redis Streams【官方文档】 - https://redis.io/topics/streams-intro

01-Redis 发布订阅 - PubSub
https://flepeng.github.io/041-Redis-41-核心概念-01-Redis-发布订阅-PubSub/
作者
Lepeng
发布于
2021年1月1日
许可协议