使用 async_hooks 模塊進行請求追蹤

來源 | https://juejin.cn/post/6922274745622724616??
async_hooks 提供了追蹤異步資源的 API,這種異步資源是具有關聯(lián)回調的對象。
簡而言之,async_hooks 模塊可以用來追蹤異步回調。那么如何使用這種追蹤能力,使用的過程中又有什么問題呢?
認識 async_hooks
v8.x.x 版本下的 async_hooks 主要有兩部分組成,一個是 createHook 用以追蹤生命周期,一個是 AsyncResource 用于創(chuàng)建異步資源。
const { createHook, AsyncResource, executionAsyncId } = require('async_hooks')const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {},before (asyncId) {},after (asyncId) {},destroy (asyncId) {}})hook.enable()function fn () {console.log(executionAsyncId())}const asyncResource = new AsyncResource('demo')asyncResource.run(fn)asyncResource.run(fn)asyncResource.emitDestroy()
創(chuàng)建一個包含在每個異步操作的 init、before、after、destroy 聲明周期執(zhí)行的鉤子函數(shù)的 hooks 實例。 啟用這個 hooks 實例。 手動創(chuàng)建一個類型為 demo 的異步資源。此時觸發(fā)了 init 鉤子,異步資源 id 為 asyncId,類型為 type(即 demo),異步資源的創(chuàng)建上下文 id 為 triggerAsyncId,異步資源為 resource。 使用此異步資源執(zhí)行 fn 函數(shù)兩次,此時會觸發(fā) before 兩次、after 兩次,異步資源 id 為 asyncId,此 asyncId 與 fn 函數(shù)內通過 executionAsyncId 取到的值相同。 手動觸發(fā) destroy 生命周期鉤子。
請求追蹤
通過 init 鉤子使得在同一條調用鏈上的異步資源共用一個存儲對象。 解析請求頭中 request-id,添加到當前異步調用鏈對應的存儲上。 改寫 http、https 模塊的 request 方法,在請求執(zhí)行時獲取當前當前的調用鏈對應存儲中的 request-id。
const http = require('http')const { createHook, executionAsyncId } = require('async_hooks')const fs = require('fs')// 追蹤調用鏈并創(chuàng)建調用鏈存儲對象const cache = {}const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {if (type === 'TickObject') return// 由于在 Node.js 中 console.log 也是異步行為,會導致觸發(fā) init 鉤子,所以我們只能通過同步方法記錄日志fs.appendFileSync('log.out', `init ${type}(${asyncId}: trigger: ${triggerAsyncId})\n`);// 判斷調用鏈存儲對象是否已經初始化if (!cache[triggerAsyncId]) {cache[triggerAsyncId] = {}}// 將父節(jié)點的存儲與當前異步資源通過引用共享cache[asyncId] = cache[triggerAsyncId]}})hook.enable()// 改寫 httpconst httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)// 獲取當前請求所屬異步資源對應存儲的 request-id 寫入 headerconst requestId = cache[executionAsyncId()].requestIdconsole.log('cache', cache[executionAsyncId()])client.setHeader('request-id', requestId)return client}function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, Math.random() * 1000)})}// 創(chuàng)建服務http.createServer(async (req, res) => {// 獲取當前請求的 request-id 寫入存儲cache[executionAsyncId()].requestId = req.headers['request-id']// 模擬一些其他耗時操作await timeout()// 發(fā)送一個請求http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()}).listen(3000)
陷阱
同時,我們也需要注意到一點,init 是異步資源創(chuàng)建的鉤子,不是異步回調函數(shù)創(chuàng)建的鉤子,只會在異步資源創(chuàng)建的時候執(zhí)行一次。
存儲初始化部分將 triggerAsyncId 保存下來,方便觀察異步調用的追蹤關系:
if (!cache[triggerAsyncId]) {cache[triggerAsyncId] = {id: triggerAsyncId}}
timeout 函數(shù)改為先進行一次長耗時再進行一次短耗時操作:
function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, [1000, 5000].pop())})}
重啟服務后,使用 postman (不用 curl 是因為 curl 每次請求結束會關閉連接,導致不能復現(xiàn))連續(xù)的發(fā)送兩次請求,可以觀察到以下輸出:
{ id: 1, requestId: '第二次請求的id' }{ id: 1, requestId: '第二次請求的id' }
即可發(fā)現(xiàn)在多并發(fā)且寫讀存儲的操作之間有耗時不固定的其他操作情況下,先到達服務器的請求存儲的值會被后到達服務器的請求執(zhí)行復寫掉,使得前一次請求讀取到錯誤的值。
當然,你可以保證在寫和讀之間不插入其他的耗時操作,但在復雜的服務中這種靠腦力維護的保障方式明顯是不可靠的。
此時,我們就需要使每次讀寫前,JS 都能進入一個全新的異步資源上下文,即獲得一個全新的 asyncId,避免這種復用。需要將調用鏈存儲的部分做以下幾方面修改:
const http = require('http')const { createHook, executionAsyncId } = require('async_hooks')const fs = require('fs')const cache = {}const httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)const requestId = cache[executionAsyncId()].requestIdconsole.log('cache', cache[executionAsyncId()])client.setHeader('request-id', requestId)return client}// 將存儲的初始化提取為一個獨立的方法async function cacheInit (callback) {// 利用 await 操作使得 await 后的代碼進入一個全新的異步上下文await Promise.resolve()cache[executionAsyncId()] = {}// 使用 callback 執(zhí)行的方式,使得后續(xù)操作都屬于這個新的異步上下文return callback()}const hook = createHook({init (asyncId, type, triggerAsyncId, resource) {if (!cache[triggerAsyncId]) {// init hook 不再進行初始化return fs.appendFileSync('log.out', `未使用 cacheInit 方法進行初始化`)}cache[asyncId] = cache[triggerAsyncId]}})hook.enable()function timeout () {return new Promise((resolve, reject) => {setTimeout(resolve, [1000, 5000].pop())})}http.createServer(async (req, res) => {// 將后續(xù)操作作為 callback 傳入 cacheInitawait cacheInit(async function fn() {cache[executionAsyncId()].requestId = req.headers['request-id']await timeout()http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()})}).listen(3000)
async function middleware (ctx, next) {await Promise.resolve()cache[executionAsyncId()] = {}return next()}
NodeJs v14
這種使用 await Promise.resolve() 創(chuàng)建全新異步上下文的方式看起來總有些 “歪門邪道” 的感覺。好在 NodeJs v9.x.x 版本中提供了創(chuàng)建異步上下文的官方實現(xiàn)方式 asyncResource.runInAsyncScope。
更好的是,NodeJs v14.x.x 版本直接提供了異步調用鏈數(shù)據(jù)存儲的官方實現(xiàn),它會直接幫你完成異步調用關系追蹤、創(chuàng)建新的異步上線文、管理數(shù)據(jù)這三項工作!API 就不再詳細介紹,我們直接使用新 API 改造之前的實現(xiàn)
const { AsyncLocalStorage } = require('async_hooks')// 直接創(chuàng)建一個 asyncLocalStorage 存儲實例,不再需要管理 async 生命周期鉤子const asyncLocalStorage = new AsyncLocalStorage()const storage = {enable (callback) {// 使用 run 方法創(chuàng)建全新的存儲,且需要讓后續(xù)操作作為 run 方法的回調執(zhí)行,以使用全新的異步資源上下文asyncLocalStorage.run({}, callback)},get (key) {return asyncLocalStorage.getStore()[key]},set (key, value) {asyncLocalStorage.getStore()[key] = value}}// 改寫 httpconst httpRequest = http.requesthttp.request = (options, callback) => {const client = httpRequest(options, callback)// 獲取異步資源存儲的 request-id 寫入 headerclient.setHeader('request-id', storage.get('requestId'))return client}// 使用http.createServer((req, res) => {storage.enable(async function () {// 獲取當前請求的 request-id 寫入存儲storage.set('requestId', req.headers['request-id'])http.request('http://www.baidu.com', (res) => {})res.write('hello\n')res.end()})}).listen(3000)
可以看到,官方實現(xiàn)的 asyncLocalStorage.run API 和我們的第二版實現(xiàn)在結構上也很一致。
于是,在 Node.js v14.x.x 版本下,使用 async_hooks 模塊進行請求追蹤的功能很輕易的就實現(xiàn)了。

