其他文章:
Redis源码解析(1) 动态字符串与链表
Redis源码解析(2) 字典与迭代器
Redis源码解析(3) 跳跃表
Redis源码解析(4) 整数集合
Redis源码解析(5) 压缩列表
Redis源码解析(6) 键的过期处理策略
Redis源码解析(7) 发布订阅机制
Redis源码解析(8) AOF持久化
Redis源码解析(9) RDB持久化
Redis源码解析(10) 网络框架
Redis源码解析(11) 内存淘汰策略
Redis源码解析(12) 命令执行过程
Redis源码解析(13) 主从复制
Redis源码解析(14) 哨兵机制[1] 结构与初始化
Redis源码解析(15) 哨兵机制[2] 信息同步与TILT模式
Redis源码解析(16) 哨兵机制[3] 判断下线
Redis源码解析(17) 哨兵机制[4] 故障转移
Redis源码解析(18) 集群[1]初始化,握手与心跳检测
Redis源码解析(19) 集群[2] 主从复制,故障检测与故障转移
Redis源码解析(20) 集群[3] 键的存储,重新分片与重定向
Redis源码解析(21) 集群[4] 故障转移failover与slave迁移
Redis源码解析(22) 事务
Redis源码解析(23) SCAN命令实现
引言
redis在2.8版本以后实现了一个发布订阅机制,这使得redis在我们需要一个基本的发布订阅功能的时候可以充当一个消息队列.Redis一共为我们提供了六个命令,两种匹配方法来实现发布订阅
- 客户端可以一次性订阅一个或者多个channel,
SUBSCRIBE
channel1 channel2 channel3 PUBSUB
返回当前publish/subscribe 系统的内部命令的活动状态,包含三个内部命令,分别为:channels
(列出当前活跃的channel),NUMSUB
(返回指定channel的订阅数目),NUMPAT
(返回订阅pattern的订阅数,不需要参数)- 订阅多个channel,也就是我们所说的模式匹配,我们可以使用正则表达式来充当模式串 PSUBSCRIBE chan*
- 消息发布,
PUBLISH
channel2 hello - 取消某一个channel消息订阅,
UNSUBSCRIBE
channel1 - 取消某个pattern的消息订阅,
PUNSUBSCRIBE
chan* ;
两种匹配方法其实上面已经提到了,就是精确的频道匹配
与使用正则表达式的模式匹配
,它们的实现并不相同,频道匹配使用字典
作为数据结构,其中的节点key为频道,value为用户链表.而模式匹配中则使用链表
来作为底层的数据结构,节点结构为pubsubPattern{模式串,用户信息},这也使得使用模式匹配是搜索的复杂度为O(N).至于模式匹配不使用字典的原因应该是模式串重复的远远小于频道的重复吧,毕竟模式匹配是支持正则表达式的.
其实下面要说的pubsubPublishMessage函数也是PUBLISH的内部实现,关于Redis的发布订阅模式,请点击这里
解析部分
notifyKeyspaceEvent是消息通知的实现函数,也就是当有消息到来时,可以是一条对键的命令,也可以是PUBLISH,如果是对键的消息,就会去构造一个字符串,因为这个过程,当我们监视某个关联键的频道时我们需要遵循redis的协议构造一个频道名,协议如下
命令 __keyspace@仓库序号__:要监控的命令名
SUBSCRIBE __keyspace@0__:key
命令 __keyevent@仓库序号__:要监控的命令名
SUBSCRIBE __keyevent@0__:del
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If notifications for this class of events are off, return ASAP. */
// 如果服务器配置为不发送 type 类型的通知,那么直接返回 就是配置文件中notify-keyspace-events的值
if (!(server.notify_keyspace_events & type)) return;
// 事件的名字
eventobj = createStringObject(event,strlen(event)); //根据事件名称创建一个robj对象
/* __keyspace@<db>__:<key> <event> notifications. */
// 发送键空间通知
if (server.notify_keyspace_events & REDIS_NOTIFY_KEYSPACE) {
// 构建频道对象
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid); //把数据库的号码转换成字符串
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr); //已执行的命令名称
chanobj = createObject(REDIS_STRING, chan); //构建一个key对象
// 通过 publish 命令发送通知
pubsubPublishMessage(chanobj, eventobj);
// 释放频道对象
decrRefCount(chanobj);
}
/* __keyevente@<db>__:<event> <key> notifications. */
// 发送键事件通知
if (server.notify_keyspace_events & REDIS_NOTIFY_KEYEVENT) {
// 构建频道对象
chan = sdsnewlen("__keyevent@",11);
// 如果在前面发送键空间通知的时候计算了 len ,那么它就不会是 -1
// 这可以避免计算两次 buf 的长度
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(REDIS_STRING, chan);
// 通过 publish 命令发送通知
pubsubPublishMessage(chanobj, key);
// 释放频道对象
decrRefCount(chanobj);
}
// 释放事件对象
decrRefCount(eventobj);
}
这里面我们可以看到真正的执行发送消息的函数是pubsubPublishMessage,也就是PUBLISH的内部实现.
首先会在模式匹配的字典中寻找匹配项,然后去模式匹配的链表中寻找匹配
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
// 取出包含所有订阅频道 channel 的客户端的链表
// 并将消息发送给它们
// 这里从字典中取成员也意味着事件通知的注册其实就是利用服务端输入的命令构建一个key
// 如
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de); //值为一个链表 其中记录着客户信息 结构为redisClient
listNode *ln;
listIter li;
// 遍历客户端链表,将 message 发送给它们
listRewind(list,&li); //设置迭代器,开始遍历
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
// 回复客户端。
// 示例:
// 1) "message"
// 2) "xxx"
// 3) "hello"
addReply(c,shared.mbulkhdr[3]);
// "message" 字符串
addReply(c,shared.messagebulk); //前两个都一样
// 消息的来源频道
addReplyBulk(c,channel); //就是我们前面构造的字符串
// 消息内容
addReplyBulk(c,message);
// 接收客户端计数
receivers++;
}
}
/* Send to clients listening to matching channels */
// 将消息也发送给那些和频道匹配的模式
if (listLength(server.pubsub_patterns)) {
// 遍历模式链表
listRewind(server.pubsub_patterns,&li); //设置迭代器
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
// 取出 pubsubPattern 存储的结构是客户端信息和模式串本身
pubsubPattern *pat = ln->value;
// 如果 channel 和 pattern 匹配 模式串为正则表达式
// 就给所有订阅该 pattern 的客户端发送消息
// 这个下面有一个样例来解释
if (stringmatchlen((char*)pat->pattern->ptr, //模式串
sdslen(pat->pattern->ptr),
(char*)channel->ptr, //从到来的消息中取出要匹配的串
sdslen(channel->ptr),0)) {
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk); //模式串,也就是我们注册时带*的那个
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel); //匹配的串
addReplyBulk(pat->client,message); //这也就是我们要发送的消息了
// 对接收消息的客户端进行计数
receivers++;
}
}
decrRefCount(channel);
}
// 返回计数
return receivers;
}
其中有一个匹配的函数stringmatchlen,我们来写个简单的小demo测试一个这个函数
int main(){
string partern="*t[ea]st.*";
string one="sdatest.asdasd";
string two="tast.asd.";
cout << stringmatchlen(partern.c_str(), partern.size(), one.c_str(), one.size(),0) << endl;
cout << stringmatchlen(partern.c_str(), partern.size(), two.c_str(), two.size(),0) << endl;
return 0;
}
输出为:
1
1
stringmatchlen所做的事情其实就是去匹配正则表达式,所以模式串中其实我们是可以去写一个正则表达式的.