Skip to content

feat(groupBy): add seed option to eagerly create groups #6425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from 'chai';
import { groupBy, delay, tap, map, take, mergeMap, materialize, skip, ignoreElements } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { ReplaySubject, of, Observable, Operator, Observer, interval, Subject } from 'rxjs';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Observable, Observer, of, Operator, ReplaySubject, Subject } from 'rxjs';
import { createNotification } from 'rxjs/internal/NotificationFactories';
import { delay, groupBy, ignoreElements, map, materialize, mergeMap, skip, take, tap } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { cold, expectObservable, expectSubscriptions, hot } from '../helpers/marble-testing';
import { observableMatcher } from '../helpers/observableMatcher';

declare const rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -1456,6 +1457,51 @@ describe('groupBy operator', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

describe('seed', () => {
let rxTest: TestScheduler;

beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

it('should open groups when seed emits', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('(--)---a---| ');
const sourceSubs = ' (^-)-------! ';
const seed = cold(' (ab)----------|');
const seedSubs = ' (^-)-------! ';
const expected = ' (ab)-------| ';
const a = cold(' (--)---a---| ');
const b = cold(' (--)-------| ');
const expectedValues = { a, b };

const grouped = source.pipe(groupBy((letter) => letter, { seed: () => seed }));

expectObservable(grouped).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(seed.subscriptions).toBe(seedSubs);
});
});

it('should emit complete when the seed completes', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold('(--)---a---|');
const sourceSubs = ' (^-)-------!';
const seed = cold(' (a|) ');
const seedSubs = ' (^!) ';
const expected = ' (a|) ';
const a = cold(' (--)---a---|');
const expectedValues = { a };

const grouped = source.pipe(groupBy((letter) => letter, { seed: () => seed }));

expectObservable(grouped).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expectSubscriptions(seed.subscriptions).toBe(seedSubs);
});
});
})
});

/**
Expand Down
96 changes: 56 additions & 40 deletions src/internal/operators/groupBy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ interface BasicGroupByOptions<K, T> {
element?: undefined;
duration?: (grouped: GroupedObservable<K, T>) => ObservableInput<any>;
connector?: () => SubjectLike<T>;
seed?: () => ObservableInput<K>;
}

interface GroupByOptionsWithElement<K, E, T> {
element: (value: T) => E;
duration?: (grouped: GroupedObservable<K, E>) => ObservableInput<any>;
connector?: () => SubjectLike<E>;
seed?: () => ObservableInput<K>;
}

export function groupBy<T, K>(key: (value: T) => K, options: BasicGroupByOptions<K, T>): OperatorFunction<T, GroupedObservable<K, T>>;
Expand Down Expand Up @@ -155,10 +157,11 @@ export function groupBy<T, K, R>(
): OperatorFunction<T, GroupedObservable<K, R>> {
return operate((source, subscriber) => {
let element: ((value: any) => any) | void;
let seed: (() => ObservableInput<K>) | undefined;
if (!elementOrOptions || typeof elementOrOptions === 'function') {
element = elementOrOptions;
} else {
({ duration, element, connector } = elementOrOptions);
({ duration, element, connector, seed } = elementOrOptions);
}

// A lookup for the groups that we have so far.
Expand Down Expand Up @@ -189,45 +192,7 @@ export function groupBy<T, K, R>(
// OperatorSubscriber will only send the error to the main subscriber.
try {
const key = keySelector(value);

let group = groups.get(key);
if (!group) {
// Create our group subject
groups.set(key, (group = connector ? connector() : new Subject<any>()));

// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
// because the grouped observable has special semantics around reference counting
// to ensure we don't sever our connection to the source prematurely.
const grouped = createGroupedObservable(key, group);
subscriber.next(grouped);

if (duration) {
const durationSubscriber = new OperatorSubscriber(
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
// wnen the duration subscription is torn down. That is important, because then
// if someone holds a handle to the grouped observable and tries to subscribe to it
// after the connection to the source has been severed, they will get an
// `ObjectUnsubscribedError` and know they can't possibly get any notifications.
group as any,
() => {
// Our duration notified! We can complete the group.
// The group will be removed from the map in the teardown phase.
group!.complete();
durationSubscriber?.unsubscribe();
},
// Completions are also sent to the group, but just the group.
undefined,
// Errors on the duration subscriber are sent to the group
// but only the group. They are not sent to the main subscription.
undefined,
// Teardown: Remove this group from our map.
() => groups.delete(key)
);

// Start our duration notifier.
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
}
}
const group = groups.get(key) ?? createGroup(key);

// Send the value to our group.
group.next(element ? element(value) : value);
Expand All @@ -246,9 +211,60 @@ export function groupBy<T, K, R>(
() => groups.clear()
);

if (seed !== undefined) {
groupBySourceSubscriber.add(
innerFrom(seed()).subscribe({
next: (value) => void (groups.has(value) || createGroup(value)),
complete: () => subscriber.complete(),
error: handleError,
})
);
}

// Subscribe to the source
source.subscribe(groupBySourceSubscriber);

function createGroup(key: K): SubjectLike<T> {
// Create our group subject
const group = connector ? connector() : new Subject<any>();
groups.set(key, group);

// Emit the grouped observable. Note that we can't do a simple `asObservable()` here,
// because the grouped observable has special semantics around reference counting
// to ensure we don't sever our connection to the source prematurely.
const grouped = createGroupedObservable(key, group);
subscriber.next(grouped);

if (duration) {
const durationSubscriber = new OperatorSubscriber(
// Providing the group here ensures that it is disposed of -- via `unsubscribe` --
// when the duration subscription is torn down. That is important, because then
// if someone holds a handle to the grouped observable and tries to subscribe to it
// after the connection to the source has been severed, they will get an
// `ObjectUnsubscribedError` and know they can't possibly get any notifications.
group as any,
() => {
// Our duration notified! We can complete the group.
// The group will be removed from the map in the teardown phase.
group.complete();
durationSubscriber.unsubscribe();
},
// Completions are also sent to the group, but just the group.
undefined,
// Errors on the duration subscriber are sent to the group
// but only the group. They are not sent to the main subscription.
undefined,
// Teardown: Remove this group from our map.
() => groups.delete(key)
);

// Start our duration notifier.
groupBySourceSubscriber.add(innerFrom(duration(grouped)).subscribe(durationSubscriber));
}

return group;
}

/**
* Creates the actual grouped observable returned.
* @param key The key of the group
Expand Down