如何利用 JavaScript 實(shí)現(xiàn)并發(fā)控制
一、前言
??在開發(fā)過程中,有時(shí)會(huì)遇到需要控制任務(wù)并發(fā)執(zhí)行數(shù)量的需求。
??例如一個(gè)爬蟲程序,可以通過限制其并發(fā)任務(wù)數(shù)量來(lái)降低請(qǐng)求頻率,從而避免由于請(qǐng)求過于頻繁被封禁問題的發(fā)生。
??接下來(lái),本文介紹如何實(shí)現(xiàn)一個(gè)并發(fā)控制器。
二、示例
??const?task?=?timeout?=>?new?Promise((resolve)?=>?setTimeout(()?=>?{
????resolve(timeout);
??},?timeout))
??const?taskList?=?[1000,?3000,?200,?1300,?800,?2000];
??async?function?startNoConcurrentControl()?{
????console.time(NO_CONCURRENT_CONTROL_LOG);
????await?Promise.all(taskList.map(item?=>?task(item)));
????console.timeEnd(NO_CONCURRENT_CONTROL_LOG);
??}
??startNoConcurrentControl();
??上述示例代碼利用 Promise.all 方法模擬6個(gè)任務(wù)并發(fā)執(zhí)行的場(chǎng)景,執(zhí)行完所有任務(wù)的總耗時(shí)為 3000 毫秒。
??下面會(huì)采用該示例來(lái)驗(yàn)證實(shí)現(xiàn)方法的正確性。
三、實(shí)現(xiàn)
??由于任務(wù)并發(fā)執(zhí)行的數(shù)量是有限的,那么就需要一種數(shù)據(jù)結(jié)構(gòu)來(lái)管理不斷產(chǎn)生的任務(wù)。
??隊(duì)列的「先進(jìn)先出」特性可以保證任務(wù)并發(fā)執(zhí)行的順序,在 JavaScript 中可以通過「數(shù)組來(lái)模擬隊(duì)列」:
??class?Queue?{
????constructor()?{
??????this._queue?=?[];
????}
????push(value)?{
??????return?this._queue.push(value);
????}
????shift()?{
??????return?this._queue.shift();
????}
????isEmpty()?{
??????return?this._queue.length?===?0;
????}
??}
??對(duì)于每一個(gè)任務(wù),需要管理其執(zhí)行函數(shù)和參數(shù):
??class?DelayedTask?{
????constructor(resolve,?fn,?args)?{
??????this.resolve?=?resolve;
??????this.fn?=?fn;
??????this.args?=?args;
????}
??}
??接下來(lái)實(shí)現(xiàn)核心的 TaskPool 類,該類主要用來(lái)控制任務(wù)的執(zhí)行:
??class?TaskPool?{
????constructor(size)?{
??????this.size?=?size;
??????this.queue?=?new?Queue();
????}
????addTask(fn,?args)?{
??????return?new?Promise((resolve)?=>?{
????????this.queue.push(new?DelayedTask(resolve,?fn,?args));
????????if?(this.size)?{
??????????this.size--;
??????????const?{?resolve:?taskResole,?fn,?args?}?=?this.queue.shift();
??????????taskResole(this.runTask(fn,?args));
????????}
??????})
????}
????pullTask()?{
??????if?(this.queue.isEmpty())?{
????????return;
??????}
??????if?(this.size?===?0)?{
????????return;
??????}
??????this.size++;
??????const?{?resolve,?fn,?args?}?=?this.queue.shift();
??????resolve(this.runTask(fn,?args));
????}
????runTask(fn,?args)?{
??????const?result?=?Promise.resolve(fn(...args));
??????result.then(()?=>?{
????????this.size--;
????????this.pullTask();
??????}).catch(()?=>?{
????????this.size--;
????????this.pullTask();
??????})
??????return?result;
????}
??}
??TaskPool 包含三個(gè)關(guān)鍵方法:
addTask: 將新的任務(wù)放入隊(duì)列當(dāng)中,并觸發(fā)任務(wù)池狀態(tài)檢測(cè),如果當(dāng)前任務(wù)池非滿載狀態(tài),則從隊(duì)列中取出任務(wù)放入任務(wù)池中執(zhí)行。 runTask: 執(zhí)行當(dāng)前任務(wù),任務(wù)執(zhí)行完成之后,更新任務(wù)池狀態(tài),此時(shí)觸發(fā)主動(dòng)拉取新任務(wù)的機(jī)制。 pullTask: 如果當(dāng)前隊(duì)列不為空,且任務(wù)池不滿載,則主動(dòng)取出隊(duì)列中的任務(wù)執(zhí)行。

??接下來(lái),將前面示例的并發(fā)數(shù)控制為2個(gè):
??const?cc?=?new?ConcurrentControl(2);
??async?function?startConcurrentControl()?{
????console.time(CONCURRENT_CONTROL_LOG);
????await?Promise.all(taskList.map(item?=>?cc.addTask(task,?[item])))
????console.timeEnd(CONCURRENT_CONTROL_LOG);
??}
??startConcurrentControl();
??執(zhí)行流程如下:

??最終執(zhí)行任務(wù)的總耗時(shí)為 5000 毫秒。
四、高階函數(shù)優(yōu)化參數(shù)傳遞
??await?Promise.all(taskList.map(item?=>?cc.addTask(task,?[item])))
??手動(dòng)傳遞每個(gè)任務(wù)的參數(shù)的方式顯得非常繁瑣,這里可以通過「高階函數(shù)實(shí)現(xiàn)參數(shù)的自動(dòng)透?jìng)鳌?/strong>:
??addTask(fn)?{
????return?(...args)?=>?{
??????return?new?Promise((resolve)?=>?{
????????this.queue.push(new?DelayedTask(resolve,?fn,?args));
????????if?(this.size)?{
??????????this.size--;
??????????const?{?resolve:?taskResole,?fn:?taskFn,?args:?taskArgs?}?=?this.queue.shift();
??????????taskResole(this.runTask(taskFn,?taskArgs));
????????}
??????})
????}
??}
??改造之后的代碼顯得簡(jiǎn)潔了很多:
??await?Promise.all(taskList.map(cc.addTask(task)))
五、優(yōu)化出隊(duì)操作
??數(shù)組一般都是基于一塊「連續(xù)內(nèi)存」來(lái)存儲(chǔ),當(dāng)調(diào)用數(shù)組的 shift 方法時(shí),首先是刪除頭部元素(時(shí)間復(fù)雜度 O(1)),然后需要將未刪除元素左移一位(時(shí)間復(fù)雜度 O(n)),所以 shift 操作的時(shí)間復(fù)雜度為 O(n)。

??由于 JavaScript 語(yǔ)言的特性,V8 在實(shí)現(xiàn) JSArray 的時(shí)候給出了一種空間和時(shí)間權(quán)衡的解決方案,在不同的場(chǎng)景下,JSArray 會(huì)在 FixedArray 和 HashTable 兩種模式間切換。
??在 hashTable 模式下,shift 操作省去了左移的時(shí)間復(fù)雜度,其時(shí)間復(fù)雜度可以降低為 O(1),即使如此,shift 仍然是一個(gè)耗時(shí)的操作。
??在數(shù)組元素比較多且需要頻繁執(zhí)行 shift 操作的場(chǎng)景下,可以通過 「reverse + pop」 的方式優(yōu)化。
??const?Benchmark?=?require('benchmark');
??const?suite?=?new?Benchmark.Suite;
??suite.add('shift',?function()?{
????let?count?=?10;
????const?arr?=?generateArray(count);
????while?(count--)?{
??????arr.shift();
????}
??})
??.add('reverse?+?pop',?function()?{
????let?count?=?10;
????const?arr?=?generateArray(count);
????arr.reverse();
????while?(count--)?{
??????arr.pop();
????}
??})
??.on('cycle',?function(event)?{
????console.log(String(event.target));
??})
??.on('complete',?function()?{
????console.log('Fastest?is?'?+?this.filter('fastest').map('name'));
????console.log('\n')
??})
??.run({
????async:?true
??})
??通過 benchmark.js 跑出的基準(zhǔn)測(cè)試數(shù)據(jù),可以很容易地看出哪種方式的效率更高:

??回顧之前 Queue 類的實(shí)現(xiàn),由于只有一個(gè)數(shù)組來(lái)存儲(chǔ)任務(wù),直接使用 reverse + pop 的方式,必然會(huì)影響任務(wù)執(zhí)行的次序。
??這里就需要引入雙數(shù)組的設(shè)計(jì),一個(gè)數(shù)組負(fù)責(zé)入隊(duì)操作,一個(gè)數(shù)組負(fù)責(zé)出隊(duì)操作。
??class?HighPerformanceQueue?{
????constructor()?{
??????this.q1?=?[];?//?用于?push?數(shù)據(jù)
??????this.q2?=?[];?//?用于?shift?數(shù)據(jù)
????}
????push(value)?{
??????return?this.q1.push(value);
????}
????shift()?{
??????let?q2?=?this.q2;
??????if?(q2.length?===?0)?{
????????const?q1?=?this.q1;
????????if?(q1.length?===?0)?{
??????????return;
????????}
????????q2?=?this.q2?=?q1.reverse();
??????}
??????return?q2.pop();
????}
????isEmpty()?{
??????if?(this.q1.length?===?0?&&?this.q2.length?===?0)?{
????????return?true;
??????}
??????return?false;
????}
??}
??最后通過基準(zhǔn)測(cè)試來(lái)驗(yàn)證優(yōu)化的效果:

