diff --git a/lib/agent.js b/lib/agent.js index 3ef558362..69cd03f66 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -56,11 +56,11 @@ Agent.prototype.close = function(err) { Agent.prototype._cleanup = function() { this.closed = true; - // Clean up doc subscription streams for (var collection in this.subscribedDocs) { var docs = this.subscribedDocs[collection]; for (var id in docs) { + this.backend.notifyUnsubscribe(this.clientId, collection, id) var stream = docs[id]; stream.destroy(); } @@ -98,6 +98,17 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { console.error('Doc subscription stream error', collection, id, data.error); return; } + if (data.a === 'pr') { + // Received a presense publish from another client + if (data.src !== agent.clientId) agent.send(data); + return + } + if (data.a === 'nus') { + // Another client has unsubscribed from this document + if (data.src !== agent.clientId) agent.send(data) + return + } + if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); @@ -176,6 +187,7 @@ Agent.prototype._sendOp = function(collection, id, op) { if (op.op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = true; + if (op.pr) message.pr = op.pr this.send(message); }; @@ -299,7 +311,10 @@ Agent.prototype._handleMessage = function(request, callback) { case 'op': var op = this._createOp(request); if (!op) return callback({code: 4000, message: 'Invalid op message'}); - return this._submit(request.c, request.d, op, callback); + return this._submit(request.c, request.d, op, request.pr, callback); + case 'pr': { + return this._sendPresence(request, callback) + } case 'nf': return this._fetchSnapshot(request.c, request.d, request.v, callback); default: @@ -496,6 +511,70 @@ Agent.prototype._subscribe = function(collection, id, version, callback) { }); }; + +Agent.prototype._sendPresence = function(request, callback) { + const collection = request.c + const id = request.d + const version = request.v + const source = request.src || this.clientId + const from = version + const backend = this.backend + const agent = this.agent + // Send a special projection so that getSnapshot knows to return all fields. + // With a null projection, it strips document metadata + var fields = {$submit: true}; + + backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) { + if (err) return callback(err); + + if (version === snapshot.v) { + // Send presense to other clients + // We don't need to transform because there is no newer version of the document to transform to + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + + backend.sendPresence(message, function(err) { + if (err) return callback(err); + }); + } else { + // The version of the presence doesn't match the snapshot so fetch the ops + // and give middleware chance to transform the presence + backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { + if (err) return callback(err); + + if (ops.length !== snapshot.v - from) { + return callback(request.missingOpsError()); + } + + request.opsToSnapshot = ops + backend.trigger('transformPresence', agent, request, function(err) { + if (err) return callback(err); + + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + + // Send presense to other clients + backend.sendPresence(message, function(err) { + if (err) return callback(err); + }); + }) + }) + } + }) +} + Agent.prototype._subscribeBulk = function(collection, versions, callback) { var agent = this; this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) { @@ -516,7 +595,10 @@ Agent.prototype._unsubscribe = function(collection, id, callback) { // stream or an inflight subscribing state var docs = this.subscribedDocs[collection]; var stream = docs && docs[id]; - if (stream) stream.destroy(); + if (stream) { + stream.destroy(); + this.backend.notifyUnsubscribe(this.clientId, collection, id) + } process.nextTick(callback); }; @@ -526,14 +608,17 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { for (var i = 0; i < ids.length; i++) { var id = ids[i]; var stream = docs[id]; - if (stream) stream.destroy(); + if (stream) { + this.backend.notifyUnsubscribe(this.clientId, collection, id) + stream.destroy(); + } } process.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(collection, id, op, presence, callback) { var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops) { + this.backend.submit(this, collection, id, op, presence, null, function(err, ops) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (err) { @@ -588,3 +673,7 @@ Agent.prototype._createOp = function(request) { Agent.prototype._fetchSnapshot = function (collection, id, version, callback) { this.backend.fetchSnapshot(this, collection, id, version, callback); }; + +Agent.prototype.missingOpsError = function() { + return {code: 5019, message: 'Presence send failed. DB missing ops needed to transform it up to the current snapshot version'}; +}; \ No newline at end of file diff --git a/lib/backend.js b/lib/backend.js index 60e6d72ed..2657331ca 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -218,10 +218,10 @@ Backend.prototype.trigger = function(action, agent, request, callback) { // Submit an operation on the named collection/docname. op should contain a // {op:}, {create:} or {del:} field. It should probably contain a v: field (if // it doesn't, it defaults to the current version). -Backend.prototype.submit = function(agent, index, id, op, options, callback) { +Backend.prototype.submit = function(agent, index, id, op, presence, options, callback) { var err = ot.checkOp(op); if (err) return callback(err); - var request = new SubmitRequest(this, agent, index, id, op, options); + var request = new SubmitRequest(this, agent, index, id, op, presence, options); var backend = this; backend.trigger(backend.MIDDLEWARE_ACTIONS.submit, agent, request, function(err) { if (err) return callback(err); @@ -239,6 +239,22 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) { }); }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + +Backend.prototype.notifyUnsubscribe = function(clientId, collectionId, docId, callback) { + const message = { + a: 'nus', + c: collectionId, + d: docId, + src: clientId + } + + this.pubsub.publish([this.getDocChannel(collectionId, docId)], message, callback) +} + Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { if (projection) { try { diff --git a/lib/client/connection.js b/lib/client/connection.js index da51948be..5dc3e2324 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -250,7 +250,14 @@ Connection.prototype.handleMessage = function(message) { var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; - + case 'pr': + var doc = this.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message) + return; + case 'nus': + var doc = this.getExisting(message.c, message.d) + if (doc) doc._handleUnsubscribeNotification(err, message) + return; default: console.warn('Ignoring unrecognized message', message); } @@ -418,9 +425,21 @@ Connection.prototype.sendOp = function(doc, op) { if (op.op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = op.del; + if (op.pr) message.pr = op.pr this.send(message); }; +Connection.prototype.sendPresence = function(doc, presence) { + var message = { + a: 'pr', + c: doc.collection, + d: doc.id, + v: doc.version, + src: this.id, + pr: presence + }; + this.send(message); +} /** * Sends a message down the socket diff --git a/lib/client/doc.js b/lib/client/doc.js index 71f4e2041..ed1185302 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -32,10 +32,12 @@ var types = require('../types'); * ------ * * You can use doc.on(eventName, callback) to subscribe to the following events: - * - `before op (op, source)` Fired before a partial operation is applied to the data. + * - `before op (op, source, presence)` Fired before a partial operation is applied to the data. * It may be used to read the old data just before applying an operation - * - `op (op, source)` Fired after every partial operation with this operation as the + * - `op (op, source, presence)` Fired after every partial operation with this operation as the * first argument + * - `on presence (presence)` Fired when presence information is received independent of an op + * - `unsub notification (clientId)` Fired when another client unsubscribes from this document * - `create (source)` The document was created. That means its type was * set and it has some initial data. * - `del (data, source)` Fired after the document is deleted, that is @@ -62,6 +64,9 @@ function Doc(connection, collection, id) { this.inflightSubscribe = []; this.inflightUnsubscribe = []; this.pendingFetch = []; + this.pendingPresence = null + this.presenceTransformer = null + // Whether we think we are subscribed on the server. Synchronously set to // false on calls to unsubscribe and disconnect. Should never be true when @@ -274,6 +279,36 @@ Doc.prototype._handleUnsubscribe = function(err) { this._emitNothingPending(); }; +Doc.prototype._handlePresence = function(err, message) { + if (err) { + return this.emit('error', err); + } + + const presence = { + ...message.pr, + src: message.src + } + + this.emit('on presence', this._transformPresence(presence)) +} + +Doc.prototype._handleUnsubscribeNotification = function(err, message) { + if (err) { + return this.emit('error', err) + } + + this.emit('unsub notification', message.src) +} + +Doc.prototype._transformPresence = function(presence) { + const ops = [this.inflightOp, ...this.pendingOps].filter(Boolean) + if (ops.length && this.presenceTransformer) { + return this.presenceTransformer(presence, ops) + } else { + return presence + } +} + Doc.prototype._handleOp = function(err, message) { if (err) { if (this.inflightOp) { @@ -530,32 +565,39 @@ Doc.prototype._otApply = function(op, source) { // that the snapshot only include updates from the particular op component // at the time of emission. Eliminating this would require rethinking how // such external bindings are implemented. - if (!source && this.type === types.defaultType && op.op.length > 1) { - if (!this.applyStack) this.applyStack = []; - var stackLength = this.applyStack.length; - for (var i = 0; i < op.op.length; i++) { - var component = op.op[i]; - var componentOp = {op: [component]}; - // Transform componentOp against any ops that have been submitted - // sychronously inside of an op event handler since we began apply of - // our operation - for (var j = stackLength; j < this.applyStack.length; j++) { - var transformErr = transformX(this.applyStack[j], componentOp); - if (transformErr) return this._hardRollback(transformErr); - } - // Apply the individual op component - this.emit('before op', componentOp.op, source); - this.data = this.type.apply(this.data, componentOp.op); - this.emit('op', componentOp.op, source); - } - // Pop whatever was submitted since we started applying this op - this._popApplyStack(stackLength); - return; - } + // if (!source && this.type === types.defaultType && op.op.length > 1) { + // if (!this.applyStack) this.applyStack = []; + // var stackLength = this.applyStack.length; + // for (var i = 0; i < op.op.length; i++) { + // var component = op.op[i]; + // var componentOp = {op: [component]}; + // // Transform componentOp against any ops that have been submitted + // // sychronously inside of an op event handler since we began apply of + // // our operation + // for (var j = stackLength; j < this.applyStack.length; j++) { + // var transformErr = transformX(this.applyStack[j], componentOp); + // if (transformErr) return this._hardRollback(transformErr); + // } + // // Apply the individual op component + // this.emit('before op', componentOp.op, source); + // this.data = this.type.apply(this.data, componentOp.op); + // this.emit('op', componentOp.op, source); + // } + // // Pop whatever was submitted since we started applying this op + // this._popApplyStack(stackLength); + // return; + // } + + // Transform the presence only if it's a remote op + const presence = op.pr && !source ? this._transformPresence(op.pr) : undefined + const presenceWithSource = presence ? { + ...presence, + src: op.src + } : undefined // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed - this.emit('before op', op.op, source); + this.emit('before op', op.op, source, presenceWithSource); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); // Emit an 'op' event once the local data includes the changes from the @@ -563,7 +605,7 @@ Doc.prototype._otApply = function(op, source) { // submission and before the server or other clients have received the op. // For ops from other clients, this will be after the op has been // committed to the database and published - this.emit('op', op.op, source); + this.emit('op', op.op, source, presenceWithSource); return; } @@ -733,6 +775,10 @@ Doc.prototype._tryCompose = function(op) { // support compose must be able to compose any two ops together if (last.op && op.op && this.type.compose) { last.op = this.type.compose(last.op, op.op); + // Use the most recent presence data for the op + if (op.pr) { + last.pr = op.pr + } return last; } }; @@ -752,10 +798,49 @@ Doc.prototype.submitOp = function(component, options, callback) { options = null; } var op = {op: component}; + // Clear any pending presence since we'll be sending the latest presence along with the op + this.pendingPresence = null + // Append any transient presence data to the top level op + if (options && options.pr) { + op.pr = options.pr + } + var source = options && options.source; this._submit(op, source, callback); }; +Doc.prototype.sendPresence = function(presence) { + if (this.pendingOps.length) { + // If there are pending ops, attach the updated presence to the most recent pending op + const lastOp = this.pendingOps[this.pendingOps.length - 1] + lastOp.pr = presence + this.pendingPresence = null + } else if (!this.hasPending()) { + // If there is nothing pending (no inflight or pending ops), then broadcast the presence + this.pendingPresence = null + this.connection.sendPresence(this, presence) + } else { + // If there are inflight ops, wait till they are acknowledged before sending presence + this.pendingPresence = presence + this.whenNothingPending(() => { + if (this.pendingPresence) { + // Send pendingPresence if still necessary + // This might not be necessary if an op has been submitted with newer presence before the + // 'whenNothingPending' callback has fired + this.connection.sendPresence(this, this.pendingPresence) + this.pendingPresence = null + } + }) + } +}; + +// Set a function (presence, ops) => presence that transforms +// the provided presence by the provided ops +// This is needed when you receive presence while there are pending ops +Doc.prototype.setPresenceTransformer = function(presenceTransformer) { + this.presenceTransformer = presenceTransformer +} + // Create the document, which in ShareJS semantics means to set its type. Every // object implicitly exists in the database but has no data and no type. Create // sets the type of the object and can optionally set some initial data on the @@ -851,26 +936,31 @@ Doc.prototype._rollback = function(err) { // working state, then call back var op = this.inflightOp; - if (op.op && op.type.invert) { - op.op = op.type.invert(op.op); + /** + * 'json0' is technically an invertible type but we register 'rich-text' as a subtype + * Since 'rich-text' is not invertible, it makes the parent type not invertible as well + */ - // Transform the undo operation by any pending ops. - for (var i = 0; i < this.pendingOps.length; i++) { - var transformErr = transformX(this.pendingOps[i], op); - if (transformErr) return this._hardRollback(transformErr); - } + // if (op.op && op.type.invert) { + // op.op = op.type.invert(op.op); - // ... and apply it locally, reverting the changes. - // - // This operation is applied to look like it comes from a remote source. - // I'm still not 100% sure about this functionality, because its really a - // local op. Basically, the problem is that if the client's op is rejected - // by the server, the editor window should update to reflect the undo. - this._otApply(op, false); + // // Transform the undo operation by any pending ops. + // for (var i = 0; i < this.pendingOps.length; i++) { + // var transformErr = transformX(this.pendingOps[i], op); + // if (transformErr) return this._hardRollback(transformErr); + // } - this._clearInflightOp(err); - return; - } + // // ... and apply it locally, reverting the changes. + // // + // // This operation is applied to look like it comes from a remote source. + // // I'm still not 100% sure about this functionality, because its really a + // // local op. Basically, the problem is that if the client's op is rejected + // // by the server, the editor window should update to reflect the undo. + // this._otApply(op, false); + + // this._clearInflightOp(err); + // return; + // } this._hardRollback(err); }; @@ -883,6 +973,7 @@ Doc.prototype._hardRollback = function(err) { this.version = null; this.inflightOp = null; this.pendingOps = []; + this.pendingPresence = null // Fetch the latest from the server to get us back into a working state var doc = this; diff --git a/lib/submit-request.js b/lib/submit-request.js index 5ff3d1997..36a9f141c 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -1,7 +1,7 @@ var ot = require('./ot'); var projections = require('./projections'); -function SubmitRequest(backend, agent, index, id, op, options) { +function SubmitRequest(backend, agent, index, id, op, presence, options) { this.backend = backend; this.agent = agent; // If a projection, rewrite the call into a call against the collection @@ -12,6 +12,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.id = id; this.op = op; this.options = options; + this.presence = presence this.start = Date.now(); this._addOpMeta(); @@ -35,6 +36,7 @@ module.exports = SubmitRequest; SubmitRequest.prototype.submit = function(callback) { var request = this; var backend = this.backend; + const agent = this.agent var collection = this.collection; var id = this.id; var op = this.op; @@ -98,16 +100,21 @@ SubmitRequest.prototype.submit = function(callback) { return callback(request.missingOpsError()); } - err = request._transformOp(ops); - if (err) return callback(err); + request.opsToSnapshot = ops + backend.trigger('transformOp', agent, request, function(err) { + if (err) return callback(err); - if (op.v !== snapshot.v) { - // This shouldn't happen, but is just a final sanity check to make - // sure we have transformed the op to the current snapshot version - return callback(request.versionAfterTransformError()); - } + err = request._transformOp(ops); + if (err) return callback(err); - request.apply(callback); + if (op.v !== snapshot.v) { + // This shouldn't happen, but is just a final sanity check to make + // sure we have transformed the op to the current snapshot version + return callback(request.versionAfterTransformError()); + } + + request.apply(callback); + }) }); }); }; @@ -143,31 +150,35 @@ SubmitRequest.prototype.commit = function(callback) { // Try committing the operation and snapshot to the database atomically backend.db.commit(request.collection, request.id, request.op, request.snapshot, request.options, function(err, succeeded) { - if (err) return callback(err); - if (!succeeded) { + if (err || !succeeded) { // Between our fetch and our call to commit, another client committed an // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); + return request.retry(callback, err); } if (!request.suppressPublish) { var op = request.op; op.c = request.collection; op.d = request.id; op.m = undefined; + if (request.presence) op.pr = request.presence // Needed for agent to detect if it can ignore sending the op back to // the client that submitted it in subscriptions if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); + + backend.trigger('publish', this.agent, request, function(err) { + if (err) return callback(err); + backend.pubsub.publish(request.channels, op); + }) } callback(); }); }); }; -SubmitRequest.prototype.retry = function(callback) { +SubmitRequest.prototype.retry = function(callback, err) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { - return callback(this.maxRetriesError()); + return callback(this.maxRetriesError(err)); } this.backend.emit('timing', 'submit.retry', Date.now() - this.start, this); this.submit(callback); @@ -243,6 +254,10 @@ SubmitRequest.prototype.versionDuringTransformError = function() { SubmitRequest.prototype.versionAfterTransformError = function() { return {code: 5003, message: 'Op submit failed. Op version mismatches snapshot after op transform'}; }; -SubmitRequest.prototype.maxRetriesError = function() { - return {code: 5004, message: 'Op submit failed. Maximum submit retries exceeded'}; +SubmitRequest.prototype.maxRetriesError = function(err) { + if (err) { + return {code: 5004, message: `Op submit failed. Maximum submit retries exceeded. Attempt failed with following error ${err}`}; + } else { + return {code: 5004, message: 'Op submit failed. Maximum submit retries exceeded'}; + } }; diff --git a/package.json b/package.json index a49f0a088..edeb46402 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "deep-is": "^0.1.3", "hat": "0.0.3", "make-error": "^1.1.1", - "ot-json0": "^1.0.1" + "ot-json0": "^1.1.0" }, "devDependencies": { "coveralls": "^2.11.8",