之前解读代码的时候,经常有一个notify关键字出现,如下:
/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
server.stat_expiredkeys++;
}
大概的意思就是做了一件事情,去通知一下订阅了该事件的客户端。先看下这个具体怎么使用:https://redis.io/topics/notifications
概述:
redis2.8之后就添加了keyspace通知的功能(keyspace包括keyevent),可以让客户端订阅Pub/sub的channels去接收到redis的数据变化。因为Pub和Sub不支持持久化,所以断开的时间段的信息收不到。
事件类型:
键空间通知通过影响redis数据的操作发送两种类型的事件:比如使用DEL操作触发两条信息传递
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
keyspace通道中带有前缀的第一种事件称为键空间通知,而带有keyevent前缀的第二种事件称为键事件通知。
配置:因为要使用一定CPU能力,所以默认是关掉的。可以修改notify-keyspace-eventsredis.conf 或通过CONFIG SET启用。
由多个字符组成的非空字符串,其中每个字符都具有特殊含义,如下表所示:
至少K或E应该出现在字符串中,例如,要仅为列表启用键空间事件,必须将配置参数设置为Kl,等等。该字符串KEA可用于启用每个可能的事件。
集群:
如上所述,Redis 集群的每个节点都会生成有关其自己的键空间子集的事件。但是,与集群中的常规 Pub/Sub 通信不同,事件的通知不会广播到所有节点。换句话说,键空间事件是特定于节点的。这意味着要接收集群的所有键空间事件,客户端需要订阅每个节点。
使用例子:
客户端0
127.0.0.1:6379> SET notify-keyspace-events KEA
OK
127.0.0.1:6379> PSUBSCRIBE *
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "*"
3) (integer) 1
先把redis设置为kea,然后在让这个客户订阅所有频道
客户端1
127.0.0.1:6379> set a b
OK
客户端0
1) "pmessage"
2) "*"
3) "__keyspace@0__:a"
4) "set"
1) "pmessage"
2) "*"
3) "__keyevent@0__:set"
4) "a"
可以看到客户端1设置一个键值对后,客户端0收到两条命令,一个是keyspace_a的键空间通知,说a被set了,令一个是关于set的键事件通知,说set命令被使用了
因为是基于pub和sub功能,源码比较简单,notify.c算是redis中最短的功能实现了于是就把整个notify.c贴上分析下:
notify.c
#include "server.h"
/* Turn a string representing notification classes into an integer
* representing notification classes flags xored.
*
* The function returns -1 if the input contains characters not mapping to
* any class. */
//将Notify设置参数由字符串转换成标识量flag
int keyspaceEventsStringToFlags(char *classes) {
char *p = classes;
int c, flags = 0;
while((c = *p++) != '\0') {
switch(c) {
case 'A': flags |= NOTIFY_ALL; break;
case 'g': flags |= NOTIFY_GENERIC; break;
case '$': flags |= NOTIFY_STRING; break;
case 'l': flags |= NOTIFY_LIST; break;
case 's': flags |= NOTIFY_SET; break;
case 'h': flags |= NOTIFY_HASH; break;
case 'z': flags |= NOTIFY_ZSET; break;
case 'x': flags |= NOTIFY_EXPIRED; break;
case 'e': flags |= NOTIFY_EVICTED; break;
case 'K': flags |= NOTIFY_KEYSPACE; break;
case 'E': flags |= NOTIFY_KEYEVENT; break;
case 't': flags |= NOTIFY_STREAM; break;
case 'm': flags |= NOTIFY_KEY_MISS; break;
case 'd': flags |= NOTIFY_MODULE; break;
default: return -1;
}
}
return flags;
}
/* This function does exactly the reverse of the function above: it gets
* as input an integer with the xored flags and returns a string representing
* the selected classes. The string returned is an sds string that needs to
* be released with sdsfree(). */
//上面的逆向函数
sds keyspaceEventsFlagsToString(int flags) {
sds res;
res = sdsempty();
if ((flags & NOTIFY_ALL) == NOTIFY_ALL) {
res = sdscatlen(res,"A",1);
} else {
if (flags & NOTIFY_GENERIC) res = sdscatlen(res,"g",1);
if (flags & NOTIFY_STRING) res = sdscatlen(res,"$",1);
if (flags & NOTIFY_LIST) res = sdscatlen(res,"l",1);
if (flags & NOTIFY_SET) res = sdscatlen(res,"s",1);
if (flags & NOTIFY_HASH) res = sdscatlen(res,"h",1);
if (flags & NOTIFY_ZSET) res = sdscatlen(res,"z",1);
if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1);
}
if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
return res;
}
/* The API provided to the rest of the Redis core is a simple function:
*
* notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
*
* 'type' is the notification class we define in `server.h`.
* 'event' is a C string representing the event name.
* 'key' is a Redis object representing the key name.
* 'dbid' is the database ID where the key lives. */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
//模块订阅的通知,redis模块是redis4.0引入,目前也不太清楚是做什么。后续分析
moduleNotifyKeyspaceEvent(type, event, key, dbid);
/* If notifications for this class of events are off, return ASAP. */
// 通知功能关闭,直接退出
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
/* __keyspace@<db>__:<key> <event> notifications. */
// 键空间通知,格式为__keyspace@<db>__:<key> <event>
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
// 键时间通知,格式为__keyevente@<db>__:<event> <key>
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
// 调用pub/sub命令
pubsubPublishMessage(chanobj, key);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}