Skip to content

Commit 9dec344

Browse files
committed
cluster: scheduler config option for cluster
scheduler let's use a custom scheduler function for scheduling workers.
1 parent b481bee commit 9dec344

File tree

5 files changed

+147
-21
lines changed

5 files changed

+147
-21
lines changed

doc/api/errors.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3304,6 +3304,17 @@ removed: v10.0.0
33043304

33053305
Used when a given value is out of the accepted range.
33063306

3307+
<a id="ERR_CLUSTER_INVALID_SCHEDULER"></a>
3308+
3309+
### `ERR_CLUSTER_INVALID_SCHEDULER`
3310+
3311+
<!-- YAML
3312+
added: v9.0.0
3313+
removed: v10.0.0
3314+
-->
3315+
3316+
Used when scheduler is not a function type.
3317+
33073318
<a id="ERR_VM_MODULE_NOT_LINKED"></a>
33083319

33093320
### `ERR_VM_MODULE_NOT_LINKED`

lib/internal/cluster/primary.js

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const SCHED_RR = 2;
2626
const minPort = 1024;
2727
const maxPort = 65535;
2828
const { validatePort } = require('internal/validators');
29+
const {
30+
ERR_CLUSTER_INVALID_SCHEDULER
31+
} = require('internal/errors').codes;
2932

3033
module.exports = cluster;
3134

@@ -42,6 +45,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections.
4245
let ids = 0;
4346
let debugPortOffset = 1;
4447
let initialized = false;
48+
let scheduler = RoundRobinHandle.scheduler;
4549

4650
// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
4751
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
@@ -58,6 +62,19 @@ else if (process.platform === 'win32') {
5862

5963
cluster.schedulingPolicy = schedulingPolicy;
6064

65+
function validateAndReturnScheduler(scheduler, schedulingPolicy) {
66+
if (scheduler !== undefined) {
67+
if (typeof scheduler.execute !== 'function') {
68+
throw new ERR_CLUSTER_INVALID_SCHEDULER('scheduler.execute');
69+
}
70+
return scheduler;
71+
} else if (schedulingPolicy === SCHED_RR) {
72+
return { execute: RoundRobinHandle.scheduler };
73+
} else if (schedulingPolicy !== SCHED_NONE) {
74+
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
75+
}
76+
}
77+
6178
cluster.setupPrimary = function(options) {
6279
const settings = {
6380
args: ArrayPrototypeSlice(process.argv, 2),
@@ -89,6 +106,7 @@ cluster.setupPrimary = function(options) {
89106
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
90107
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
91108

109+
scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy);
92110
process.nextTick(setupSettingsNT, settings);
93111

94112
process.on('internalMessage', (message) => {
@@ -310,7 +328,7 @@ function queryServer(worker, message) {
310328
message.addressType === 'udp6') {
311329
handle = new SharedHandle(key, address, message);
312330
} else {
313-
handle = new RoundRobinHandle(key, address, message);
331+
handle = new RoundRobinHandle(key, address, { ...message, scheduler });
314332
}
315333

316334
handles.set(key, handle);

lib/internal/cluster/round_robin_handle.js

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ const { constants } = internalBinding('tcp_wrap');
1515

1616
module.exports = RoundRobinHandle;
1717

18-
function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
18+
function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) {
1919
this.key = key;
20-
this.all = new SafeMap();
21-
this.free = new SafeMap();
20+
this.workers = new SafeMap();
2221
this.handles = init(ObjectCreate(null));
2322
this.handle = null;
23+
this.scheduler = scheduler;
2424
this.server = net.createServer(assert.fail);
2525

2626
if (fd >= 0)
@@ -45,8 +45,8 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
4545
}
4646

4747
RoundRobinHandle.prototype.add = function(worker, send) {
48-
assert(this.all.has(worker.id) === false);
49-
this.all.set(worker.id, worker);
48+
assert(this.workers.has(worker.id) === false);
49+
this.workers.set(worker.id, worker);
5050

5151
const done = () => {
5252
if (this.handle.getsockname) {
@@ -58,7 +58,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
5858
send(null, null, null); // UNIX socket.
5959
}
6060

61-
this.handoff(worker); // In case there are connections pending.
61+
this.handoff(); // In case there are connections pending.
6262
};
6363

6464
if (this.server === null)
@@ -72,14 +72,12 @@ RoundRobinHandle.prototype.add = function(worker, send) {
7272
};
7373

7474
RoundRobinHandle.prototype.remove = function(worker) {
75-
const existed = this.all.delete(worker.id);
75+
const existed = this.workers.delete(worker.id);
7676

7777
if (!existed)
7878
return false;
7979

80-
this.free.delete(worker.id);
81-
82-
if (this.all.size !== 0)
80+
if (this.workers.size !== 0)
8381
return false;
8482

8583
while (!isEmpty(this.handles)) {
@@ -95,25 +93,40 @@ RoundRobinHandle.prototype.remove = function(worker) {
9593

9694
RoundRobinHandle.prototype.distribute = function(err, handle) {
9795
append(this.handles, handle);
96+
this.handoff();
97+
};
98+
99+
RoundRobinHandle.scheduler = function(workers) {
98100
// eslint-disable-next-line node-core/no-array-destructuring
99-
const [ workerEntry ] = this.free; // this.free is a SafeMap
101+
const [ workerEntry ] = workers;
100102

101103
if (ArrayIsArray(workerEntry)) {
102104
const { 0: workerId, 1: worker } = workerEntry;
103-
this.free.delete(workerId);
104-
this.handoff(worker);
105+
workers.delete(workerId);
106+
workers.set(workerId, worker);
107+
return worker;
105108
}
106109
};
107110

108-
RoundRobinHandle.prototype.handoff = function(worker) {
109-
if (!this.all.has(worker.id)) {
110-
return; // Worker is closing (or has closed) the server.
111-
}
112-
111+
RoundRobinHandle.prototype.handoff = function() {
113112
const handle = peek(this.handles);
114113

115114
if (handle === null) {
116-
this.free.set(worker.id, worker); // Add to ready queue again.
115+
return;
116+
}
117+
118+
let socket;
119+
if (this.scheduler.exposeSocket === true) {
120+
socket = new net.Socket({
121+
handle,
122+
readable: false,
123+
writable: false,
124+
pauseOnCreate: true
125+
});
126+
}
127+
128+
const worker = this.scheduler.execute(this.workers, socket);
129+
if (typeof worker === 'undefined') {
117130
return;
118131
}
119132

@@ -127,6 +140,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
127140
else
128141
this.distribute(0, handle); // Worker is shutting down. Send to another.
129142

130-
this.handoff(worker);
143+
this.handoff();
131144
});
132145
};

lib/internal/errors.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,7 @@ E('ERR_CHILD_PROCESS_IPC_REQUIRED',
920920
Error);
921921
E('ERR_CHILD_PROCESS_STDIO_MAXBUFFER', '%s maxBuffer length exceeded',
922922
RangeError);
923+
E('ERR_CLUSTER_INVALID_SCHEDULER', '%s is not a valid function', TypeError);
923924
E('ERR_CONSOLE_WRITABLE_STREAM',
924925
'Console expects a writable stream instance for %s', TypeError);
925926
E('ERR_CONTEXT_NOT_INITIALIZED', 'context used is not initialized', Error);
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const cluster = require('cluster');
6+
const http = require('http');
7+
const net = require('net');
8+
9+
if (cluster.isMaster) {
10+
const numWorkers = 2;
11+
const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2];
12+
let index = 0;
13+
let readyCount = 0;
14+
15+
// The scheduler moves through pattern. Each request is scheduled to the
16+
// worker id specified in the current pattern index.
17+
const execute = (workers, socket) => {
18+
const id = pattern[index];
19+
const worker = workers.get(id);
20+
21+
if (id === 2) {
22+
assert.strictEqual(scheduler.exposeSocket, true);
23+
assert(socket instanceof net.Socket);
24+
} else {
25+
assert.strictEqual(scheduler.exposeSocket, false);
26+
assert.strictEqual(socket, undefined);
27+
}
28+
29+
if (worker !== undefined)
30+
index++;
31+
32+
return worker;
33+
};
34+
35+
const scheduler = { execute };
36+
37+
// Create a getter for exposeSocket. If the current item in the pattern is 2,
38+
// then expose the socket. Otherwise, hide it.
39+
Object.defineProperty(scheduler, 'exposeSocket', {
40+
get() { return pattern[index] === 2; }
41+
});
42+
43+
function onMessage(msg) {
44+
// Once both workers send a 'ready' signal, indicating that their servers
45+
// are listening, begin making HTTP requests.
46+
assert.strictEqual(msg.cmd, 'ready');
47+
readyCount++;
48+
49+
if (readyCount === numWorkers)
50+
makeRequest(0, msg.port);
51+
}
52+
53+
function makeRequest(reqCount, port) {
54+
// Make one request for each element in pattern and then shut down the
55+
// workers.
56+
if (reqCount >= pattern.length) {
57+
for (const id in cluster.workers)
58+
cluster.workers[id].disconnect();
59+
60+
return;
61+
}
62+
63+
http.get({ port }, (res) => {
64+
res.on('data', (data) => {
65+
assert.strictEqual(+data.toString(), pattern[reqCount]);
66+
reqCount++;
67+
makeRequest(reqCount, port);
68+
});
69+
});
70+
}
71+
72+
cluster.setupMaster({ scheduler });
73+
74+
for (let i = 0; i < numWorkers; i++)
75+
cluster.fork().on('message', common.mustCall(onMessage));
76+
77+
} else {
78+
const server = http.createServer((req, res) => {
79+
res.end(cluster.worker.id + '');
80+
}).listen(0, () => {
81+
process.send({ cmd: 'ready', port: server.address().port });
82+
});
83+
}

0 commit comments

Comments
 (0)