Skip to content

Commit b3cc256

Browse files
committed
feat(groupBy): add seed option to eagerly create groups
1 parent 6bd1c5f commit b3cc256

File tree

2 files changed

+106
-44
lines changed

2 files changed

+106
-44
lines changed

spec/operators/groupBy-spec.ts

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { expect } from 'chai';
2-
import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElements } from 'rxjs/operators';
3-
import { TestScheduler } from 'rxjs/testing';
4-
import { ReplaySubject, of, Observable, Operator, Observer, interval, Subject } from 'rxjs';
5-
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
2+
import { Observable, Observer, of, Operator, ReplaySubject, Subject } from 'rxjs';
63
import { createNotification } from 'rxjs/internal/NotificationFactories';
4+
import { delay, groupBy, ignoreElements, map, materialize, mergeMap, skip, take, tap } from 'rxjs/operators';
5+
import { TestScheduler } from 'rxjs/testing';
6+
import { cold, expectObservable, expectSubscriptions, hot } from '../helpers/marble-testing';
7+
import { observableMatcher } from '../helpers/observableMatcher';
78

89
declare const rxTestScheduler: TestScheduler;
910

@@ -1456,6 +1457,51 @@ describe('groupBy operator', () => {
14561457

14571458
expect(sideEffects).to.deep.equal([0, 1, 2]);
14581459
});
1460+
1461+
describe('seed', () => {
1462+
let rxTest: TestScheduler;
1463+
1464+
beforeEach(() => {
1465+
rxTest = new TestScheduler(observableMatcher);
1466+
});
1467+
1468+
it('should open groups when seed emits', () => {
1469+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
1470+
const source = cold('(--)---a---| ');
1471+
const sourceSubs = ' (^-)-------! ';
1472+
const seed = cold(' (ab)----------|');
1473+
const seedSubs = ' (^-)-------! ';
1474+
const expected = ' (ab)-------| ';
1475+
const a = cold(' (--)---a---| ');
1476+
const b = cold(' (--)-------| ');
1477+
const expectedValues = { a, b };
1478+
1479+
const grouped = source.pipe(groupBy((letter) => letter, { seed: () => seed }));
1480+
1481+
expectObservable(grouped).toBe(expected, expectedValues);
1482+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
1483+
expectSubscriptions(seed.subscriptions).toBe(seedSubs);
1484+
});
1485+
});
1486+
1487+
it('should emit complete when the seed completes', () => {
1488+
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
1489+
const source = cold('(--)---a---|');
1490+
const sourceSubs = ' (^-)-------!';
1491+
const seed = cold(' (a|) ');
1492+
const seedSubs = ' (^!) ';
1493+
const expected = ' (a|) ';
1494+
const a = cold(' (--)---a---|');
1495+
const expectedValues = { a };
1496+
1497+
const grouped = source.pipe(groupBy((letter) => letter, { seed: () => seed }));
1498+
1499+
expectObservable(grouped).toBe(expected, expectedValues);
1500+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
1501+
expectSubscriptions(seed.subscriptions).toBe(seedSubs);
1502+
});
1503+
});
1504+
})
14591505
});
14601506

14611507
/**

src/internal/operators/groupBy.ts

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ interface BasicGroupByOptions<K, T> {
99
element?: undefined;
1010
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
1111
connector?: () => SubjectLike<T>;
12+
seed?: () => ObservableInput<K>;
1213
}
1314

1415
interface GroupByOptionsWithElement<K, E, T> {
1516
element: (value: T) => E;
1617
duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
1718
connector?: () => SubjectLike<E>;
19+
seed?: () => ObservableInput<K>;
1820
}
1921

2022
export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
@@ -155,10 +157,11 @@ export function groupBy<T, K, R>(
155157
): OperatorFunction<T, GroupedObservable<K, R>> {
156158
return operate((source, subscriber) => {
157159
let element: ((value: any) => any) | void;
160+
let seed: (() => ObservableInput<K>) | undefined;
158161
if (!elementOrOptions || typeof elementOrOptions === 'function') {
159162
element = elementOrOptions;
160163
} else {
161-
({ duration, element, connector } = elementOrOptions);
164+
({ duration, element, connector, seed } = elementOrOptions);
162165
}
163166

164167
// A lookup for the groups that we have so far.
@@ -189,45 +192,7 @@ export function groupBy<T, K, R>(
189192
// OperatorSubscriber will only send the error to the main subscriber.
190193
try {
191194
const key = keySelector(value);
192-
193-
let group = groups.get(key);
194-
if (!group) {
195-
// Create our group subject
196-
groups.set(key, (group = connector ? connector() : new Subject<any>()));
197-
198-
// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
199-
// because the grouped observable has special semantics around reference counting
200-
// to ensure we don't sever our connection to the source prematurely.
201-
const grouped = createGroupedObservable(key, group);
202-
subscriber.next(grouped);
203-
204-
if (duration) {
205-
const durationSubscriber = new OperatorSubscriber(
206-
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
207-
// wnen the duration subscription is torn down. That is important, because then
208-
// if someone holds a handle to the grouped observable and tries to subscribe to it
209-
// after the connection to the source has been severed, they will get an
210-
// `ObjectUnsubscribedError` and know they can't possibly get any notifications.
211-
group as any,
212-
() => {
213-
// Our duration notified! We can complete the group.
214-
// The group will be removed from the map in the teardown phase.
215-
group!.complete();
216-
durationSubscriber?.unsubscribe();
217-
},
218-
// Completions are also sent to the group, but just the group.
219-
undefined,
220-
// Errors on the duration subscriber are sent to the group
221-
// but only the group. They are not sent to the main subscription.
222-
undefined,
223-
// Teardown: Remove this group from our map.
224-
() => groups.delete(key)
225-
);
226-
227-
// Start our duration notifier.
228-
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
229-
}
230-
}
195+
const group = groups.get(key) ?? createGroup(key);
231196

232197
// Send the value to our group.
233198
group.next(element ? element(value) : value);
@@ -246,9 +211,60 @@ export function groupBy<T, K, R>(
246211
() => groups.clear()
247212
);
248213

214+
if (seed !== undefined) {
215+
groupBySourceSubscriber.add(
216+
innerFrom(seed()).subscribe({
217+
next: (value) => void (groups.has(value) || createGroup(value)),
218+
complete: () => subscriber.complete(),
219+
error: handleError,
220+
})
221+
);
222+
}
223+
249224
// Subscribe to the source
250225
source.subscribe(groupBySourceSubscriber);
251226

227+
function createGroup(key: K): SubjectLike<T> {
228+
// Create our group subject
229+
const group = connector ? connector() : new Subject<any>();
230+
groups.set(key, group);
231+
232+
// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
233+
// because the grouped observable has special semantics around reference counting
234+
// to ensure we don't sever our connection to the source prematurely.
235+
const grouped = createGroupedObservable(key, group);
236+
subscriber.next(grouped);
237+
238+
if (duration) {
239+
const durationSubscriber = new OperatorSubscriber(
240+
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
241+
// when the duration subscription is torn down. That is important, because then
242+
// if someone holds a handle to the grouped observable and tries to subscribe to it
243+
// after the connection to the source has been severed, they will get an
244+
// `ObjectUnsubscribedError` and know they can't possibly get any notifications.
245+
group as any,
246+
() => {
247+
// Our duration notified! We can complete the group.
248+
// The group will be removed from the map in the teardown phase.
249+
group.complete();
250+
durationSubscriber.unsubscribe();
251+
},
252+
// Completions are also sent to the group, but just the group.
253+
undefined,
254+
// Errors on the duration subscriber are sent to the group
255+
// but only the group. They are not sent to the main subscription.
256+
undefined,
257+
// Teardown: Remove this group from our map.
258+
() => groups.delete(key)
259+
);
260+
261+
// Start our duration notifier.
262+
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
263+
}
264+
265+
return group;
266+
}
267+
252268
/**
253269
* Creates the actual grouped observable returned.
254270
* @param key The key of the group

0 commit comments

Comments
 (0)