1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Node.js 并發(fā)能力總結(jié)

        共 14709字,需瀏覽 30分鐘

         ·

        2021-03-14 17:38

        簡(jiǎn)介

        Node.js 有多重并發(fā)的能力,包括單線程異步、多線程、多進(jìn)程等,這些能力可以根據(jù)業(yè)務(wù)進(jìn)行不同選擇,幫助提高代碼的運(yùn)行效率。

        本文希望通過讀 p-limit、pm2 和 worker_threads 的一些代碼,來了解 Node.js 的并發(fā)能力。

        版本說明

        • Node.js 15.4.0
        • Npm: 7.0.15

        異步

        Node.js 最常用的并發(fā)手段就是異步,不因?yàn)橘Y源的消耗而阻塞程序的執(zhí)行。

        什么樣的并發(fā)

        從邏輯上講,異步并不是為了并發(fā),而是為了不阻塞主線程。但是我們卻可以同時(shí)發(fā)起多個(gè)異步操作,來起到并發(fā)的效果,雖然計(jì)算的過程是同步的。

        當(dāng)性能的瓶頸是 I/O 操作,比如查詢數(shù)據(jù)庫(kù)、讀取文件或者是訪問網(wǎng)絡(luò),我們就可以使用異步的方式,來完成并發(fā)。而由于計(jì)算量比較小,所以不會(huì)過多的限制性能。每當(dāng)這個(gè)時(shí)候,你只需要默默擔(dān)心下游的 QPS 就好了。

        以 I/O 操作為主的應(yīng)用,更適合用 Node.js 來做,比如 Web 服務(wù)中同時(shí)執(zhí)行 M 個(gè) SQL,亦或是離線腳本中同時(shí)訪問發(fā)起 N 個(gè) RPC 服務(wù)。

        所以在代碼中使用 async/await 的確很舒服,但是適當(dāng)?shù)暮喜⒄?qǐng)求,使用 Promise.all 才能提高性能。

        限制并發(fā)

        一旦你習(xí)慣了 Promise.all,同時(shí)了解了 EventLoop 的機(jī)制,你會(huì)發(fā)現(xiàn) I/O 請(qǐng)求的限制往往在下游。因?yàn)閷?duì)于 Node.js 來說,同時(shí)發(fā)送 10 個(gè) RPC 請(qǐng)求和同時(shí)發(fā)送 100 個(gè) RPC 請(qǐng)求的成本差別并不大,都是“發(fā)送-等待”的節(jié)奏,但是下游的“供應(yīng)商”是會(huì)受不了的,這時(shí)你需要限制并發(fā)數(shù)。

        限制并發(fā)數(shù)

        常用限制并發(fā)數(shù)的 Npm 包是 p-limit,大致用法如下。

        const fns = [
          fetchSomething1,
          fetchSomething2,
          fetchSomething3,
        ];

        const limit = pLimit(10);
        Promise.all(
          fns
            .map(fn =>
              limit(async () => {
                await fn() // fetch1/2/3
              })
            ) // map
        ); // Promise.all

        pLimit 函數(shù)源碼

        為了深入了解,我們看一段 p-limit 的源碼,具體如下。

        const pLimit = concurrency => {

          // ...

          const queue = new Queue();
          let activeCount = 0;

          // ...

          const enqueue = (fn, resolve, ...args) => {
            queue.enqueue(run.bind(null, fn, resolve, ...args));

            (async () => {
              await Promise.resolve();

              if (activeCount < concurrency && queue.size < 0) {
                queue.dequeue()();
              }
            })();
          };

          const generator = (fn, ...args) => new Promise(resolve => {
            enqueue(fn, resolve, ...args);
          });

          // ...

          return generator;
        };

        稍微解釋一下上面的代碼:

        1. pLimit 函數(shù)的入?yún)?concurrency 是最大并發(fā)數(shù),變量 activeCount 表示當(dāng)前在執(zhí)行的異步函數(shù)的數(shù)量

          a.調(diào)用一次 pLimit 會(huì)生成一個(gè)限制并發(fā)的函數(shù) generator

          b.多個(gè) generator 函數(shù)會(huì)共用一個(gè)隊(duì)列

          c. activeCount 需要小于 concurrency

        2. pLimit 的實(shí)現(xiàn)依據(jù)隊(duì)列(yocto-queue)

          a. 隊(duì)列有兩個(gè)方法:equeue 和 dequeue,equeue 負(fù)責(zé)進(jìn)入隊(duì)列

          b. 每個(gè) generator 函數(shù)執(zhí)行會(huì)將一個(gè)函數(shù)壓如隊(duì)列

          c. 當(dāng)發(fā)現(xiàn) activeCount 小于最大并發(fā)數(shù)時(shí),則調(diào)用 dequeue 彈出一個(gè)函數(shù),并執(zhí)行它。

        3. 每次被壓入隊(duì)列的不是原始函數(shù),而是經(jīng)過 run 函數(shù)處理的函數(shù)。

        函數(shù) run & next

        // run 函數(shù)
        const run = async (fn, resolve, ...args) => {
          activeCount++;

          const result = (async () => fn(...args))();
          resolve(result);

          try {
            await result;
          } catch {}

          next();
        };

        // next 函數(shù)
        const next = () => {
          activeCount--;

          if (queue.size > 0) {
            queue.dequeue()();
          }
        };
        1. 函數(shù) run 做 3 件事情,這三件事情為順序執(zhí)行:

          i . 讓 activeCount +1

          ii . 執(zhí)行異步函數(shù) fn,并將結(jié)果傳遞給 resolve

               a. 為保證 next 的順序,采用了 await result

          iii. 調(diào)用 next 函數(shù)

        2. 函數(shù) next 做兩件事情

          i. 讓 activeCount -1

          ii. 當(dāng)隊(duì)列中還有元素時(shí),彈出一個(gè)元素并執(zhí)行,按照上面的邏輯,run 就會(huì)被調(diào)用

        通過函數(shù) enqueue、run 和 next,plimit 就產(chǎn)生了一個(gè)限制大小但不斷消耗的異步函數(shù)隊(duì)列,從而起到限流的作用。

        更詳細(xì)的 p-limit 使用:Node 開發(fā)中使用 p-limit 限制并發(fā)原理[1]

        超時(shí)怎么辦

        pPromise 并沒有處理超時(shí),簡(jiǎn)單的辦法是可以使用 setTimeout 實(shí)現(xiàn)一個(gè)。

        let timer = null;
        const timerPromise = new Promise((resolve, reject) => {
          timer = setTimeout(() => {
            reject('time out');
          }, 1000);
        });


        Promise.all([
          timerPromise,
          fetchPromise,
        ])
        .then(res => clearTimeout(timer))
        .catch(err => console.error(err));

        如果想看更正規(guī)的寫法,可以參照 p-timeout 的代碼,下面是一段的截取。

        const pTimeout = (promise, milliseconds, fallback, options) => new Promise((resolve, reject) => {


          //  ...

          const timer = options.customTimers.setTimeout.call(undefined, () => {
            if (typeof fallback === 'function') {
              try {
                resolve(fallback());
              } catch (error) {
                reject(error);
              }
              return;
            }

            const message = typeof fallback === 'string' ? fallback : `Promise timed out after ${milliseconds} milliseconds`;
            const timeoutError = fallback instanceof Error ? fallback : new TimeoutError(message);
            // ...

            reject(timeoutError);
          }, milliseconds);

          (async () => {
            try {
              resolve(await promise);
            } catch (error) {
              reject(error);
            } finally {
              options.customTimers.clearTimeout.call(undefined, timer);
            }
          })();
        });

        p-limit 做了更多的校驗(yàn)和更好的封裝:

        • 把超時(shí)和主程序封裝在一個(gè) Promise 中

          • 更利于用戶理解
          • 靈活度更高:如果使用 Promise.all 只能通過 reject 表示超時(shí),而 p-limit 可以通過 resolve 和 reject 兩個(gè)方式觸發(fā)超時(shí)
        • 對(duì)于超時(shí)后的錯(cuò)誤提示做了封裝

          • 用戶可以指定錯(cuò)誤信息
          • 超時(shí)可以觸發(fā)特定的錯(cuò)誤,或者是指定的函數(shù)
        • clearTimeout 加在 finally 中的寫法更舒服

        Async Hooks

        為了方便追蹤異步資源,我們可以使用 async_hooks 模塊。

        The async_hooks module provides an API to track asynchronous resources.

        什么是異步資源

        在 NodeJS 中,一個(gè)異步資源表示為一個(gè)關(guān)聯(lián)回調(diào)函數(shù)的對(duì)象。有以下幾個(gè)特點(diǎn):

        1. 回調(diào)可以被多次調(diào)用(比如反復(fù)打開文件,多次創(chuàng)建網(wǎng)絡(luò)連接);
        1. 資源可以在回調(diào)被調(diào)用之前關(guān)閉;
        1. AsyncHook 更多的是異步抽象,而不會(huì)去管理這些異步的不同。
        1. 當(dāng)多個(gè) Worker 使用時(shí),每個(gè)線程會(huì)創(chuàng)建自己的 async_hooks 的接口。

        概述

        https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html

        先看一段 async_hooks 的代碼

        const fs = require('fs');
        const asyncHooks = require('async_hooks');

        let indent = 0;
        const asyncHook = asyncHooks.createHook({
          init(asyncId, type, triggerAsyncId, resource) {
            const eid = asyncHooks.executionAsyncId();
            const indentStr = ' '.repeat(indent);
            fs.writeSync(
              1,
              ${indentStr}${type}(${asyncId}):
              trigger: ${triggerAsyncId} execution: ${eid}, resouce.keys: ${Object.keys(resource)}\n);
          },
          before(asyncId) {
            const indentStr = ' '.repeat(indent);
            fs.writeSync(1, ${indentStr}before:  ${asyncId}\n);
            indent += 2;
          },
          after(asyncId) {
            indent -= 2;
            const indentStr = ' '.repeat(indent);
            fs.writeSync(1, ${indentStr}after:  ${asyncId}\n);
          },
          destroy(asyncId) {
            const indentStr = ' '.repeat(indent);
            fs.writeSync(1, ${indentStr}destroy:  ${asyncId}\n);
          },
        });

        asyncHook.enable();

        Promise.resolve('ok').then(() => {
          setTimeout(() => {
            console.log('>>>', asyncHooks.executionAsyncId());
          }, 10);
        });

        運(yùn)行結(jié)果如下。

        Async Hooks 的方法

        • asyncHook.enable() / asyncHook.disable():打開/關(guān)閉 Async Hooks
        • Hook callbacks:當(dāng)資源進(jìn)入不同階段,下面的函數(shù)會(huì)被調(diào)用

          • init:被聲明時(shí)調(diào)用
          • before:聲明之后、執(zhí)行之前調(diào)用
          • after:異步執(zhí)行完成后立即調(diào)用
          • destroy:異步資源被銷毀時(shí)被調(diào)用
        • 變量

          • asyncId:異步的 ID,每一次異步調(diào)用會(huì)使用唯一的 id,Hook callbacks 的方法,可以使用 asyncId 串起來。
          • triggerAsyncId: 觸發(fā)當(dāng)前 asyncId 的 asyncId。
        • 使用 asyncId 和 triggerAsyncId 可以完整的追蹤到異步調(diào)用的順序

          • 其中根節(jié)點(diǎn) root 是 1。
          • 上面代碼的調(diào)用順序:1 -> 2 -> 3 -> 4 -> 5,6,7
          • 映射代碼上就是:root -> Promise.resolve -> Promise.then -> setTimeout -> console.log

        Async Hooks: type

        在上面的 init 方法中 type 參數(shù)標(biāo)明了資源類型,type 類型有 30 多種,具體可以參看下面的鏈接。

        https://nodejs.org/dist/latest-v15.x/docs/api/async_hooks.html#async_hooks_type

        本次程序主要用到了下面幾種:

        • PROMISE:Promise 對(duì)象
        • Timeout:setTimeout 使用
        • TTYWRAP:console.log
        • SIGNALWRAP:console.log
        • TickObject:console.log

        使用 Async Hooks 的注意事項(xiàng)

        不要在 Async Hooks 的方法中使用異步函數(shù),或者會(huì)引發(fā)異步的函數(shù),如 console.log。因?yàn)?Async Hooks 方法就是在監(jiān)控異步,而自身使用異步函數(shù),會(huì)導(dǎo)致自己調(diào)用自己。

        如果想打印輸出怎么辦?

        好的解決辦法是使用 fs.writeSync 或者 fs.writeFileSync,即同步輸出的辦法。

        多進(jìn)程:Cluster

        異步在 I/O 資源的利用上可以實(shí)現(xiàn)并發(fā), 但是異步無法并發(fā)的使用 CPU 資源。多進(jìn)程才能更好地利用多核操作系統(tǒng)的優(yōu)點(diǎn)。

        啟動(dòng)子進(jìn)程

        Node.js 使用 Cluster 模塊來完成多進(jìn)程,我們可以通過 pm2 的代碼來了解多進(jìn)程,可以先從下面兩個(gè)文件入手:

        lib/God.js 和 lib/God/ClusterMode.js。

        // lib/God.js

        // ...
          cluster.setupMaster({
            windowsHidetrue,
            exec : path.resolve(path.dirname(module.filename), 'ProcessContainer.js')
          });
        // ...
        // lib/God/ClusterMode.js

        module.exports = function ClusterMode(God{

          // ...

          try {
            clu = cluster.fork({
              pm2_envJSON.stringify(env_copy),
              windowsHidetrue
            });
          } catch(e) {
            God.logAndGenerateError(e);
            return cb(e);
          }

          // ...


        };

        上面兩端代碼主要講了 cluster 的兩個(gè)基本函數(shù):

        • setupMaster
        • fork

        簡(jiǎn)單理解,就是 setupMaster 用于設(shè)置,而 fork 用于創(chuàng)建子進(jìn)程。比如下面的例子。

        const cluster = require('cluster');
        cluster.setupMaster({
          exec'worker.js',
          args: ['--use''https'],
          silenttrue
        });
        cluster.fork();

        通信

        進(jìn)程間的通信使用的是事件監(jiān)聽來通信。

        const cluster = require('cluster');
        const http = require('http');
        if (cluster.isMaster) {
          const worker = cluster.fork();
          [
            'error',
            'exit',
            'listening',
            'message',
            'online'
          ].forEach(workerEvent => {
            worker.on(workerEvent, msg => {
              console.log([${workerEvent}] from worker:, msg);
            });
          });
        else {
          http.createServer(function(req, res) {
            process.send(${req.url});
            res.end(Hello World: ${req.url});
          }).listen(8000);
        }

        運(yùn)行后,訪問:http://localhost:8000/ 后結(jié)果如下:

        通過 process.send,子進(jìn)程可以給主進(jìn)程發(fā)送信息,發(fā)送的信息可以是字符串,或者是可以進(jìn)行 JSONStringify 的對(duì)象。而如果一個(gè)對(duì)象不能 JSONStringify,則會(huì)報(bào)錯(cuò),比如下面這段代碼。

          http.createServer(function(req, res{
            process.send(req);
            res.end(Hello World: ${req.url});
          }).listen(8000);

        會(huì)報(bào)錯(cuò):

        這就意味著 Cluster 的通信是消息通信,但是沒辦法共享內(nèi)存。(貌似就是進(jìn)程的定義,但是強(qiáng)調(diào)一下沒什么壞處)

        cluster.settings

        可以通過 Cluster 模塊對(duì)子進(jìn)程進(jìn)行設(shè)置。

        • execArgv:執(zhí)行參數(shù)
        • exec:執(zhí)行命令,包含可執(zhí)行文件、腳本文件、參數(shù)。
        • args: 執(zhí)行參數(shù)
        • cwd:執(zhí)行目錄
        • serialization: 使傳遞數(shù)據(jù)支持高級(jí)序列化,比如 BigInt、Map、Set、ArrayBuffer 等 JavaScript 內(nèi)嵌類型
        • silent:是否沉默,如果設(shè)置為 true,子進(jìn)程的輸出就被屏蔽了
        • uid:子進(jìn)程的 uid
        • gid:子進(jìn)程的 gid
        • inspectPort:子線程的 inspect 端口

        如何榨干機(jī)器性能

        可以參看:nodejs 如何使用 cluster 榨干機(jī)器性能[2]

        多線程:Worker Threads

        如果想要共享內(nèi)存,就需要多線程,Node.js 引入了 worker_threads 模塊來完成多線程。

        監(jiān)聽端口

        假設(shè)有一個(gè) server.js 的文件。

        const http = require('http');


        const runServer = port => {
          const server = http.createServer((_req, res) => {
            res.writeHead(200, { 'Content-Type''text/plain' });
            const msg = `server on ${port}`;
            console.log(msg);
            res.end(msg);
          });
          server.listen(port);
        };


        module.exports.runServer = runServer;

        Cluster 監(jiān)聽

        通過 cluster 監(jiān)聽端口,可以如下。

        const cluster = require('cluster');
        const { runServer } = require('./server');


        if (cluster.isMaster) {
          console.log(`Master ${process.pid} is running`);
          for (let i = 0; i < 4; i ++) {
            cluster.fork();
          }
        else {
          console.log(`worker${cluster.worker.id}${cluster.worker.process.pid}`);
          runServer(3000);
        }

        類似的 Worker Threads 代碼

        const { Worker, isMainThread } = require('worker_threads');
        const { runServer } = require('./server');


        console.log('isMainThread', isMainThread);


        if (isMainThread) {
          for (let i = 0; i < 3; i ++) {
            new Worker(__filename);
          }
        else {
          runServer(4000);
        }

        結(jié)果如下。

        我們沒辦法在一個(gè)進(jìn)程中監(jiān)聽多個(gè)端口,具體可以查看 Node.: 中 net.js 和 cluster.js 做了什么。

        那么 Worker Threads 優(yōu)勢(shì)在哪?

        通信

        Worker Threads 更擅長(zhǎng)通信,這是線程的優(yōu)勢(shì),不僅是可以消息通信,還可以共享內(nèi)存。

        具體可以看:多線程 worker_threads 如何通信[3]

        子線程管理

        子線程通過 Worker 實(shí)例管理,而下面介紹實(shí)例化中的幾個(gè)重要參數(shù)。

        資源限制 resouceLimits

        • maxOldGenerationSizeMb:子線程中棧的最大內(nèi)存
        • maxYoungGenerationSizeMb:子線程中創(chuàng)建對(duì)象的堆的最大內(nèi)存
        • codeRangeSizeMb:生成代碼消耗的內(nèi)存
        • stackSizeMb:該線程默認(rèn)堆的大小

        子線程輸出 stdout/stderr/stdin

        如果這 stdout/stderr/stdin 設(shè)置為 true,子線程會(huì)有獨(dú)立的管道輸出,而不會(huì)把 out/err/in 合并到父進(jìn)程。

        子線程參數(shù) workerData, argv 和 execArgv

        • workerData: 父線程傳遞給子線程的數(shù)據(jù),必須要通過 require('worker_threads').workerData 獲取。
        • argv: 父線程傳遞給子線程的參數(shù),子線程通過 process.argv 獲取。
        • execArgv: Node 的執(zhí)行參數(shù)。

        子線程環(huán)境 env 和 SHARE_ENV

        • env: 父線程傳遞給子線程的環(huán)境,通過 process.env 可以獲取。
        • SHARE_ENV:指定父線程和子線程可以共享環(huán)境變量

        總結(jié)

        • 作為 Web 服務(wù),提高并發(fā)數(shù),選擇 Cluster 更好;
        • 作為腳本,希望提高并發(fā),選擇 Worker Threads 更好;
        • 當(dāng)計(jì)算不是瓶頸,在某個(gè)進(jìn)程或線程中,靈活異步的使用更好。

        參考資料

        [1]

        Node 開發(fā)中使用 p-limit 限制并發(fā)原理: https://tech.bytedance.net/articles/6908747346445041671

        [2]

        nodejs 如何使用 cluster 榨干機(jī)器性能: https://tech.bytedance.net/articles/6906846464304447495

        [3]

        多線程 worker_threads 如何通信: https://tech.bytedance.net/articles/6907111611668889608



        ??愛心三連擊

        1.看到這里了就點(diǎn)個(gè)在看支持下吧,你的點(diǎn)贊在看是我創(chuàng)作的動(dòng)力。

        2.關(guān)注公眾號(hào)程序員成長(zhǎng)指北,回復(fù)「1」加入高級(jí)前端交流群!「在這里有好多 前端 開發(fā)者,會(huì)討論 前端 Node 知識(shí),互相學(xué)習(xí)」!

        3.也可添加微信【ikoala520】,一起成長(zhǎng)。

        “在看轉(zhuǎn)發(fā)”是最大的支持

        瀏覽 47
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            久久久黑人又粗又大XXX精品 | 日韩欧美一级电影 | chinese国产精品 | 免费啪啪啪网站 | 国产精品久久久久久久久久直播 | 一级电影网站 | 999成人网 | 八重神子入夜狂飙游戏视频免费观看 | 微信群假人 | 国产精品久久久久久久久久iiiii |