Skip to content

Commit bcf2a39

Browse files
Merge pull request #1789 from benjchristensen/1668-groupedObservable
GroupedObservable.from/create
2 parents 0741bb4 + cff4962 commit bcf2a39

File tree

2 files changed

+58
-6
lines changed

2 files changed

+58
-6
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public void onNext(T t) {
188188
private GroupState<K, T> createNewGroup(final K key) {
189189
final GroupState<K, T> groupState = new GroupState<K, T>();
190190

191-
GroupedObservable<K, R> go = new GroupedObservable<K, R>(key, new OnSubscribe<R>() {
191+
GroupedObservable<K, R> go = GroupedObservable.create(key, new OnSubscribe<R>() {
192192

193193
@Override
194194
public void call(final Subscriber<? super R> o) {

src/main/java/rx/observables/GroupedObservable.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,16 @@
1616
package rx.observables;
1717

1818
import rx.Observable;
19+
import rx.Scheduler;
20+
import rx.Subscriber;
1921
import rx.functions.Func1;
2022

2123
/**
22-
* An {@link Observable} that has been grouped by key, the value of which can be obtained with
23-
* {@link #getKey()}.
24+
* An {@link Observable} that has been grouped by key, the value of which can be obtained with {@link #getKey()}.
2425
* <p>
2526
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
26-
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
27-
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
27+
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they
28+
* may
2829
* discard their buffers by applying an operator like {@link Observable#take}{@code (0)} to them.
2930
*
3031
* @param <K>
@@ -37,7 +38,58 @@
3738
public class GroupedObservable<K, T> extends Observable<T> {
3839
private final K key;
3940

40-
public GroupedObservable(K key, OnSubscribe<T> onSubscribe) {
41+
/**
42+
* Converts an {@link Observable} into a {@code GroupedObservable} with a particular key.
43+
*
44+
* @param key
45+
* the key to identify the group of items emitted by this {@code GroupedObservable}
46+
* @param o
47+
* the {@link Observable} to convert
48+
* @return a {@code GroupedObservable} representation of {@code o}, with key {@code key}
49+
*/
50+
public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o) {
51+
return new GroupedObservable<K, T>(key, new OnSubscribe<T>() {
52+
53+
@Override
54+
public void call(Subscriber<? super T> s) {
55+
o.unsafeSubscribe(s);
56+
}
57+
});
58+
}
59+
60+
/**
61+
* Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
62+
* it.
63+
* <p>
64+
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="">
65+
* <p>
66+
* Write the function you pass to {@code create} so that it behaves as an Observable: It should invoke the
67+
* Subscriber's {@link Subscriber#onNext onNext}, {@link Subscriber#onError onError}, and {@link Subscriber#onCompleted onCompleted} methods appropriately.
68+
* <p>
69+
* A well-formed Observable must invoke either the Subscriber's {@code onCompleted} method exactly once or
70+
* its {@code onError} method exactly once.
71+
* <p>
72+
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
73+
* information.
74+
* <dl>
75+
* <dt><b>Scheduler:</b></dt>
76+
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
77+
* </dl>
78+
*
79+
* @param <K>
80+
* the type of the key
81+
* @param <T>
82+
* the type of the items that this Observable emits
83+
* @param f
84+
* a function that accepts an {@code Subscriber<T>}, and invokes its {@code onNext}, {@code onError}, and {@code onCompleted} methods as appropriate
85+
* @return a GroupedObservable that, when a {@link Subscriber} subscribes to it, will execute the specified
86+
* function
87+
*/
88+
public final static <K, T> GroupedObservable<K, T> create(K key, OnSubscribe<T> f) {
89+
return new GroupedObservable<K, T>(key, f);
90+
}
91+
92+
protected GroupedObservable(K key, OnSubscribe<T> onSubscribe) {
4193
super(onSubscribe);
4294
this.key = key;
4395
}

0 commit comments

Comments
 (0)