redis-notify

2021-11-29

之前解读代码的时候,经常有一个notify关键字出现,如下:

/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
    mstime_t expire_latency;
    latencyStartMonitor(expire_latency);
    if (server.lazyfree_lazy_expire)
        dbAsyncDelete(db,keyobj);
    else
        dbSyncDelete(db,keyobj);
    latencyEndMonitor(expire_latency);
    latencyAddSampleIfNeeded("expire-del",expire_latency);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
    signalModifiedKey(NULL, db, keyobj);
    propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
    server.stat_expiredkeys++;
}

大概的意思就是做了一件事情,去通知一下订阅了该事件的客户端。先看下这个具体怎么使用:https://redis.io/topics/notifications

概述:
redis2.8之后就添加了keyspace通知的功能(keyspace包括keyevent),可以让客户端订阅Pub/sub的channels去接收到redis的数据变化。因为Pub和Sub不支持持久化,所以断开的时间段的信息收不到。

事件类型:
键空间通知通过影响redis数据的操作发送两种类型的事件:比如使用DEL操作触发两条信息传递
PUBLISH __keyspace@0__:mykey del
PUBLISH __keyevent@0__:del mykey
keyspace通道中带有前缀的第一种事件称为键空间通知,而带有keyevent前缀的第二种事件称为键事件通知。

配置:因为要使用一定CPU能力,所以默认是关掉的。可以修改notify-keyspace-eventsredis.conf 或通过CONFIG SET启用。
由多个字符组成的非空字符串,其中每个字符都具有特殊含义,如下表所示:

image-20211129123342574

至少K或E应该出现在字符串中,例如,要仅为列表启用键空间事件,必须将配置参数设置为Kl,等等。该字符串KEA可用于启用每个可能的事件。
集群:
如上所述,Redis 集群的每个节点都会生成有关其自己的键空间子集的事件。但是,与集群中的常规 Pub/Sub 通信不同,事件的通知不会广播到所有节点。换句话说,键空间事件是特定于节点的。这意味着要接收集群的所有键空间事件,客户端需要订阅每个节点。

使用例子:

客户端0
127.0.0.1:6379> SET notify-keyspace-events KEA
OK
127.0.0.1:6379> PSUBSCRIBE *
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "*"
3) (integer) 1
先把redis设置为kea,然后在让这个客户订阅所有频道

客户端1
127.0.0.1:6379> set a b
OK

客户端0
1) "pmessage"
2) "*"
3) "__keyspace@0__:a"
4) "set"
1) "pmessage"
2) "*"
3) "__keyevent@0__:set"
4) "a"
可以看到客户端1设置一个键值对后,客户端0收到两条命令,一个是keyspace_a的键空间通知,说a被set了,令一个是关于set的键事件通知,说set命令被使用了

因为是基于pub和sub功能,源码比较简单,notify.c算是redis中最短的功能实现了于是就把整个notify.c贴上分析下:

notify.c
#include "server.h"

/* Turn a string representing notification classes into an integer
 * representing notification classes flags xored.
 *
 * The function returns -1 if the input contains characters not mapping to
 * any class. */
 //将Notify设置参数由字符串转换成标识量flag
int keyspaceEventsStringToFlags(char *classes) {
    char *p = classes;
    int c, flags = 0;

    while((c = *p++) != '\0') {
        switch(c) {
        case 'A': flags |= NOTIFY_ALL; break;
        case 'g': flags |= NOTIFY_GENERIC; break;
        case '$': flags |= NOTIFY_STRING; break;
        case 'l': flags |= NOTIFY_LIST; break;
        case 's': flags |= NOTIFY_SET; break;
        case 'h': flags |= NOTIFY_HASH; break;
        case 'z': flags |= NOTIFY_ZSET; break;
        case 'x': flags |= NOTIFY_EXPIRED; break;
        case 'e': flags |= NOTIFY_EVICTED; break;
        case 'K': flags |= NOTIFY_KEYSPACE; break;
        case 'E': flags |= NOTIFY_KEYEVENT; break;
        case 't': flags |= NOTIFY_STREAM; break;
        case 'm': flags |= NOTIFY_KEY_MISS; break;
        case 'd': flags |= NOTIFY_MODULE; break;
        default: return -1;
        }
    }
    return flags;
}

/* This function does exactly the reverse of the function above: it gets
 * as input an integer with the xored flags and returns a string representing
 * the selected classes. The string returned is an sds string that needs to
 * be released with sdsfree(). */
 //上面的逆向函数
sds keyspaceEventsFlagsToString(int flags) {
    sds res;

    res = sdsempty();
    if ((flags & NOTIFY_ALL) == NOTIFY_ALL) {
        res = sdscatlen(res,"A",1);
    } else {
        if (flags & NOTIFY_GENERIC) res = sdscatlen(res,"g",1);
        if (flags & NOTIFY_STRING) res = sdscatlen(res,"$",1);
        if (flags & NOTIFY_LIST) res = sdscatlen(res,"l",1);
        if (flags & NOTIFY_SET) res = sdscatlen(res,"s",1);
        if (flags & NOTIFY_HASH) res = sdscatlen(res,"h",1);
        if (flags & NOTIFY_ZSET) res = sdscatlen(res,"z",1);
        if (flags & NOTIFY_EXPIRED) res = sdscatlen(res,"x",1);
        if (flags & NOTIFY_EVICTED) res = sdscatlen(res,"e",1);
        if (flags & NOTIFY_STREAM) res = sdscatlen(res,"t",1);
        if (flags & NOTIFY_MODULE) res = sdscatlen(res,"d",1);
    }
    if (flags & NOTIFY_KEYSPACE) res = sdscatlen(res,"K",1);
    if (flags & NOTIFY_KEYEVENT) res = sdscatlen(res,"E",1);
    if (flags & NOTIFY_KEY_MISS) res = sdscatlen(res,"m",1);
    return res;
}

/* The API provided to the rest of the Redis core is a simple function:
 *
 * notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
 *
 * 'type' is the notification class we define in `server.h`.
 * 'event' is a C string representing the event name.
 * 'key' is a Redis object representing the key name.
 * 'dbid' is the database ID where the key lives.  */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
    sds chan;
    robj *chanobj, *eventobj;
    int len = -1;
    char buf[24];

    /* If any modules are interested in events, notify the module system now.
     * This bypasses the notifications configuration, but the module engine
     * will only call event subscribers if the event type matches the types
     * they are interested in. */
     //模块订阅的通知,redis模块是redis4.0引入,目前也不太清楚是做什么。后续分析
     moduleNotifyKeyspaceEvent(type, event, key, dbid);

    /* If notifications for this class of events are off, return ASAP. */
    // 通知功能关闭,直接退出
    if (!(server.notify_keyspace_events & type)) return;

    eventobj = createStringObject(event,strlen(event));

    /* __keyspace@<db>__:<key> <event> notifications. */
    // 键空间通知,格式为__keyspace@<db>__:<key> <event>
    if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
        chan = sdsnewlen("__keyspace@",11);
        len = ll2string(buf,sizeof(buf),dbid);
        chan = sdscatlen(chan, buf, len);
        chan = sdscatlen(chan, "__:", 3);
        chan = sdscatsds(chan, key->ptr);
        chanobj = createObject(OBJ_STRING, chan);
        pubsubPublishMessage(chanobj, eventobj);
        decrRefCount(chanobj);
    }

    /* __keyevent@<db>__:<event> <key> notifications. */
    // 键时间通知,格式为__keyevente@<db>__:<event> <key>
    if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
        chan = sdsnewlen("__keyevent@",11);
        if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
        chan = sdscatlen(chan, buf, len);
        chan = sdscatlen(chan, "__:", 3);
        chan = sdscatsds(chan, eventobj->ptr);
        chanobj = createObject(OBJ_STRING, chan);
        // 调用pub/sub命令
        pubsubPublishMessage(chanobj, key);
        decrRefCount(chanobj);
    }
    decrRefCount(eventobj);
}

1590744310779