diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java
index 851506a85c..17c8f39b7e 100644
--- a/src/main/java/rx/Observable.java
+++ b/src/main/java/rx/Observable.java
@@ -175,20 +175,11 @@ public void call(Subscriber super R> o) {
* @see RxJava wiki: Implementing Your Own Operators
*/
@SuppressWarnings("unchecked")
- public Observable compose(Transformer super T, ? extends R> transformer) {
+ public Observable compose(Func1 extends Observable super T>, Observable extends R>> transformer) {
// Casting to Observable is type-safe because we know Observable is covariant.
- return (Observable) transformer.call(this);
+ return (Observable) ((Func1) transformer).call(this);
}
-
- /**
- * Transformer function used by {@link #compose}.
- * @warn more complete description needed
- */
- public static interface Transformer extends Func1, Observable extends R>> {
- // cover for generics insanity
- }
-
-
+
/* *********************************************************************************************************
* Operators Below Here
diff --git a/src/test/java/rx/CovarianceTest.java b/src/test/java/rx/CovarianceTest.java
index 26f060b5d3..01f2da0295 100644
--- a/src/test/java/rx/CovarianceTest.java
+++ b/src/test/java/rx/CovarianceTest.java
@@ -16,10 +16,13 @@
package rx;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
import org.junit.Test;
-import rx.Observable.Transformer;
import rx.functions.Func1;
import rx.functions.Func2;
@@ -63,22 +66,22 @@ public Integer call(Media t1, Media t2) {
@Test
public void testCovarianceOfCompose() {
Observable movie = Observable.just(new HorrorMovie());
- Observable movie2 = movie.compose(new Transformer() {
+ Observable movie2 = movie.compose(new Func1, Observable extends Movie>>() {
@Override
- public Observable extends Movie> call(Observable extends Movie> t1) {
+ public Observable extends Movie> call(Observable t1) {
return Observable.just(new Movie());
}
});
}
-
+
@Test
public void testCovarianceOfCompose2() {
Observable movie = Observable. just(new HorrorMovie());
- Observable movie2 = movie.compose(new Transformer() {
+ Observable movie2 = movie.compose(new Func1, Observable extends HorrorMovie>>() {
@Override
- public Observable extends HorrorMovie> call(Observable extends Movie> t1) {
+ public Observable extends HorrorMovie> call(Observable t1) {
return Observable.just(new HorrorMovie());
}
});
@@ -87,9 +90,9 @@ public Observable extends HorrorMovie> call(Observable extends Movie> t1) {
@Test
public void testCovarianceOfCompose3() {
Observable movie = Observable.just(new HorrorMovie());
- Observable movie2 = movie.compose(new Transformer() {
+ Observable movie2 = movie.compose(new Func1, Observable extends HorrorMovie>>() {
@Override
- public Observable extends HorrorMovie> call(Observable extends Movie> t1) {
+ public Observable extends HorrorMovie> call(Observable t1) {
return Observable.just(new HorrorMovie()).map(new Func1() {
@Override
@@ -104,9 +107,9 @@ public HorrorMovie call(HorrorMovie horrorMovie) {
@Test
public void testCovarianceOfCompose4() {
Observable movie = Observable.just(new HorrorMovie());
- Observable movie2 = movie.compose(new Transformer() {
+ Observable movie2 = movie.compose(new Func1, Observable extends HorrorMovie>>() {
@Override
- public Observable extends HorrorMovie> call(Observable extends HorrorMovie> t1) {
+ public Observable extends HorrorMovie> call(Observable t1) {
return t1.map(new Func1() {
@Override
@@ -118,6 +121,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) {
});
}
+ @Test
+ public void testComposeWithDeltaLogic() {
+ List list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie());
+ List list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie());
+ Observable> movies = Observable.just(list1, list2);
+ movies.compose(deltaTransformer);
+ }
+
+ static Func1>, Observable extends Movie>> deltaTransformer = new Func1>, Observable extends Movie>>() {
+ @Override
+ public Observable call(Observable> movieList) {
+ return movieList
+ .startWith(new ArrayList())
+ .buffer(2, 1)
+ .skip(1)
+ .flatMap(calculateDelta);
+ }
+ };
+
+ static Func1>, Observable> calculateDelta = new Func1>, Observable>() {
+ public Observable call(List> listOfLists) {
+ if (listOfLists.size() == 1) {
+ return Observable.from(listOfLists.get(0));
+ } else {
+ // diff the two
+ List newList = listOfLists.get(1);
+ List oldList = new ArrayList(listOfLists.get(0));
+
+ Set delta = new LinkedHashSet();
+ delta.addAll(newList);
+ // remove all that match in old
+ delta.removeAll(oldList);
+
+ // filter oldList to those that aren't in the newList
+ oldList.removeAll(newList);
+
+ // for all left in the oldList we'll create DROP events
+ for (Movie old : oldList) {
+ delta.add(new Movie());
+ }
+
+ return Observable.from(delta);
+ }
+ };
+ };
+
/*
* Most tests are moved into their applicable classes such as [Operator]Tests.java
*/
diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java
index 33b1e9da02..856b2c8058 100644
--- a/src/test/java/rx/ObservableTests.java
+++ b/src/test/java/rx/ObservableTests.java
@@ -46,7 +46,6 @@
import org.mockito.MockitoAnnotations;
import rx.Observable.OnSubscribe;
-import rx.Observable.Transformer;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Action2;
@@ -1107,10 +1106,10 @@ public void call(List booleans) {
@Test
public void testCompose() {
TestSubscriber ts = new TestSubscriber();
- Observable.just(1, 2, 3).compose(new Transformer() {
+ Observable.just(1, 2, 3).compose(new Func1, Observable extends String>>() {
@Override
- public Observable call(Observable extends Integer> t1) {
+ public Observable call(Observable t1) {
return t1.map(new Func1() {
@Override