Skip to content

Instantly share code, notes, and snippets.

@likecyber
Last active July 12, 2024 10:55
Show Gist options
  • Save likecyber/21999ee4d97a697b9e2ac286378933b2 to your computer and use it in GitHub Desktop.
Save likecyber/21999ee4d97a697b9e2ac286378933b2 to your computer and use it in GitHub Desktop.
Node.js's Cluster polyfill module for Bun (shared socket is not supported, but IPC should work)
import assert from 'node:assert';
import cluster from 'node:cluster';
import { EventEmitter } from 'node:events';
import { fork } from 'node:child_process';
try {
new cluster.Worker();
} catch (err) {
if (err.code === 'ERR_NOT_IMPLEMENTED') {
const callbacks = new Map();
let seq = 0;
function sendHelper(proc, message, handle, cb) {
if (!proc.connected) {
return false;
}
message = {
cmd: 'NODE_CLUSTER',
...message,
seq
};
if (typeof cb === 'function') {
callbacks.set(seq, cb);
}
seq += 1;
return proc.send(message, handle);
}
function internal(worker, cb) {
return function (message, handle) {
if (message.cmd !== 'NODE_CLUSTER') {
return;
}
let fn = cb;
if (message.ack !== undefined) {
const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
fn.apply(worker, arguments);
};
}
const Worker = function Worker(options) {
if (!(this instanceof Worker)) {
return new Worker(options);
}
EventEmitter.apply(this, []);
if (options === null || typeof options !== 'object') {
options = {};
}
this.exitedAfterDisconnect = undefined;
this.state = options.state || 'none';
this.id = options.id | 0;
if (options.process) {
this.process = options.process;
this.process.on('error', (code, signal) => this.emit('error', code, signal));
this.process.on('message', (message, handle) => {
if (typeof message === 'object' && message.cmd === 'NODE_CLUSTER') {
this.process.emit('internalMessage', message, handle);
} else {
this.emit('message', message, handle)
}
});
}
};
Object.setPrototypeOf(Worker.prototype, EventEmitter.prototype);
Object.setPrototypeOf(Worker, EventEmitter);
Worker.prototype.kill = function () {
this.destroy.apply(this, arguments);
};
Worker.prototype.send = function () {
return this.process.send.apply(this.process, arguments);
};
Worker.prototype.isDead = function () {
return this.process.exitCode != null || this.process.signalCode != null;
};
Worker.prototype.isConnected = function () {
return this.process.connected;
};
cluster.Worker = Worker;
cluster.isPrimary = cluster.isMaster = process.env.NODE_UNIQUE_ID === undefined;
cluster.isWorker = !cluster.isPrimary;
if (cluster.isPrimary) {
let ids = 0;
let initialized = false;
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY === 'none' || process.platform === 'win32' ? cluster.SCHED_NONE : cluster.SCHED_RR;
const intercom = new EventEmitter();
function removeWorker(worker) {
assert(worker);
delete cluster.workers[worker.id];
if (Object.keys(cluster.workers).length === 0) {
intercom.emit('disconnect');
}
}
Worker.prototype.disconnect = function () {
this.exitedAfterDisconnect = true;
sendHelper(this.process, {
act: 'disconnect'
});
removeWorker(this);
return this;
};
Worker.prototype.destroy = function (signo) {
const proc = this.process;
const signal = signo || 'SIGTERM';
proc.kill(signal);
};
cluster.disconnect = function (cb) {
const workers = Object.keys(cluster.workers);
if (workers.length === 0) {
process.nextTick(() => intercom.emit('disconnect'));
} else {
for (const worker of Object.values(cluster.workers)) {
if (worker.isConnected()) {
worker.disconnect();
}
}
}
if (typeof cb === 'function') {
intercom.once('disconnect', cb);
}
};
cluster.fork = function (env) {
cluster.setupPrimary();
const id = ++ids;
const workerProcess = fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: {
...process.env,
...env,
NODE_UNIQUE_ID: id
},
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: cluster.settings.execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid,
});
const worker = new Worker({
id: id,
process: workerProcess,
});
worker.on('message', function (message, handle) {
cluster.emit('message', this, message, handle);
});
worker.process.once('exit', (exitCode, signalCode) => {
if (!worker.isConnected()) {
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'dead';
worker.emit('exit', exitCode, signalCode);
cluster.emit('exit', worker, exitCode, signalCode);
});
worker.process.once('disconnect', () => {
if (worker.isDead()) {
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = 'disconnected';
worker.emit('disconnect');
cluster.emit('disconnect', worker);
});
worker.process.on('internalMessage', internal(worker, function (message, handle) {
if (message.act === 'exitedAfterDisconnect') {
this.exitedAfterDisconnect = true;
sendHelper(this.process, {
ack: message.seq
});
} else if (message.act === 'online') {
this.state = 'online';
this.emit('online');
cluster.emit('online', this);
}
}));
process.nextTick(() => cluster.emit('fork', worker));
cluster.workers[worker.id] = worker;
return worker;
};
cluster.schedulingPolicy = schedulingPolicy;
cluster.settings = {};
cluster.setupPrimary = cluster.setupMaster = function (options) {
const settings = {
args: process.argv.slice(2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options,
};
cluster.settings = settings;
if (initialized === true) {
return process.nextTick(() => cluster.emit('setup', settings));
}
initialized = true;
schedulingPolicy = cluster.schedulingPolicy;
assert(schedulingPolicy === cluster.SCHED_NONE || schedulingPolicy === cluster.SCHED_RR, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
process.nextTick(() => cluster.emit('setup', settings));
};
delete cluster.worker;
cluster.workers = {};
} else {
function _disconnect(primaryInitiated) {
this.exitedAfterDisconnect = true;
let waitingCount = 1;
function checkWaitingCount() {
waitingCount--;
if (waitingCount === 0) {
if (primaryInitiated) {
process.disconnect();
} else {
sendHelper(process, {
act: 'exitedAfterDisconnect'
}, null, () => process.disconnect());
}
}
}
checkWaitingCount();
}
Worker.prototype.disconnect = function () {
if (this.state !== 'disconnecting' && this.state !== 'destroying') {
this.state = 'disconnecting';
_disconnect.apply(this, []);
}
return this;
};
Worker.prototype.destroy = function () {
if (this.state === 'destroying') {
return;
}
this.exitedAfterDisconnect = true;
if (!this.isConnected()) {
process.exit(0);
} else {
this.state = 'destroying';
sendHelper(process, {
act: 'exitedAfterDisconnect'
}, null, () => process.disconnect());
process.once('disconnect', () => process.exit(0));
}
};
delete cluster.disconnect;
delete cluster.fork;
delete cluster.settings;
delete cluster.setupMaster;
delete cluster.setupPrimary;
cluster.worker = null;
delete cluster.workers;
cluster._setupWorker = function() {
const worker = new Worker({
id: +process.env.NODE_UNIQUE_ID | 0,
process: process,
state: 'online',
});
cluster.worker = worker;
process.once('disconnect', () => {
worker.emit('disconnect');
if (!worker.exitedAfterDisconnect) {
process.exit(0);
}
});
process.on('internalMessage', internal(worker, function (message, handle) {
if (message.act === 'disconnect') {
_disconnect.apply(this, [true]);
}
}));
sendHelper(process, {
act: 'online'
}, null);
};
cluster._getServer = function(obj, options, cb) {
// Not Implemented
};
cluster._setupWorker();
}
delete process.env.NODE_UNIQUE_ID;
} else {
throw err;
}
}
export default cluster;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment