diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 851506a85c..17c8f39b7e 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -175,20 +175,11 @@ public void call(Subscriber o) { * @see RxJava wiki: Implementing Your Own Operators */ @SuppressWarnings("unchecked") - public Observable compose(Transformer transformer) { + public Observable compose(Func1, Observable> transformer) { // Casting to Observable is type-safe because we know Observable is covariant. - return (Observable) transformer.call(this); + return (Observable) ((Func1) transformer).call(this); } - - /** - * Transformer function used by {@link #compose}. - * @warn more complete description needed - */ - public static interface Transformer extends Func1, Observable> { - // cover for generics insanity - } - - + /* ********************************************************************************************************* * Operators Below Here diff --git a/src/test/java/rx/CovarianceTest.java b/src/test/java/rx/CovarianceTest.java index 26f060b5d3..01f2da0295 100644 --- a/src/test/java/rx/CovarianceTest.java +++ b/src/test/java/rx/CovarianceTest.java @@ -16,10 +16,13 @@ package rx; import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; import org.junit.Test; -import rx.Observable.Transformer; import rx.functions.Func1; import rx.functions.Func2; @@ -63,22 +66,22 @@ public Integer call(Media t1, Media t2) { @Test public void testCovarianceOfCompose() { Observable movie = Observable.just(new HorrorMovie()); - Observable movie2 = movie.compose(new Transformer() { + Observable movie2 = movie.compose(new Func1, Observable>() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new Movie()); } }); } - + @Test public void testCovarianceOfCompose2() { Observable movie = Observable. just(new HorrorMovie()); - Observable movie2 = movie.compose(new Transformer() { + Observable movie2 = movie.compose(new Func1, Observable>() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new HorrorMovie()); } }); @@ -87,9 +90,9 @@ public Observable call(Observable t1) { @Test public void testCovarianceOfCompose3() { Observable movie = Observable.just(new HorrorMovie()); - Observable movie2 = movie.compose(new Transformer() { + Observable movie2 = movie.compose(new Func1, Observable>() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return Observable.just(new HorrorMovie()).map(new Func1() { @Override @@ -104,9 +107,9 @@ public HorrorMovie call(HorrorMovie horrorMovie) { @Test public void testCovarianceOfCompose4() { Observable movie = Observable.just(new HorrorMovie()); - Observable movie2 = movie.compose(new Transformer() { + Observable movie2 = movie.compose(new Func1, Observable>() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return t1.map(new Func1() { @Override @@ -118,6 +121,52 @@ public HorrorMovie call(HorrorMovie horrorMovie) { }); } + @Test + public void testComposeWithDeltaLogic() { + List list1 = Arrays.asList(new Movie(), new HorrorMovie(), new ActionMovie()); + List list2 = Arrays.asList(new ActionMovie(), new Movie(), new HorrorMovie(), new ActionMovie()); + Observable> movies = Observable.just(list1, list2); + movies.compose(deltaTransformer); + } + + static Func1>, Observable> deltaTransformer = new Func1>, Observable>() { + @Override + public Observable call(Observable> movieList) { + return movieList + .startWith(new ArrayList()) + .buffer(2, 1) + .skip(1) + .flatMap(calculateDelta); + } + }; + + static Func1>, Observable> calculateDelta = new Func1>, Observable>() { + public Observable call(List> listOfLists) { + if (listOfLists.size() == 1) { + return Observable.from(listOfLists.get(0)); + } else { + // diff the two + List newList = listOfLists.get(1); + List oldList = new ArrayList(listOfLists.get(0)); + + Set delta = new LinkedHashSet(); + delta.addAll(newList); + // remove all that match in old + delta.removeAll(oldList); + + // filter oldList to those that aren't in the newList + oldList.removeAll(newList); + + // for all left in the oldList we'll create DROP events + for (Movie old : oldList) { + delta.add(new Movie()); + } + + return Observable.from(delta); + } + }; + }; + /* * Most tests are moved into their applicable classes such as [Operator]Tests.java */ diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index 33b1e9da02..856b2c8058 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -46,7 +46,6 @@ import org.mockito.MockitoAnnotations; import rx.Observable.OnSubscribe; -import rx.Observable.Transformer; import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Action2; @@ -1107,10 +1106,10 @@ public void call(List booleans) { @Test public void testCompose() { TestSubscriber ts = new TestSubscriber(); - Observable.just(1, 2, 3).compose(new Transformer() { + Observable.just(1, 2, 3).compose(new Func1, Observable>() { @Override - public Observable call(Observable t1) { + public Observable call(Observable t1) { return t1.map(new Func1() { @Override