Skip to content

Commit f8eaf7e

Browse files
author
Alec Gibson
committed
[WIP] Refactor
1 parent 3952db9 commit f8eaf7e

15 files changed

+2205
-1421
lines changed

lib/agent.js

Lines changed: 77 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ function Agent(backend, stream) {
2727
this.subscribedQueries = {};
2828

2929
// Track which documents are subscribed to presence by the client. This is a
30-
// map of collection -> id -> stream
30+
// map of channel -> stream
3131
this.subscribedPresences = {};
32+
// Highest seq received for a subscription request. Any seq lower than this
33+
// value is stale, and should be ignored. Used for keeping the subscription
34+
// state in sync with the client's desired state
35+
this.presenceSubscriptionSeq = 0;
3236

3337
// We need to track this manually to make sure we don't reply to messages
3438
// after the stream was closed.
@@ -78,12 +82,8 @@ Agent.prototype._cleanup = function() {
7882
}
7983
this.subscribedDocs = {};
8084

81-
for (var collection in this.subscribedPresences) {
82-
var streams = this.subscribedPresences[collection];
83-
for (var id in streams) {
84-
var stream = streams[id];
85-
stream.destroy();
86-
}
85+
for (var channel in this.subscribedPresences) {
86+
this.subscribedPresences[channel].destroy();
8787
}
8888
this.subscribedPresences = {};
8989

@@ -130,22 +130,19 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) {
130130
});
131131
};
132132

133-
Agent.prototype._subscribeToPresenceStream = function(collection, id, stream) {
133+
Agent.prototype._subscribeToPresenceStream = function(channel, stream) {
134134
if (this.closed) return stream.destroy();
135135

136136
stream.on('data', function(data) {
137137
if (data.error) {
138-
logger.error('Presence subscription stream error', collection, id, data.error);
138+
logger.error('Presence subscription stream error', channel, data.error);
139139
}
140140
this._handlePresenceData(data);
141141
}.bind(this));
142142

143143
stream.on('end', function() {
144-
var streams = this.subscribedPresences[collection];
145-
if (!streams || !streams[id] !== stream) return;
146-
delete streams[id];
147-
if (util.hasKeys(streams)) return;
148-
delete agent.subscribedPresences[collection];
144+
if (this.subscribedPresences[channel] !== stream) return;
145+
delete this.subscribedPresences[channel];
149146
}.bind(this));
150147
};
151148

@@ -394,10 +391,14 @@ Agent.prototype._handleMessage = function(request, callback) {
394391
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
395392
case 'p':
396393
var presence = this._createPresence(request);
397-
if (!util.supportsPresence(types.map[presence.t])) {
394+
if (presence.t && !util.supportsPresence(types.map[presence.t])) {
398395
return callback({code: 9999, message: 'Type does not support presence: ' + presence.t});
399396
}
400-
return this._broadcastPresence(request.c, request.d, presence, callback);
397+
return this._broadcastPresence(presence, callback);
398+
case 'ps':
399+
return this._subscribePresence(request.ch, request.seq, callback);
400+
case 'pu':
401+
return this._unsubscribePresence(request.ch, request.seq, callback);
401402
default:
402403
callback({code: 4000, message: 'Invalid or unknown message'});
403404
}
@@ -666,77 +667,82 @@ Agent.prototype._fetchSnapshotByTimestamp = function(collection, id, timestamp,
666667
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
667668
};
668669

669-
Agent.prototype._broadcastPresence = function(collection, id, presence, callback) {
670-
var wantsSubscribe = presence.s;
671-
this._handlePresenceSubscription(collection, id, wantsSubscribe, function(error) {
670+
Agent.prototype._broadcastPresence = function(presence, callback) {
671+
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
672672
if (error) return callback(error);
673-
this.backend.transformPresenceToLatestVersion(this, presence, function(error, presence) {
673+
var channel = this._getPresenceChannel(presence.ch);
674+
this.backend.pubsub.publish([channel], presence, function(error) {
674675
if (error) return callback(error);
675-
var channel = this.backend.getPresenceChannel(collection, id);
676-
this.backend.pubsub.publish([channel], presence, function(error) {
677-
if (error) return callback(error);
678-
callback(null, presence);
679-
});
680-
}.bind(this));
681-
}.bind(this));
682-
};
683-
684-
Agent.prototype._handlePresenceSubscription = function(collection, id, wantsSubscribe, callback) {
685-
var streams = this.subscribedPresences[collection] || (this.subscribedPresences[collection] = {});
686-
var stream = streams[id];
687-
688-
if (stream) {
689-
if (wantsSubscribe) {
690-
return callback();
691-
}
692-
stream.destroy();
693-
return callback();
694-
}
695-
696-
if (!wantsSubscribe) return callback();
697-
698-
var channel = this.backend.getPresenceChannel(collection, id);
699-
this.backend.pubsub.subscribe(channel, function(error, stream) {
700-
if (error) return callback(error);
701-
streams[id] = stream;
702-
this._subscribeToPresenceStream(collection, id, stream);
703-
callback();
676+
callback(null, presence);
677+
});
704678
}.bind(this));
705679
};
706680

707681
Agent.prototype._createPresence = function(request) {
708-
// src can be provided if it is not the same as the current agent,
709-
// such as a resubmission after a reconnect, but it usually isn't needed
710-
var src = request.src || this.clientId;
711682
return {
712683
a: 'p',
713-
src: src,
684+
ch: request.ch,
685+
src: this.clientId,
714686
seq: request.seq,
687+
id: request.id,
688+
p: request.p,
715689
c: request.c,
716690
d: request.d,
717-
id: request.id,
718691
v: request.v,
719-
p: request.p,
720-
t: request.t,
721-
r: !!request.r,
722-
s: !!request.s
692+
t: request.t
723693
};
724694
};
725695

696+
Agent.prototype._subscribePresence = function(channel, seq, callback) {
697+
var presenceChannel = this._getPresenceChannel(channel);
698+
this.backend.pubsub.subscribe(presenceChannel, function(error, stream) {
699+
if (error) return callback(error);
700+
if (seq < this.presenceSubscriptionSeq) return callback(null, {ch: channel, seq: seq});
701+
this.presenceSubscriptionSeq = seq;
702+
this.subscribedPresences[channel] = stream;
703+
this._subscribeToPresenceStream(channel, stream);
704+
this._requestPresence(channel, function(error) {
705+
callback(error, {ch: channel, seq: seq});
706+
});
707+
}.bind(this));
708+
};
709+
710+
Agent.prototype._unsubscribePresence = function(channel, seq, callback) {
711+
if (seq < this.presenceSubscriptionSeq) return;
712+
this.presenceSubscriptionSeq = seq;
713+
var stream = this.subscribedPresences[channel];
714+
if (stream) stream.destroy();
715+
callback(null, {ch: channel, seq: seq});
716+
};
717+
718+
Agent.prototype._getPresenceChannel = function(channel) {
719+
// TODO: May need to namespace this further if we want to have automatic Doc channels that don't
720+
// clash with arbitrary user input (eg if a user decides to name their channel the same as a doc collection)
721+
// TODO: What if a user creates a collection called _presence?
722+
return '_presence.' + channel;
723+
};
724+
725+
Agent.prototype._requestPresence = function(channel, callback) {
726+
var presenceChannel = this._getPresenceChannel(channel);
727+
this.backend.pubsub.publish([presenceChannel], {ch: channel, r: true, src: this.clientId}, callback);
728+
};
729+
726730
Agent.prototype._handlePresenceData = function(presence) {
727-
if (presence.src !== this.clientId) {
728-
var backend = this.backend;
729-
var context = {
730-
collection: presence.c,
731-
presence: presence
732-
};
733-
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
734-
if (error) {
735-
return this.send({a: 'p', c: presence.c, d: presence.d, id: presence.id, error: getReplyErrorObject(error)});
736-
}
737-
this.send(presence);
738-
}.bind(this));
739-
}
731+
if (presence.src === this.clientId) return;
732+
733+
if (presence.r) return this.send({a: 'pr', ch: presence.ch});
734+
735+
var backend = this.backend;
736+
var context = {
737+
collection: presence.c,
738+
presence: presence
739+
};
740+
backend.trigger(backend.MIDDLEWARE_ACTIONS.sendPresence, this, context, function(error) {
741+
if (error) {
742+
return this.send({a: 'p', ch: presence.ch, id: presence.id, error: getReplyErrorObject(error)});
743+
}
744+
this.send(presence);
745+
}.bind(this));
740746
};
741747

742748
function createClientOp(request, clientId) {

lib/backend.js

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -682,10 +682,6 @@ Backend.prototype.getDocChannel = function(collection, id) {
682682
return collection + '.' + id;
683683
};
684684

685-
Backend.prototype.getPresenceChannel = function(collection, id) {
686-
return this.getDocChannel(collection, id) + '.presence';
687-
};
688-
689685
Backend.prototype.getChannels = function(collection, id) {
690686
return [
691687
this.getCollectionChannel(collection),
@@ -807,6 +803,7 @@ Backend.prototype._buildSnapshotFromOps = function(id, startingSnapshot, ops, ca
807803
};
808804

809805
Backend.prototype.transformPresenceToLatestVersion = function(agent, presence, callback) {
806+
if (!presence.c || !presence.d) return callback(null, presence);
810807
this.getOps(agent, presence.c, presence.d, presence.v, null, function(error, ops) {
811808
if (error) return callback(error);
812809
for (var i = 0; i < ops.length; i++) {

0 commit comments

Comments
 (0)