redis-multi

2021-12-09

eventloop 的创建

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    monotonicInit();    /* just in case the calling app didn't initialize */

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

只有在initServer和redis-benchmark.c中才会被调用

void initServer(void) {
	server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

其中传入的参数是默认是10000,然后还有128的aof、rdb等buffer,setsize 参数表示了 eventloop 可以监听的网络事件 fd 的个数(不包含超时事件),如果当前监听的 fd 个数超过了 setsize,eventloop 将不能继续注册。

而删除EventLoop则是在server.c中最后一个函数

int main(int argc, char **argv) {
	......
	aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

void aeDeleteEventLoop(aeEventLoop *eventLoop) {
    aeApiFree(eventLoop);
    zfree(eventLoop->events);
    zfree(eventLoop->fired);

    /* Free the time events list. */
    aeTimeEvent *next_te, *te = eventLoop->timeEventHead;
    while (te) {
        next_te = te->next;
        zfree(te);
        te = next_te;
    }
    zfree(eventLoop);
}

我们知道,Linux 内核会给每个进程维护一个文件描述符表。而 POSIX 标准对于文件描述符进行了以下约束:

  1. fd 为 0、1、2 分别表示标准输入、标准输出和错误输出
  2. 每次新打开的 fd,必须使用当前进程中最小可用的文件描述符。

Redis 充分利用了文件描述符的这些特点,来存储每个 fd 对应的事件。

#define AE_NONE 0       /* No events registered. */
#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
for (i = 0; i < setsize; i++)
    eventLoop->events[i].mask = AE_NONE;

不过也基于文件描述符的这些特点,意味着 events 数组的前三位一定不会有相应的 fd 赋值。

网络 IO(文件) 事件

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

Redis 通过以下接口进行网络 IO 事件的注册和删除。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

aeCreateFileEvent 表示将某个 fd 的某些事件注册到 eventloop 中。

目前可注册的事件有三种:

  1. AE_READABLE 可读事件
  2. AE_WRITABLE 可写事件
  3. AE_BARRIER 该事件稍后在 “事件的等待和处理“ 一节详细讲到。

而 mask 就是这几个事件经过或运算后的掩码。

epoll:
typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

aeCreateFileEvent 在 epoll 的实现中调用了 epoll_ctl 函数。Redis 会根据该事件对应之前的 mask 是否为 AE_NONE,来决定使用 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。

同样的,aeDeleteFileEvent 也使用了 epoll_ctl,Redis 判断用户是否是要完全删除该 fd 上所有事件,来决定使用 EPOLL_CTL_DEL 还是 EPOLL_CTL_MOD。

定时器(时间事件)

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    monotime when;
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
  		   * freed in recursive time event calls. */
} aeTimeEvent;

AE 中最不值得分析的大概就是定时器了,因为实现的实在是太简单了,甚至可以说是简陋。

Redis 通过以下两个接口进行定时器的注册和取消。

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

Redis 的定时器其实做的非常简陋,只是一个普通的双向链表,链表也并不是有序的。每次最新的超时事件,直接插入链表的最头部。 当 AE 要遍历当前时刻的超时事件时,也是直接暴力的从头到尾遍历链表,看看有没有超时的事件。


异常处理

虽然定时器做的这么简陋,但是对于一些时间上的异常情况,Redis 还是做了下基本的处理。具体可见如下代码:

https://www.cyhone.com/articles/analysis-of-redis-ae/