Redis所實現(xiàn)的Reactor模型設計方案
寫在文章開頭
我們都知道解決C10k問題的最好方案就是通過在IO多路復用的基礎上通過reactor模型實現(xiàn)高性能的網(wǎng)絡并發(fā)程序,借助這個設計,redis的主線程也是基于IO多路復用以reactor模型的思路實現(xiàn)了一個高性能的單線程內存數(shù)據(jù),本文將帶領讀者從源碼的角度來查看redis關于reactor模型的設計。
詳解Redis中的Reactor模型
Reactor模型掃盲
在此之前我們先來了解一下Reactor模型,在高性能網(wǎng)絡并發(fā)程序的設計中,Reactor模型通過reactor接收用戶連接事件、讀事件、寫事件這些網(wǎng)絡事件,得到連接事件之后通過acceptor為其分配handler,后續(xù)的這些客戶端的讀寫事件都會交由handler完成讀寫事件的處理,由此實現(xiàn)盡可能少的線程處理盡可能多的連接。
詳解reactor的實現(xiàn)
上文我們簡單的對Reactor模型進行了簡單的掃盲,接下來我們將從redis的源碼來了解redis對于Reactor模型的實現(xiàn),我們都知道Reactor模型是通過reactor接收連接、讀、寫三種事件的,這一點我們可以直接在main方法看到aeMain的調用,該方法內部本質就是通過epoll模型進行非阻塞獲取就的網(wǎng)絡事件:
int main(int argc, char **argv) {
//前置初始化步驟
//......
//事件循環(huán)輪詢前置操作
aeSetBeforeSleepProc(server.el,beforeSleep);
//執(zhí)行事件驅動框架,循環(huán)處理各種觸發(fā)的事件
aeMain(server.el);
//事件循環(huán)后置操作
aeDeleteEventLoop(server.el);
return 0;
}我們步入aeMain方法,可以看到只要eventLoop沒有停止就會無限循環(huán)調用aeProcessEvents獲取并處理就緒的事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//......
//輪詢并處理就緒的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}步入aeProcessEvents方法,我們就可以看到redis通過對于epoll的封裝函數(shù)aeApiPoll非阻塞獲取就緒的IO事件,注意筆者所強調的非阻塞獲取,這也就是為什么redis僅僅用一個主線程即可實現(xiàn)Reactor模型的原因所在。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//非阻塞獲取就緒事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
//......
//處理事件
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}對此我們再次步入aeApiPoll實現(xiàn)可以看到redis對于epoll的調用epoll_wait,得到事件數(shù)retval 之后,直接基于retval遍歷eventLoop的events這里面存儲的就是所有收到的事件aeFiredEvent,redis會根據(jù)其事件類型累加對應的事件mask值,例如如果是得到的事件類型是EPOLLIN則mask值會加上AE_READABLE(1),若是標準輸出事件EPOLLOUT則累加AE_WRITABLE即2:

對應的我們給出這段基于epoll實現(xiàn)reacor的實現(xiàn),可以看到其reactor通過事件輪詢獲取對應的事件類型再將其封裝為aeFileEvent存到事件數(shù)組eventLoop->fired中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
//遍歷事件
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
//根據(jù)事件類型累加讀寫的mask值
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
//將該事件存到fired數(shù)組中
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
//返回事件數(shù)
return numevents;
}詳解事件的封裝
上文我們提到一個aeFileEvent 事件的概念,該個事件結構如下圖所示,它通過mask標記當前IO事件類型,在epoll輪詢到事件時,它并通過rfileProc讀事件處理指針和wfileProc寫文件處理保存針對網(wǎng)絡IO事件的處理函數(shù),注意這個處理函數(shù)我們完全可以直接理解為reactor模型中的handler,最后用clientData記錄客戶端私有數(shù)據(jù)的指針:
typedef struct aeFileEvent {
//記錄事件讀寫類型,如果是讀事件READABLE則mask+1,若是寫事件WRITABLE則加2
int mask; /* one of AE_(READABLE|WRITABLE) */
//讀事件處理器指針指向讀事件處理函數(shù)handler
aeFileProc *rfileProc;
//寫事件處理器指針指向讀事件處理函數(shù)handler
aeFileProc *wfileProc;
//記錄客戶端私有數(shù)據(jù)指針
void *clientData;
} aeFileEvent;這里我們以服務端socket初始化階段為例展示一下aeFileEvent對應處理器的初始化過程,我們在redis服務端啟動的main函數(shù)可以看到initServer的調用,該方法會為當前服務端socket套接字的文件描述符綁定讀事件的處理器acceptTcpHandler:

對應的我們給出這一段事件綁定handler的邏輯的核心代碼段:
int main(int argc, char **argv) {
//......
//server初始化,其內部會完成數(shù)據(jù)結構、鍵值對數(shù)據(jù)庫初始化、網(wǎng)絡框架初始化工作
initServer();
}
void initServer(void) {
//......
for (j = 0; j < server.ipfd_count; j++) {
//為每一個監(jiān)聽服務端socket的讀事件綁定對應的TCP處理器acceptTcpHandler,并將其注冊到eventLoop中
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//......
}輪詢并分發(fā)到handler
上述步驟完成redis server的事件注冊之后,main方法的aeMain函數(shù)就會通過epoll輪詢eventLoop中是否有就緒的IO事件,如果redis server的fd的讀事件就緒就會交給當前對應的讀處理器完成redis客戶端初始化工作,后續(xù)redis客戶端套接字的fd也會將讀寫事件注冊到eventLoop中,如此一來所有的服務端和客戶端socket的讀寫事件都會注冊到epoll上,讓epoll作為reactor進行輪詢,然后根據(jù)讀寫事件分配到各自的handler即rfileProc/wfileProc 指針所指向的函數(shù)上。
這里我們補充的一下rfileProc/wfileProc指針指向的函數(shù)列表:
rfileProc:如果是redis服務端則該指針指向acceptTcpHandler處理新連接,如果是客戶端則指向readQueryFromClient處理客戶端的命令。wfileProc:該指針服務端和客戶端都一樣,指向sendReplyToClient用于將響應結果發(fā)送給客戶端。

對應的我們給出上述描述的核心代碼段,可以看到main方法會調用aeMain開始事件輪詢:
int main(int argc, char **argv) {
//前置初始化步驟
//......
//事件循環(huán)輪詢前置操作
aeSetBeforeSleepProc(server.el,beforeSleep);
//執(zhí)行事件驅動框架,循環(huán)處理各種觸發(fā)的事件
aeMain(server.el);
//事件循環(huán)后置操作
aeDeleteEventLoop(server.el);
return 0;
}步入aeMain即可看到無限循環(huán)傳入eventLoop查看是否有就緒的事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//......
//傳入eventLoop查看是否有socket的事件就緒
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}繼續(xù)步入aeProcessEvents即看到輪詢就緒事件、acceptor調用acceptTcpHandler分發(fā)到讀寫的處理器handler上、后續(xù)客戶端都會基于讀寫handler完成事件處理這樣一套核心的reactor模型設計:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//調用epoll獲取所有就緒的socket的讀寫事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
//獲取當前事件的讀寫類型為mask賦值
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果是讀事件則交給rfileProc指向的函數(shù),可以是服務端socket的連接處理器acceptTcpHandler,也可能是客戶端的命令處理器readQueryFromClient
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//如果是寫事件則調用wfileProc指向的sendReplyToClient將結果發(fā)送給客戶端
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//......
}小結
自此我們將redis單線程的reactor模型設計都分析完成了,希望對你有幫助。
到此這篇關于Redis所實現(xiàn)的Reactor模型的文章就介紹到這了,更多相關Redis Reactor模型內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Redis連接池監(jiān)控(連接池是否已滿)與優(yōu)化方法
本文詳細講解了如何在Linux系統(tǒng)中監(jiān)控Redis連接池的使用情況,以及如何通過連接池參數(shù)配置、系統(tǒng)資源使用情況、Redis命令監(jiān)控、外部監(jiān)控工具等多種方法進行檢測和優(yōu)化,以確保系統(tǒng)在高并發(fā)場景下的性能和穩(wěn)定性,討論了連接池的概念、工作原理、參數(shù)配置,以及優(yōu)化策略等內容2024-09-09
Redis從單點到集群部署模式(單機模式?主從模式?哨兵模式)
這篇文章主要為大家介紹了Redis從單點集群部署模式(單機模式?主從模式?哨兵模式)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-11-11
Redis高并發(fā)防止秒殺超賣實戰(zhàn)源碼解決方案
本文主要介紹了Redis高并發(fā)防止秒殺超賣實戰(zhàn)源碼解決方案,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10

