Node.js 是如何工作的
fs.readFile(path.join(__dirname, './package.json'), (err, content) => {
if (err) {
return null;
}
console.log(content.toString());
});
简介:worker_threads
type WorkerCallback = (err: any, result?: any) => any;
export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) {
const worker = new Worker(path, { workerData });
worker.on('message', cb.bind(null, null));
worker.on('error', cb);
worker.on('exit', (exitCode) => {
if (exitCode === 0) {
return null;
}
return cb(new Error(`Worker has stopped with code ${exitCode}`));
});
return worker;
}
worker.on('error', (error) => {});
worker.on('exit', (exitCode) => {});
worker.on('online', () => {});
worker.on('message', (data) => {});
在线程之间交换数据
port.postMessage(data[, transferList])
数据参数
它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。
在线程之间共享内存
import { parentPort } from 'worker_threads';
parentPort.on('message', () => {
const numberOfElements = 100;
const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements);
const arr = new Int32Array(sharedBuffer);
for (let i = 0; i < numberOfElements; i += 1) {
arr[i] = Math.round(Math.random() * 30);
}
parentPort.postMessage({ arr });
});
import path from 'path';
import { runWorker } from '../run-worker';
const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => {
if (err) {
return null;
}
arr[0] = 5;
});
worker.postMessage({});
transferList参数
创建通信渠道
import { parentPort } from 'worker_threads';
const data = {
// ...
};
parentPort.postMessage(data);
import path from 'path';
import { Worker, MessageChannel } from 'worker_threads';
const worker = new Worker(path.join(__dirname, 'worker.js'));
const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => {
console.log('message from worker:', message);
});
worker.postMessage({ port: port2 }, [port2]);
import { parentPort, MessagePort } from 'worker_threads';
parentPort.on('message', (data) => {
const { port }: { port: MessagePort } = data;
port.postMessage('heres your message!');
});
使用 worker 的两种方式
import { parentPort } from 'worker_threads';
const collection = [];
for (let i = 0; i < 10; i += 1) {
collection[i] = i;
}
parentPort.postMessage(collection);
import { parentPort } from 'worker_threads';
parentPort.on('message', (data: any) => {
const result = doSomething(data);
parentPort.postMessage(result);
});
worker_threads 模块中可用的重要属性
isMainThread
import { isMainThread } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
workerData
const worker = new Worker(path, { workerData });
import { workerData } from 'worker_threads';
console.log(workerData.property);
parentPort
threadId
实现 setTimeout
import { parentPort, workerData } from 'worker_threads';
const time = Date.now();
while (true) {
if (time + workerData.time <= Date.now()) {
parentPort.postMessage({});
break;
}
}
const timeoutState: { [key: string]: Worker } = {};
export function setTimeout(callback: (err: any) => any, time: number) {
const id = uuidv4();
const worker = runWorker(
path.join(__dirname, './timeout-worker.js'),
(err) => {
if (!timeoutState[id]) {
return null;
}
timeoutState[id] = null;
if (err) {
return callback(err);
}
callback(null);
},
{
time,
},
);
timeoutState[id] = worker;
return id;
}
export function cancelTimeout(id: string) {
if (timeoutState[id]) {
timeoutState[id].terminate();
timeoutState[id] = undefined;
return true;
}
return false;
}
native setTimeout { ms: 7004, averageCPUCost: 0.1416 }
worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
实现工作池
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
export class WorkerPool<T, N> {
private queue: QueueItem<T, N>[] = [];
private workersById: { [key: number]: Worker } = {};
private activeWorkersById: { [key: number]: boolean } = {};
public constructor(public workerPath: string, public numberOfThreads: number) {
this.init();
}
}
type QueueCallback<N> = (err: any, result?: N) => void;
interface QueueItem<T, N> {
callback: QueueCallback<N>;
getData: () => T;
}
private init() {
if (this.numberOfThreads < 1) {
return null;
}
for (let i = 0; i < this.numberOfThreads; i += 1) {
const worker = new Worker(this.workerPath);
this.workersById[i] = worker;
this.activeWorkersById[i] = false;
}
}
public run(getData: () => T) {
return new Promise<N>((resolve, reject) => {
const availableWorkerId = this.getInactiveWorkerId();
const queueItem: QueueItem<T, N> = {
getData,
callback: (error, result) => {
if (error) {
return reject(error);
}
return resolve(result);
},
};
if (availableWorkerId === -1) {
this.queue.push(queueItem);
return null;
}
this.runWorker(availableWorkerId, queueItem);
});
}
private getInactiveWorkerId(): number {
for (let i = 0; i < this.numberOfThreads; i += 1) {
if (!this.activeWorkersById[i]) {
return i;
}
}
return -1;
}
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) {
const worker = this.workersById[workerId];
this.activeWorkersById[workerId] = true;
const messageCallback = (result: N) => {
queueItem.callback(null, result);
cleanUp();
};
const errorCallback = (error: any) => {
queueItem.callback(error);
cleanUp();
};
const cleanUp = () => {
worker.removeAllListeners('message');
worker.removeAllListeners('error');
this.activeWorkersById[workerId] = false;
if (!this.queue.length) {
return null;
}
this.runWorker(workerId, this.queue.shift());
};
worker.once('message', messageCallback);
worker.once('error', errorCallback);
worker.postMessage(await queueItem.getData());
}
import { isMainThread, parentPort } from 'worker_threads';
if (isMainThread) {
throw new Error('Its not a worker');
}
const doCalcs = (data: any) => {
const collection = [];
for (let i = 0; i < 1000000; i += 1) {
collection[i] = Math.round(Math.random() * 100000);
}
return collection.sort((a, b) => {
if (a > b) {
return 1;
}
return -1;
});
};
parentPort.on('message', (data: any) => {
const result = doCalcs(data);
parentPort.postMessage(result);
});
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8);
const items = [...new Array(100)].fill(null);
Promise.all(
items.map(async (_, i) => {
await pool.run(() => ({ i }));
console.log('finished', i);
}),
).then(() => {
console.log('finished all');
});
结论
本文首发微信公众号:jingchengyideng