diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index e706cf5d2e..76f92b2214 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -20,7 +20,6 @@ import rx.internal.operators.*; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; - import rx.observables.*; import rx.observers.SafeSubscriber; import rx.plugins.*; @@ -3459,8 +3458,8 @@ public final Observable cast(final Class klass) { *
{@code collect} does not operate by default on a particular {@link Scheduler}.
* * - * @param state - * the mutable data structure that will collect the items + * @param stateFactory + * factory for the mutable data structure that will collect the items * @param collector * a function that accepts the {@code state} and an emitted item, and modifies {@code state} * accordingly @@ -3468,7 +3467,7 @@ public final Observable cast(final Class klass) { * into a single mutable data structure * @see RxJava wiki: collect */ - public final Observable collect(R state, final Action2 collector) { + public final Observable collect(Func0 stateFactory, final Action2 collector) { Func2 accumulator = new Func2() { @Override @@ -3478,7 +3477,7 @@ public final R call(R state, T value) { } }; - return reduce(state, accumulator); + return reduce(stateFactory, accumulator); } /** @@ -3565,7 +3564,14 @@ public final Boolean call(T t1) { * @see #countLong() */ public final Observable count() { - return reduce(0, new Func2() { + return reduce(new Func0() { + + @Override + public Integer call() { + return 0; + } + + }, new Func2() { @Override public final Integer call(Integer t1, T t2) { return t1 + 1; @@ -3592,7 +3598,14 @@ public final Integer call(Integer t1, T t2) { * @see #count() */ public final Observable countLong() { - return reduce(0L, new Func2() { + return reduce(new Func0() { + + @Override + public Long call() { + return 0L; + } + + }, new Func2() { @Override public final Long call(Long t1, T t2) { return t1 + 1; @@ -5260,39 +5273,6 @@ public final Observable reduce(Func2 accumulator) { return scan(accumulator).last(); } - /** - * Returns an Observable that applies a specified accumulator function to the first item emitted by a source - * Observable and a specified seed value, then feeds the result of that function along with the second item - * emitted by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole item. - *

- * - *

- * This technique, which is called "reduce" here, is sometimec called "aggregate," "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an {@code inject} method - * that does a similar operation on lists. - *

- *
Backpressure Support:
- *
This operator does not support backpressure because by intent it will receive all values and reduce - * them to a single {@code onNext}.
- *
Scheduler:
- *
{@code reduce} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param initialValue - * the initial (seed) accumulator value - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source Observable, the - * result of which will be used in the next accumulator call - * @return an Observable that emits a single item that is the result of accumulating the output from the - * items emitted by the source Observable - * @see RxJava wiki: reduce - * @see Wikipedia: Fold (higher-order function) - */ - public final Observable reduce(R initialValue, Func2 accumulator) { - return scan(initialValue, accumulator).takeLast(1); - } - /** * Returns an Observable that applies a specified accumulator function to the first item emitted by a source * Observable and a specified seed value, then feeds the result of that function along with the second item @@ -6329,37 +6309,6 @@ public final Observable scan(Func2 accumulator) { return lift(new OperatorScan(accumulator)); } - /** - * Returns an Observable that applies a specified accumulator function to the first item emitted by a source - * Observable and a seed value, then feeds the result of that function along with the second item emitted by - * the source Observable into the same function, and so on until all items have been emitted by the source - * Observable, emitting the result of each of these iterations. - *

- * - *

- * This sort of function is sometimes called an accumulator. - *

- * Note that the Observable that results from this method will emit {@code initialValue} as its first - * emitted item. - *

- *
Scheduler:
- *
{@code scan} does not operate by default on a particular {@link Scheduler}.
- *
- * - * @param initialValue - * the initial (seed) accumulator item - * @param accumulator - * an accumulator function to be invoked on each item emitted by the source Observable, whose - * result will be emitted to {@link Observer}s via {@link Observer#onNext onNext} and used in the - * next accumulator call - * @return an Observable that emits {@code initialValue} followed by the results of each call to the - * accumulator function - * @see RxJava wiki: scan - */ - public final Observable scan(R initialValue, Func2 accumulator) { - return lift(new OperatorScan(initialValue, accumulator)); - } - /** * Returns an Observable that applies a specified accumulator function to the first item emitted by a source * Observable and a seed value, then feeds the result of that function along with the second item emitted by diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 3a795aafd0..d58565ea2d 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -45,6 +45,7 @@ import rx.Scheduler; import rx.Subscriber; import rx.functions.Action0; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.schedulers.Schedulers; @@ -105,7 +106,14 @@ public RetryWithPredicate(Func2 predicate) { @Override public Observable> call(Observable> ts) { - return ts.scan(Notification.createOnNext(0), new Func2, Notification, Notification>() { + return ts.scan(new Func0>() { + + @Override + public Notification call() { + return Notification.createOnNext(0); + } + + }, new Func2, Notification, Notification>() { @SuppressWarnings("unchecked") @Override public Notification call(Notification n, Notification term) { diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index 9dec4e616f..4549aaa1ad 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -50,6 +50,7 @@ import rx.exceptions.OnErrorNotImplementedException; import rx.functions.Action1; import rx.functions.Action2; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.observables.ConnectableObservable; @@ -277,7 +278,7 @@ public void call(Integer t1) { @Test public void testReduceWithEmptyObservableAndSeed() { Observable observable = Observable.range(1, 0); - int value = observable.reduce(1, new Func2() { + int value = observable.startWith(1).reduce(new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -292,7 +293,7 @@ public Integer call(Integer t1, Integer t2) { @Test public void testReduceWithInitialValue() { Observable observable = Observable.just(1, 2, 3, 4); - observable.reduce(50, new Func2() { + observable.startWith(50).reduce(new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -965,7 +966,14 @@ public void testRangeWithScheduler() { @Test public void testCollectToList() { - List list = Observable.just(1, 2, 3).collect(new ArrayList(), new Action2, Integer>() { + List list = Observable.just(1, 2, 3).collect(new Func0>() { + + @Override + public List call() { + return new ArrayList(); + } + + }, new Action2, Integer>() { @Override public void call(List list, Integer v) { @@ -981,7 +989,14 @@ public void call(List list, Integer v) { @Test public void testCollectToString() { - String value = Observable.just(1, 2, 3).collect(new StringBuilder(), new Action2() { + String value = Observable.just(1, 2, 3).collect(new Func0() { + + @Override + public StringBuilder call() { + return new StringBuilder(); + } + + }, new Action2() { @Override public void call(StringBuilder sb, Integer v) { diff --git a/src/test/java/rx/ScanTests.java b/src/test/java/rx/ScanTests.java index 36cb429255..ad8f33645c 100644 --- a/src/test/java/rx/ScanTests.java +++ b/src/test/java/rx/ScanTests.java @@ -22,6 +22,7 @@ import rx.EventStream.Event; import rx.functions.Action1; +import rx.functions.Func0; import rx.functions.Func2; public class ScanTests { @@ -30,7 +31,14 @@ public class ScanTests { public void testUnsubscribeScan() { EventStream.getEventStream("HTTP-ClusterB", 20) - .scan(new HashMap(), new Func2, Event, Map>() { + .scan(new Func0>() { + + @Override + public Map call() { + return new HashMap(); + } + + }, new Func2, Event, Map>() { @Override public Map call(Map accum, Event perInstanceEvent) { diff --git a/src/test/java/rx/ZipTests.java b/src/test/java/rx/ZipTests.java index 5d35ea94bf..6d9e2ab34a 100644 --- a/src/test/java/rx/ZipTests.java +++ b/src/test/java/rx/ZipTests.java @@ -35,6 +35,7 @@ import rx.CovarianceTest.Result; import rx.EventStream.Event; import rx.functions.Action1; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.functions.FuncN; @@ -57,7 +58,14 @@ public String call(Event e) { @Override public Observable> call(final GroupedObservable ge) { - return ge.scan(new HashMap(), new Func2, Event, Map>() { + return ge.scan(new Func0>() { + + @Override + public Map call() { + return new HashMap(); + } + + }, new Func2, Event, Map>() { @Override public Map call(Map accum, Event perInstanceEvent) { diff --git a/src/test/java/rx/internal/operators/OperatorReduceTest.java b/src/test/java/rx/internal/operators/OperatorReduceTest.java index c550c835ea..c5f450a119 100644 --- a/src/test/java/rx/internal/operators/OperatorReduceTest.java +++ b/src/test/java/rx/internal/operators/OperatorReduceTest.java @@ -30,6 +30,7 @@ import rx.Observable; import rx.Observer; import rx.exceptions.TestException; +import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; import rx.internal.util.UtilityFunctions; @@ -53,7 +54,7 @@ public Integer call(Integer t1, Integer t2) { @Test public void testAggregateAsIntSum() { - Observable result = Observable.just(1, 2, 3, 4, 5).reduce(0, sum).map(UtilityFunctions. identity()); + Observable result = Observable.just(1, 2, 3, 4, 5).reduce(sum).map(UtilityFunctions. identity()); result.subscribe(observer); @@ -66,7 +67,7 @@ public void testAggregateAsIntSum() { public void testAggregateAsIntSumSourceThrows() { Observable result = Observable.concat(Observable.just(1, 2, 3, 4, 5), Observable. error(new TestException())) - .reduce(0, sum).map(UtilityFunctions. identity()); + .reduce(sum).map(UtilityFunctions. identity()); result.subscribe(observer); @@ -85,7 +86,7 @@ public Integer call(Integer t1, Integer t2) { }; Observable result = Observable.just(1, 2, 3, 4, 5) - .reduce(0, sumErr).map(UtilityFunctions. identity()); + .reduce(sumErr).map(UtilityFunctions. identity()); result.subscribe(observer); @@ -106,7 +107,7 @@ public Integer call(Integer t1) { }; Observable result = Observable.just(1, 2, 3, 4, 5) - .reduce(0, sum).map(error); + .reduce(sum).map(error); result.subscribe(observer); @@ -127,7 +128,14 @@ public void testBackpressureWithNoInitialValue() throws InterruptedException { @Test public void testBackpressureWithInitialValue() throws InterruptedException { Observable source = Observable.just(1, 2, 3, 4, 5, 6); - Observable reduced = source.reduce(0, sum); + Observable reduced = source.reduce(new Func0() { + + @Override + public Integer call() { + return 0; + } + + }, sum); Integer r = reduced.toBlocking().first(); assertEquals(21, r.intValue()); diff --git a/src/test/java/rx/internal/operators/OperatorScanTest.java b/src/test/java/rx/internal/operators/OperatorScanTest.java index c382e4da20..5b995bfd5d 100644 --- a/src/test/java/rx/internal/operators/OperatorScanTest.java +++ b/src/test/java/rx/internal/operators/OperatorScanTest.java @@ -56,7 +56,14 @@ public void testScanIntegersWithInitialValue() { Observable observable = Observable.just(1, 2, 3); - Observable m = observable.scan("", new Func2() { + Observable m = observable.scan(new Func0() { + + @Override + public String call() { + return ""; + } + + }, new Func2() { @Override public String call(String s, Integer n) { @@ -131,7 +138,7 @@ public Integer call(Integer t1, Integer t2) { @Test public void shouldNotEmitUntilAfterSubscription() { TestSubscriber ts = new TestSubscriber(); - Observable.range(1, 100).scan(0, new Func2() { + Observable.range(1, 100).scan(new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -155,7 +162,14 @@ public Boolean call(Integer t1) { public void testBackpressureWithInitialValue() { final AtomicInteger count = new AtomicInteger(); Observable.range(1, 100) - .scan(0, new Func2() { + .scan(new Func0() { + + @Override + public Integer call() { + return 0; + } + + }, new Func2() { @Override public Integer call(Integer t1, Integer t2) { @@ -237,7 +251,14 @@ public void onNext(Integer t) { public void testNoBackpressureWithInitialValue() { final AtomicInteger count = new AtomicInteger(); Observable.range(1, 100) - .scan(0, new Func2() { + .scan(new Func0() { + + @Override + public Integer call() { + return 0; + } + + }, new Func2() { @Override public Integer call(Integer t1, Integer t2) {