
我們都知道Node.js 採用的是單執行緒、基於事件驅動的非同步I/O 模型,其特性決定了它無法利用CPU 多核心的優勢,也不善於完成一些非I/O 類型的操作(例如執行腳本、AI 計算、圖像處理等),為了解決此類問題,Node.js 提供了常規的多進(線程)方案(關於進程、線程的討論,可參見筆者的另一篇文章Node.js 與並發模型),本文為大家介紹Node.js 的多進(線)程機制。
我們可以使用child_process模組來建立Node.js 的子進程,來完成一些特殊的任務(例如執行腳本),該模組主要提供了exec 、 execFile 、 fork 、 spwan等方法,下面我們就簡單介紹下這些方法的使用。
const { exec } = require('child_process');
exec('ls -al', (error, stdout, stderr) => {
console.log(stdout);
});此方法根據options.shell指定的可執行檔處理命令字串,在命令的執行過程中快取其輸出,直到命令執行完成後,再將執行結果以回調函數參數的形式傳回。
此方法的參數解釋如下:
command :將要執行的指令(如ls -al );
options :參數設定(可不指定),相關屬性如下:
cwd :子行程的目前工作目錄,預設取process.cwd()的值;
env :環境變數設定(為鍵值對物件),預設取process.env的值;
encoding :字元編碼,預設值為: utf8 ;
shell :處理指令字串的可執行文件, Unix上預設值為/bin/sh , Windows上預設值取process.env.ComSpec的值(如為空則為cmd.exe );例如:
const { exec } = require('child_process');
exec("print('Hello World!')", { shell: 'python' }, (error, stdout, stderr) => {
console.log(stdout);
});運行上面的範例將輸出Hello World! ,這等同於子程序執行了python -c "print('Hello World!')"命令,因此在使用該屬性時需要注意,所指定的可執行文件必須支援透過-c選項來執行相關語句。
註:碰巧Node.js也支援-c選項,但它等同於--check選項,只用來偵測指定的腳本是否有語法錯誤,並不會執行相關腳本。
signal :使用指定的AbortSignal 終止子進程,該屬性在v14.17.0 以上可用,例如:
const { exec } = require('child_process');
const ac = new AbortController();
exec('ls -al', { signal: ac.signal }, (error, stdout, stderr) => {});上例中,我們可透過呼叫ac.abort()來提前終止子程序。
timeout :子程序的逾時時間(如果該屬性的值大於0 ,那麼當子程序運行時間超過指定值時,將會給子程序發送屬性killSignal指定的終止訊號),單位毫米,預設值為0 ;
maxBuffer :stdout 或stderr 所允許的最大快取(二進位),如果超出,子程序將會被殺死,並且將會截斷任何輸出,預設值為1024 * 1024 ;
killSignal :子程序終止訊號,預設值為SIGTERM ;
uid :執行子程序的uid ;
gid :執行子程序的gid ;
windowsHide :是否隱藏子程序的控制台窗口,常用於Windows系統,預設值為false ;
callback :回呼函數,包含error 、 stdout 、 stderr三個參數:
error :如果命令列執行成功,值為null ,否則值為Error 的實例,其中error.code為子進程的退出的錯誤碼, error.signal為子進程終止的訊號;stdout和stderr :子進程的stdout和stderr ,依照encoding屬性的值編碼,如果encoding的值為buffer ,或stdout 、 stderr的值是一個無法辨識的字串,將依照buffer編碼。const { execFile } = require('child_process');
execFile('ls', ['-al'], (error, stdout, stderr) => {
console.log(stdout);
});該方法的功能類似於exec ,唯一的區別是execFile在預設情況下直接用指定的可執行檔(即參數file的值)處理命令,這使得其效率略高於exec (如果查看shell 的處理邏輯,筆者覺得這效率可忽略不計)。
此方法的參數解釋如下:
file :執行檔的名字或路徑;
args :執行檔的參數清單;
options :參數設定(可不指定),相關屬性如下:
shell :值為false時表示直接使用指定的可執行檔(即參數file的值)處理指令,值為true或其它字串時,作用等同於exec中的shell ,預設值為false ;windowsVerbatimArguments :在Windows中是否對參數進行引號或轉義處理,在Unix中將忽略該屬性,預設值為false ;cwd 、 env 、 encoding 、 timeout 、 maxBuffer 、 killSignal 、 uid 、 gid 、 windowsHide 、 signal在上文中已介紹,不再重述。callback :回呼函數,等同於exec中的callback ,此處不再闡述。
const { fork } = require('child_process');
const echo = fork('./echo.js', {
silent: true
});
echo.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
echo.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
echo.on('close', (code) => {
console.log(`child process exited with code ${code}`);
});該方法用於建立新的Node.js 實例以執行指定的Node.js 腳本,與父進程之間以IPC 方式進行通訊。
此方法的參數解釋如下:
modulePath :要執行的Node.js 腳本路徑;
args :傳遞給Node.js 腳本的參數清單;
options :參數設定(可不指定),相關屬性如:
detached :請參閱下文對spwan中options.detached的說明;
execPath :建立子程序的執行檔;
execArgv :傳遞給執行檔的字串參數列表,預設取process.execArgv的值;
serialization :進程間訊息的序號類型,可用值為json和advanced ,預設值為json ;
slient : 如果為true ,子進程的stdin 、 stdout和stderr將透過管道傳遞給父進程,否則將繼承父進程的stdin 、 stdout和stderr ;預設值為false ;
stdio :請參閱下文對spwan中options.stdio的說明。這裡要注意的是:
slient的值;ipc的選項(例如[0, 1, 2, 'ipc'] ),否則將拋出例外。屬性cwd 、 env 、 uid 、 gid 、 windowsVerbatimArguments 、 signal 、 timeout 、 killSignal在上文中已介紹,此處不再重述。
const { spawn } = require('child_process');
const ls = spawn('ls', ['-al']);
ls.stdout.on('data', (data) => {
console.log(`stdout: ${data}`);
});
ls.stderr.on('data', (data) => {
console.error(`stderr: ${data}`);
});
ls.on('close', (code) => {
console.log(`child process exited with code ${code}`);
});此方法為child_process模組的基礎方法, exec 、 execFile 、 fork最終都會呼叫spawn來建立子程序。
此方法的參數解釋如下:
command :可執行檔的名字或路徑;
args :傳遞給執行檔的參數清單;
options :參數設定(可不指定),相關屬性如下:
argv0 :傳送給子程序argv[0 ] 的值,預設取參數command的值;
detached :是否允許子進程可以獨立於父進程運行(即父進程退出後,子進程可以繼續運行),預設值為false ,其值為true時,各平台的效果如下所述:
Windows系統中,父進程退出後,子程序可以繼續運行,並且子程序擁有自己的控制台視窗(該特性一旦啟動後,在運行過程中將無法更改);Windows系統中,子進程將作為新進程會話組的組長,此刻不管子進程是否與父進程分離,子進程都可以在父進程退出後繼續運行。需要注意的是,如果子進程需要執行長時間的任務,並且想要父進程提前退出,需要同時滿足以下幾點:調用
unref方法從而將子進程從父進程的事件循環中剔除;detached設定為true ;stdio為ignore 。例如下面的例子:
// hello.js
const fs = require('fs');
let index = 0;
function run() {
setTimeout(() => {
fs.writeFileSync('./hello', `index: ${index}`);
if (index < 10) {
index += 1;
run();
}
}, 1000);
}
run();
// main.js
const { spawn } = require('child_process');
const child = spawn('node', ['./hello.js'], {
detached: true,
stdio: 'ignore'
});
child.unref();stdio :子程序標準輸入輸出配置,預設值為pipe ,值為字串或陣列:
pipe被轉換為['pipe', 'pipe', 'pipe'] ),可用值為pipe 、 overlapped 、 ignore 、 inherit ;stdin 、 stdout和stderr的配置,每一項目的可用值為pipe 、 overlapped 、 ignore 、 inherit 、 ipc 、Stream 物件、正整數(在父行程開啟的檔案描述子)、 null (如位於陣列的前三項,等同於pipe ,否則等同於ignore ) , undefined (如位於數組的前三項,等同於pipe ,否則等同於ignore )。屬性cwd 、 env 、 uid 、 gid 、 serialization 、 shell (值為boolean或string )、 windowsVerbatimArguments 、 windowsHide 、 signal 、 timeout 、 killSignal在上文中已介紹,此處不再重述。
上文對child_process模組中主要方法的使用進行了簡短介紹,由於execSync 、 execFileSync 、 forkSync 、 spwanSync方法是exec 、 execFile 、 spwan的同步版本,其參數並無任何差異,故不再重述。
透過cluster模組我們可以建立Node.js 進程集群,透過Node.js 進程進群,我們可以更加充分地利用多核心的優勢,將程式任務分發到不同的進程中以提高程式的執行效率;下面將透過範例為大家介紹cluster模組的使用:
const http = require('http');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
http.createServer((req, res) => {
res.writeHead(200);
res.end(`${process.pid}n`);
}).listen(8000);
}上例透過cluster.isPrimary屬性判斷(即判斷當前進程是否為主進程)將其分為兩個部分:
cluster.fork呼叫來建立相應數量的子進程;8000 )。運行上面的例子,並在瀏覽器中訪問http://localhost:8000/ ,我們會發現每次訪問返回的pid都不一樣,這說明了請求確實被分發到了各個子進程。 Node.js 預設採用的負載平衡策略是輪詢調度,可透過環境變數NODE_CLUSTER_SCHED_POLICY或cluster.schedulingPolicy屬性來修改其負載平衡策略:
NODE_CLUSTER_SCHED_POLICY = rr // 或none cluster.schedulingPolicy = cluster.SCHED_RR; // 或cluster.SCHED_NONE
另外需要注意的是,雖然每個子進程都創建了HTTP server,並都監聽了同一個端口,但並不代表由這些子進程自由競爭用戶請求,因為這樣無法保證所有子程序的負載達到平衡。所以正確的流程應該是由主進程監聽端口,然後將用戶請求根據分發策略轉發到具體的子進程進行處理。
由於進程之間是相互隔離的,因此進程之間一般透過共享記憶體、訊息傳遞、管道等機制進行通訊。 Node.js 則是透過消息传递來完成父子進程之間的通信,例如下面的例子:
const http = require('http');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isPrimary) {
for (let i = 0; i < numCPUs; i++) {
const worker = cluster.fork();
worker.on('message', (message) => {
console.log(`I am primary(${process.pid}), I got message from worker: "${message}"`);
worker.send(`Send message to worker`)
});
}
} else {
process.on('message', (message) => {
console.log(`I am worker(${process.pid}), I got message from primary: "${message}"`)
});
http.createServer((req, res) => {
res.writeHead(200);
res.end(`${process.pid}n`);
process.send('Send message to primary');
}).listen(8000);
}運行上面的例子,並訪問http://localhost:8000/ ,再查看終端,我們會看到類似下面的輸出:
I am primary(44460), I got message from worker: "Send message to primary" I am worker(44461), I got message from primary: "Send message to worker" I am primary(44460), I got message from worker: "Send message to primary" I am worker(44462), I got message from primary: "Send message to worker"
利用該機制,我們可以監聽各子進程的狀態,以便在某個子進程出現意外後,能夠及時對其進行幹預,以保證服務的可用性。
cluster模組的介面非常簡單,為了節省篇幅,這裡只對cluster.setupPrimary方法做一些特別聲明,其它方法請查看官方文件:
cluster.setupPrimary調用後,相關設定將同步到在cluster.settings屬性中,並且每次呼叫都基於目前cluster.settings屬性的值;cluster.setupPrimary呼叫後,對已執行的子程序沒有影響,只影響後續的cluster.fork呼叫;cluster.setupPrimary呼叫後,不影響後續傳遞給cluster.fork呼叫的env參數;cluster.setupPrimary只能在主程序中使用。前文我們對cluster模組進行了介紹,透過它我們可以創建Node.js 進程集群以提高程式的運行效率,但cluster基於多進程模型,進程間高成本的切換以及進程間資源的隔離,會隨著子進程數量的增加,容易導致因係統資源緊張而無法回應的問題。為了解決此類問題,Node.js 提供worker_threads ,以下我們透過具體的範例對此模組的使用進行簡單介紹:
// server.js
const http = require('http');
const { Worker } = require('worker_threads');
http.createServer((req, res) => {
const httpWorker = new Worker('./http_worker.js');
httpWorker.on('message', (result) => {
res.writeHead(200);
res.end(`${result}n`);
});
httpWorker.postMessage('Tom');
}).listen(8000);
// http_worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (name) => {
parentPort.postMessage(`Welcone ${name}!`);
});上例展示了worker_threads的簡單使用,在使用worker_threads的過程中,需要注意以下幾點:
透過worker_threads.Worker建立Worker 實例,其中Worker 腳本既可以為一個獨立的JavaScript文件,也可以為字符串,例如上例可修改為:
const code = "const { parentPort } = require('worker_threads'); parentPort.on('message', (name) => {parentPort.postMessage(`Welcone ${name}!` );})";
const httpWorker = new Worker(code, { eval: true });透過worker_threads.Worker建立Worker 實例時,可以透過指定workerData的值來設定Worker 子執行緒的初始元數據,例如:
// server.js
const { Worker } = require('worker_threads');
const httpWorker = new Worker('./http_worker.js', { workerData: { name: 'Tom'} });
// http_worker.js
const { workerData } = require('worker_threads');
console.log(workerData);透過worker_threads.Worker建立Worker 實例時,可透過設定SHARE_ENV以實現在Worker 子執行緒與主執行緒之間共用環境變數的需求,例如:
const { Worker, SHARE_ENV } = require('worker_threads ');
const worker = new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV });
worker.on('exit', () => {
console.log(process.env.SET_IN_WORKER);
});不同於cluster中進程間的通訊機制, worker_threads所採用的MessageChannel 來進行線程間的通訊: W
parentPort.postMessage方法傳送訊息給主線程,並透過監聽parentPort的message事件來處理來自主執行緒的訊息;httpWorker ,以下皆以此取代Worker 子執行緒)的postMessage方法傳送訊息給httpWorker ,並透過監聽httpWorker的message事件來處理來自Worker 子執行緒的訊息。在Node.js 中,無論是cluster創建的子進程,還是worker_threads創建的Worker 子線程,它們都擁有屬於自己的V8 實例以及事件循環,所不同的是:
儘管看起來Worker 子執行緒比子程序更有效率,但Worker 子執行緒也有不足的地方,即cluster提供了負載平衡,而worker_threads則需要我們自行完成負載平衡的設計與實作。
本文介紹了Node.js 中child_process 、 cluster和worker_threads三個模組的使用,透過這三個模組,我們可以充分利用CPU 多核心的優勢,並以多進(線)程的模式來高效地解決一些特殊任務(如AI、圖片處理等)的運作效率。每個模組都有其適用的場景,文中僅對其基本使用進行了說明,如何結合自己的問題進行高效地運用,還需要大家自行摸索。最後,本文若有紕漏之處,也望大家能指正,祝大家快樂編碼每一天。