1. TDSQL for PG 并行框架原理解析

        共 17954字,需瀏覽 36分鐘

         ·

        2024-07-17 21:36

        并行方式簡介

        查詢并行

        查詢并行是指將一個查詢分解為多個子查詢,在多個處理器上同時執(zhí)行這些子查詢。查詢并行通常用于處理計算密集型和IO密集型的查詢,例如,涉及多個表連接、聚合、表掃描等操作的查詢。查詢并行可以有效地提高查詢性能,因為每個處理器只需要處理查詢的一部分。

        這種并行方式在傳統(tǒng)數(shù)據(jù)庫中使用比較多,比如Oracle、PostgreSQL,TDSQL for PG 也采用的是這種并行方式。這種方式的好處是能將查詢?nèi)蝿?wù)分解為多個任務(wù),分布在多個處理器(甚至跨服務(wù)器的處理器)上并行執(zhí)行,最終通過 Gather 節(jié)點將結(jié)果匯總。

        相比其他的并行方式,查詢并行的調(diào)度更簡單,正因為如此,資源的使用效率不是最高的。另外,這種并行方式需要在處理器之間傳輸和同步數(shù)據(jù),系統(tǒng)開銷較大。

        pipeline并行

        管道 (pipeline) 并行是指將一個操作的輸出作為另一個操作的輸入,這樣多個操作可以同時進行。這種并行方式通常用于數(shù)據(jù)庫查詢處理中的多個階段,例如,從磁盤讀取數(shù)據(jù)、過濾數(shù)據(jù)、排序數(shù)據(jù)等。

        pipeline并行可以提高資源利用率,因為 pipeline 中的各個階段、pipeline 之間可以并行、異步執(zhí)行,而不是等待前一個階段完成。

        ClickHouse、Doris 等使用的就是這種并行方式。pipeline 并行的好處是能充分的利用資源,結(jié)合線程池技術(shù),可以非常精細的調(diào)度任務(wù),目的是提升數(shù)據(jù)處理的吞吐量。

        但是這種并行方式不夠靈活,因為每個處理階段的輸入輸出是固定的,限制了處理階段之間的交互和協(xié)作,同時還需要管理和協(xié)調(diào)好各個處理階段,提升了調(diào)度的復(fù)雜度。與之對應(yīng)的是 DAG(Directed Acyclic Graph) 方式,典型的產(chǎn)品就是 Spark。

        任務(wù)并行

        任務(wù)并行是指在多個處理器上同時執(zhí)行不同的任務(wù)。這種并行方式通常用于處理多個獨立的查詢或事務(wù)。任務(wù)并行可以提高系統(tǒng)的吞吐量,因為多個查詢或事務(wù)可以同時進行。

        TDSQL for PG 的后臺任務(wù),比如 autovacuum、checkpointer 等就是這種并行方式,任務(wù)之間獨立執(zhí)行,互不干擾。

        數(shù)據(jù)并行

        數(shù)據(jù)并行是指在多個處理器上同時對數(shù)據(jù)集的不同部分執(zhí)行相同的操作。這通常是通過將數(shù)據(jù)劃分為多個分區(qū)來實現(xiàn)的,每個處理器負責(zé)處理一個分區(qū)。

        數(shù)據(jù)并行可以有效地提高查詢性能,因為每個處理器只需要處理數(shù)據(jù)的一部分。通常來說,上面的并行方式都會結(jié)合數(shù)據(jù)并行來執(zhí)行。

        指令并行

        本文指的指令并行是利用SIMD指令的并行,SIMD指令可以減少分支預(yù)測的開銷,提高內(nèi)存訪問的局部性、cache的命中率。數(shù)據(jù)庫中的排序算法可以利用 SIMD 指令進行并行比較和交換,join 也可以使用 SIMD 進行并行的匹配,最常用的是壓縮和編碼用 SIMD 提升性能。

        TDSQL for PG 主要使用了查詢并行、數(shù)據(jù)并行、任務(wù)并行這幾種方式,本文重點要分析的是查詢并行的框架和原理。

        并行框架概述

        TDSQL for PG 并行框架總體流程

        在并行框架中有三種進程角色,分部是 server 進程,backend 進程(也稱作 leader 進程)和 Background Worker 進程。
        • server 進程是資源調(diào)度進程,負責(zé)進程的分配
        • backend 是并行任務(wù)的發(fā)起進程,負責(zé)并行執(zhí)行環(huán)境的初始化,也負責(zé)通過 Gather 和 GatherMerge 節(jié)點匯總結(jié)果
        • Background Worker 進程是任務(wù)的具體執(zhí)行者,并返回結(jié)果給backend 進程。

        執(zhí)行的流程跟單進程時一樣,都會依次調(diào)用 CreateQueryDesc(), ExecutorStart() , ExecutorRun(), ExecutorFinish(), ExecutorEnd() 函數(shù)。

        區(qū)別在于 Background Worker 需要先從動態(tài)共享內(nèi)存中恢復(fù)執(zhí)行需要的環(huán)境,以及執(zhí)行結(jié)束后清理動態(tài)內(nèi)存。

        TDSQL for PG 的并行框架主要流程如下圖所示:

        1.  Client 連接到 server 以后 server 進程為其創(chuàng)建一個 backend 進程,banckend 進程在生成執(zhí)行計劃的過程中識別出是否需要并行執(zhí)行,如果能并行執(zhí)行就會創(chuàng)建 Background Worker 進程。

        2.  如果并行執(zhí)行,backend 進程先調(diào)用ExecInitParallelPlan()函數(shù)初始化并行執(zhí)行需要的環(huán)境。
        包括執(zhí)行計劃的序列化(ExecSerializePlan()),動態(tài)共享內(nèi)存初始化InitializeParallelDSM(), 動態(tài)共享內(nèi)存初始化又包含動態(tài)共享內(nèi)存段的創(chuàng)建,library、GUC、snapshot 等的序列化和拷貝。
        3.  接著后端進程調(diào)用LaunchParallelWorkers()注冊 Background Worker。
        注冊的方式是調(diào)用RegisterDynamicBackgroundWorker()查找可用的 Background Worker 槽位,如果找到就向 server 進程發(fā)送PMSIGNAL_BACKGROUND_WORKER_CHANGE信號。
        4.  server 進程處理信號(sigusr1_handler())
        調(diào)用BackgroundWorkerStateChange() 遍歷所有的 Background Worker 槽位,找到剛注冊的槽位,實例化一個RegisteredBgWorker,并 push 到全局變量中。
        5.  接下來 server 進程調(diào)用maybe_start_bgworkers()遍歷BackgroundWorkerList。
        為里面的每個RegisteredBgWorker fork進程。fork 出來的進程執(zhí)行ParallelWorkerMain(),ParallelWorkerMain()就是 background worker 的入口函數(shù)。
        并行框架的使用的大致流程如下:
        /* 進入并行模式,阻止不安全的狀態(tài)修改 */
            
        EnterParallelMode();

        /* 創(chuàng)建并行執(zhí)行的上下文,并插入到全局鏈表 pcxt_list 中 */
        pcxt = CreateParallelContext();

        /* 估算變量占用的 DSM 的大小,包括變量本身的大小和 key 的大學(xué). */
        shm_toc_estimate_chunk(&pcxt->estimator, size);
        shm_toc_estimate_keys(&pcxt->estimator, keys);

        /* 創(chuàng)建 DSM 并拷貝數(shù)據(jù) */
        InitializeParallelDSM(pcxt);

        /* 在 DSM 中申請空間. */
        space = shm_toc_allocate(pcxt->toc, size);
        shm_toc_insert(pcxt->toc, key, space);

        /* 注冊 background worker */
        LaunchParallelWorkers(pcxt);

        /* 執(zhí)行并行任務(wù)(計劃) */

        /* 等待并行 worker 執(zhí)行結(jié)束 */

        /* 讀取共享內(nèi)存中的結(jié)果 */

        DestroyParallelContext(pcxt);

        /* 退出并行執(zhí)行模式 */
        ExitParallelMode();

        通信機制

        并行執(zhí)行避免不了進程或線程之間的通信,TDSQL for PG 的并行框架采用的是進程模型,主要用到了兩種通信機制,一個是信號,一個是共享內(nèi)存。

        1.  信號

        信號主要是控制流,在并行框架中,后端進程注冊 background worker 時向 server 進程發(fā)送信號SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE)),server 進程調(diào)用 sigusr1_handler()處理信號,并創(chuàng)建 background worker 進程。

        當(dāng) background worker 執(zhí)行結(jié)束,會通過信號通知 backend 進程。

        2.  動態(tài)共享內(nèi)存

        在 TDSQL for PG 的并行框架中,動態(tài)共享內(nèi)存主要用來傳遞狀態(tài)和數(shù)據(jù)。
        從 backend 進程傳遞給 background worker 的狀態(tài)主要有執(zhí)行計劃、GUC、事務(wù)信息、snapshot 信息等,這部分使用的動態(tài)共享內(nèi)存在啟動并行執(zhí)行的初始化階段調(diào)用InitializeParallelDSM()來完成。
        數(shù)據(jù)主要是從 background Worker 返回給 backend 的執(zhí)行結(jié)果和錯誤信息,這些結(jié)果通過基于共享內(nèi)存的消息隊列shm_mq來傳遞,也就是 tuple queue, 每個 background Worker 和 backend 之間都有一個消息隊列,是多對一的關(guān)系。
        ExecParallelCreateReaders()函數(shù)負責(zé)為每個background Worker 創(chuàng)建 tuple queue 。同樣的也會創(chuàng)建多對一的錯誤消息隊列,用于 background Worker 傳遞具體的錯誤信息給 backend。
        對于普通的 SELECT 語句,background Worker 寫數(shù)據(jù)到 tuple queue,backend 進程從 tuple queue 中讀取結(jié)果。
        TDSQL for PG 還實現(xiàn)了 INSERT 和 UPDATE 的并行執(zhí)行,此時 background Worker 通過共享內(nèi)存中的變量把結(jié)果傳給 backend 進程,而不需要通過 tuple queue。

        關(guān)鍵數(shù)據(jù)結(jié)構(gòu)分析

        ParallelContext

        typedef struct ParallelContext
            
        {
        dlist_node node; /* 雙向鏈表的掛載點 */
        SubTransactionId subid; /* 調(diào)用GetCurrentSubTransactionId獲取子事務(wù)ID */
        int nworkers; /* 計劃的Worker數(shù)量 */
        int nworkers_launched; /* 實際發(fā)起的Worker數(shù)量 */
        bool leader_participate; /* 主進程是否參與執(zhí)行 */
        char *library_name; /* 庫的名稱,一般是postgres */
        char *function_name; /* background Worker的執(zhí)行函數(shù),用戶自定義,select對應(yīng)的是ParallelQueryMain */
        ErrorContextCallback *error_context_stack; /* 錯誤上下文棧 */
        shm_toc_estimator estimator; /* 共享內(nèi)存大小估算 */
        dsm_segment *seg; /* 動態(tài)共享內(nèi)存的狀態(tài)信息 */
        void *private_memory; /* 動態(tài)共享內(nèi)存申請失敗后回退到非并行執(zhí)行是使用的內(nèi)存。*/
        shm_toc *toc; /* Shared Memory Table of Contents */
        ParallelWorkerInfo *worker; /* 是一個數(shù)組,每個Worker一個,記錄Worker的信息 */
        int nknown_attached_workers; /* attach到error queue的Worker數(shù)量 */
        bool *known_attached_workers; /* 數(shù)組,標記每個Worker attach的狀態(tài) */
        } ParallelContext;

        次創(chuàng)建ParallelContext后都會插入到雙向鏈表pcxt_list中,這個雙向鏈表用于記錄活躍的并行上下文。

        ParallelExecutorInfo

        typedef struct ParallelExecutorInfo
            
        {
        PlanState *planstate; /* plan subtree we're running in parallel */
        ParallelContext *pcxt; /* parallel context we're using */
        BufferUsage *buffer_usage; /* points to bufusage area in DSM */
        uint64 *processed_count; /* processed tuple count area in DSM */
        SharedExecutorInstrumentation *instrumentation; /* optional */
        struct SharedJitInstrumentation *jit_instrumentation; /* optional */
        dsa_area *area; /* points to DSA area in DSM */
        dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */
        bool finished; /* set true by ExecParallelFinish */
        /* These two arrays have pcxt->nworkers_launched entries: */
        shm_mq_handle **tqueue; /* tuple queues for worker output */
        struct TupleQueueReader **reader; /* tuple reader/writer support */
        } ParallelExecutorInfo;

        這個數(shù)據(jù)結(jié)構(gòu)記錄了并行執(zhí)行時的各種信息,由函數(shù)mq_bytes_readmq_bytes_written按 8 bytes 讀寫,必須用 memory barrier 同步。

        shm_mq

        struct shm_mq
        {
                slock_t mq_mutex;
        PGPROC *mq_receiver;
        PGPROC *mq_sender;
        pg_atomic_uint64 mq_bytes_read;
        pg_atomic_uint64 mq_bytes_written;
        Size mq_ring_size;
        bool mq_detached;
        uint8 mq_ring_offset;
                char mq_ring[FLEXIBLE_ARRAY_MEMBER];
        };

        共享內(nèi)存中的隊列。

         mq_receivermq_bytes_read只能被 receiver 改變。同理mq_sendermq_bytes_writte 只能被 sender 改變。

         mq_receivermq_sendermq_mutex保護,一旦設(shè)置就不能改變,所以設(shè)置以后可以無鎖的讀。

         mq_bytes_readmq_bytes_written按 8 bytes 讀寫,必須用 memory barrier 同步。

        shm_mq_handle

        struct shm_mq_handle
            
        {
        shm_mq *mqh_queue;
        dsm_segment *mqh_segment;
        BackgroundWorkerHandle *mqh_handle;
        char *mqh_buffer;
        Size mqh_buflen;
        Size mqh_consume_pending;
        Size mqh_send_pending;
        Size mqh_partial_bytes;
        Size mqh_expected_bytes;
        bool mqh_length_word_complete;
        bool mqh_counterparty_attached;
        MemoryContext mqh_context;
        };
        用于管理共享隊列的數(shù)據(jù)結(jié)構(gòu)。

        ● mqh_queue

        指向關(guān)聯(lián)的消息隊列

        ● mqh_segment

        指向包含該消息隊列的動態(tài)共享內(nèi)存

        ● mqh_handle

        與該消息隊列綁定的后臺工作進程,由shm_mq_attach()綁定。

        ● mqh_buffer

        對于超過ring buffer大小的數(shù)據(jù),或者出現(xiàn)了回卷的數(shù)據(jù),就把隊列中的chunk拷貝到mqh_buffer。

        ● mqh_buflen

        mqh_buflen 是mqh_buffer的長度。

        ● mqh_consume_pending

        當(dāng)mqh_consume_pending超過環(huán)形緩沖區(qū)1/4大小時,說明數(shù)據(jù)已經(jīng)消費掉了,需要更新共享內(nèi)存中的數(shù)據(jù)。
        ● mqh_send_pending

        已經(jīng)寫到queue中,但是還沒有更新到共享內(nèi)存的數(shù)據(jù)大小。只有當(dāng)數(shù)據(jù)大小超過 ring buffer 的 1/4,或者tuple queue慢了的時候才更新共享內(nèi)存。

        ●mqh_partial_bytes、mqh_expected_bytes、and mqh_length_word_complete

        這三個變量用來跟蹤非阻塞操作的狀態(tài),記錄的是length word的發(fā)送情況。
        當(dāng)調(diào)用者嘗試非阻塞操作時,但是返回了SHM_MQ_WOULD_BLOCK,那么需要稍后用相同的參數(shù)重新調(diào)用這個參數(shù),所以需要記錄狀態(tài)還有多少數(shù)據(jù)沒有被發(fā)送。
        發(fā)送數(shù)據(jù)時shm_mq_sendv()),先發(fā)送要發(fā)送的字節(jié)數(shù)nbytes(類型是Size),mqh_length_word_complete 就是記錄nbytes的幾個字節(jié)是否都發(fā)送完了。
        此時 mqh_partial_bytes 表示已經(jīng)發(fā)生了幾個字節(jié),也可以用于記錄 payload 發(fā)送了多少字節(jié)。
        ● mqh_length_word_complete
        用于跟蹤是否完整的接收或者發(fā)送了所有的數(shù)據(jù)。
        mqh_partial_bytes 記錄了讀或者寫了多少bytes數(shù)據(jù),而mqh_expected_bytes 只記錄期望讀的負載數(shù)據(jù)的總大小。

        ● mqh_counterparty_attached

        用于記錄對手(sender 或者 receiver)是否已經(jīng)掛載到queue。從而不必要的 mutex 申請。

        ● mqh_context

        shm_mq_handle所在的上下文,所有內(nèi)存的申請都要在這個上下文內(nèi)進行。


        關(guān)鍵函數(shù)分析

        ExecInitParallelPlan()

        該函數(shù)主要初始化并行執(zhí)行需要的一些基礎(chǔ)信息,在并行的發(fā)起節(jié)點調(diào)用,比如Gather、GatherMerge、RemoteFragment(分布式場景下也支持節(jié)點內(nèi)并行)等。

        這個函數(shù)的核心流程如下:

             

        ParallelExecutorInfo *
            
        ExecInitParallelPlan(PlanState *planstate, EState *estate,
        Bitmapset *sendParams, int nworkers,
        int64 tuples_needed)
        {
        ...

        /* 序列化執(zhí)行計劃 */
        pstmt_data = ExecSerializePlan(planstate->plan, estate);

        /* 為返回值申請空間 */
        pei = palloc0(sizeof(ParallelExecutorInfo));

        /* 創(chuàng)建并行框架上下文 */
        pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
        pei->pcxt = pcxt;

        /* 動態(tài)共享內(nèi)存大小估算,為每個需要傳遞給background worker的變量估算內(nèi)存大小,包括執(zhí)行計劃、BufferUsage、tuple queues等。*/
        shm_toc_estimate_chunk(&pcxt->estimator, ...);
        shm_toc_estimate_keys(&pcxt->estimator, 1);
        ...

        /* 為每個可并行的node估算其需要的共享內(nèi)存大小 */
        ExecParallelEstimate(planstate, &e);

        ...

        /* 為DSA估算空間。DSA的大小可以在執(zhí)行過程中改變,所以可能會更新的狀態(tài)放到這個區(qū)域。DSA會綁定多個DSM,當(dāng)DSA大小不夠時,可以創(chuàng)建新的DSM。*/
        shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
        shm_toc_estimate_keys(&pcxt->estimator, 1);

        /* 為并行框架建立動態(tài)共享內(nèi)存段,并將Worker需要的狀態(tài) copy 到共享內(nèi)存。*/
        InitializeParallelDSM(pcxt);

        /* 在DSM中為并行執(zhí)行需要的狀態(tài)信息申請共享內(nèi)存并插入到toc中。*/
        shm_toc_allocate(pcxt->toc, ...);
        shm_toc_insert(pcxt->toc, ..., ...);
        ...

        /* 為每個Worker創(chuàng)建一個tuple queue,用于leader和Worker之間傳遞執(zhí)行結(jié)果。*/
        pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);

        ...

        /* 遍歷planstate中所有的node,為其初始化共享內(nèi)存,把狀態(tài)信息拷貝的共享內(nèi)存。*/
        ExecParallelInitializeDSM(planstate, &d);

        ...

        return pei;
        }

        ExecSerializePlan

        用于執(zhí)行計劃的序列化,序列化以后放入共享內(nèi)存,傳遞給background worker,再經(jīng)過反序列化。
        static char *
            
        ExecSerializePlan(Plan *plan, EState *estate)
        {
        /* 實際調(diào)用copyObjectImpl()對執(zhí)行計劃中的算子、表達式進行深度拷貝,會遞歸調(diào)用一些列以“_copy”開頭的函數(shù)。*/
        plan = copyObject(plan);

        /* 復(fù)制PlannedStmt,復(fù)制background worker必要的信息,最后序列化返回*/
        pstmt = makeNode(PlannedStmt);
        ...
        return nodeToString(pstmt);
        }

        CreateParallelContext

        ParallelContext *
            
        CreateParallelContext(const char *library_name, const char *function_name, int nworkers);

        創(chuàng)建并行框架的上下文,library_name 是要加載的庫的名稱,通常為“postgres”, function_name 是并行執(zhí)行函數(shù)的名稱。

        在background worker進程的入口函數(shù)ParallelWorkerMain()中會通過這個函數(shù)名從library中加載函數(shù)并執(zhí)行,nworkers 是并行執(zhí)行的進程數(shù)。

        ExecParallelSetupTupleQueues()

        Worker 進程執(zhí)行完 plan segment 后,結(jié)果通過共享內(nèi)存消息隊列傳遞給 leader 進程,這個函數(shù)就是為每個 worker 創(chuàng)建一個共享隊列shm_mq_handle。

             

        static shm_mq_handle **
            
        ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
        {
        /* 為每個worker的shm_mq_handle申請內(nèi)存 */
        responseq = (shm_mq_handle **)
        palloc(pcxt->nworkers * sizeof(shm_mq_handle *));

        /* 如果不需要重新初始化,那么就在DSM中為每一個worker的tuple queue申請空間;否則就直接查找。 */
        if (!reinitialize)
        tqueuespace =
        shm_toc_allocate(pcxt->toc, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
        else
        tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);

        /* 為每個 worker 創(chuàng)建消息隊列,并將 leader 進程設(shè)置為接收者,然后將 mq、dsm_segment 關(guān)聯(lián)起來。*/
        for (i = 0; i < pcxt->nworkers; ++i)
        {
        shm_mq *mq;

        mq = shm_mq_create(tqueuespace +
        ((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
        (Size) PARALLEL_TUPLE_QUEUE_SIZE);

        shm_mq_set_receiver(mq, MyProc);
        responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
        }

        /* 插入到toc中, 方便在 background worker 中查表恢復(fù) */
        if (!reinitialize)
        shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);

        return responseq;
        }

        LaunchParallelWorkers()

        發(fā)起background worker的函數(shù),主要是調(diào)用以下代碼來完成:RegisterDynamicBackgroundWorker()

             

        void
            
        LaunchParallelWorkers(ParallelContext *pcxt)
        {
        BackgroundWorker worker;
        ...

        /* 誰發(fā)起worker誰就是leader */
        BecomeLockGroupLeader();

        ...

        // 注冊worker
        for (i = 0; i < pcxt->nworkers; ++i)
        {
        memcpy(worker.bgw_extra, &i, sizeof(int));
        if (!any_registrations_failed &&
        RegisterDynamicBackgroundWorker(&worker,
        &pcxt->worker[i].bgwhandle))
        {
                                /* 注冊成功 */
        }
        else
        {
        /* 當(dāng)超過了max_worker_processes的限制,則注冊失敗。設(shè)置any_registrations_failed = true,防止繼續(xù)注冊。
        any_registrations_failed = true;
        ...
        }

        ...

        }

        RegisterDynamicBackgroundWorker()

        這個函數(shù)主要是從BackgroundWorkerData中獲取一個可用的BackgroundWorkerSlot將其設(shè)置為已經(jīng)占用。

        然后給 server 發(fā)送一個PMSIGNAL_BACKGROUND_WORKER_CHANGE信號,通知server ,background worker 的狀態(tài)有變化。

        此時 server 遍歷BackgroundWorkerSlot,找到剛注冊的 background worker,為其創(chuàng)建進程。

             

        bool
            
        RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
        BackgroundWorkerHandle **handle)
        {
        ...

        for (slotno = 0; slotno < BackgroundWorkerData->total_slots; ++slotno)
        {

        if (!slot->in_use)
        {
        ...
        }
        }

        /* If we found a slot, tell the postmaster to notice the change. */
        if (success)
        SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);

        if (success && handle)
        {
        *handle = palloc(sizeof(BackgroundWorkerHandle));
        (*handle)->slot = slotno;
        (*handle)->generation = generation;
        }
        }

        ParallelWorkerMain

        background worker 進程的入口函數(shù),屬于并行框架內(nèi)固定的函數(shù)。
        由這個函數(shù)調(diào)用實際的執(zhí)行函數(shù),對于select、update、Insert 語句,執(zhí)行函數(shù)就是ParallelQueryMain(),對于并行創(chuàng)建索引,執(zhí)行函數(shù)就是_bt_parallel_build_main()。
        調(diào)用CreateParallelContext()創(chuàng)建ParallelContext時,執(zhí)行函數(shù)的名稱作為參數(shù)傳遞,例如CreateParallelContext("postgres", "ParallelQueryMain", nworkers)
        這個函數(shù)的主要任務(wù)就是從共享內(nèi)存中反操作讀取信息,準備 background worker 執(zhí)行需要的環(huán)境。

             

        void
            
        ParallelWorkerMain(Datum main_arg)
        {
        ...

        /* attach 共享內(nèi)存,讀取 toc中的內(nèi)容 */
        seg = dsm_attach(DatumGetUInt32(main_arg));
        toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));

        /* 注冊退出時的回調(diào)函數(shù) */
        on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);

        /* 設(shè)置錯誤消息隊列,將當(dāng)前worker設(shè)為發(fā)送者 */
        error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
        mq = (shm_mq *) (error_queue_space +
        ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
        shm_mq_set_sender(mq, MyProc);
        mqh = shm_mq_attach(mq, seg, NULL);

        /* 從庫中查找background worker要執(zhí)行的函數(shù),例如ParallelQueryMain。*/
        entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
        library_name = entrypointstate;
        function_name = entrypointstate + strlen(library_name) + 1;

        entrypt = LookupParallelWorkerFunction(library_name, function_name);

        ...

        /* 多次調(diào)用 shm_toc_lookup(shm_toc *toc, uint64 key, bool noError), 從 toc 中讀取狀態(tài)、參數(shù)等 */
        ...

        /* 執(zhí)行 ParallelQueryMain、_bt_parallel_build_main 等。*/
        entrypt(seg, toc);

        /* 退出并行模式 */
        ExitParallelMode();

        PopActiveSnapshot();

        EndParallelWorkerTransaction();

        DetachSession();
        ...
        }

        ParallelQueryMain

        并行 query 的入口函數(shù),在ParallelWorkerMain()中被調(diào)用。

             

        void
            
        ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
        {
        ...

        /* 設(shè)置tuple的接收者 */
        receiver = ExecParallelGetReceiver(seg, toc);

        /* 反序列化 PlannedStmt,ParamListInfo,SQL,并創(chuàng)建 QueryDesc */
        queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);

        ...

        ExecutorStart(queryDesc, fpes->eflags);

        ...

        /* 初始化 PlanState,根據(jù)node類型調(diào)用不同的ExecXXXInitializeWorker();*/
        ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);

        ...

        ExecutorRun(queryDesc,
        ForwardScanDirection,
        fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
        true);

        ....

        ExecutorEnd(queryDesc);

        ...
        }


        性能分析與優(yōu)化

        使用并行框架能提升執(zhí)行的效率,但是也帶來了額外的開銷,經(jīng)過實際的測試,這個開銷大概在5 ~ 10毫秒,也就是說啟動平行框架需要5毫秒以上。其中這個開銷的主要有以下幾部分組成:

        1.  序列化

        序列化包括 plan,planstate,snap,GUC、library 等。GUC 的序列化耗時最高。
        GUC序列化耗時包括兩部分,一部分是遍歷所有的 GUC 參數(shù),估算參數(shù)占用的內(nèi)存大小,一部分是將所有參數(shù)序列化。因為 TDSQL for PG 的參數(shù)有超過 800 個,內(nèi)存大小估算耗時大概 100 微秒,序列化耗時更高。
        GUC參數(shù)占用內(nèi)存的大小是不變的。因此可以不用每次啟動并行框架時計算一次,可以放在系統(tǒng)啟動階段時完成,比如放到build_guc_variables()函數(shù)中。
        因為每次執(zhí)行的參數(shù)可能會不一樣,所以不能在系統(tǒng)啟動階段完成,可以在系統(tǒng)啟動時序列化,啟動并行框架時判斷參數(shù)是否有變化,如果有就重新序列化并保存。
        2.  動態(tài)共享內(nèi)存的申請
        動態(tài)共享內(nèi)存的申請主要耗時點在函數(shù)dsm_create(),并行框架初始化過程中有兩個地方調(diào)用這個函數(shù)。
        一個是 GetSessionDsmHandle() 中調(diào)用,用來申請 session 內(nèi)部進程共享的內(nèi)存,每個session只需要調(diào)用一次;一次是為并行上下文申請共享內(nèi)存。每一次調(diào)用100微秒以上。
        3.  進程間數(shù)據(jù)傳輸
        background worker 進程將結(jié)果傳遞給 leader 進程的耗時也不可避免,跟數(shù)據(jù)量成正比。
        調(diào)整shmq 的緩沖區(qū)大小并不能提升性能。因此需要結(jié)合優(yōu)化器把這部分代價也加入到執(zhí)行計劃之中。


        -- 更多精彩 --

        技術(shù)干貨丨TDSQL 列存引擎 LibraDB 計算模型的設(shè)計與思考


        技術(shù)干貨丨 TDSQL for MySQL DDL執(zhí)行框架


        分布式數(shù)據(jù)庫時代,需要什么樣的產(chǎn)品?



        瀏覽 111
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 亚洲性情| 影音先锋日韩精品 | 中文在线观看免费视频 | 国产成人精品无码免费看夜聊软件 | 18国产片 |