Skip to content

Commit e117022

Browse files
committed
Merge 'upstream/master' into fix-hasWritePending-in-op-callback
2 parents 448535f + 68bde00 commit e117022

File tree

12 files changed

+379
-52
lines changed

12 files changed

+379
-52
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
*.swp
33
*.DS_Store
44

5+
# WebStorm
6+
.idea/
7+
58
# Emacs
69
\#*\#
710

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
language: node_js
22
node_js:
3+
- "10"
34
- "9"
45
- "8"
56
- "6"
6-
- "4"
77
script: "npm run jshint && npm run test-cover"
88
# Send coverage data to Coveralls
99
after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js"

README.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,15 @@ Register a new middleware.
123123
One of:
124124
* `'connect'`: A new client connected to the server.
125125
* `'op'`: An operation was loaded from the database.
126-
* `'doc'`: A snapshot was loaded from the database.
126+
* `'doc'`: DEPRECATED: A snapshot was loaded from the database. Please use 'readSnapshots'
127+
* `'readSnapshots'`: Snapshot(s) were loaded from the database for a fetch or subscribe of a query or document
127128
* `'query'`: A query is about to be sent to the database
128-
* `'submit'`: An operation is about to be submited to the database
129+
* `'submit'`: An operation is about to be submitted to the database
129130
* `'apply'`: An operation is about to be applied to a snapshot
130131
before being committed to the database
131132
* `'commit'`: An operation was applied to a snapshot; The operation
132133
and new snapshot are about to be written to the database.
133-
* `'after submit'`: An operation was successfully submitted to
134+
* `'afterSubmit'`: An operation was successfully submitted to
134135
the database.
135136
* `'receive'`: Received a message from a client
136137
* `fn` _(Function(request, callback))_
@@ -141,6 +142,7 @@ Register a new middleware.
141142
* `req`: The HTTP request being handled
142143
* `collection`: The collection name being handled
143144
* `id`: The document id being handled
145+
* `snapshots`: The retrieved snapshots for the `readSnapshots` action
144146
* `query`: The query object being handled
145147
* `op`: The op being handled
146148

lib/backend.js

Lines changed: 75 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,63 @@ function Backend(options) {
3232
// The number of open agents for monitoring and testing memory leaks
3333
this.agentsCount = 0;
3434
this.remoteAgentsCount = 0;
35+
36+
// The below shims are for backwards compatibility. These options will be
37+
// removed in a future major version
38+
if (!options.disableDocAction) {
39+
this._shimDocAction();
40+
}
41+
if (!options.disableSpaceDelimitedActions) {
42+
this._shimAfterSubmit();
43+
}
3544
}
3645
module.exports = Backend;
3746
emitter.mixin(Backend);
3847

48+
Backend.prototype.MIDDLEWARE_ACTIONS = {
49+
// An operation was successfully submitted to the database.
50+
afterSubmit: 'afterSubmit',
51+
// DEPRECATED: Synonym for 'afterSubmit'
52+
'after submit': 'after submit',
53+
// An operation is about to be applied to a snapshot before being committed to the database
54+
apply: 'apply',
55+
// An operation was applied to a snapshot; The operation and new snapshot are about to be written to the database.
56+
commit: 'commit',
57+
// A new client connected to the server.
58+
connect: 'connect',
59+
// DEPRECATED: A snapshot was loaded from the database.
60+
doc: 'doc',
61+
// An operation was loaded from the database
62+
op: 'op',
63+
// A query is about to be sent to the database
64+
query: 'query',
65+
// Received a message from a client
66+
receive: 'receive',
67+
// Snapshot(s) were received from the database and are about to be returned to a client
68+
readSnapshots: 'readSnapshots',
69+
// An operation is about to be submitted to the database
70+
submit: 'submit'
71+
};
72+
73+
Backend.prototype._shimDocAction = function() {
74+
var backend = this;
75+
this.use(this.MIDDLEWARE_ACTIONS.readSnapshots, function(request, callback) {
76+
async.each(request.snapshots, function(snapshot, eachCb) {
77+
var docRequest = {collection: request.collection, id: snapshot.id, snapshot: snapshot};
78+
backend.trigger(backend.MIDDLEWARE_ACTIONS.doc, request.agent, docRequest, eachCb);
79+
}, callback);
80+
});
81+
};
82+
83+
// Shim for backwards compatibility with deprecated middleware action name.
84+
// The action 'after submit' is now 'afterSubmit'.
85+
Backend.prototype._shimAfterSubmit = function() {
86+
var backend = this;
87+
this.use(backend.MIDDLEWARE_ACTIONS.afterSubmit, function(request, callback) {
88+
backend.trigger(backend.MIDDLEWARE_ACTIONS['after submit'], request.agent, request, callback);
89+
});
90+
};
91+
3992
Backend.prototype.close = function(callback) {
4093
var wait = 3;
4194
var backend = this;
@@ -82,7 +135,7 @@ Backend.prototype.connect = function(connection, req) {
82135
*/
83136
Backend.prototype.listen = function(stream, req) {
84137
var agent = new Agent(this, stream);
85-
this.trigger('connect', agent, {stream: stream, req: req}, function(err) {
138+
this.trigger(this.MIDDLEWARE_ACTIONS.connect, agent, {stream: stream, req: req}, function(err) {
86139
if (err) return agent.close(err);
87140
agent._open();
88141
});
@@ -155,11 +208,11 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) {
155208
if (err) return callback(err);
156209
var request = new SubmitRequest(this, agent, index, id, op, options);
157210
var backend = this;
158-
backend.trigger('submit', agent, request, function(err) {
211+
backend.trigger(backend.MIDDLEWARE_ACTIONS.submit, agent, request, function(err) {
159212
if (err) return callback(err);
160213
request.submit(function(err) {
161214
if (err) return callback(err);
162-
backend.trigger('after submit', agent, request, function(err) {
215+
backend.trigger(backend.MIDDLEWARE_ACTIONS.afterSubmit, agent, request, function(err) {
163216
if (err) return callback(err);
164217
backend._sanitizeOps(agent, request.projection, request.collection, id, request.ops, function(err) {
165218
if (err) return callback(err);
@@ -179,7 +232,7 @@ Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op,
179232
return callback(err);
180233
}
181234
}
182-
this.trigger('op', agent, {collection: collection, id: id, op: op}, callback);
235+
this.trigger(this.MIDDLEWARE_ACTIONS.op, agent, {collection: collection, id: id, op: op}, callback);
183236
};
184237
Backend.prototype._sanitizeOps = function(agent, projection, collection, id, ops, callback) {
185238
var backend = this;
@@ -194,33 +247,31 @@ Backend.prototype._sanitizeOpsBulk = function(agent, projection, collection, ops
194247
}, callback);
195248
};
196249

197-
Backend.prototype._sanitizeSnapshot = function(agent, projection, collection, id, snapshot, callback) {
250+
Backend.prototype._sanitizeSnapshots = function(agent, projection, collection, snapshots, callback) {
198251
if (projection) {
199252
try {
200-
projections.projectSnapshot(projection.fields, snapshot);
253+
projections.projectSnapshots(projection.fields, snapshots);
201254
} catch (err) {
202255
return callback(err);
203256
}
204257
}
205-
this.trigger('doc', agent, {collection: collection, id: id, snapshot: snapshot}, callback);
206-
};
207-
Backend.prototype._sanitizeSnapshots = function(agent, projection, collection, snapshots, callback) {
208-
var backend = this;
209-
async.each(snapshots, function(snapshot, eachCb) {
210-
backend._sanitizeSnapshot(agent, projection, collection, snapshot.id, snapshot, eachCb);
211-
}, callback);
212-
};
213-
Backend.prototype._sanitizeSnapshotBulk = function(agent, projection, collection, snapshotMap, callback) {
214-
var backend = this;
215-
async.forEachOf(snapshotMap, function(snapshot, id, eachCb) {
216-
backend._sanitizeSnapshot(agent, projection, collection, id, snapshot, eachCb);
217-
}, callback);
258+
var request = {collection: collection, snapshots: snapshots};
259+
this.trigger(this.MIDDLEWARE_ACTIONS.readSnapshots, agent, request, callback);
218260
};
219261

220262
Backend.prototype._getSnapshotProjection = function(db, projection) {
221263
return (db.projectsSnapshots) ? null : projection;
222264
};
223265

266+
Backend.prototype._getSnapshotsFromMap = function(ids, snapshotMap) {
267+
var snapshots = new Array(ids.length);
268+
for (var i = 0; i < ids.length; i++) {
269+
var id = ids[i];
270+
snapshots[i] = snapshotMap[id];
271+
}
272+
return snapshots;
273+
};
274+
224275
// Non inclusive - gets ops from [from, to). Ie, all relevant ops. If to is
225276
// not defined (null or undefined) then it returns all ops.
226277
Backend.prototype.getOps = function(agent, index, id, from, to, callback) {
@@ -283,7 +334,8 @@ Backend.prototype.fetch = function(agent, index, id, callback) {
283334
backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) {
284335
if (err) return callback(err);
285336
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
286-
backend._sanitizeSnapshot(agent, snapshotProjection, collection, id, snapshot, function(err) {
337+
var snapshots = [snapshot];
338+
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, function(err) {
287339
if (err) return callback(err);
288340
backend.emit('timing', 'fetch', Date.now() - start, request);
289341
callback(null, snapshot);
@@ -306,7 +358,8 @@ Backend.prototype.fetchBulk = function(agent, index, ids, callback) {
306358
backend.db.getSnapshotBulk(collection, ids, fields, null, function(err, snapshotMap) {
307359
if (err) return callback(err);
308360
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
309-
backend._sanitizeSnapshotBulk(agent, snapshotProjection, collection, snapshotMap, function(err) {
361+
var snapshots = backend._getSnapshotsFromMap(ids, snapshotMap);
362+
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, function(err) {
310363
if (err) return callback(err);
311364
backend.emit('timing', 'fetchBulk', Date.now() - start, request);
312365
callback(null, snapshotMap);
@@ -479,7 +532,7 @@ Backend.prototype._triggerQuery = function(agent, index, query, options, callbac
479532
snapshotProjection: null,
480533
};
481534
var backend = this;
482-
backend.trigger('query', agent, request, function(err) {
535+
backend.trigger(backend.MIDDLEWARE_ACTIONS.query, agent, request, function(err) {
483536
if (err) return callback(err);
484537
// Set the DB reference for the request after the middleware trigger so
485538
// that the db option can be changed in middleware

lib/projections.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
var json0 = require('ot-json0').type;
22

33
exports.projectSnapshot = projectSnapshot;
4+
exports.projectSnapshots = projectSnapshots;
45
exports.projectOp = projectOp;
56
exports.isSnapshotAllowed = isSnapshotAllowed;
67
exports.isOpAllowed = isOpAllowed;
@@ -15,6 +16,13 @@ function projectSnapshot(fields, snapshot) {
1516
snapshot.data = projectData(fields, snapshot.data);
1617
}
1718

19+
function projectSnapshots(fields, snapshots) {
20+
for (var i = 0; i < snapshots.length; i++) {
21+
var snapshot = snapshots[i];
22+
projectSnapshot(fields, snapshot);
23+
}
24+
}
25+
1826
function projectOp(fields, op) {
1927
if (op.create) {
2028
projectSnapshot(fields, op.create);

lib/pubsub/index.js

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
var emitter = require('../emitter');
12
var OpStream = require('../op-stream');
23
var ShareDBError = require('../error');
34
var util = require('../util');
45

56
function PubSub(options) {
7+
if (!(this instanceof PubSub)) return new PubSub(options);
8+
emitter.EventEmitter.call(this);
9+
610
this.prefix = options && options.prefix;
711
this.nextStreamId = 1;
812
this.streamsCount = 0;
@@ -13,8 +17,14 @@ function PubSub(options) {
1317
// isn't complete until the callback returns from Redis
1418
// Maps channel -> true
1519
this.subscribed = {};
20+
21+
var pubsub = this;
22+
this._defaultCallback = function(err) {
23+
if (err) return pubsub.emit('error', err);
24+
};
1625
}
1726
module.exports = PubSub;
27+
emitter.mixin(PubSub);
1828

1929
PubSub.prototype.close = function(callback) {
2030
for (var channel in this.streams) {
@@ -23,22 +33,29 @@ PubSub.prototype.close = function(callback) {
2333
map[id].destroy();
2434
}
2535
}
26-
if (callback) callback();
36+
if (callback) process.nextTick(callback);
2737
};
2838

2939
PubSub.prototype._subscribe = function(channel, callback) {
30-
callback(new ShareDBError(5015, '_subscribe PubSub method unimplemented'));
40+
process.nextTick(function() {
41+
callback(new ShareDBError(5015, '_subscribe PubSub method unimplemented'));
42+
});
3143
};
3244

3345
PubSub.prototype._unsubscribe = function(channel, callback) {
34-
callback(new ShareDBError(5016, '_unsubscribe PubSub method unimplemented'));
46+
process.nextTick(function() {
47+
callback(new ShareDBError(5016, '_unsubscribe PubSub method unimplemented'));
48+
});
3549
};
3650

3751
PubSub.prototype._publish = function(channels, data, callback) {
38-
callback(new ShareDBError(5017, '_publish PubSub method unimplemented'));
52+
process.nextTick(function() {
53+
callback(new ShareDBError(5017, '_publish PubSub method unimplemented'));
54+
});
3955
};
4056

4157
PubSub.prototype.subscribe = function(channel, callback) {
58+
if (!callback) callback = this._defaultCallback;
4259
if (this.prefix) {
4360
channel = this.prefix + ' ' + channel;
4461
}
@@ -60,6 +77,7 @@ PubSub.prototype.subscribe = function(channel, callback) {
6077
};
6178

6279
PubSub.prototype.publish = function(channels, data, callback) {
80+
if (!callback) callback = this._defaultCallback;
6381
if (this.prefix) {
6482
for (var i = 0; i < channels.length; i++) {
6583
channels[i] = this.prefix + ' ' + channels[i];
@@ -109,9 +127,7 @@ PubSub.prototype._removeStream = function(channel, stream) {
109127
// complete before we can count on being subscribed again
110128
delete this.subscribed[channel];
111129

112-
this._unsubscribe(channel, function(err) {
113-
if (err) throw err;
114-
});
130+
this._unsubscribe(channel, this._defaultCallback);
115131
};
116132

117133
function shallowCopy(object) {

lib/query-emitter.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ QueryEmitter.prototype.queryPoll = function(callback) {
185185
if (inserted.length) {
186186
emitter.db.getSnapshotBulk(emitter.collection, inserted, emitter.fields, null, function(err, snapshotMap) {
187187
if (err) return emitter._finishPoll(err, callback, pending);
188-
emitter.backend._sanitizeSnapshotBulk(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshotMap, function(err) {
188+
var snapshots = emitter.backend._getSnapshotsFromMap(inserted, snapshotMap);
189+
emitter.backend._sanitizeSnapshots(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshots, function(err) {
189190
if (err) return emitter._finishPoll(err, callback, pending);
190191
emitter._emitTiming('queryEmitter.pollGetSnapshotBulk', start);
191192
var diff = mapDiff(idsDiff, snapshotMap);
@@ -232,10 +233,10 @@ QueryEmitter.prototype.queryPollDoc = function(id, callback) {
232233
// delay in sending to the client anyway
233234
emitter.db.getSnapshot(emitter.collection, id, emitter.fields, null, function(err, snapshot) {
234235
if (err) return callback(err);
235-
emitter.backend._sanitizeSnapshot(emitter.agent, emitter.snapshotProjection, emitter.collection, id, snapshot, function(err) {
236+
var snapshots = [snapshot];
237+
emitter.backend._sanitizeSnapshots(emitter.agent, emitter.snapshotProjection, emitter.collection, snapshots, function(err) {
236238
if (err) return callback(err);
237-
var values = [snapshot];
238-
emitter.onDiff([new arraydiff.InsertDiff(index, values)]);
239+
emitter.onDiff([new arraydiff.InsertDiff(index, snapshots)]);
239240
emitter._emitTiming('queryEmitter.pollDocGetSnapshot', start);
240241
callback();
241242
});

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "sharedb",
3-
"version": "1.0.0-beta.8",
3+
"version": "1.0.0-beta.9",
44
"description": "JSON OT database backend",
55
"main": "lib/index.js",
66
"dependencies": {

0 commit comments

Comments
 (0)