Skip to content

Commit 26ffe5e

Browse files
Merge pull request #1776 from benjchristensen/compose-generics
Observable.compose Generics
2 parents 502405d + 4e88e56 commit 26ffe5e

File tree

3 files changed

+101
-9
lines changed

3 files changed

+101
-9
lines changed

src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,14 @@ public void call(Subscriber<? super R> o) {
177177
@SuppressWarnings("unchecked")
178178
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
179179
// Casting to Observable<R> is type-safe because we know Observable is covariant.
180-
return (Observable<R>) transformer.call(this);
180+
return (Observable<R>) ((Transformer<T, ? extends R>) transformer).call(this);
181181
}
182-
182+
183183
/**
184184
* Transformer function used by {@link #compose}.
185185
* @warn more complete description needed
186186
*/
187-
public static interface Transformer<T, R> extends Func1<Observable<? extends T>, Observable<? extends R>> {
187+
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<? extends R>> {
188188
// cover for generics insanity
189189
}
190190

src/test/java/rx/CovarianceTest.java

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,21 @@
1515
*/
1616
package rx;
1717

18+
import static org.junit.Assert.assertEquals;
19+
1820
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.LinkedHashSet;
23+
import java.util.List;
24+
import java.util.Set;
1925

2026
import org.junit.Test;
2127

2228
import rx.Observable.Transformer;
2329
import rx.functions.Func1;
2430
import rx.functions.Func2;
31+
import rx.observables.GroupedObservable;
32+
import rx.observers.TestSubscriber;
2533

2634
/**
2735
* Test super/extends of generics.
@@ -59,14 +67,52 @@ public Integer call(Media t1, Media t2) {
5967
o2.toSortedList(SORT_FUNCTION);
6068
}
6169

62-
70+
@Test
71+
public void testGroupByCompose() {
72+
Observable<Movie> movies = Observable.just(new HorrorMovie(), new ActionMovie(), new Movie());
73+
TestSubscriber<String> ts = new TestSubscriber<String>();
74+
movies.groupBy(new Func1<Movie, Class<? extends Movie>>() {
75+
76+
@Override
77+
public Class<? extends Movie> call(Movie m) {
78+
return m.getClass();
79+
}
80+
81+
}).flatMap(new Func1<GroupedObservable<Class<? extends Movie>, Movie>, Observable<String>>() {
82+
83+
@Override
84+
public Observable<String> call(GroupedObservable<Class<? extends Movie>, Movie> g) {
85+
return g.compose(new Transformer<Movie, Movie>() {
86+
87+
@Override
88+
public Observable<? extends Movie> call(Observable<Movie> m) {
89+
return m.concatWith(Observable.just(new ActionMovie()));
90+
}
91+
92+
}).map(new Func1<Movie, String>() {
93+
94+
@Override
95+
public String call(Movie m) {
96+
return m.toString();
97+
}
98+
99+
});
100+
}
101+
102+
}).subscribe(ts);
103+
ts.assertTerminalEvent();
104+
ts.assertNoErrors();
105+
// System.out.println(ts.getOnNextEvents());
106+
assertEquals(6, ts.getOnNextEvents().size());
107+
}
108+
63109
@Test
64110
public void testCovarianceOfCompose() {
65111
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
66112
Observable<Movie> movie2 = movie.compose(new Transformer<Movie, Movie>() {
67113

68114
@Override
69-
public Observable<? extends Movie> call(Observable<? extends Movie> t1) {
115+
public Observable<? extends Movie> call(Observable<Movie> t1) {
70116
return Observable.just(new Movie());
71117
}
72118

@@ -78,7 +124,7 @@ public void testCovarianceOfCompose2() {
78124
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
79125
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
80126
@Override
81-
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
127+
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
82128
return Observable.just(new HorrorMovie());
83129
}
84130
});
@@ -89,7 +135,7 @@ public void testCovarianceOfCompose3() {
89135
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
90136
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<Movie, HorrorMovie>() {
91137
@Override
92-
public Observable<? extends HorrorMovie> call(Observable<? extends Movie> t1) {
138+
public Observable<? extends HorrorMovie> call(Observable<Movie> t1) {
93139
return Observable.just(new HorrorMovie()).map(new Func1<HorrorMovie, HorrorMovie>() {
94140

95141
@Override
@@ -106,7 +152,7 @@ public void testCovarianceOfCompose4() {
106152
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
107153
Observable<HorrorMovie> movie2 = movie.compose(new Transformer<HorrorMovie, HorrorMovie>() {
108154
@Override
109-
public Observable<? extends HorrorMovie> call(Observable<? extends HorrorMovie> t1) {
155+
public Observable<? extends HorrorMovie> call(Observable<HorrorMovie> t1) {
110156
return t1.map(new Func1<HorrorMovie, HorrorMovie>() {
111157

112158
@Override
@@ -118,6 +164,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) {
118164
});
119165
}
120166

167+
@Test
168+
public void testComposeWithDeltaLogic() {
169+
List<Movie> list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
170+
List<Movie> list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
171+
Observable<List<Movie>> movies = Observable.just(list1, list2);
172+
movies.compose(deltaTransformer);
173+
}
174+
175+
static Transformer<List<Movie>, Movie> deltaTransformer = new Transformer<List<Movie>, Movie>() {
176+
@Override
177+
public Observable<Movie> call(Observable<List<Movie>> movieList) {
178+
return movieList
179+
.startWith(new ArrayList<Movie>())
180+
.buffer(2, 1)
181+
.skip(1)
182+
.flatMap(calculateDelta);
183+
}
184+
};
185+
186+
static Func1<List<List<Movie>>, Observable<Movie>> calculateDelta = new Func1<List<List<Movie>>, Observable<Movie>>() {
187+
public Observable<Movie> call(List<List<Movie>> listOfLists) {
188+
if (listOfLists.size() == 1) {
189+
return Observable.from(listOfLists.get(0));
190+
} else {
191+
// diff the two
192+
List<Movie> newList = listOfLists.get(1);
193+
List<Movie> oldList = new ArrayList<Movie>(listOfLists.get(0));
194+
195+
Set<Movie> delta = new LinkedHashSet<Movie>();
196+
delta.addAll(newList);
197+
// remove all that match in old
198+
delta.removeAll(oldList);
199+
200+
// filter oldList to those that aren't in the newList
201+
oldList.removeAll(newList);
202+
203+
// for all left in the oldList we'll create DROP events
204+
for (Movie old : oldList) {
205+
delta.add(new Movie());
206+
}
207+
208+
return Observable.from(delta);
209+
}
210+
};
211+
};
212+
121213
/*
122214
* Most tests are moved into their applicable classes such as [Operator]Tests.java
123215
*/

src/test/java/rx/ObservableTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ public void testCompose() {
11101110
Observable.just(1, 2, 3).compose(new Transformer<Integer, String>() {
11111111

11121112
@Override
1113-
public Observable<String> call(Observable<? extends Integer> t1) {
1113+
public Observable<String> call(Observable<Integer> t1) {
11141114
return t1.map(new Func1<Integer, String>() {
11151115

11161116
@Override

0 commit comments

Comments
 (0)