Skip to content

Commit 286733a

Browse files
committed
add: scheduler config option for cluster
scheduler let's use a custom scheduler function for scheduling workers.
1 parent 457567f commit 286733a

File tree

3 files changed

+40
-17
lines changed

3 files changed

+40
-17
lines changed

lib/internal/cluster/primary.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections.
4242
let ids = 0;
4343
let debugPortOffset = 1;
4444
let initialized = false;
45+
let scheduler = RoundRobinHandle.scheduler;
4546

4647
// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
4748
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
@@ -58,6 +59,19 @@ else if (process.platform === 'win32') {
5859

5960
cluster.schedulingPolicy = schedulingPolicy;
6061

62+
function validateAndReturnScheduler(scheduler, schedulingPolicy) {
63+
if (scheduler !== undefined) {
64+
if (typeof scheduler !== 'function') {
65+
throw new TypeError('scheduler must be a function');
66+
}
67+
return scheduler;
68+
} else if (schedulingPolicy === SCHED_RR) {
69+
return RoundRobinHandle.scheduler;
70+
} else if (schedulingPolicy !== SCHED_NONE) {
71+
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
72+
}
73+
}
74+
6175
cluster.setupPrimary = function(options) {
6276
const settings = {
6377
args: ArrayPrototypeSlice(process.argv, 2),
@@ -89,6 +103,7 @@ cluster.setupPrimary = function(options) {
89103
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
90104
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
91105

106+
scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy);
92107
process.nextTick(setupSettingsNT, settings);
93108

94109
process.on('internalMessage', (message) => {
@@ -310,6 +325,8 @@ function queryServer(worker, message) {
310325
message.addressType === 'udp6') {
311326
handle = new SharedHandle(key, address, message);
312327
} else {
328+
// FIXME: Better structure to code
329+
message.scheduler = scheduler;
313330
handle = new RoundRobinHandle(key, address, message);
314331
}
315332

lib/internal/cluster/round_robin_handle.js

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,13 @@ const { constants } = internalBinding('tcp_wrap');
1515

1616
module.exports = RoundRobinHandle;
1717

18-
function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
18+
function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) {
1919
this.key = key;
2020
this.all = new SafeMap();
21-
this.free = new SafeMap();
21+
this.workers = new SafeMap();
2222
this.handles = init(ObjectCreate(null));
2323
this.handle = null;
24+
this.scheduler = scheduler;
2425
this.server = net.createServer(assert.fail);
2526

2627
if (fd >= 0)
@@ -47,6 +48,7 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
4748
RoundRobinHandle.prototype.add = function(worker, send) {
4849
assert(this.all.has(worker.id) === false);
4950
this.all.set(worker.id, worker);
51+
this.workers.set(worker.id, worker);
5052

5153
const done = () => {
5254
if (this.handle.getsockname) {
@@ -58,7 +60,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
5860
send(null, null, null); // UNIX socket.
5961
}
6062

61-
this.handoff(worker); // In case there are connections pending.
63+
this.handoff(); // In case there are connections pending.
6264
};
6365

6466
if (this.server === null)
@@ -77,7 +79,7 @@ RoundRobinHandle.prototype.remove = function(worker) {
7779
if (!existed)
7880
return false;
7981

80-
this.free.delete(worker.id);
82+
this.workers.delete(worker.id);
8183

8284
if (this.all.size !== 0)
8385
return false;
@@ -95,28 +97,32 @@ RoundRobinHandle.prototype.remove = function(worker) {
9597

9698
RoundRobinHandle.prototype.distribute = function(err, handle) {
9799
append(this.handles, handle);
98-
// eslint-disable-next-line node-core/no-array-destructuring
99-
const [ workerEntry ] = this.free; // this.free is a SafeMap
100+
this.handoff();
101+
};
102+
103+
RoundRobinHandle.scheduler = function(workers) {
104+
const [ workerEntry ] = workers;
100105

101106
if (ArrayIsArray(workerEntry)) {
102107
const { 0: workerId, 1: worker } = workerEntry;
103-
this.free.delete(workerId);
104-
this.handoff(worker);
108+
workers.delete(workerId);
109+
workers.set(workerId, worker)
110+
return worker;
105111
}
106112
};
107113

108-
RoundRobinHandle.prototype.handoff = function(worker) {
109-
if (!this.all.has(worker.id)) {
110-
return; // Worker is closing (or has closed) the server.
111-
}
112-
114+
RoundRobinHandle.prototype.handoff = function () {
113115
const handle = peek(this.handles);
114116

115117
if (handle === null) {
116-
this.free.set(worker.id, worker); // Add to ready queue again.
117118
return;
118119
}
119120

121+
const worker = this.scheduler(this.workers);
122+
if (typeof worker === 'undefined' || !this.all.has(worker.id)) {
123+
return; // Worker is closing (or has closed) the server.
124+
}
125+
120126
remove(handle);
121127

122128
const message = { act: 'newconn', key: this.key };
@@ -127,6 +133,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
127133
else
128134
this.distribute(0, handle); // Worker is shutting down. Send to another.
129135

130-
this.handoff(worker);
136+
this.handoff();
131137
});
132138
};

lib/internal/cluster/worker.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ function Worker(options) {
2929
this.process.on('error', (code, signal) =>
3030
this.emit('error', code, signal)
3131
);
32-
this.process.on('message', (message, handle) =>
32+
this.process.on('message', (message, handle) => {
3333
this.emit('message', message, handle)
34-
);
34+
});
3535
}
3636
}
3737

0 commit comments

Comments
 (0)