From c89c8d98b252371a7cbf2e058755b8223fa00a65 Mon Sep 17 00:00:00 2001 From: Jeremy Gordon Date: Mon, 13 Aug 2018 16:59:34 -0700 Subject: [PATCH 01/31] Removing the effective 'shattering' of remotely received ops. --- lib/client/doc.js | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 71f4e2041..2415988ba 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -530,28 +530,28 @@ Doc.prototype._otApply = function(op, source) { // that the snapshot only include updates from the particular op component // at the time of emission. Eliminating this would require rethinking how // such external bindings are implemented. - if (!source && this.type === types.defaultType && op.op.length > 1) { - if (!this.applyStack) this.applyStack = []; - var stackLength = this.applyStack.length; - for (var i = 0; i < op.op.length; i++) { - var component = op.op[i]; - var componentOp = {op: [component]}; - // Transform componentOp against any ops that have been submitted - // sychronously inside of an op event handler since we began apply of - // our operation - for (var j = stackLength; j < this.applyStack.length; j++) { - var transformErr = transformX(this.applyStack[j], componentOp); - if (transformErr) return this._hardRollback(transformErr); - } - // Apply the individual op component - this.emit('before op', componentOp.op, source); - this.data = this.type.apply(this.data, componentOp.op); - this.emit('op', componentOp.op, source); - } - // Pop whatever was submitted since we started applying this op - this._popApplyStack(stackLength); - return; - } + // if (!source && this.type === types.defaultType && op.op.length > 1) { + // if (!this.applyStack) this.applyStack = []; + // var stackLength = this.applyStack.length; + // for (var i = 0; i < op.op.length; i++) { + // var component = op.op[i]; + // var componentOp = {op: [component]}; + // // Transform componentOp against any ops that have been submitted + // // sychronously inside of an op event handler since we began apply of + // // our operation + // for (var j = stackLength; j < this.applyStack.length; j++) { + // var transformErr = transformX(this.applyStack[j], componentOp); + // if (transformErr) return this._hardRollback(transformErr); + // } + // // Apply the individual op component + // this.emit('before op', componentOp.op, source); + // this.data = this.type.apply(this.data, componentOp.op); + // this.emit('op', componentOp.op, source); + // } + // // Pop whatever was submitted since we started applying this op + // this._popApplyStack(stackLength); + // return; + // } // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed From f191eefe5ce409dfdfbaefe0dc36d46ac89198f6 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Wed, 29 Aug 2018 11:59:56 -0700 Subject: [PATCH 02/31] Add 'publish' hook to sharedb to allow modifying the op before it is transmitted to all listeners --- lib/submit-request.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/submit-request.js b/lib/submit-request.js index 5ff3d1997..0da36a1cf 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -157,7 +157,11 @@ SubmitRequest.prototype.commit = function(callback) { // Needed for agent to detect if it can ignore sending the op back to // the client that submitted it in subscriptions if (request.collection !== request.index) op.i = request.index; - backend.pubsub.publish(request.channels, op); + + backend.trigger('publish', this.agent, this, function(err) { + if (err) return callback(err); + backend.pubsub.publish(request.channels, op); + }) } callback(); }); From e56e0e3edee9231eaae34df95109ff244d456bdb Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Wed, 29 Aug 2018 13:48:55 -0700 Subject: [PATCH 03/31] pass correct context to publish trigger --- lib/submit-request.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/submit-request.js b/lib/submit-request.js index 0da36a1cf..7570814d6 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -158,7 +158,7 @@ SubmitRequest.prototype.commit = function(callback) { // the client that submitted it in subscriptions if (request.collection !== request.index) op.i = request.index; - backend.trigger('publish', this.agent, this, function(err) { + backend.trigger('publish', this.agent, request, function(err) { if (err) return callback(err); backend.pubsub.publish(request.channels, op); }) From 894bf7e8a9786c7418961b9c18cdf7cf3519c6a3 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Thu, 30 Aug 2018 14:35:24 -0700 Subject: [PATCH 04/31] prevent invert attempt on error since 'rich-text' subtype does not support it --- lib/client/doc.js | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 2415988ba..8dbea10d8 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -851,26 +851,31 @@ Doc.prototype._rollback = function(err) { // working state, then call back var op = this.inflightOp; - if (op.op && op.type.invert) { - op.op = op.type.invert(op.op); - - // Transform the undo operation by any pending ops. - for (var i = 0; i < this.pendingOps.length; i++) { - var transformErr = transformX(this.pendingOps[i], op); - if (transformErr) return this._hardRollback(transformErr); - } - - // ... and apply it locally, reverting the changes. - // - // This operation is applied to look like it comes from a remote source. - // I'm still not 100% sure about this functionality, because its really a - // local op. Basically, the problem is that if the client's op is rejected - // by the server, the editor window should update to reflect the undo. - this._otApply(op, false); - - this._clearInflightOp(err); - return; - } + /** + * 'json0' is technically an invertible type but we register 'rich-text' as a subtype + * Since 'rich-text' is not invertible, it makes the parent type not invertible as well + */ + + // if (op.op && op.type.invert) { + // op.op = op.type.invert(op.op); + + // // Transform the undo operation by any pending ops. + // for (var i = 0; i < this.pendingOps.length; i++) { + // var transformErr = transformX(this.pendingOps[i], op); + // if (transformErr) return this._hardRollback(transformErr); + // } + + // // ... and apply it locally, reverting the changes. + // // + // // This operation is applied to look like it comes from a remote source. + // // I'm still not 100% sure about this functionality, because its really a + // // local op. Basically, the problem is that if the client's op is rejected + // // by the server, the editor window should update to reflect the undo. + // this._otApply(op, false); + + // this._clearInflightOp(err); + // return; + // } this._hardRollback(err); }; From c2d8b685beae524ab71431900de0a85558e09afc Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Thu, 30 Aug 2018 15:13:05 -0700 Subject: [PATCH 05/31] use newer json0 in sharedb fork --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index a49f0a088..ed1344756 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "deep-is": "^0.1.3", "hat": "0.0.3", "make-error": "^1.1.1", - "ot-json0": "^1.0.1" + "ot-json0": "ottypes/json0#9df44f0c7958d2c98164e0682ef52f53e886ac45" }, "devDependencies": { "coveralls": "^2.11.8", From f749c99617858db1ad89dcec0fa4aec590b599ee Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Thu, 30 Aug 2018 15:20:40 -0700 Subject: [PATCH 06/31] use 1.1.0 json0 explicitly instead of referring to github commit --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index ed1344756..edeb46402 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ "deep-is": "^0.1.3", "hat": "0.0.3", "make-error": "^1.1.1", - "ot-json0": "ottypes/json0#9df44f0c7958d2c98164e0682ef52f53e886ac45" + "ot-json0": "^1.1.0" }, "devDependencies": { "coveralls": "^2.11.8", From 212873b70f2a25eefe3b10a28dd1a87d4ed38315 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Sun, 24 Mar 2019 23:52:57 -0700 Subject: [PATCH 07/31] prelim add presense transmit --- lib/agent.js | 47 +++++++++++++++++++++++++++++++++++++++++++ lib/backend.js | 5 +++++ lib/client/doc.js | 5 +++++ lib/submit-request.js | 21 +++++++++++-------- 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 3ef558362..9f1754edf 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -98,6 +98,12 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { console.error('Doc subscription stream error', collection, id, data.error); return; } + if (data.a === 'pr') { + // Received a presense publish from another client + if (data.src !== agent.clientId) agent.send(data); + return + } + if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); @@ -300,6 +306,9 @@ Agent.prototype._handleMessage = function(request, callback) { var op = this._createOp(request); if (!op) return callback({code: 4000, message: 'Invalid op message'}); return this._submit(request.c, request.d, op, callback); + case 'pr': { + return this._sendPresence(request, callback) + } case 'nf': return this._fetchSnapshot(request.c, request.d, request.v, callback); default: @@ -496,6 +505,40 @@ Agent.prototype._subscribe = function(collection, id, version, callback) { }); }; +Agent.prototype._sendPresence = function(request, callback) { + const collection = request.c + const id = request.d + const version = request.v + const source = request.src || this.clientId + const from = version + this.backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { + if (err) return callback(err); + + if (ops.length !== snapshot.v - from) { + return callback(request.missingOpsError()); + } + + this.opsToSnapshot = ops + backend.trigger('transformPresence', this.agent, request, function(err) { + if (err) return callback(err); + + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + + // Send presense to other clients + this.backend.sendPresence(message, function(err) { + if (err) return callback(err); + }); + }) + }) +} + Agent.prototype._subscribeBulk = function(collection, versions, callback) { var agent = this; this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap) { @@ -588,3 +631,7 @@ Agent.prototype._createOp = function(request) { Agent.prototype._fetchSnapshot = function (collection, id, version, callback) { this.backend.fetchSnapshot(this, collection, id, version, callback); }; + +Agent.prototype.missingOpsError = function() { + return {code: 5019, message: 'Presence send failed. DB missing ops needed to transform it up to the current snapshot version'}; +}; \ No newline at end of file diff --git a/lib/backend.js b/lib/backend.js index 60e6d72ed..3ccaf7fd3 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -239,6 +239,11 @@ Backend.prototype.submit = function(agent, index, id, op, options, callback) { }); }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { if (projection) { try { diff --git a/lib/client/doc.js b/lib/client/doc.js index 8dbea10d8..dfa464a65 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -752,6 +752,11 @@ Doc.prototype.submitOp = function(component, options, callback) { options = null; } var op = {op: component}; + // Append any transient presence data to the top level op + if (options && options.pr) { + op.pr = options.pr + } + var source = options && options.source; this._submit(op, source, callback); }; diff --git a/lib/submit-request.js b/lib/submit-request.js index 7570814d6..af5c92d06 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -98,16 +98,21 @@ SubmitRequest.prototype.submit = function(callback) { return callback(request.missingOpsError()); } - err = request._transformOp(ops); - if (err) return callback(err); + this.opsToSnapshot = ops + backend.trigger('transformOp', this.agent, this, function(err) { + if (err) return callback(err); - if (op.v !== snapshot.v) { - // This shouldn't happen, but is just a final sanity check to make - // sure we have transformed the op to the current snapshot version - return callback(request.versionAfterTransformError()); - } + err = request._transformOp(ops); + if (err) return callback(err); + + if (op.v !== snapshot.v) { + // This shouldn't happen, but is just a final sanity check to make + // sure we have transformed the op to the current snapshot version + return callback(request.versionAfterTransformError()); + } - request.apply(callback); + request.apply(callback); + }) }); }); }; From 1b908a443f00a7e45767274488bd1b024c7eceb5 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 11:20:06 -0700 Subject: [PATCH 08/31] update connection.js to transmit presence alongside op --- lib/client/connection.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/client/connection.js b/lib/client/connection.js index da51948be..f5d21fcfe 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -418,6 +418,7 @@ Connection.prototype.sendOp = function(doc, op) { if (op.op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = op.del; + if (op.pr) message.pr = op.pr this.send(message); }; From 2387957b7d50cb655c143d8532046e3173d3c494 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 13:19:57 -0700 Subject: [PATCH 09/31] Add sendPresence method to client --- lib/client/connection.js | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/lib/client/connection.js b/lib/client/connection.js index f5d21fcfe..a49659f67 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -422,6 +422,17 @@ Connection.prototype.sendOp = function(doc, op) { this.send(message); }; +Connection.prototype.sendPresence = function(doc, presence) { + var message = { + a: 'pr', + c: doc.collection, + d: doc.id, + v: doc.version, + src: op.src, + pr: presence + }; + this.send(message); +} /** * Sends a message down the socket From 32641bbeaafcbd93cfc9c15439863e044660a2cc Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 14:12:06 -0700 Subject: [PATCH 10/31] add sendPresence to client/doc --- lib/client/doc.js | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/client/doc.js b/lib/client/doc.js index dfa464a65..0e4fdc848 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -761,6 +761,11 @@ Doc.prototype.submitOp = function(component, options, callback) { this._submit(op, source, callback); }; +Doc.prototype.sendPresence = function(presence) { + this.connection.sendPresence(this, presence) +}; + + // Create the document, which in ShareJS semantics means to set its type. Every // object implicitly exists in the database but has no data and no type. Create // sets the type of the object and can optionally set some initial data on the From 9b40a5208ff42fddf5cc8268ffd0e786975635e3 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 14:22:28 -0700 Subject: [PATCH 11/31] use source id from the correct place --- lib/client/connection.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client/connection.js b/lib/client/connection.js index a49659f67..2d50fa0ff 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -428,7 +428,7 @@ Connection.prototype.sendPresence = function(doc, presence) { c: doc.collection, d: doc.id, v: doc.version, - src: op.src, + src: this.id, pr: presence }; this.send(message); From 4c53d85ac3db4fb318e389d510a7d6d6e5bc38a0 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 15:58:32 -0700 Subject: [PATCH 12/31] Fix sendPresence unknown reference --- lib/agent.js | 47 +++++++++++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 9f1754edf..d823c0917 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -505,36 +505,47 @@ Agent.prototype._subscribe = function(collection, id, version, callback) { }); }; + Agent.prototype._sendPresence = function(request, callback) { const collection = request.c const id = request.d const version = request.v const source = request.src || this.clientId const from = version - this.backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { - if (err) return callback(err); + const backend = this.backend + const agent = this.agent + // Send a special projection so that getSnapshot knows to return all fields. + // With a null projection, it strips document metadata + var fields = {$submit: true}; - if (ops.length !== snapshot.v - from) { - return callback(request.missingOpsError()); - } + backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) { + if (err) return callback(err); - this.opsToSnapshot = ops - backend.trigger('transformPresence', this.agent, request, function(err) { + backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { if (err) return callback(err); - var message = { - a: 'pr', - c: collection, - d: id, - v: version, - src: source, - pr: request.pr - }; + if (ops.length !== snapshot.v - from) { + return callback(request.missingOpsError()); + } - // Send presense to other clients - this.backend.sendPresence(message, function(err) { + request.opsToSnapshot = ops + backend.trigger('transformPresence', agent, request, function(err) { if (err) return callback(err); - }); + + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + + // Send presense to other clients + backend.sendPresence(message, function(err) { + if (err) return callback(err); + }); + }) }) }) } From 717733a57948b71d98a8a763b9f736feacd24d70 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 16:13:40 -0700 Subject: [PATCH 13/31] pass through presence data through op submit --- lib/backend.js | 4 ++-- lib/client/doc.js | 4 ++++ lib/submit-request.js | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/backend.js b/lib/backend.js index 3ccaf7fd3..600b77e5d 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -218,10 +218,10 @@ Backend.prototype.trigger = function(action, agent, request, callback) { // Submit an operation on the named collection/docname. op should contain a // {op:}, {create:} or {del:} field. It should probably contain a v: field (if // it doesn't, it defaults to the current version). -Backend.prototype.submit = function(agent, index, id, op, options, callback) { +Backend.prototype.submit = function(agent, index, id, op, presence, options, callback) { var err = ot.checkOp(op); if (err) return callback(err); - var request = new SubmitRequest(this, agent, index, id, op, options); + var request = new SubmitRequest(this, agent, index, id, op, presence, options); var backend = this; backend.trigger(backend.MIDDLEWARE_ACTIONS.submit, agent, request, function(err) { if (err) return callback(err); diff --git a/lib/client/doc.js b/lib/client/doc.js index 0e4fdc848..b876cac4e 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -733,6 +733,10 @@ Doc.prototype._tryCompose = function(op) { // support compose must be able to compose any two ops together if (last.op && op.op && this.type.compose) { last.op = this.type.compose(last.op, op.op); + // Use the most recent presence data for the op + if (op.pr) { + last.pr = op.pr + } return last; } }; diff --git a/lib/submit-request.js b/lib/submit-request.js index af5c92d06..e4c1a6d40 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -1,7 +1,7 @@ var ot = require('./ot'); var projections = require('./projections'); -function SubmitRequest(backend, agent, index, id, op, options) { +function SubmitRequest(backend, agent, index, id, op, presence, options) { this.backend = backend; this.agent = agent; // If a projection, rewrite the call into a call against the collection @@ -12,6 +12,7 @@ function SubmitRequest(backend, agent, index, id, op, options) { this.id = id; this.op = op; this.options = options; + this.presence = presence this.start = Date.now(); this._addOpMeta(); @@ -159,6 +160,7 @@ SubmitRequest.prototype.commit = function(callback) { op.c = request.collection; op.d = request.id; op.m = undefined; + if (request.presence) op.pr = request.presence // Needed for agent to detect if it can ignore sending the op back to // the client that submitted it in subscriptions if (request.collection !== request.index) op.i = request.index; From 9297aa02eec9da119e6fb210e5dfc51fa8eb35f4 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 16:25:22 -0700 Subject: [PATCH 14/31] correctly call submit with the .. correct parameters --- lib/agent.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index d823c0917..cd1c26609 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -305,7 +305,7 @@ Agent.prototype._handleMessage = function(request, callback) { case 'op': var op = this._createOp(request); if (!op) return callback({code: 4000, message: 'Invalid op message'}); - return this._submit(request.c, request.d, op, callback); + return this._submit(request.c, request.d, op, request.pr, callback); case 'pr': { return this._sendPresence(request, callback) } @@ -585,9 +585,9 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { process.nextTick(callback); }; -Agent.prototype._submit = function(collection, id, op, callback) { +Agent.prototype._submit = function(collection, id, op, presence, callback) { var agent = this; - this.backend.submit(this, collection, id, op, null, function(err, ops) { + this.backend.submit(this, collection, id, op, presence, null, function(err, ops) { // Message to acknowledge the op was successfully submitted var ack = {src: op.src, seq: op.seq, v: op.v}; if (err) { From afefc13c3e2b2f1c552e7a67f7e61a5ccc9e0898 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 16:41:23 -0700 Subject: [PATCH 15/31] attach presence when sending op in agent.js --- lib/agent.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/agent.js b/lib/agent.js index cd1c26609..16d016a70 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -182,6 +182,7 @@ Agent.prototype._sendOp = function(collection, id, op) { if (op.op) message.op = op.op; if (op.create) message.create = op.create; if (op.del) message.del = true; + if (op.pr) message.pr = op.pr this.send(message); }; From 8c7df469c4c1e1f0df5832a8c417080399229aa0 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 17:17:09 -0700 Subject: [PATCH 16/31] fix referencing global this instead of request 'this' in submit-request --- lib/submit-request.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/submit-request.js b/lib/submit-request.js index e4c1a6d40..73dcd4142 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -36,6 +36,7 @@ module.exports = SubmitRequest; SubmitRequest.prototype.submit = function(callback) { var request = this; var backend = this.backend; + const agent = this.agent var collection = this.collection; var id = this.id; var op = this.op; @@ -99,8 +100,8 @@ SubmitRequest.prototype.submit = function(callback) { return callback(request.missingOpsError()); } - this.opsToSnapshot = ops - backend.trigger('transformOp', this.agent, this, function(err) { + request.opsToSnapshot = ops + backend.trigger('transformOp', agent, request, function(err) { if (err) return callback(err); err = request._transformOp(ops); From 9edd8851443129492246eb9fa83b0a3cc1ca1f58 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 19:52:05 -0700 Subject: [PATCH 17/31] update behavior of send presence to take into account inflight ops --- lib/client/doc.js | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index b876cac4e..926831687 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -62,6 +62,7 @@ function Doc(connection, collection, id) { this.inflightSubscribe = []; this.inflightUnsubscribe = []; this.pendingFetch = []; + this.pendingPresence = null // Whether we think we are subscribed on the server. Synchronously set to // false on calls to unsubscribe and disconnect. Should never be true when @@ -756,6 +757,8 @@ Doc.prototype.submitOp = function(component, options, callback) { options = null; } var op = {op: component}; + // Clear any pending presence since we'll be sending the latest presence along with the op + this.pendingPresence = null // Append any transient presence data to the top level op if (options && options.pr) { op.pr = options.pr @@ -766,10 +769,29 @@ Doc.prototype.submitOp = function(component, options, callback) { }; Doc.prototype.sendPresence = function(presence) { - this.connection.sendPresence(this, presence) + if (this.pendingOps.length) { + // If there are pending ops, attach the updated presence to the most recent pending op + const lastOp = this.pendingOps[this.pendingOps.length - 1] + lastOp.pr = presence + this.pendingPresence = null + } else if (!this.hasPending()) { + // If there is nothing pending (no inflight or pending ops), then broadcast the presence + this.pendingPresence = null + this.connection.sendPresence(this, presence) + } else { + // If there are inflight ops, wait till they are acknowledged before sending presence + this.whenNothingPending(() => { + if (this.pendingPresence) { + // Send pendingPresence if still necessary + // This might not be necessary if an op has been submitted with newer presence before the + // 'whenNothingPending' callback has fired + this.connection.sendPresence(this, this.pendingFetch) + this.pendingPresence = null + } + }) + } }; - // Create the document, which in ShareJS semantics means to set its type. Every // object implicitly exists in the database but has no data and no type. Create // sets the type of the object and can optionally set some initial data on the @@ -902,6 +924,7 @@ Doc.prototype._hardRollback = function(err) { this.version = null; this.inflightOp = null; this.pendingOps = []; + this.pendingPresence = null // Fetch the latest from the server to get us back into a working state var doc = this; From b9636bd200f6c847cdec586fd6b5b633e446e144 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 23:25:54 -0700 Subject: [PATCH 18/31] Add support for receiving presence --- lib/client/connection.js | 5 ++++- lib/client/doc.js | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 36 insertions(+), 5 deletions(-) diff --git a/lib/client/connection.js b/lib/client/connection.js index 2d50fa0ff..ee32a720e 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -250,7 +250,10 @@ Connection.prototype.handleMessage = function(message) { var doc = this.getExisting(message.c, message.d); if (doc) doc._handleOp(err, message); return; - + case 'pr': + var doc = this.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message) + return; default: console.warn('Ignoring unrecognized message', message); } diff --git a/lib/client/doc.js b/lib/client/doc.js index 926831687..06f68d1a0 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -32,10 +32,11 @@ var types = require('../types'); * ------ * * You can use doc.on(eventName, callback) to subscribe to the following events: - * - `before op (op, source)` Fired before a partial operation is applied to the data. + * - `before op (op, source, presence)` Fired before a partial operation is applied to the data. * It may be used to read the old data just before applying an operation - * - `op (op, source)` Fired after every partial operation with this operation as the + * - `op (op, source, presence)` Fired after every partial operation with this operation as the * first argument + * - `presence (presence)` Fired when presence information is received independent of an op * - `create (source)` The document was created. That means its type was * set and it has some initial data. * - `del (data, source)` Fired after the document is deleted, that is @@ -63,6 +64,8 @@ function Doc(connection, collection, id) { this.inflightUnsubscribe = []; this.pendingFetch = []; this.pendingPresence = null + this.presenceTransformer = null + // Whether we think we are subscribed on the server. Synchronously set to // false on calls to unsubscribe and disconnect. Should never be true when @@ -275,6 +278,22 @@ Doc.prototype._handleUnsubscribe = function(err) { this._emitNothingPending(); }; +Doc.prototype._handlePresence = function(err, message) { + if (err) { + return this.emit('error', err); + } + + this.emit('on presence', this._transformPresence(presence)) +} + +Doc.prototype._transformPresence = function(presence) { + if (this.pendingOps.length && this.presenceTransformer) { + return this.presenceTransformer(presence, this.pendingOps) + } else { + return presence + } +} + Doc.prototype._handleOp = function(err, message) { if (err) { if (this.inflightOp) { @@ -554,9 +573,11 @@ Doc.prototype._otApply = function(op, source) { // return; // } + const presence = op.pr ? this._transformPresence(op.pr) : undefined + // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed - this.emit('before op', op.op, source); + this.emit('before op', op.op, source, presence); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); // Emit an 'op' event once the local data includes the changes from the @@ -564,7 +585,7 @@ Doc.prototype._otApply = function(op, source) { // submission and before the server or other clients have received the op. // For ops from other clients, this will be after the op has been // committed to the database and published - this.emit('op', op.op, source); + this.emit('op', op.op, source, presence); return; } @@ -792,6 +813,13 @@ Doc.prototype.sendPresence = function(presence) { } }; +// Set a function (presence, ops) => presence that transforms +// the provided presence by the provided ops +// This is needed when you receive presence while there are pending ops +Doc.prototype.setPresenceTransformer = function(presenceTransformer) { + this.presenceTransformer = presenceTransformer +} + // Create the document, which in ShareJS semantics means to set its type. Every // object implicitly exists in the database but has no data and no type. Create // sets the type of the object and can optionally set some initial data on the From 82dff80cf32d7a76e3f7b6530aa5f2f97c8d1dc1 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 25 Mar 2019 23:56:54 -0700 Subject: [PATCH 19/31] fix unknown reference --- lib/client/doc.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 06f68d1a0..133b93a34 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -283,7 +283,7 @@ Doc.prototype._handlePresence = function(err, message) { return this.emit('error', err); } - this.emit('on presence', this._transformPresence(presence)) + this.emit('on presence', this._transformPresence(message.pr)) } Doc.prototype._transformPresence = function(presence) { From 5d58406801e34b775d685fbd1a27214979e96d9b Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Wed, 27 Mar 2019 11:23:05 -0700 Subject: [PATCH 20/31] correctly persist presence when delaying send' --- lib/client/doc.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 133b93a34..0714b4a43 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -801,12 +801,13 @@ Doc.prototype.sendPresence = function(presence) { this.connection.sendPresence(this, presence) } else { // If there are inflight ops, wait till they are acknowledged before sending presence + this.pendingPresence = presence this.whenNothingPending(() => { if (this.pendingPresence) { // Send pendingPresence if still necessary // This might not be necessary if an op has been submitted with newer presence before the // 'whenNothingPending' callback has fired - this.connection.sendPresence(this, this.pendingFetch) + this.connection.sendPresence(this, this.pendingPresence) this.pendingPresence = null } }) From 4cb14a82bbc4ca6f920ff1e3e8f4bd1289b21c55 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Thu, 28 Mar 2019 20:41:49 -0700 Subject: [PATCH 21/31] transform incoming presense by inflight op as well if available; do not transform presence on own op --- lib/client/doc.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 0714b4a43..781ca0dab 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -287,8 +287,9 @@ Doc.prototype._handlePresence = function(err, message) { } Doc.prototype._transformPresence = function(presence) { - if (this.pendingOps.length && this.presenceTransformer) { - return this.presenceTransformer(presence, this.pendingOps) + const ops = [this.inflightOp, ...this.pendingOps].filter(Boolean) + if (ops.length && this.presenceTransformer) { + return this.presenceTransformer(presence, ops) } else { return presence } @@ -573,7 +574,8 @@ Doc.prototype._otApply = function(op, source) { // return; // } - const presence = op.pr ? this._transformPresence(op.pr) : undefined + // Transform the presence only if it's a remote op + const presence = op.pr && !source ? this._transformPresence(op.pr) : undefined // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed From 9f4a3afed823f592ad4c6ee69d22f4becee59285 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Wed, 3 Apr 2019 11:37:14 -0700 Subject: [PATCH 22/31] avoid fetching database ops when transforming presence if presence already at latest version --- lib/agent.js | 52 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 16d016a70..5e233e3ea 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -522,32 +522,42 @@ Agent.prototype._sendPresence = function(request, callback) { backend.db.getSnapshot(collection, id, fields, null, function(err, snapshot) { if (err) return callback(err); - backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { - if (err) return callback(err); - - if (ops.length !== snapshot.v - from) { - return callback(request.missingOpsError()); - } - - request.opsToSnapshot = ops - backend.trigger('transformPresence', agent, request, function(err) { - if (err) return callback(err); - - var message = { - a: 'pr', - c: collection, - d: id, - v: version, - src: source, - pr: request.pr - }; - + if (version === snapshot.v) { // Send presense to other clients + // We don't need to transform because there is no newer version of the document to transform to backend.sendPresence(message, function(err) { if (err) return callback(err); }); + } else { + // The version of the presence doesn't match the snapshot so fetch the ops + // and give middleware chance to transform the presence + backend.db.getOpsToSnapshot(collection, id, from, snapshot, null, function(err, ops) { + if (err) return callback(err); + + if (ops.length !== snapshot.v - from) { + return callback(request.missingOpsError()); + } + + request.opsToSnapshot = ops + backend.trigger('transformPresence', agent, request, function(err) { + if (err) return callback(err); + + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + + // Send presense to other clients + backend.sendPresence(message, function(err) { + if (err) return callback(err); + }); + }) }) - }) + } }) } From 3786df62a2140e39801a224417b5c9b9532187c8 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Wed, 3 Apr 2019 11:53:39 -0700 Subject: [PATCH 23/31] declare variable before using it ... --- lib/agent.js | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/agent.js b/lib/agent.js index 5e233e3ea..065b8d954 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -525,6 +525,15 @@ Agent.prototype._sendPresence = function(request, callback) { if (version === snapshot.v) { // Send presense to other clients // We don't need to transform because there is no newer version of the document to transform to + var message = { + a: 'pr', + c: collection, + d: id, + v: version, + src: source, + pr: request.pr + }; + backend.sendPresence(message, function(err) { if (err) return callback(err); }); From 8c662515c311ae5d669e3a32787e79fe76ffdadb Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Thu, 4 Apr 2019 03:04:58 -0700 Subject: [PATCH 24/31] retry submit op on commit error as well as commit failure --- lib/submit-request.js | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/submit-request.js b/lib/submit-request.js index 73dcd4142..36a9f141c 100644 --- a/lib/submit-request.js +++ b/lib/submit-request.js @@ -150,11 +150,10 @@ SubmitRequest.prototype.commit = function(callback) { // Try committing the operation and snapshot to the database atomically backend.db.commit(request.collection, request.id, request.op, request.snapshot, request.options, function(err, succeeded) { - if (err) return callback(err); - if (!succeeded) { + if (err || !succeeded) { // Between our fetch and our call to commit, another client committed an // operation. We expect this to be relatively infrequent but normal. - return request.retry(callback); + return request.retry(callback, err); } if (!request.suppressPublish) { var op = request.op; @@ -176,10 +175,10 @@ SubmitRequest.prototype.commit = function(callback) { }); }; -SubmitRequest.prototype.retry = function(callback) { +SubmitRequest.prototype.retry = function(callback, err) { this.retries++; if (this.maxRetries != null && this.retries > this.maxRetries) { - return callback(this.maxRetriesError()); + return callback(this.maxRetriesError(err)); } this.backend.emit('timing', 'submit.retry', Date.now() - this.start, this); this.submit(callback); @@ -255,6 +254,10 @@ SubmitRequest.prototype.versionDuringTransformError = function() { SubmitRequest.prototype.versionAfterTransformError = function() { return {code: 5003, message: 'Op submit failed. Op version mismatches snapshot after op transform'}; }; -SubmitRequest.prototype.maxRetriesError = function() { - return {code: 5004, message: 'Op submit failed. Maximum submit retries exceeded'}; +SubmitRequest.prototype.maxRetriesError = function(err) { + if (err) { + return {code: 5004, message: `Op submit failed. Maximum submit retries exceeded. Attempt failed with following error ${err}`}; + } else { + return {code: 5004, message: 'Op submit failed. Maximum submit retries exceeded'}; + } }; From 97b2358e60f6d96a81c84ee1a02eec39e39e68e6 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 14:06:14 -0700 Subject: [PATCH 25/31] transmit a connection closed message when an agent disconnects --- lib/agent.js | 8 ++++++++ lib/backend.js | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/lib/agent.js b/lib/agent.js index 065b8d954..2cfad2df0 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -57,10 +57,16 @@ Agent.prototype.close = function(err) { Agent.prototype._cleanup = function() { this.closed = true; + // The channels that we want to let know that the agent + // has disconnected + const channelsToInform = [] + // Clean up doc subscription streams for (var collection in this.subscribedDocs) { var docs = this.subscribedDocs[collection]; for (var id in docs) { + channelsToInform.push(this.backend.getDocChannel(collection, id)) + var stream = docs[id]; stream.destroy(); } @@ -73,6 +79,8 @@ Agent.prototype._cleanup = function() { emitter.destroy(); } this.subscribedQueries = {}; + + this.backend.sendConnectionClosed(this.clientId, channelsToInform) }; /** diff --git a/lib/backend.js b/lib/backend.js index 600b77e5d..a43c1ba35 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -244,6 +244,15 @@ Backend.prototype.sendPresence = function(presence, callback) { this.pubsub.publish(channels, presence, callback); }; +Backend.prototype.sendConnectionClosed = function(clientId, channels, callback) { + const message = { + a: 'cl', + src: clientId + } + this.pubsub.publish(channels, message, callback) + +} + Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { if (projection) { try { From a37ed29e2b2e3565a38b5104fd0954c85db647be Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 15:11:38 -0700 Subject: [PATCH 26/31] rename action to notifyUnsubscribe; notify unsubscribe on either unsubscribe or close --- lib/agent.js | 22 ++++++++++++++++++---- lib/backend.js | 5 ++--- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index 2cfad2df0..cb683f652 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -58,7 +58,7 @@ Agent.prototype._cleanup = function() { this.closed = true; // The channels that we want to let know that the agent - // has disconnected + // is no longer subscribed to. const channelsToInform = [] // Clean up doc subscription streams @@ -80,7 +80,9 @@ Agent.prototype._cleanup = function() { } this.subscribedQueries = {}; - this.backend.sendConnectionClosed(this.clientId, channelsToInform) + if (channelsToInform.length) { + this.backend.notifyUnsubscribe(this.clientId, channelsToInform) + } }; /** @@ -111,6 +113,10 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { if (data.src !== agent.clientId) agent.send(data); return } + if (data.a === 'nus') { + // Another client has unsubscribed from this document + if (data.src !== agent.clientId) agent.send(data) + } if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); @@ -598,7 +604,11 @@ Agent.prototype._unsubscribe = function(collection, id, callback) { // stream or an inflight subscribing state var docs = this.subscribedDocs[collection]; var stream = docs && docs[id]; - if (stream) stream.destroy(); + if (stream) { + stream.destroy(); + const docChannel = this.backend.getDocChannel(collection, id) + this.backend.notifyUnsubscribe(this.clientId, [docChannel]) + } process.nextTick(callback); }; @@ -608,7 +618,11 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { for (var i = 0; i < ids.length; i++) { var id = ids[i]; var stream = docs[id]; - if (stream) stream.destroy(); + if (stream) { + const docChannel = this.backend.getDocChannel(collection, id) + this.backend.notifyUnsubscribe(this.clientId, [docChannel]) + stream.destroy(); + } } process.nextTick(callback); }; diff --git a/lib/backend.js b/lib/backend.js index a43c1ba35..543c1ebaa 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -244,13 +244,12 @@ Backend.prototype.sendPresence = function(presence, callback) { this.pubsub.publish(channels, presence, callback); }; -Backend.prototype.sendConnectionClosed = function(clientId, channels, callback) { +Backend.prototype.notifyUnsubscribe = function(clientId, channels, callback) { const message = { - a: 'cl', + a: 'nus', src: clientId } this.pubsub.publish(channels, message, callback) - } Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { From 18d9e3d02cdba1614557f7f9a478d9383b19d6b7 Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 16:34:39 -0700 Subject: [PATCH 27/31] emit notifyUnsubscribe message with collection and doc id; add unsub notification event listener to sharedb document --- lib/agent.js | 18 +++--------------- lib/backend.js | 7 +++++-- lib/client/connection.js | 4 ++++ lib/client/doc.js | 11 ++++++++++- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index cb683f652..36ef229b9 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -56,17 +56,11 @@ Agent.prototype.close = function(err) { Agent.prototype._cleanup = function() { this.closed = true; - - // The channels that we want to let know that the agent - // is no longer subscribed to. - const channelsToInform = [] - // Clean up doc subscription streams for (var collection in this.subscribedDocs) { var docs = this.subscribedDocs[collection]; for (var id in docs) { - channelsToInform.push(this.backend.getDocChannel(collection, id)) - + this.backend.notifyUnsubscribe(this.clientId, collection, id) var stream = docs[id]; stream.destroy(); } @@ -79,10 +73,6 @@ Agent.prototype._cleanup = function() { emitter.destroy(); } this.subscribedQueries = {}; - - if (channelsToInform.length) { - this.backend.notifyUnsubscribe(this.clientId, channelsToInform) - } }; /** @@ -606,8 +596,7 @@ Agent.prototype._unsubscribe = function(collection, id, callback) { var stream = docs && docs[id]; if (stream) { stream.destroy(); - const docChannel = this.backend.getDocChannel(collection, id) - this.backend.notifyUnsubscribe(this.clientId, [docChannel]) + this.backend.notifyUnsubscribe(this.clientId, collection, id) } process.nextTick(callback); }; @@ -619,8 +608,7 @@ Agent.prototype._unsubscribeBulk = function(collection, ids, callback) { var id = ids[i]; var stream = docs[id]; if (stream) { - const docChannel = this.backend.getDocChannel(collection, id) - this.backend.notifyUnsubscribe(this.clientId, [docChannel]) + this.backend.notifyUnsubscribe(this.clientId, collection, id) stream.destroy(); } } diff --git a/lib/backend.js b/lib/backend.js index 543c1ebaa..2657331ca 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -244,12 +244,15 @@ Backend.prototype.sendPresence = function(presence, callback) { this.pubsub.publish(channels, presence, callback); }; -Backend.prototype.notifyUnsubscribe = function(clientId, channels, callback) { +Backend.prototype.notifyUnsubscribe = function(clientId, collectionId, docId, callback) { const message = { a: 'nus', + c: collectionId, + d: docId, src: clientId } - this.pubsub.publish(channels, message, callback) + + this.pubsub.publish([this.getDocChannel(collectionId, docId)], message, callback) } Backend.prototype._sanitizeOp = function(agent, projection, collection, id, op, callback) { diff --git a/lib/client/connection.js b/lib/client/connection.js index ee32a720e..5dc3e2324 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -254,6 +254,10 @@ Connection.prototype.handleMessage = function(message) { var doc = this.getExisting(message.c, message.d); if (doc) doc._handlePresence(err, message) return; + case 'nus': + var doc = this.getExisting(message.c, message.d) + if (doc) doc._handleUnsubscribeNotification(err, message) + return; default: console.warn('Ignoring unrecognized message', message); } diff --git a/lib/client/doc.js b/lib/client/doc.js index 781ca0dab..3bd8a668c 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -36,7 +36,8 @@ var types = require('../types'); * It may be used to read the old data just before applying an operation * - `op (op, source, presence)` Fired after every partial operation with this operation as the * first argument - * - `presence (presence)` Fired when presence information is received independent of an op + * - `on presence (presence)` Fired when presence information is received independent of an op + * - `unsub notification (clientId)` Fired when another client unsubscribes from this document * - `create (source)` The document was created. That means its type was * set and it has some initial data. * - `del (data, source)` Fired after the document is deleted, that is @@ -286,6 +287,14 @@ Doc.prototype._handlePresence = function(err, message) { this.emit('on presence', this._transformPresence(message.pr)) } +Doc.prototype._handleUnsubscribeNotification = function(err, message) { + if (err) { + return this.emit('error', err) + } + + this.emit('unsub notification', message.src) +} + Doc.prototype._transformPresence = function(presence) { const ops = [this.inflightOp, ...this.pendingOps].filter(Boolean) if (ops.length && this.presenceTransformer) { From 0211facfa9563eb5677ad125fd88fd0c050268bb Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 17:02:41 -0700 Subject: [PATCH 28/31] inject the src param into the presence object upon receipt --- lib/client/doc.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 3bd8a668c..e0940dbe1 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -284,7 +284,12 @@ Doc.prototype._handlePresence = function(err, message) { return this.emit('error', err); } - this.emit('on presence', this._transformPresence(message.pr)) + const presence = { + ...message.pr, + src: message.src + } + + this.emit('on presence', this._transformPresence(presence)) } Doc.prototype._handleUnsubscribeNotification = function(err, message) { From d8305a0f910afc663e92a1b5b13b6f436475b38d Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 17:32:19 -0700 Subject: [PATCH 29/31] fix not returning after broadcasting the notify unsub message --- lib/agent.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/agent.js b/lib/agent.js index 36ef229b9..69cd03f66 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -106,6 +106,7 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { if (data.a === 'nus') { // Another client has unsubscribed from this document if (data.src !== agent.clientId) agent.send(data) + return } if (agent._isOwnOp(collection, data)) return; From fe76e78e2ba40c59b184d3e27d6f1ae4d0a9828e Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Mon, 8 Apr 2019 17:56:02 -0700 Subject: [PATCH 30/31] attach src info to presence that piggybacks on ops --- lib/client/doc.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index e0940dbe1..1fb29d63c 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -590,10 +590,14 @@ Doc.prototype._otApply = function(op, source) { // Transform the presence only if it's a remote op const presence = op.pr && !source ? this._transformPresence(op.pr) : undefined + const presenceWithSource = { + ...presence, + src: op.src + } // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed - this.emit('before op', op.op, source, presence); + this.emit('before op', op.op, source, presenceWithSource); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); // Emit an 'op' event once the local data includes the changes from the @@ -601,7 +605,7 @@ Doc.prototype._otApply = function(op, source) { // submission and before the server or other clients have received the op. // For ops from other clients, this will be after the op has been // committed to the database and published - this.emit('op', op.op, source, presence); + this.emit('op', op.op, source, presenceWithSource); return; } From d6ccf47b9117e691c8f9699c564ad88f49d9bafe Mon Sep 17 00:00:00 2001 From: Sidhu Alluri Date: Tue, 9 Apr 2019 13:01:52 -0700 Subject: [PATCH 31/31] only create presence object if presence received --- lib/client/doc.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 1fb29d63c..ed1185302 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -590,10 +590,10 @@ Doc.prototype._otApply = function(op, source) { // Transform the presence only if it's a remote op const presence = op.pr && !source ? this._transformPresence(op.pr) : undefined - const presenceWithSource = { + const presenceWithSource = presence ? { ...presence, src: op.src - } + } : undefined // The 'before op' event enables clients to pull any necessary data out of // the snapshot before it gets changed