aeEventLoop初始化
在server.c文件的initServer函數(shù)中,對(duì)aeEventLoop進(jìn)行了初始化:
- 調(diào)用aeCreateEventLoop函數(shù)創(chuàng)建aeEventLoop結(jié)構(gòu)體,對(duì)aeEventLoop結(jié)構(gòu)體中的變量進(jìn)行了初始化,之后調(diào)用了aeApiCreate函數(shù)創(chuàng)建epoll實(shí)例
- 調(diào)用aeCreateFileEvent函數(shù)向內(nèi)核注冊(cè)監(jiān)聽(tīng)事件,由參數(shù)可知,注冊(cè)的是對(duì)TCP文件描述符的可讀事件監(jiān)聽(tīng),回調(diào)函數(shù)是acceptTcpHandler,當(dāng)內(nèi)核監(jiān)聽(tīng)到TCP文件描述符有可讀事件時(shí),Redis將調(diào)用acceptTcpHandler函數(shù)對(duì)事件進(jìn)行處理
void initServer(void) {
// 創(chuàng)建aeEventLoop結(jié)構(gòu)體
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
// 省略其他代碼...
for (j = 0; j < server.ipfd_count; j++) {
// 注冊(cè)監(jiān)聽(tīng)事件,server.ipfd是TCP文件描述符,AE_READABLE可讀事件,acceptTcpHandler事件處理回調(diào)函數(shù)
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 省略其他代碼...
}
在aeCreateEventLoop函數(shù)調(diào)用時(shí),傳入的最大文件描述符個(gè)數(shù)為客戶(hù)端最大連接數(shù)+宏定義CONFIG_FDSET_INCR的大小,CONFIG_FDSET_INCR的定義在server.h中:
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
#define CONFIG_MIN_RESERVED_FDS 32
aeEventLoop結(jié)構(gòu)體創(chuàng)建
aeEventLoop結(jié)構(gòu)體定義,在ae.h中:
typedef struct aeEventLoop {
int maxfd; /* 記錄最大的文件描述符 */
int setsize; /* 最大文件描述符個(gè)數(shù) */
long long timeEventNextId;
time_t lastTime;
aeFileEvent *events; /* IO事件集合,記錄了每個(gè)文件描述符產(chǎn)生事件時(shí)的回調(diào)函數(shù) */
aeFiredEvent *fired; /* 記錄已觸發(fā)的事件 */
aeTimeEvent *timeEventHead; /* 時(shí)間事件 */
int stop;
void *apidata; /* IO多路復(fù)用API接口相關(guān)數(shù)據(jù) */
aeBeforeSleepProc *beforesleep;/* 進(jìn)入事件循環(huán)流程前的執(zhí)行函數(shù) */
aeBeforeSleepProc *aftersleep;/* 退出事件循環(huán)流程后的執(zhí)行函數(shù) */
} aeEventLoop;
aeCreateEventLoop
aeEventLoop結(jié)構(gòu)體創(chuàng)建在aeCreateEventLoop函數(shù)中(ae.c文件):
- 分配aeEventLoop結(jié)構(gòu)體所需內(nèi)存
- 分配aeEventLoop結(jié)構(gòu)體中其他變量所需內(nèi)存
- 調(diào)用aeApiCreate函數(shù)創(chuàng)建epoll實(shí)例
- 對(duì)IO事件集合events的mask掩碼初始化為AE_NONE,表示當(dāng)前沒(méi)有事件監(jiān)聽(tīng)
aeEventLoop *aeCreateEventLoop(int setsize) {
// aeEventLoop結(jié)構(gòu)體
aeEventLoop *eventLoop;
int i;
// 分配eventLoop內(nèi)存
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 分配IO事件內(nèi)存
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
// 創(chuàng)建poll實(shí)例
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE; // 初始化為空事件
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
創(chuàng)建epoll實(shí)例
aeApiState結(jié)構(gòu)體定義,在ae_epoll.c中:
-
epfd:創(chuàng)建的epoll實(shí)例文件描述符
-
events:記錄文件描述符產(chǎn)生的事件
typedef struct aeApiState {
int epfd; // epoll實(shí)例文件描述符
struct epoll_event *events; // 記錄就緒的事件
} aeApiState;
aeApiCreate
epoll實(shí)例的的創(chuàng)建在aeApiCreate函數(shù)(ae_epoll.c文件)中,處理邏輯如下:
-
為aeApiState結(jié)構(gòu)體分配內(nèi)存空間
-
為aeApiState中的events分配內(nèi)存空間,events數(shù)組個(gè)數(shù)為eventLoop中的最大文件描述個(gè)數(shù)
-
調(diào)用epoll_create函數(shù)創(chuàng)建epoll實(shí)例,將返回的epoll文件描述符保存在epfd中
-
將eventLoop的apidata指向創(chuàng)建的aeApiState,之后就可以通過(guò)eventLoop獲取到epoll實(shí)例并且注冊(cè)監(jiān)聽(tīng)事件了
static int aeApiCreate(aeEventLoop *eventLoop) {
// 分配內(nèi)存
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 為epoll事件分配內(nèi)存
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// epoll_create創(chuàng)建epoll實(shí)例,返回文件描述符,保存在state的epfd中
state->epfd = epoll_create(1024);
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 將aeApiState設(shè)置到eventLoop的apidata
eventLoop->apidata = state;
return 0;
}
注冊(cè)事件
IO 事件的數(shù)據(jù)結(jié)構(gòu)是 aeFileEvent 結(jié)構(gòu)體,在ae.c中定義:
-
mask:事件類(lèi)型掩碼,共有READABLE、WRITABLE、BARRIER三種事件,分別為可讀事件、可寫(xiě)事件和屏障事件
-
rfileProc:寫(xiě)事件回調(diào)函數(shù)
-
wfileProc:讀事件回調(diào)函數(shù)
typedef struct aeFileEvent {
int mask; /* 事件類(lèi)型掩碼 READABLE|WRITABLE|BARRIER */
aeFileProc *rfileProc; /* 寫(xiě)事件回調(diào)函數(shù) */
aeFileProc *wfileProc; /* 讀事件回調(diào)函數(shù) */
void *clientData; /* 客戶(hù)端數(shù)據(jù) */
} aeFileEvent;
aeCreateFileEvent
aeCreateFileEvent函數(shù)在ae.c文件中,主要處理邏輯如下:
- 根據(jù)傳入的文件描述符,在eventLoop中獲取對(duì)應(yīng)的IO事件aeFileEvent fe
- 調(diào)用aeApiAddEvent方法注冊(cè)要監(jiān)聽(tīng)的事件
- 設(shè)置讀寫(xiě)事件的回調(diào)函數(shù)
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 根據(jù)傳入的文件描述符獲取對(duì)應(yīng)的IO事件
aeFileEvent *fe = &eventLoop->events[fd];
// 注冊(cè)要監(jiān)聽(tīng)的事件,讓內(nèi)核可以監(jiān)聽(tīng)到當(dāng)前文件描述符上的IO事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 設(shè)置寫(xiě)事件的回調(diào)函數(shù)
if (mask & AE_WRITABLE) fe->wfileProc = proc; // 設(shè)置讀事件的回調(diào)函數(shù)
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
aeApiAddEvent
aeApiAddEvent用于注冊(cè)事件(ae_epoll.c文件中):
- 從eventLoop獲取aeApiState,因?yàn)閍eApiState中的epfd記錄了epoll實(shí)例
- 創(chuàng)建了epoll_event類(lèi)型的變量ee,用于記錄操作類(lèi)型、要監(jiān)聽(tīng)的文件描述符以及事件類(lèi)型,在調(diào)用函數(shù)時(shí)使用
- 根據(jù)掩碼mask判斷操作類(lèi)型,如果文件描述符還未設(shè)置監(jiān)聽(tīng)事件mask掩碼為AE_NONE, 類(lèi)型設(shè)置為添加,否則設(shè)置為修改,操作類(lèi)型有如下三種:
- EPOLL_CTL_ADD:用于向epoll添加監(jiān)聽(tīng)事件
- EPOLL_CTL_MOD:用于修改已經(jīng)注冊(cè)過(guò)的監(jiān)聽(tīng)事件
- EPOLL_CTL_ADD:用于刪除監(jiān)聽(tīng)事件
- 將redis的可讀、可寫(xiě)事件類(lèi)型轉(zhuǎn)換為epoll的類(lèi)型,讀事件類(lèi)型為EPOLLIN,寫(xiě)事件為EPOLLOUT,并設(shè)置到ee的events中
- 調(diào)用epoll_ctl函數(shù)添加文件描述符的監(jiān)聽(tīng)事件,參數(shù)分別為epoll實(shí)例、操作類(lèi)型、要監(jiān)聽(tīng)的文件描述符、epoll_event類(lèi)型變量ee
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
// 獲取aeApiState
aeApiState *state = eventLoop->apidata;
// 創(chuàng)建epoll_event類(lèi)型的變量ee,添加監(jiān)聽(tīng)事件的時(shí)候使用
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* 如果fd文件描述符還未設(shè)置監(jiān)聽(tīng)事件, 類(lèi)型設(shè)置為添加,否則設(shè)置為修改,簡(jiǎn)言之就是根據(jù)掩碼判斷是添加還是修改監(jiān)聽(tīng)事件 */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask;
// 如果是可讀事件,轉(zhuǎn)換為epoll的讀事件監(jiān)聽(tīng)類(lèi)型EPOLLIN,并設(shè)置到ee的events中
if (mask & AE_READABLE) ee.events |= EPOLLIN;
// 如果是可寫(xiě)事件,轉(zhuǎn)換為epoll的寫(xiě)事件監(jiān)聽(tīng)類(lèi)型EPOLLOUT,并設(shè)置到ee的events中
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
// 記錄要監(jiān)聽(tīng)的文件描述符
ee.data.fd = fd;
// 調(diào)用epoll_ctl函數(shù)向epoll添加監(jiān)聽(tīng)事件,參數(shù)分別為epoll實(shí)例、操作類(lèi)型、要監(jiān)聽(tīng)的文件描述符、epoll_event類(lèi)型變量ee
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
總結(jié)
Redis在啟動(dòng)時(shí),調(diào)用aeCreateEventLoop創(chuàng)建aeEventLoop結(jié)構(gòu)體和epoll實(shí)例,之后調(diào)用aeCreateFileEvent函數(shù)向內(nèi)核注冊(cè)TCP文件描述符的監(jiān)聽(tīng)事件,當(dāng)有客戶(hù)端連接Redis服務(wù)時(shí),TCP文件描述符產(chǎn)生可讀事件,通過(guò)epoll可以獲取產(chǎn)生事件的文件描述符,Redis就可以對(duì)連接請(qǐng)求進(jìn)行處理。
// server.el是eventLoop
// server.ipfd[j]是監(jiān)聽(tīng)socket的文件描述符
// AE_READABLE是讀事件
// acceptTcpHandler是事件產(chǎn)生時(shí)的回調(diào)函數(shù)
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
事件處理
aeMain函數(shù)在ae.c文件中,里面是一個(gè)while循環(huán),它的處理邏輯如下:
- 通過(guò)eventLoop的stop判斷是否處于停止?fàn)顟B(tài),如果非停止?fàn)顟B(tài)進(jìn)入第2步
- 判斷eventLoop的beforesleep是否為空,如果不為空,調(diào)用beforesleep函數(shù)
- 調(diào)用了aeProcessEvents函數(shù)處理IO事件
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 調(diào)用了aeProcessEvents處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
aeProcessEvents
aeProcessEvents函數(shù)在ae.c文件中,處理邏輯如下:
- 調(diào)用aeApiPoll函數(shù)等待就緒的事件,如果有事件產(chǎn)生,返回就緒的文件描述符個(gè)數(shù),aeApiPoll函數(shù)中對(duì)就緒文件描述符處理時(shí)將其放在了fired中
- for循環(huán)中處理就緒的事件,通過(guò)fired可以獲取到每一個(gè)產(chǎn)生事件的文件描述符fd,根據(jù)文件描述符fd可以在eventLoop的events中獲取對(duì)應(yīng)的事件aeFileEvent,aeFileEvent中記錄了事件的回調(diào)函數(shù),之后根據(jù)事件類(lèi)型,調(diào)用對(duì)應(yīng)的回調(diào)函數(shù),調(diào)用回調(diào)函數(shù)的入?yún)⒎謩e為eventLoop、文件描述符、aeFileEvent的clientData、事件類(lèi)型掩碼
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* 如果沒(méi)有事件 */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* 如果有IO事件或者時(shí)間事件 */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 省略代碼...
// 等待事件,返回就緒文件描述符的數(shù)量
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 處理就緒的事件
for (j = 0; j < numevents; j++) {
// aeApiPoll中已將就緒的事件放在了fired中,通過(guò)fired可以獲取到產(chǎn)生事件的文件描述符fd
// 根據(jù)文件描述符fd獲取對(duì)應(yīng)的事件aeFileEvent,aeFileEvent中記錄了事件的回調(diào)函數(shù)
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
// 獲取文件描述符
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* 判斷屏障 */
int invert = fe->mask & AE_BARRIER;
/* 處理可讀事件 */
if (!invert && fe->mask & mask & AE_READABLE) {
// 如果是可讀事件,調(diào)用可讀事件的回調(diào)函數(shù),參數(shù)分別為eventLoop、文件描述符、aeFileEvent的clientData、事件類(lèi)型掩碼
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* 處理可寫(xiě)事件 */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 如果是寫(xiě)事件,調(diào)用寫(xiě)事件的回調(diào)函數(shù),參數(shù)分別為eventLoop、文件描述符、aeFileEvent的clientData、事件類(lèi)型掩碼
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* 如果有時(shí)間事件 */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
aeApiPoll
aeApiPoll處理就緒的事件:
-
調(diào)用IO多路復(fù)用epoll_wait函數(shù)等待事件的產(chǎn)生,epoll_wait函數(shù)需要傳入epoll實(shí)例、記錄就緒事件集合的epoll_event,這兩個(gè)參數(shù)分別在aeApiState的epfd和events中,當(dāng)監(jiān)聽(tīng)的文件描述符有事件產(chǎn)生時(shí),epoll_wait返回就緒的文件描述符個(gè)數(shù)
-
對(duì)epoll_wait返回的就緒事件進(jìn)行處理,事件記錄在events變量中,遍歷每一個(gè)就緒的事件,將事件對(duì)應(yīng)的文件描述符設(shè)置在eventLoop的fire中,后續(xù)通過(guò)fire對(duì)事件進(jìn)行處理
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
// 獲取aeApiState,aeApiState記錄了epoll實(shí)例,events記錄了產(chǎn)生的事件
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// 等待事件的產(chǎn)生,epoll_wait返回就緒的文件描述符個(gè)數(shù),就緒的事件記錄在state->events中
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 處理返回的就緒事件
for (j = 0; j < numevents; j++) {
int mask = 0;
// 獲取每一個(gè)就緒的事件
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 將就緒事件的文件描述符設(shè)置到已觸發(fā)的事件fired的fd中
eventLoop->fired[j].fd = e->data.fd;
// 設(shè)置事件類(lèi)型掩碼
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
處理客戶(hù)端連接
acceptTcpHandler
由上面的調(diào)用可知,Redis在啟動(dòng)時(shí),注冊(cè)了AE_READABLE讀事件,回調(diào)函數(shù)為acceptTcpHandler(network.c文件中)用于處理客戶(hù)端連接,當(dāng)有客戶(hù)端與Redis連接時(shí),epoll返回就緒的文件描述符,Redis在處理就緒的事件時(shí)調(diào)用acceptTcpHandler進(jìn)行處理:
- 調(diào)用anetTcpAccept建立連接,并返回已連接的套接字文件描述符cfd
- 調(diào)用acceptCommonHandler(network.c文件中)函數(shù),它又調(diào)用了createClient函數(shù),在createClient函數(shù)中調(diào)用了aeCreateFileEvent,向內(nèi)核注冊(cè)已連接套接字的可讀監(jiān)聽(tīng)事件
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
// 建立連接,返回已連接的套接字文件描述符cfd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 調(diào)用acceptCommonHandler處理連接,這里傳入的文件描述符為已連接的套接字
acceptCommonHandler(cfd,0,cip);
}
}
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 調(diào)用createClient
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
// ..
}
createClient
createClient函數(shù)中調(diào)用了aeCreateFileEvent方法向內(nèi)核中注冊(cè)可讀事件,上文可知傳入的描述符是已連接套接字cfd,回調(diào)函數(shù)為readQueryFromClient,此時(shí)事件驅(qū)動(dòng)框架增加了對(duì)客戶(hù)端已連接套接字的監(jiān)聽(tīng),當(dāng)客戶(hù)端有數(shù)據(jù)發(fā)送到服務(wù)端時(shí),Redis調(diào)用readQueryFromClient函數(shù)處理讀事件:
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注冊(cè)已連接套接字的可讀事件,回調(diào)函數(shù)為readQueryFromClient
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
// ...
}
處理讀事件
readQueryFromClient
readQueryFromClient函數(shù)在network.c文件中,是可讀事件的回調(diào)函數(shù),用于處理已連接套接字上的讀事件,處理邏輯如下:
- 從已連接的套接字中讀取客戶(hù)端的請(qǐng)求數(shù)據(jù)到輸入緩沖區(qū)
- 調(diào)用processInputBufferAndReplicate函數(shù)處理輸入緩沖區(qū)的數(shù)據(jù)
// aeProcessEvents中調(diào)用回調(diào)函數(shù)時(shí),傳入的參數(shù)分別為aeEventLoop、已連接套接字的文件描述符、aeFileEvent的clientData私有數(shù)據(jù)、事件類(lèi)型掩碼
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 從已連接的套接字中讀取客戶(hù)端的請(qǐng)求數(shù)據(jù)到輸入緩沖區(qū)
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// 省略...
/* 處理輸入緩沖區(qū)數(shù)據(jù) */
processInputBufferAndReplicate(c);
}
處理寫(xiě)事件
在aeMain調(diào)用aeProcessEvents之前,先調(diào)用了beforeSleep方法,beforeSleep中又調(diào)用了handleClientsWithPendingWrites,它會(huì)將Redis Server緩沖區(qū)的數(shù)據(jù)寫(xiě)回到客戶(hù)端:
void beforeSleep(struct aeEventLoop *eventLoop) {
// 省略...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
// 省略...
}.
handleClientsWithPendingWrites
Redis Server收到客戶(hù)端的請(qǐng)求命令后,需要處理請(qǐng)求,然后將要返回的數(shù)據(jù)寫(xiě)回到客戶(hù)端,寫(xiě)回到客戶(hù)端的邏輯在handleClientsWithPendingWrites函數(shù)中,處理邏輯如下:
- 獲取待寫(xiě)回?cái)?shù)據(jù)的客戶(hù)端列表
- 遍歷每一個(gè)待寫(xiě)回?cái)?shù)據(jù)的客戶(hù)端,調(diào)用writeToClient方法將緩沖區(qū)的數(shù)據(jù)寫(xiě)到客戶(hù)端socket中,然后調(diào)用clientHasPendingReplies方法判斷數(shù)據(jù)是否全部寫(xiě)回,如果為否,則調(diào)用aeCreateFileEvent向內(nèi)核注冊(cè)客戶(hù)端文件描述符的可寫(xiě)事件監(jiān)聽(tīng),交由回調(diào)函數(shù)sendReplyToClient處理
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
// 獲取待寫(xiě)回?cái)?shù)據(jù)的客戶(hù)端列表
listRewind(server.clients_pending_write,&li);
// 遍歷每一個(gè)待寫(xiě)回?cái)?shù)據(jù)的客戶(hù)端
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
if (c->flags & CLIENT_PROTECTED) continue;
/* 將緩沖區(qū)的數(shù)據(jù)寫(xiě)到客戶(hù)端socket中 */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* 如果數(shù)據(jù)未全部寫(xiě)回到客戶(hù)端 */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
// 調(diào)用aeCreateFileEvent方法,向內(nèi)核注冊(cè)客戶(hù)端文件描述符的可寫(xiě)事件監(jiān)聽(tīng),交由回調(diào)函數(shù)sendReplyToClient處理
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
clientHasPendingReplies
有時(shí)由于網(wǎng)絡(luò)原因或者其他原因,可能只發(fā)出去了部分?jǐn)?shù)據(jù),客戶(hù)端如果一直未從緩沖區(qū)讀取數(shù)據(jù),在緩沖區(qū)已滿(mǎn)的情況,服務(wù)端將無(wú)法往客戶(hù)端發(fā)送數(shù)據(jù),所以調(diào)用clientHasPendingReplies函數(shù)判斷數(shù)據(jù)是否寫(xiě)回完畢,如果未寫(xiě)回完畢交由事件循環(huán)驅(qū)動(dòng)處理,提高處理效率。
整體流程圖
總結(jié)
參考
極客時(shí)間 - Redis源碼剖析與實(shí)戰(zhàn)(蔣德鈞)
【osc_avxkth26】Redis 網(wǎng)絡(luò)通信模塊源碼分析(3)
Redis版本:redis-5.0.8
本文摘自 :https://www.cnblogs.com/