Pubsub概述
Redis的发布和订阅功能由PUBLISH、SUBSCRIBE和PSUBSCRIBE等命令组成,要想理解源码,必须首先熟悉这些命令的形式和功能。可以看到之前的notify的分析文章中对这里的应用“
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
//模块订阅的通知,redis模块是redis4.0引入,目前也不太清楚是做什么。后续分析
moduleNotifyKeyspaceEvent(type, event, key, dbid);
/* If notifications for this class of events are off, return ASAP. */
// 通知功能关闭,直接退出
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
/* __keyspace@<db>__:<key> <event> notifications. */
// 键空间通知,格式为__keyspace@<db>__:<key> <event>
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
// 键时间通知,格式为__keyevente@<db>__:<event> <key>
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
// 调用pub/sub命令
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}
看下server.h中对pubsub数据结构的使用
struct redisClient {
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
}
111
struct redisServer {
...
dict *pubsub_channels; /* Map channels to list of subscribed clients */
dict *pubsub_patterns; /* A dict of pubsub_patterns */
...
}
订阅
订阅频道
当客户端执行订阅频道命令的时候,客户端和服务器需要执行两个步骤:
- 向客户端的
pubsub_channels
字典中添加该频道 - 向服务器的
pubsub_channels
字典中添加该频道及其对应的客户端
上述两个步骤由subscribeCommand函数完成,其源码如下:
/* SUBSCRIBE channel [channel ...] */
void subscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
*
* Notice that we have a special treatment for multi because of
* backward compatibility
*/
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
底层实现:
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { //添加到c的pubsub_channels中
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients); //添加server中去:channel:client
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c); 添加到这个channel对应的clinet的list的尾端
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel);
return retval;
}
订阅模式
当客户端执行订阅模式的指令时,同样需要对服务器和客户端的pubsub_patterns链表进行操作。其源码如下:
/* PSUBSCRIBE pattern [pattern ...] */
void psubscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
*
* Notice that we have a special treatment for multi because of
* backward compatibility
*/
addReplyError(c, "PSUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
源码如下:
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
list *clients;
int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { //如果客户端里没有这个pattern
retval = 1;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns,pattern); //如果服务端没有这个pattern
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubPatSubscribed(c,pattern);
return retval;
}
退订
退订的操作就放在一节里面讲了,无非就是从结构体中删除一些节点,事实就是如此,以退订频道为例:
/* UNSUBSCRIBE [channel [channel ...]] */
void unsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
底层实现
/* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed to. */
int pubsubUnsubscribeAllChannels(client *c, int notify) {
int count = 0;
if (dictSize(c->pubsub_channels) > 0) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
count += pubsubUnsubscribeChannel(c,channel,notify);
}
dictReleaseIterator(di);
}
/* We were subscribed to nothing? Still reply to the client. */
if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
return count;
}
发布消息
当客户端调用发布消息的命令时,需要进行如下两个操作:
- 查找服务器的pubsub_channels字典下该频道对应的客户端链表,然后遍历,一一发送
- 查找服务器的pubsub_patterns字典,遍历模式串,如果匹配就发送,反之不作处理
发布消息的命令由publishCommand函数实现,其源码如下:
/* PUBLISH <channel> <message> */
void publishCommand(client *c) {
if (server.sentinel_mode) {
sentinelPublishCommand(c);
return;
}
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
底层实现
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
// 发送到订阅该频道的所有客户端
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
// 如果存在该频道,则获取客户端链表
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
// 遍历,发送消息
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
// 发送消息
addReplyPubsubMessage(c,channel,message);
updateClientMemUsage(c);
receivers++;
}
}
// 发送到订阅该频道pattern的所有客户端
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
//如果匹配到
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
//发消息
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsage(c);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
自提的相关issue:
https://github.com/redis/redis/issues/9897