1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Redis 6.0 IO線程功能分析

        共 5504字,需瀏覽 12分鐘

         ·

        2020-07-11 12:34

        Redis多線程原理

        Redis 6.0?的亮點之一就是支持多線程,Redis 分?主線程?和?IO線程,IO線程?只用于讀取客戶端的命令和發(fā)送回復(fù)數(shù)據(jù)給客戶端,處理客戶端命令還是在?主線程?進行,如下圖所示:

        8ead1de77cb71e408032f1338ed96a5e.webp


        從上圖可知,主線程?主要負責(zé)接收客戶端連接,并且分發(fā)到各個?IO線程,而?IO線程?負責(zé)讀取客戶端命令。命令讀取完成后,由?主線程?執(zhí)行命令。主線程?執(zhí)行完命令后,再由?IO線程?把回復(fù)數(shù)據(jù)發(fā)送給客戶端。

        讀者可能會問,為什么處理命令不在?IO線程?進行,我覺得主要有兩個原因:

        • 如果處理命令在?IO線程?進行,那么就會涉及到競爭的問題。因為 Redis 的數(shù)據(jù)庫是共享的,所以如果多個線程同時操作數(shù)據(jù)庫,那么就必須要對數(shù)據(jù)庫進行上鎖,而上鎖是一個比較耗時的操作(因為上鎖可能會導(dǎo)致線程上下文切換)。

        • 由于 Redis 6.0 以前一直都是由單線程執(zhí)行命令的,所以如果要改為多線程執(zhí)行命令,那么需要修改大量代碼,而且可能會引入新的問題(比如bug)。所以,為了穩(wěn)定性,繼續(xù)使用單線程執(zhí)行命令是最好的選擇。

        為什么要使用多線程呢?主要為了使用多核CPU的優(yōu)勢,下面是使用多線程的測試數(shù)據(jù)(數(shù)據(jù)來源網(wǎng)絡(luò)):

        e18a5cbeb62cde0b45c2e5d62fa3539d.webp


        549d08bbb9b8a2d5e91d4e08d22359bb.webp

        從上面的測試結(jié)果可以看出,多線程版本的 Redis 讀寫QPS都要比單線程版本的高。

        Redis 多線程實現(xiàn)

        要開啟 Redis 的?IO線程?功能,可以在配置文件中加入以下配置項:

        io-threads-do-reads yes     # 開啟IO線程
        io-threads 6 # 設(shè)置IO線程數(shù)

        Redis 在啟動時會根據(jù)配置文件中設(shè)置的?IO線程?數(shù)來啟動?IO線程,啟動?IO線程?在函數(shù)?initThreadedIO()?中完成,代碼如下:

        void initThreadedIO(void) {
        io_threads_active = 0;

        if (server.io_threads_num == 1) return;
        ...
        for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
        serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
        exit(1);
        }
        io_threads[i] = tid;
        }
        }

        initThreadedIO()?函數(shù)的主要工作是:

        • 為每個IO線程創(chuàng)建一個鏈表,用于放置要進行IO操作的客戶端連接。

        • 為每個IO線程創(chuàng)建一個鎖,用于主線程與IO線程的通信。

        • 調(diào)用?pthread_create()?系統(tǒng)調(diào)用來創(chuàng)建IO線程,IO線程的主體函數(shù)是?IOThreadMain()。

        下面我們來分析一下IO線程的主體函數(shù)主要完成的工作:

        void *IOThreadMain(void *myid) {
        long id = (unsigned long)myid;
        ...
        while (1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
        if (io_threads_pending[id] != 0) break;
        }

        if (io_threads_pending[id] == 0) { // 不等于0表示有客戶端連接需要處理
        pthread_mutex_lock(&io_threads_mutex[id]);
        pthread_mutex_unlock(&io_threads_mutex[id]);
        continue;
        }
        ...
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        if (io_threads_op == IO_THREADS_OP_WRITE) {
        writeToClient(c,0);
        } else if (io_threads_op == IO_THREADS_OP_READ) {
        readQueryFromClient(c->conn);
        } else {
        serverPanic("io_threads_op value is unknown");
        }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
        ...
        }
        }

        IO線程的主體函數(shù)主要完成以下幾個操作:

        • 等待主線程分配客戶端連接(對應(yīng)IO線程的?io_threads_list?鏈表不為空)。

        • 判斷當(dāng)前是進行讀操作還是寫操作(io_threads_op?等于?IO_THREADS_OP_WRITE?表示要進行寫操作,而?io_threads_op?等于?IO_THREADS_OP_READ?表示要進行讀操作)。如果是進行寫操作,那么就調(diào)用?writeToClient()?函數(shù)向客戶端連接進行發(fā)送數(shù)據(jù)。如果是讀操作,那么調(diào)用?readQueryFromClient()?函數(shù)讀取客戶端連接的請求。

        • 完成對客戶端連接的讀寫操作后,需要清空對應(yīng)IO線程的?io_threads_list?鏈表和計數(shù)器?io_threads_pending,用于通知主線程已經(jīng)完成讀寫操作。

        那么,主線程是怎樣分配客戶端連接給各個IO線程的呢?

        主線程在接收到客戶端連接后,會把客戶端連接添加到事件驅(qū)動庫中監(jiān)聽其讀事件,讀事件的回調(diào)函數(shù)為?readQueryFromClient()。也就是說,當(dāng)客戶端連接可讀時會觸發(fā)調(diào)用?readQueryFromClient()?函數(shù),而?readQueryFromClient()?函數(shù)會調(diào)用?postponeClientRead()?函數(shù)判斷當(dāng)前 Redis 是否開啟了?IO線程?功能,代碼如下:

        int postponeClientRead(client *c) {
        if (io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
        {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
        } else {
        return 0;
        }
        }

        postponeClientRead()?函數(shù)主要判斷 Redis 是否開啟了?IO線程?功能,如果開啟了就調(diào)用?listAddNodeHead()?函數(shù)把客戶端連接添加到?clients_pending_read?鏈表中,并且設(shè)置客戶端連接的?CLIENT_PENDING_READ?標(biāo)志位,表示當(dāng)前連接已經(jīng)在?clients_pending_read?鏈表中,防止二次添加。

        把客戶端連接添加到?clients_pending_read?鏈表后,主線程會在?handleClientsWithPendingReadsUsingThreads()?函數(shù)中把客戶端連接分配給各個?IO線程。代碼如下:

        int handleClientsWithPendingReadsUsingThreads(void) {
        ...
        /* 分配給各個IO線程 */
        listIter li;
        listNode *ln;
        listRewind(server.clients_pending_read,&li);
        int item_id = 0;
        while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
        }

        // 設(shè)置各個IO線程負責(zé)的客戶端連接數(shù)
        io_threads_op = IO_THREADS_OP_READ;
        for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count;
        }

        // 主線程也要負責(zé)一部分客戶端連接的讀寫操作
        listRewind(io_threads_list[0],&li);
        while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
        }
        listEmpty(io_threads_list[0]);

        // 等待所有IO線程完成
        while (1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
        pending += io_threads_pending[j];
        if (pending == 0) break;
        }
        ...
        // 執(zhí)行各個客戶端連接的命令
        while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);

        if (c->flags & CLIENT_PENDING_COMMAND) {
        c->flags &= ~CLIENT_PENDING_COMMAND;
        if (processCommandAndResetClient(c) == C_ERR) {
        continue;
        }
        }
        processInputBuffer(c);
        }
        return processed;
        }

        handleClientsWithPendingReadsUsingThreads()?函數(shù)主要完成以下幾個操作:

        • 分配客戶端連接給各個?IO線程(添加到對應(yīng)?IO線程?的?io_threads_list?鏈表中),分配策略為輪詢。

        • 設(shè)置各個?IO線程?負責(zé)的客戶端連接數(shù)?io_threads_pending。

        • 處理主線程負責(zé)那部分客戶端連接的讀寫操作。

        • 等待所有?IO線程?完成讀取客戶端連接請求的命令。

        • 執(zhí)行各個客戶端連接請求的命令。

        前面說過,IO線程?在完成讀取客戶端連接的請求后,會把?io_threads_pending?計數(shù)器清零,主線程就是通過檢測?io_threads_pending?計數(shù)器來判斷是否所有?IO線程?都完成了對客戶端連接的讀取命令操作。

        但這里要吐槽一下的是,在等待?IO線程?讀取客戶端請求時,居然用了一個死循環(huán)來等待,這樣有可能會導(dǎo)致CPU使用率飆升的問題,有可能影響其他服務(wù)的運行(不知道作者怎么想的)。我覺得比較合適的方式是,各個?IO線程?完成了讀取命令操作后,通過一個信號來通知主線程。


        瀏覽 70
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            女人扒开尿口让男人舔 | 国产精品久久久违 | 找个操逼网站 | 豆花视频AⅤ一区二区三区 | 色婷婷香蕉在线一区二区 | 激情视屏 | 天天久久夜夜一起射 | 人人插人人干在线观看 | 中文字幕在线观看av | 中文字幕A片 |