1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Redis 6 中的多線程是如何實(shí)現(xiàn)的?。?/h1>

        共 17331字,需瀏覽 35分鐘

         ·

        2022-06-08 00:46

        大家好,我是魚皮,今天給大家分享Redis 6 中的多線程是如何實(shí)現(xiàn)的。

        Redis 是一個(gè)高性能服務(wù)端的典范。它通過多路復(fù)用 epoll 來管理海量的用戶連接,只使用一個(gè)線程來通過事件循環(huán)來處理所有用戶請(qǐng)求,就可以達(dá)到每秒數(shù)萬 QPS 的處理能力。下圖是單線程版本 Redis 工作的核心原理圖。

        單線程的 Redis 雖然性能很高,但是卻有兩個(gè)問題。一個(gè)問題是沒有辦法充分發(fā)揮現(xiàn)代 CPU 的多核處理能力,一個(gè)實(shí)例只能使用一個(gè)核的能力。二是如果某個(gè)用戶請(qǐng)求的處理過程卡住一段時(shí)間,會(huì)導(dǎo)致其它所有的請(qǐng)求都會(huì)出現(xiàn)超時(shí)的情況。所以,在線上的 redis 使用過程時(shí)是明確禁止使用 keys * 等長(zhǎng)耗時(shí)的操作的。

        那如何改進(jìn)呢,思路和方向其實(shí)很明確。那就是和其它的主流程序一樣引入多線程,用更多的線程來分擔(dān)這些可能耗時(shí)的操作。事實(shí)上 Redis 也確實(shí)這么干了,在 6.0 以后的版本里,開始支持了多線程。我們今天就來領(lǐng)略一下 Redis 的多線程是如何實(shí)現(xiàn)的。

        一、多線程 Redis 服務(wù)啟動(dòng)

        首先獲取多線程版本 Redis 的源碼

        #?git?clone?https://github.com/redis/redis
        #?cd?redis
        #?git?checkout?-b?6.2.0?6.2.0

        默認(rèn)情況下多線程是默認(rèn)關(guān)閉的。如果想要啟動(dòng)多線程,需要在配置文件中做適當(dāng)?shù)男薷?。相關(guān)的配置項(xiàng)是 io-threads 和 io-threads-do-reads 兩個(gè)。

        #vi?/usr/local/soft/redis6/conf/redis.conf?
        io-threads?4?#啟用的?io?線程數(shù)量
        io-threads-do-reads?yes?#讀請(qǐng)求也使用io線程

        其中 io-threads 表示要啟動(dòng)的 io 線程的數(shù)量。io-threads-do-reads 表示是否在讀階段也使用 io 線程,默認(rèn)是只在寫階段使用 io 線程的。

        現(xiàn)在假設(shè)我們已經(jīng)打開了如上兩項(xiàng)多線程配置。帶著這個(gè)假設(shè),讓我們進(jìn)入到 Redis 的 main 入口函數(shù)。

        //file:?src/server.c
        int?main(int?argc,?char?**argv)?{
        ????......

        ????//?1.1?主線程初始化
        ????initServer();

        ????//?1.2?啟動(dòng)?io?線程
        ????InitServerLast();

        ????//?進(jìn)入事件循環(huán)
        ????aeMain(server.el);
        }

        1.1 主線程初始化

        在 initServer 這個(gè)函數(shù)內(nèi),Redis 主線程做了這么幾件重要的事情。

        • 初始化讀任務(wù)隊(duì)列、寫任務(wù)隊(duì)列
        • 創(chuàng)建一個(gè) epoll 對(duì)象
        • 對(duì)配置的監(jiān)聽端口進(jìn)行 listen
        • 把 listen socket 讓 epoll 給管理起來
        //file:?src/server.c
        void?initServer()?{

        ????//?1?初始化?server?對(duì)象
        ????server.clients_pending_write?=?listCreate();
        ????server.clients_pending_read?=?listCreate();
        ????......

        ????//?2?初始化回調(diào)?events,創(chuàng)建?epoll
        ????server.el?=?aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

        ????//?3?綁定監(jiān)聽服務(wù)端口
        ????listenToPort(server.port,server.ipfd,&server.ipfd_count);

        ????//?4?注冊(cè)?accept?事件處理器
        ????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
        ????????????acceptTcpHandler,NULL);
        ????}
        ????...
        }

        接下來我們分別來看。

        初始化 server 對(duì)象

        在 initServer 的一開頭,先是對(duì) server 的各種成員變量進(jìn)行初始化。值得注意的是 clients_pending_write 和 clients_pending_read 這兩個(gè)成員,它們分別是寫任務(wù)隊(duì)列和讀任務(wù)隊(duì)列。將來主線程產(chǎn)生的任務(wù)都會(huì)放在放在這兩個(gè)任務(wù)隊(duì)列里。

        主線程會(huì)根據(jù)這兩個(gè)任務(wù)隊(duì)列來進(jìn)行任務(wù)哈希散列,以將任務(wù)分配到多個(gè)線程中進(jìn)行處理。

        aeCreateEventLoop 處理

        我們來看 aeCreateEventLoop 詳細(xì)邏輯。它會(huì)初始化事件回調(diào) event,并且創(chuàng)建了一個(gè) epoll 對(duì)象出來。

        //file:src/ae.c
        aeEventLoop?*aeCreateEventLoop(int?setsize)?{
        ????aeEventLoop?*eventLoop;
        ????eventLoop?=?zmalloc(sizeof(*eventLoop);

        ????//將來的各種回調(diào)事件就都會(huì)存在這里
        ????eventLoop->events?=?zmalloc(sizeof(aeFileEvent)*setsize);
        ????......

        ????aeApiCreate(eventLoop);
        ????return?eventLoop;
        }

        我們注意一下 eventLoop->events,將來在各種事件注冊(cè)的時(shí)候都會(huì)保存到這個(gè)數(shù)組里。

        //file:src/ae.h
        typedef?struct?aeEventLoop?{
        ????......
        ????aeFileEvent?*events;?/*?Registered?events?*/
        }

        具體創(chuàng)建 epoll 的過程在 ae_epoll.c 文件下的 aeApiCreate 中。在這里,真正調(diào)用了 epoll_create

        //file:src/ae_epoll.c
        static?int?aeApiCreate(aeEventLoop?*eventLoop)?{
        ????aeApiState?*state?=?zmalloc(sizeof(aeApiState));
        ????state->epfd?=?epoll_create(1024);?
        ????eventLoop->apidata?=?state;
        ????return?0;
        }

        綁定監(jiān)聽服務(wù)端口

        我們?cè)賮砜?Redis 中的 listen 過程,它在 listenToPort 函數(shù)中。調(diào)用鏈條很長(zhǎng),依次是 listenToPort => anetTcpServer => _anetTcpServer => anetListen。在 anetListen 中,就是簡(jiǎn)單的 bind 和 listen 的調(diào)用。

        //file:src/anet.c
        static?int?anetListen(......)?{
        ????bind(s,sa,len);
        ????listen(s,?backlog);
        ????......
        }

        注冊(cè)事件回調(diào)函數(shù)

        前面我們調(diào)用 aeCreateEventLoop 創(chuàng)建了 epoll,調(diào)用 listenToPort 進(jìn)行了服務(wù)端口的 bind 和 listen。接著就調(diào)用的 aeCreateFileEvent 就是來注冊(cè)一個(gè) accept 事件處理器。

        我們來看 aeCreateFileEvent 具體代碼。

        //file:?src/ae.c
        int?aeCreateFileEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask,
        ????????aeFileProc?*proc,?void?*clientData)

        {
        ????//?取出一個(gè)文件事件結(jié)構(gòu)
        ????aeFileEvent?*fe?=?&eventLoop->events[fd];

        ????//?監(jiān)聽指定?fd?的指定事件
        ????aeApiAddEvent(eventLoop,?fd,?mask);

        ????//?設(shè)置文件事件類型,以及事件的處理器
        ????fe->mask?|=?mask;
        ????if?(mask?&?AE_READABLE)?fe->rfileProc?=?proc;
        ????if?(mask?&?AE_WRITABLE)?fe->wfileProc?=?proc;

        ????//?私有數(shù)據(jù)
        ????fe->clientData?=?clientData;
        }

        函數(shù) aeCreateFileEvent 一開始,從 eventLoop->events 獲取了一個(gè) aeFileEvent 對(duì)象。

        接下來調(diào)用 aeApiAddEvent。這個(gè)函數(shù)其實(shí)就是對(duì) epoll_ctl 的一個(gè)封裝。主要就是實(shí)際執(zhí)行 epoll_ctl EPOLL_CTL_ADD。

        //file:src/ae_epoll.c
        static?int?aeApiAddEvent(aeEventLoop?*eventLoop,?int?fd,?int?mask)?{
        ????//?add?or?mod
        ????int?op?=?eventLoop->events[fd].mask?==?AE_NONE??
        ????????????EPOLL_CTL_ADD?:?EPOLL_CTL_MOD;
        ????......

        ????//?epoll_ctl?添加事件
        ????epoll_ctl(state->epfd,op,fd,&ee);
        ????return?0;
        }

        每一個(gè) eventLoop->events 元素都指向一個(gè) aeFileEvent 對(duì)象。在這個(gè)對(duì)象上,設(shè)置了三個(gè)關(guān)鍵東西

        • rfileProc:讀事件回調(diào)
        • wfileProc:寫事件回調(diào)
        • clientData:一些額外的擴(kuò)展數(shù)據(jù)

        將來 當(dāng) epoll_wait 發(fā)現(xiàn)某個(gè) fd 上有事件發(fā)生的時(shí)候,這樣 redis 首先根據(jù) fd 到 eventLoop->events 中查找 aeFileEvent 對(duì)象,然后再看 rfileProc、wfileProc 就可以找到讀、寫回調(diào)處理函數(shù)。

        回頭看 initServer 調(diào)用 aeCreateFileEvent 時(shí)傳參來看。

        //file:?src/server.c
        void?initServer()?{
        ????......

        ????for?(j?=?0;?j?????????aeCreateFileEvent(server.el,?server.ipfd[j],?AE_READABLE,
        ????????????acceptTcpHandler,NULL);
        ????}
        }

        listen fd 對(duì)應(yīng)的讀回調(diào)函數(shù) rfileProc 事實(shí)上就被設(shè)置成了 acceptTcpHandler,寫回調(diào)沒有設(shè)置,私有數(shù)據(jù) client_data 也為 null。

        1.2 io 線程啟動(dòng)

        在主線程啟動(dòng)以后,會(huì)調(diào)用 InitServerLast => initThreadedIO 來創(chuàng)建多個(gè) io 線程。

        將來這些 IO 線程會(huì)配合主線程一起共同來處理所有的 read 和 write 任務(wù)。

        我們來看 InitServerLast 創(chuàng)建 IO 線程的過程。

        //file:src/server.c
        void?InitServerLast()?{
        ????initThreadedIO();
        ????......
        }
        //file:src/networking.c
        void?initThreadedIO(void)?{
        ????//如果沒開啟多?io?線程配置就不創(chuàng)建了
        ????if?(server.io_threads_num?==?1)?return;

        ????//開始?io?線程的創(chuàng)建
        ????for?(int?i?=?0;?i?????????pthread_t?tid;
        ????????pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
        ????????io_threads[i]?=?tid;
        ????}
        }

        在 initThreadedIO 中調(diào)用 pthread_create 庫(kù)函數(shù)創(chuàng)建線程,并且注冊(cè)線程回調(diào)函數(shù) IOThreadMain。

        //file:src/networking.c
        void?*IOThreadMain(void?*myid)?{
        ????long?id?=?(unsigned?long)myid;

        ????while(1)?{
        ????????//循環(huán)等待任務(wù)
        ????????for?(int?j?=?0;?j?1000000;?j++)?{
        ????????????if?(getIOPendingCount(id)?!=?0)?break;
        ????????}

        ????????//允許主線程來關(guān)閉自己
        ????????......

        ????????//遍歷當(dāng)前線程等待隊(duì)列里的請(qǐng)求?client
        ????????listIter?li;
        ????????listNode?*ln;
        ????????listRewind(io_threads_list[id],&li);
        ????????while((ln?=?listNext(&li)))?{
        ????????????client?*c?=?listNodeValue(ln);
        ????????????if?(io_threads_op?==?IO_THREADS_OP_WRITE)?{
        ????????????????writeToClient(c,0);
        ????????????}?else?if?(io_threads_op?==?IO_THREADS_OP_READ)?{
        ????????????????readQueryFromClient(c->conn);
        ????????????}?else?{
        ????????????????serverPanic("io_threads_op?value?is?unknown");
        ????????????}
        ????????}
        ????????listEmpty(io_threads_list[id]);
        ????}
        }

        是將當(dāng)前線程等待隊(duì)列 io_threads_list[id] 里所有的請(qǐng)求 client,依次取出處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。其中 io_threads_list[id] 中的任務(wù)是主線程分配過來的,后面我們將會(huì)看到。

        二、主線程事件循環(huán)

        接著我們進(jìn)入到 Redis 最重要的 aeMain,這個(gè)函數(shù)就是一個(gè)死循環(huán)(Redis 不退出的話),不停地執(zhí)行 aeProcessEvents 函數(shù)。

        void?aeMain(aeEventLoop?*eventLoop)?{
        ????eventLoop->stop?=?0;
        ????while?(!eventLoop->stop)?{
        ????????aeProcessEvents(eventLoop,?AE_ALL_EVENTS|
        ???????????????????????????????????AE_CALL_BEFORE_SLEEP|
        ???????????????????????????????????AE_CALL_AFTER_SLEEP);
        ????}
        }

        其中 aeProcessEvents 就是所謂的事件分發(fā)器。它通過調(diào)用 epoll_wait 來發(fā)現(xiàn)所發(fā)生的各種事件,然后調(diào)用事先注冊(cè)好的處理函數(shù)進(jìn)行處理。

        接著看 aeProcessEvents 函數(shù)。

        //file:src/ae.c
        int?aeProcessEvents(aeEventLoop?*eventLoop,?int?flags)
        {
        ????// 2.3 事件循環(huán)處理3:epoll_wait 前進(jìn)行讀寫任務(wù)隊(duì)列處理
        ????if?(eventLoop->beforesleep?!=?NULL?&&?flags?&?AE_CALL_BEFORE_SLEEP)
        ????????????eventLoop->beforesleep(eventLoop);

        ????//epoll_wait發(fā)現(xiàn)事件并進(jìn)行處理
        ????numevents?=?aeApiPoll(eventLoop,?tvp);

        ????//?從已就緒數(shù)組中獲取事件
        ????aeFileEvent?*fe?=?&eventLoop->events[eventLoop->fired[j].fd];

        ????//如果是讀事件,并且有讀回調(diào)函數(shù)
        ????//2.1?如果是?listen?socket?讀事件,則處理新連接請(qǐng)求
        ????//2.2?如果是客戶連接socket?讀事件,處理客戶連接上的讀請(qǐng)求
        ????fe->rfileProc()

        ????//如果是寫事件,并且有寫回調(diào)函數(shù)
        ????fe->wfileProc()
        ????......
        }

        其中 aeApiPoll 就是對(duì) epoll_wait 的一個(gè)封裝而已。

        //file:?src/ae_epoll.c
        static?int?aeApiPoll(aeEventLoop?*eventLoop,?struct?timeval?*tvp)?{
        ????//?等待事件
        ????aeApiState?*state?=?eventLoop->apidata;
        ????epoll_wait(state->epfd,state->events,eventLoop->setsize,
        ????????????tvp???(tvp->tv_sec*1000?+?tvp->tv_usec/1000)?:?-1);
        ????...
        }

        aeProcessEvents 就是調(diào)用 epoll_wait 來發(fā)現(xiàn)事件。當(dāng)發(fā)現(xiàn)有某個(gè) fd 上事件發(fā)生以后,則調(diào)為其事先注冊(cè)的事件處理器函數(shù) rfileProc 和 wfileProc。

        2.1 事件循環(huán)處理1:新連接到達(dá)

        在 1.1 節(jié)中我們看到,主線程初始化的時(shí)候,將 listen socket 上的讀事件處理函數(shù)注冊(cè)成了 acceptTcpHandler。也就是說如果有新連接到達(dá)的時(shí)候,acceptTcpHandler 將會(huì)被執(zhí)行到。

        在這個(gè)函數(shù)內(nèi),主要完成如下幾件事情。

        • 調(diào)用 accept 接收連接
        • 創(chuàng)建一個(gè) redisClient對(duì)象
        • 添加到 epoll
        • 注冊(cè)讀事件處理函數(shù)

        接下來讓我們進(jìn)入 acceptTcpHandler 源碼。

        //file:src/networking.c
        void?acceptTcpHandler(aeEventLoop?*el,?int?fd,?void?*privdata,?int?mask)?{
        ????......
        ????cfd?=?anetTcpAccept(server.neterr,?fd,?cip,?sizeof(cip),?&cport);
        ????acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
        }

        其中 netTcpAccept 調(diào)用 accept 系統(tǒng)調(diào)用獲取連接,就不展開了。我們看 acceptCommonHandler。

        //file:?src/networking.c
        static?void?acceptCommonHandler(int?fd,?int?flags)?{
        ????//?創(chuàng)建客戶端
        ????redisClient?*c;
        ????if?((c?=?createClient(fd))?==?NULL)?{
        ????}
        }

        client?*createClient(connection?*conn)?{
        ????client?*c?=?zmalloc(sizeof(client));

        ????//?為用戶連接注冊(cè)讀事件處理器
        ????if?(conn)?{
        ????????...
        ????????connSetReadHandler(conn,?readQueryFromClient);
        ????????connSetPrivateData(conn,?c);
        ????}

        ????selectDb(c,0);
        ????c->id?=?client_id;
        ????c->resp?=?2;
        ????c->conn?=?conn;
        ????......
        }

        在上面的代碼中,我們重點(diǎn)關(guān)注 connSetReadHandler(conn, readQueryFromClient), 這一行是將這個(gè)新連接的讀事件處理函數(shù)設(shè)置成了 readQueryFromClient。

        2.2 事件循環(huán)處理2:用戶命令請(qǐng)求到達(dá)

        在上面我們看到了, Redis 把用戶連接上的讀請(qǐng)求處理函數(shù)設(shè)置成了 readQueryFromClient,這意味著當(dāng)用戶連接上有命令發(fā)送過來的時(shí)候,會(huì)進(jìn)入 readQueryFromClient 開始執(zhí)行。

        在多線程版本的 readQueryFromClient 中,處理邏輯非常簡(jiǎn)單,僅僅只是將發(fā)生讀時(shí)間的 client 放到了任務(wù)隊(duì)列里而已。

        來詳細(xì)看 readQueryFromClient 代碼。

        //file:src/networking.c
        void?readQueryFromClient(connection?*conn)?{
        ????client?*c?=?connGetPrivateData(conn);

        ????//如果啟動(dòng) threaded I/O 的話,直接入隊(duì)
        ????if?(postponeClientRead(c))?return;

        ????//處理用戶連接讀請(qǐng)求
        ????......
        ????c->querybuf?=?sdsMakeRoomFor(c->querybuf,?readlen);
        ????nread?=?connRead(c->conn,?c->querybuf+qblen,?readlen);
        ????processInputBuffer(c);
        }

        在 postponeClientRead 中判斷,是不是開啟了多 io 線程,如果開啟了的話,那就將有請(qǐng)求數(shù)據(jù)到達(dá)的 client 直接放到讀任務(wù)隊(duì)列(server.clients_pending_read)中就算是完事。我們看下 postponeClientRead。

        //file:src/networking.c
        int?postponeClientRead(client?*c)?{
        ????if?(server.io_threads_active?&&
        ????????server.io_threads_do_reads?&&
        ????????!ProcessingEventsWhileBlocked?&&
        ????????!(c->flags?&?(CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
        ????{
        ????????c->flags?|=?CLIENT_PENDING_READ;
        ????????listAddNodeHead(server.clients_pending_read,c);
        ????????return?1;
        ????}?else?{
        ????????return?0;
        ????}
        }

        listAddNodeHead 就是把這個(gè) client 對(duì)象添加到 server.clients_pending_read 而已。

        2.3 事件循環(huán)處理3:epoll_wait 前進(jìn)行任務(wù)處理

        在 aeProcessEvents 中假如 aeApiPoll(epoll_wait)中的事件都處理完了以后,則會(huì)進(jìn)入下一次的循環(huán)再次進(jìn)入 aeProcessEvents。

        而這一次中 beforesleep 將會(huì)處理前面讀事件處理函數(shù)添加的讀任務(wù)隊(duì)列了。

        //file:src/ae.c
        int?aeProcessEvents(aeEventLoop?*eventLoop,?int?flags)
        {
        ????//?參見 2.4 事件循環(huán)處理3:epoll_wait 前進(jìn)行任務(wù)處理
        ????if?(eventLoop->beforesleep?!=?NULL?&&?flags?&?AE_CALL_BEFORE_SLEEP)
        ????????????eventLoop->beforesleep(eventLoop);

        ????//epoll_wait發(fā)現(xiàn)事件并進(jìn)行處理
        ????numevents?=?aeApiPoll(eventLoop,?tvp);
        ????......
        }

        在 beforeSleep 里會(huì)依次處理兩個(gè)任務(wù)隊(duì)列。先處理讀任務(wù)隊(duì)列,解析其中的請(qǐng)求,并處理之。然后將處理結(jié)果寫到緩存中,同時(shí)寫到寫任務(wù)隊(duì)列中。緊接著 beforeSleep 會(huì)進(jìn)入寫任務(wù)隊(duì)列處理,會(huì)將處理結(jié)果寫到 socket 里,進(jìn)行真正的數(shù)據(jù)發(fā)送。

        我們來看 beforeSleep 的代碼,這個(gè)函數(shù)中最重要的兩個(gè)調(diào)用是 handleClientsWithPendingReadsUsingThreads(處理讀任務(wù)隊(duì)列),handleClientsWithPendingWritesUsingThreads(處理寫任務(wù)隊(duì)列)

        //file:src/server.c
        void?beforeSleep(struct?aeEventLoop?*eventLoop)?{
        ????//處理讀任務(wù)隊(duì)列
        ????handleClientsWithPendingReadsUsingThreads();
        ????//處理寫任務(wù)隊(duì)列
        ????handleClientsWithPendingWritesUsingThreads();
        ????......
        }

        值得注意的是,如果開啟了多 io 線程的話,handleClientsWithPendingReadsUsingThreads 和 handleClientsWithPendingWritesUsingThreads 中將會(huì)是主線程、io 線程一起配合來處理的。所以我們單獨(dú)分兩個(gè)小節(jié)來闡述。

        三、主線程 && io 線程處理讀請(qǐng)求

        在 handleClientsWithPendingReadsUsingThreads 中,主線程會(huì)遍歷讀任務(wù)隊(duì)列 server.clients_pending_read,把其中的請(qǐng)求分配到每個(gè) io 線程的處理隊(duì)列 io_threads_list[target_id] 中。然后通知各個(gè) io 線程開始處理。

        3.1 主線程分配任務(wù)

        我們來看 handleClientsWithPendingReadsUsingThreads 詳細(xì)代碼。

        //file:src/networking.c
        //當(dāng)開啟了?reading?+?parsing?多線程?I/O?
        //read?handler?僅僅只是把?clients?推到讀隊(duì)列里
        //而這個(gè)函數(shù)開始處理該任務(wù)隊(duì)列
        int?handleClientsWithPendingReadsUsingThreads(void)?{

        ????//訪問讀任務(wù)隊(duì)列?server.clients_pending_read
        ????listRewind(server.clients_pending_read,&li);

        ????//把每一個(gè)任務(wù)取出來
        ????//添加到指定線程的任務(wù)隊(duì)列里?io_threads_list[target_id]
        ????while((ln?=?listNext(&li)))?{
        ????????client?*c?=?listNodeValue(ln);
        ????????int?target_id?=?item_id?%?server.io_threads_num;
        ????????listAddNodeTail(io_threads_list[target_id],c);
        ????????item_id++;
        ????}

        ????//啟動(dòng)Worker線程,處理讀請(qǐng)求
        ????io_threads_op?=?IO_THREADS_OP_READ;
        ????for?(int?j?=?1;?j?????????int?count?=?listLength(io_threads_list[j]);
        ????????setIOPendingCount(j,?count);
        ????}

        ????//主線程處理?0?號(hào)任務(wù)隊(duì)列
        ????listRewind(io_threads_list[0],&li);
        ????while((ln?=?listNext(&li)))?{
        ????????//需要先干掉?CLIENT_PENDING_READ?標(biāo)志
        ????????//否則?readQueryFromClient?并不處理,而是入隊(duì)
        ????????client?*c?=?listNodeValue(ln);
        ????????readQueryFromClient(c->conn);
        ????}

        ????//主線程等待其它線程處理完畢
        ????while(1)?{
        ????????unsigned?long?pending?=?0;
        ????????for?(int?j?=?1;?j?????????????pending?+=?getIOPendingCount(j);
        ????????if?(pending?==?0)?break;
        ????}

        ????//再跑一遍任務(wù)隊(duì)列,目的是處理輸入
        ????while(listLength(server.clients_pending_read))?{
        ????????......
        ????????processInputBuffer(c);
        ????????if?(!(c->flags?&?CLIENT_PENDING_WRITE)?&&?clientHasPendingReplies(c))
        ????????????clientInstallWriteHandler(c);
        ????}
        }

        在主線程中將任務(wù)分別放到了 io_threads_list 的第 0 到第 N 個(gè)元素里。并對(duì) 1 : N 號(hào)線程通過 setIOPendingCount 發(fā)消息,告訴他們起來處理。這時(shí)候 io 線程將會(huì)在 IOThreadMain 中收到消息并開始處理讀任務(wù)。

        //file:src/networking.c
        void?*IOThreadMain(void?*myid)?{
        ????while(1)?{
        ????????//遍歷當(dāng)前線程等待隊(duì)列里的請(qǐng)求?client
        ????????listRewind(io_threads_list[id],&li);
        ????????while((ln?=?listNext(&li)))?{
        ????????????client?*c?=?listNodeValue(ln);
        ????????????if?(io_threads_op?==?IO_THREADS_OP_WRITE)?{
        ????????????????writeToClient(c,0);
        ????????????}?else?if?(io_threads_op?==?IO_THREADS_OP_READ)?{
        ????????????????readQueryFromClient(c->conn);
        ????????????}?else?{
        ????????????????serverPanic("io_threads_op?value?is?unknown");
        ????????????}
        ????????}
        ????}
        }

        在 io 線程中,從自己的 io_threads_list[id] 中遍歷獲取待處理的 client。如果發(fā)現(xiàn)是讀請(qǐng)求處理,則進(jìn)入 readQueryFromClient 開始處理特定的 client。

        而主線程在分配完 1 :N 任務(wù)隊(duì)列讓其它 io 線程處理后,自己則開始處理第 0 號(hào)任務(wù)池。同樣是會(huì)進(jìn)入到 readQueryFromClient 中來執(zhí)行。

        //file:src/networking.c
        int?handleClientsWithPendingReadsUsingThreads(void)?{
        ????......
        ????//主線程處理?0?號(hào)任務(wù)隊(duì)列
        ????listRewind(io_threads_list[0],&li);
        ????while((ln?=?listNext(&li)))?{
        ????????//需要先干掉?CLIENT_PENDING_READ?標(biāo)志
        ????????//否則?readQueryFromClient?并不處理,而是入隊(duì)
        ????????client?*c?=?listNodeValue(ln);
        ????????readQueryFromClient(c->conn);
        ????}
        ????......
        }

        所以無論是主線程還是 io 線程,處理客戶端的讀事件都是會(huì)進(jìn)入 readQueryFromClient。我們來看其源碼。

        3.2 讀請(qǐng)求處理

        //file:src/networking.c
        void?readQueryFromClient(connection?*conn)?{

        ????//讀取請(qǐng)求
        ????nread?=?connRead(c->conn,?c->querybuf+qblen,?readlen);

        ????//處理請(qǐng)求
        ????processInputBuffer(c);
        }

        在 connRead 中就是調(diào)用 read 將 socket 中的命令讀取出來,就不展開看了。接著在 processInputBuffer 中將輸入緩沖區(qū)中的數(shù)據(jù)解析成對(duì)應(yīng)的命令。解析完命令后真正開始處理它。

        //file:src/networking.c
        void?processInputBuffer(client?*c)?{
        ????while(c->qb_pos?querybuf))?{
        ????????//解析命令
        ????????......
        ????????//真正開始處理?command
        ????????processCommandAndResetClient(c);
        ????}
        }

        函數(shù) processCommandAndResetClient 會(huì)調(diào)用 processCommand,查詢命令并開始執(zhí)行。執(zhí)行的核心方法是 call 函數(shù),我們直接看它。

        //file:src/server.c
        void?call(client?*c,?int?flags)?{

        ????//?查找處理命令,
        ????struct?redisCommand?*real_cmd?=?c->cmd;

        ????//?調(diào)用命令處理函數(shù)
        ????c->cmd->proc(c);

        ????......
        }

        在 server.c 中定義了每一個(gè)命令對(duì)應(yīng)的處理函數(shù)

        //file:src/server.c
        struct?redisCommand?redisCommandTable[]?=?{
        ????{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
        ????{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
        ????{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
        ????{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
        ????{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
        ????......

        ????{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},
        ????{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        ????{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        ????{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},
        ????......
        }

        對(duì)于 get 命令來說,其對(duì)應(yīng)的命令處理函數(shù)就是 getCommand。也就是說當(dāng)處理 GET 命令執(zhí)行到 c->cmd->proc 的時(shí)候會(huì)進(jìn)入到 getCommand 函數(shù)中來。

        //file:?src/t_string.c
        void?getCommand(client?*c)?{
        ????getGenericCommand(c);
        }
        int?getGenericCommand(client?*c)?{
        ????robj?*o;

        ????if?((o?=?lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))?==?NULL)
        ????????return?C_OK;
        ????...
        ????addReplyBulk(c,o);
        ????return?C_OK;
        }

        getGenericCommand 方法會(huì)調(diào)用 lookupKeyReadOrReply 來從內(nèi)存中查找對(duì)應(yīng)的 key值。如果找不到,則直接返回 C_OK;如果找到了,調(diào)用 addReplyBulk 方法將值添加到輸出緩沖區(qū)中。

        //file:?src/networking.c
        void?addReplyBulk(client?*c,?robj?*obj)?{
        ????addReplyBulkLen(c,obj);
        ????addReply(c,obj);
        ????addReply(c,shared.crlf);
        }

        3.3 寫處理結(jié)果到發(fā)送緩存區(qū)

        其主體是調(diào)用 addReply 來設(shè)置回復(fù)數(shù)據(jù)。在 addReply 方法中做了兩件事情:

        • prepareClientToWrite 判斷是否需要返回?cái)?shù)據(jù),并且將當(dāng)前 client 添加到等待寫返回?cái)?shù)據(jù)隊(duì)列中。
        • 調(diào)用 _addReplyToBuffer 和 _addReplyObjectToList 方法將返回值寫入到輸出緩沖區(qū)中,等待寫入 socekt
        //file:src/networking.c
        void?addReply(client?*c,?robj?*obj)?{
        ????if?(prepareClientToWrite(c)?!=?C_OK)?return;

        ????if?(sdsEncodedObject(obj))?{
        ????????if?(_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr))?!=?C_OK)
        ????????????_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
        ????}?else?{
        ????????......????????
        ????}
        }

        先來看 prepareClientToWrite 的詳細(xì)實(shí)現(xiàn),

        //file:?src/networking.c
        int?prepareClientToWrite(client?*c)?{
        ????......

        ????if?(!clientHasPendingReplies(c)?&&?!(c->flags?&?CLIENT_PENDING_READ))
        ????????clientInstallWriteHandler(c);
        }

        //file:src/networking.c
        void?clientInstallWriteHandler(client?*c)?{
        ????c->flags?|=?CLIENT_PENDING_WRITE;
        ????listAddNodeHead(server.clients_pending_write,c);
        }

        其中 server.clients_pending_write 就是我們說的任務(wù)隊(duì)列,隊(duì)列中的每一個(gè)元素都是有待寫返回?cái)?shù)據(jù)的 client 對(duì)象。在 prepareClientToWrite 函數(shù)中,把 client 添加到任務(wù)隊(duì)列 server.clients_pending_write 里就算完事。

        接下再來 _addReplyToBuffer,該方法是向固定緩存中寫,如果寫不下的話就繼續(xù)調(diào)用 _addReplyStringToList 往鏈表里寫。簡(jiǎn)單起見,我們只看 _addReplyToBuffer 的代碼。

        //file:src/networking.c
        int?_addReplyToBuffer(client?*c,?const?char?*s,?size_t?len)?{
        ????......
        ????//?拷貝到?client?對(duì)象的?Response?buffer?中
        ????memcpy(c->buf+c->bufpos,s,len);
        ????c->bufpos+=len;
        ????return?C_OK;
        }

        要注意的是,本節(jié)的讀請(qǐng)求處理過程是主線程和 io 線程在并行執(zhí)行的。主線程在處理完后會(huì)等待其它的 io 線程處理。在所有的讀請(qǐng)求都處理完后,主線程 beforeSleep 中對(duì) handleClientsWithPendingReadsUsingThreads 的調(diào)用就結(jié)束了。

        四、主線程 && io 線程配合處理寫請(qǐng)求

        當(dāng)所有的讀請(qǐng)求處理完后,handleClientsWithPendingReadsUsingThreads 會(huì)退出。主線程會(huì)緊接著進(jìn)入 handleClientsWithPendingWritesUsingThreads 中來處理。

        //file:src/server.c
        void?beforeSleep(struct?aeEventLoop?*eventLoop)?{
        ????//處理讀任務(wù)隊(duì)列
        ????handleClientsWithPendingReadsUsingThreads();
        ????//處理寫任務(wù)隊(duì)列
        ????handleClientsWithPendingWritesUsingThreads();
        ????......
        }

        4.1 主線程分配任務(wù)

        //file:src/networking.c
        int?handleClientsWithPendingWritesUsingThreads(void)?{
        ????//沒有開啟多線程的話,仍然是主線程自己寫
        ????if?(server.io_threads_num?==?1?||?stopThreadedIOIfNeeded())?{
        ????????return?handleClientsWithPendingWrites();
        ????}

        ????......

        ????//獲取待寫任務(wù)
        ????int?processed?=?listLength(server.clients_pending_write);

        ????//在N個(gè)任務(wù)列表中分配該任務(wù)
        ????listIter?li;
        ????listNode?*ln;
        ????listRewind(server.clients_pending_write,&li);
        ????int?item_id?=?0;
        ????while((ln?=?listNext(&li)))?{
        ????????client?*c?=?listNodeValue(ln);
        ????????c->flags?&=?~CLIENT_PENDING_WRITE;

        ????????/*?Remove?clients?from?the?list?of?pending?writes?since
        ?????????*?they?are?going?to?be?closed?ASAP.?*/

        ????????if?(c->flags?&?CLIENT_CLOSE_ASAP)?{
        ????????????listDelNode(server.clients_pending_write,?ln);
        ????????????continue;
        ????????}

        ????????//hash的方式進(jìn)行分配
        ????????int?target_id?=?item_id?%?server.io_threads_num;
        ????????listAddNodeTail(io_threads_list[target_id],c);
        ????????item_id++;
        ????}

        ????//告訴對(duì)應(yīng)的線程該開始干活了
        ????io_threads_op?=?IO_THREADS_OP_WRITE;
        ????for?(int?j?=?1;?j?????????int?count?=?listLength(io_threads_list[j]);
        ????????setIOPendingCount(j,?count);
        ????}

        ????//主線程自己也會(huì)處理一些
        ????listRewind(io_threads_list[0],&li);
        ????while((ln?=?listNext(&li)))?{
        ????????client?*c?=?listNodeValue(ln);
        ????????writeToClient(c,0);
        ????}
        ????listEmpty(io_threads_list[0]);

        ????//循環(huán)等待其它線程結(jié)束處理
        ????while(1)?{
        ????????unsigned?long?pending?=?0;
        ????????for?(int?j?=?1;?j?????????????pending?+=?getIOPendingCount(j);
        ????????if?(pending?==?0)?break;
        ????}
        ????......
        }

        在 io 線程中收到消息后,開始遍歷自己的任務(wù)隊(duì)列 io_threads_list[id],并將其中的 client 挨個(gè)取出來開始處理。

        //file:src/networking.c
        void?*IOThreadMain(void?*myid)?{
        ????while(1)?{
        ????????//遍歷當(dāng)前線程等待隊(duì)列里的請(qǐng)求?client
        ????????listRewind(io_threads_list[id],&li);
        ????????while((ln?=?listNext(&li)))?{
        ????????????client?*c?=?listNodeValue(ln);
        ????????????if?(io_threads_op?==?IO_THREADS_OP_WRITE)?{
        ????????????????writeToClient(c,0);
        ????????????}?else?if?(io_threads_op?==?IO_THREADS_OP_READ)?{
        ????????????????readQueryFromClient(c->conn);
        ????????????}?
        ????????}
        ????????listEmpty(io_threads_list[id]);
        ????}
        }

        4.2 寫請(qǐng)求處理

        由于這次任務(wù)隊(duì)列里都是寫請(qǐng)求,所以 io 線程會(huì)進(jìn)入 writeToClient。而主線程在分配完任務(wù)以后,自己開始處理起了 io_threads_list[0],并也進(jìn)入到 writeToClient。

        //file:src/networking.c
        int?writeToClient(int?fd,?client?*c,?int?handler_installed)?{
        ????while(clientHasPendingReplies(c))?{
        ????????//?先發(fā)送固定緩沖區(qū)
        ????????if?(c->bufpos?>?0)?{
        ????????????nwritten?=?write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
        ????????????if?(nwritten?<=?0)?break;
        ????????????......

        ????????//?再發(fā)送回復(fù)鏈表中數(shù)據(jù)
        ????????}?else?{
        ????????????o?=?listNodeValue(listFirst(c->reply));
        ????????????nwritten?=?write(fd,?o->buf?+?c->sentlen,?objlen?-?c->sentlen);
        ????????????......
        ????????}
        ????}
        }

        writeToClient 中的主要邏輯就是調(diào)用 write 系統(tǒng)調(diào)用讓內(nèi)核幫其把數(shù)據(jù)發(fā)送出去即可。由于每個(gè)命令的處理結(jié)果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可變鏈表來儲(chǔ)存結(jié)果字符串。這里自然發(fā)送的時(shí)候就需要分別對(duì)固定緩存區(qū)和鏈表來進(jìn)行發(fā)送了。

        當(dāng)所有的寫請(qǐng)求也處理完后,beforeSleep 就退出了。主線程將會(huì)再次調(diào)用 epoll_wait 來發(fā)現(xiàn)請(qǐng)求,進(jìn)入下一輪的用戶請(qǐng)求處理。

        五、總結(jié)

        //file:?src/server.c
        int?main(int?argc,?char?**argv)?{
        ????......

        ????//?1.1?主線程初始化
        ????initServer();

        ????//?1.2?啟動(dòng)?io?線程
        ????InitServerLast();

        ????//?進(jìn)入事件循環(huán)
        ????aeMain(server.el);
        }

        在 initServer 這個(gè)函數(shù)內(nèi),Redis 做了這么三件重要的事情。

        • 創(chuàng)建一個(gè) epoll 對(duì)象
        • 對(duì)配置的監(jiān)聽端口進(jìn)行 listen
        • 把 listen socket 讓 epoll 給管理起來

        在 initThreadedIO 中調(diào)用 pthread_create 庫(kù)函數(shù)創(chuàng)建線程,并且注冊(cè)線程回調(diào)函數(shù) IOThreadMain。在 IOThreadMain 中等待其隊(duì)列 io_threads_list[id] 產(chǎn)生請(qǐng)求,當(dāng)有請(qǐng)求到達(dá)的時(shí)候取出 client,依次處理。其中讀操作通過 readQueryFromClient 處理, 寫操作通過 writeToClient 處理。

        主線程在 aeMain 函數(shù)中,是一個(gè)無休止的循環(huán),它是 Redis 中最重要的部分。它先是調(diào)用事件分發(fā)器發(fā)現(xiàn)事件。如果有新連接請(qǐng)求到達(dá)的時(shí)候,執(zhí)行 accept 接收新連接,并為其注冊(cè)事件處理函數(shù)。

        當(dāng)用戶連接上有命令請(qǐng)求到達(dá)的時(shí)候,主線程在 read 處理函數(shù)中將其添加到讀發(fā)送隊(duì)列中。然后接著在 beforeSleep 中開啟對(duì)讀任務(wù)隊(duì)列和寫任務(wù)隊(duì)列的處理??傮w工作過程如下圖所示。

        在這個(gè)處理過程中,對(duì)讀任務(wù)隊(duì)列和寫任務(wù)隊(duì)列的處理都是多線程并行進(jìn)行的(前提是開篇我們開啟了多 IO 線程并且也并發(fā)處理讀)。

        當(dāng)讀任務(wù)隊(duì)列和寫任務(wù)隊(duì)列的都處理完的時(shí)候,主線程再一次調(diào)用 epoll_wait 去發(fā)現(xiàn)新的待處理事件,如此往復(fù)循環(huán)進(jìn)行處理。

        至此,多線程版本的 Redis 的工作原理就介紹完了。坦白講,我覺得這種多線程模型實(shí)現(xiàn)的并不足夠的好。

        原因是主線程是在處理讀、寫任務(wù)隊(duì)列的時(shí)候還要等待其它的 io 線程處理完才能進(jìn)入下一步。假設(shè)這時(shí)有 10 個(gè)用戶請(qǐng)求到達(dá),其中 9 個(gè)處理耗時(shí)需要 1 ms,而另外一個(gè)命令需要 1 s。則這時(shí)主線程仍然會(huì)等待這個(gè) io 線程處理 1s 結(jié)束后才能進(jìn)入后面的處理。整個(gè) Redis 服務(wù)還是被一個(gè)耗時(shí)的命令給 block 住了。

        我倒是希望我的理解哪里有問題。因?yàn)檫@種方式真的是沒能很好地并發(fā)起來。


        最后,歡迎加入?魚皮的編程知識(shí)星球(點(diǎn)擊了解詳情),和 8200 多名小伙伴們一起交流學(xué)習(xí),向魚皮和大廠同學(xué) 1 對(duì) 1 提問、幫你制定學(xué)習(xí)計(jì)劃不迷茫、跟著魚皮直播做項(xiàng)目(往期項(xiàng)目可無限回看)領(lǐng)取魚皮原創(chuàng)編程學(xué)習(xí)/求職資料等。





        往期推薦

        幾個(gè)對(duì)程序員的誤解,害人不淺!

        編程導(dǎo)航,火了!

        Gitee 很無奈!

        我的 IP 歸屬地,是咋被挖出來的?

        我造了個(gè)輪子,完整開源!



        瀏覽 60
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)

        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国内精品国产三级国产在线专 | www.狠狠 | 国产又粗又长又硬又爽的在线视频播放 | 婷婷爱爱 | 日韩美女性交 | 国产一级婬乱A片 | 小浪货都湿透了痒 | 亚洲一区二区三区中文字幕日韩电影 | 天天操夜夜操天天 | 和漂亮的女老板做爰 |