Skip to content

Commit 21d893d

Browse files
committed
added test for custom scheduling and restructed
1 parent 22a734a commit 21d893d

File tree

3 files changed

+96
-3
lines changed

3 files changed

+96
-3
lines changed

lib/internal/cluster/primary.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ cluster.schedulingPolicy = schedulingPolicy;
6161

6262
function validateAndReturnScheduler(scheduler, schedulingPolicy) {
6363
if (scheduler !== undefined) {
64-
if (typeof scheduler !== 'function') {
64+
if (typeof scheduler.execute !== 'function') {
6565
throw new TypeError('scheduler must be a function');
6666
}
6767
return scheduler;
6868
} else if (schedulingPolicy === SCHED_RR) {
69-
return RoundRobinHandle.scheduler;
69+
return { execute: RoundRobinHandle.scheduler };
7070
} else if (schedulingPolicy !== SCHED_NONE) {
7171
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
7272
}

lib/internal/cluster/round_robin_handle.js

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,17 @@ RoundRobinHandle.prototype.handoff = function () {
114114
return;
115115
}
116116

117-
const worker = this.scheduler(this.workers);
117+
let socket;
118+
if (this.scheduler.exposeSocket === true) {
119+
socket = new net.Socket({
120+
handle,
121+
readable: false,
122+
writable: false,
123+
pauseOnCreate: true
124+
});
125+
}
126+
127+
const worker = this.scheduler.execute(this.workers, socket);
118128
if (typeof worker === 'undefined') {
119129
return;
120130
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const cluster = require('cluster');
6+
const http = require('http');
7+
const net = require('net');
8+
9+
if (cluster.isMaster) {
10+
const numWorkers = 2;
11+
const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2];
12+
let index = 0;
13+
let readyCount = 0;
14+
15+
// The scheduler moves through pattern. Each request is scheduled to the
16+
// worker id specified in the current pattern index.
17+
const execute = (workers, socket) => {
18+
const id = pattern[index];
19+
const worker = workers.get(id);
20+
21+
if (id === 2) {
22+
assert.strictEqual(scheduler.exposeSocket, true);
23+
assert(socket instanceof net.Socket);
24+
} else {
25+
assert.strictEqual(scheduler.exposeSocket, false);
26+
assert.strictEqual(socket, undefined);
27+
}
28+
29+
if (worker !== undefined)
30+
index++;
31+
32+
return worker;
33+
};
34+
35+
const scheduler = { execute };
36+
37+
// Create a getter for exposeSocket. If the current item in the pattern is 2,
38+
// then expose the socket. Otherwise, hide it.
39+
Object.defineProperty(scheduler, 'exposeSocket', {
40+
get() { return pattern[index] === 2; }
41+
});
42+
43+
function onMessage(msg) {
44+
// Once both workers send a 'ready' signal, indicating that their servers
45+
// are listening, begin making HTTP requests.
46+
assert.strictEqual(msg.cmd, 'ready');
47+
readyCount++;
48+
49+
if (readyCount === numWorkers)
50+
makeRequest(0, msg.port);
51+
}
52+
53+
function makeRequest(reqCount, port) {
54+
// Make one request for each element in pattern and then shut down the
55+
// workers.
56+
if (reqCount >= pattern.length) {
57+
for (const id in cluster.workers)
58+
cluster.workers[id].disconnect();
59+
60+
return;
61+
}
62+
63+
http.get({ port }, (res) => {
64+
res.on('data', (data) => {
65+
assert.strictEqual(+data.toString(), pattern[reqCount]);
66+
reqCount++;
67+
makeRequest(reqCount, port);
68+
});
69+
});
70+
}
71+
72+
cluster.setupMaster({ scheduler });
73+
74+
for (let i = 0; i < numWorkers; i++)
75+
cluster.fork().on('message', common.mustCall(onMessage));
76+
77+
} else {
78+
const server = http.createServer((req, res) => {
79+
res.end(cluster.worker.id + '');
80+
}).listen(0, () => {
81+
process.send({ cmd: 'ready', port: server.address().port });
82+
});
83+
}

0 commit comments

Comments
 (0)