diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java index 6ca01eca3e..81388089b2 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.functions.Func1; import rx.plugins.RxJavaPlugins; @@ -49,17 +50,29 @@ public OperatorOnErrorResumeNextViaFunction(Func1 call(final Subscriber child) { - return new Subscriber(child) { + Subscriber parent = new Subscriber() { + private boolean done = false; + @Override public void onCompleted() { + if (done) { + return; + } + done = true; child.onCompleted(); } @Override public void onError(Throwable e) { + if (done) { + Exceptions.throwIfFatal(e); + return; + } + done = true; try { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + unsubscribe(); Observable resume = resumeFunction.call(e); resume.unsafeSubscribe(child); } catch (Throwable e2) { @@ -69,10 +82,15 @@ public void onError(Throwable e) { @Override public void onNext(T t) { + if (done) { + return; + } child.onNext(t); } }; + child.add(parent); + return parent; } } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java index c49207fc18..ac0c16ae2a 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.plugins.RxJavaPlugins; /** @@ -51,13 +52,24 @@ public OperatorOnErrorResumeNextViaObservable(Observable resumeSequ public Subscriber call(final Subscriber child) { // shared subscription won't work here Subscriber s = new Subscriber() { + + private boolean done = false; + @Override public void onNext(T t) { + if (done) { + return; + } child.onNext(t); } @Override public void onError(Throwable e) { + if (done) { + Exceptions.throwIfFatal(e); + return; + } + done = true; RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); resumeSequence.unsafeSubscribe(child); @@ -65,6 +77,10 @@ public void onError(Throwable e) { @Override public void onCompleted() { + if (done) { + return; + } + done = true; child.onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java index c0ec388a91..b7f477c068 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnErrorReturn.java @@ -16,9 +16,11 @@ package rx.internal.operators; import java.util.Arrays; + import rx.Observable.Operator; import rx.Subscriber; import rx.exceptions.CompositeException; +import rx.exceptions.Exceptions; import rx.functions.Func1; import rx.plugins.RxJavaPlugins; @@ -50,7 +52,7 @@ public OperatorOnErrorReturn(Func1 resultFunction) { @Override public Subscriber call(final Subscriber child) { - return new Subscriber(child) { + Subscriber parent = new Subscriber() { private boolean done = false; @@ -65,13 +67,14 @@ public void onNext(T t) { @Override public void onError(Throwable e) { if (done) { + Exceptions.throwIfFatal(e); return; } done = true; try { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + unsubscribe(); T result = resultFunction.call(e); - child.onNext(result); } catch (Throwable x) { child.onError(new CompositeException(Arrays.asList(e, x))); @@ -90,5 +93,7 @@ public void onCompleted() { } }; + child.add(parent); + return parent; } } diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java index 9de09bfc2e..247348aac7 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java @@ -18,6 +18,7 @@ import rx.Observable; import rx.Observable.Operator; import rx.Subscriber; +import rx.exceptions.Exceptions; import rx.plugins.RxJavaPlugins; /** @@ -56,13 +57,23 @@ public Subscriber call(final Subscriber child) { // needs to independently unsubscribe so child can continue with the resume Subscriber s = new Subscriber() { + private boolean done = false; + @Override public void onNext(T t) { + if (done) { + return; + } child.onNext(t); } @Override public void onError(Throwable e) { + if (done) { + Exceptions.throwIfFatal(e); + return; + } + done = true; if (e instanceof Exception) { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); unsubscribe(); @@ -74,6 +85,10 @@ public void onError(Throwable e) { @Override public void onCompleted() { + if (done) { + return; + } + done = true; child.onCompleted(); } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java index d25af29886..457557d283 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunctionTest.java @@ -35,6 +35,7 @@ import rx.Subscription; import rx.functions.Func1; import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; public class OperatorOnErrorResumeNextViaFunctionTest { @@ -47,6 +48,8 @@ public void testResumeNextWithSynchronousExecution() { public void call(Subscriber observer) { observer.onNext("one"); observer.onError(new Throwable("injected failure")); + observer.onNext("two"); + observer.onNext("three"); } }); @@ -226,6 +229,47 @@ public Observable call(Throwable t1) { System.out.println(ts.getOnNextEvents()); ts.assertReceivedOnNext(Arrays.asList("success")); } + + @Test + public void testMapResumeAsyncNext() { + // Trigger multiple failures + Observable w = Observable.just("one", "fail", "two", "three", "fail"); + + // Introduce map function that fails intermittently (Map does not prevent this when the observer is a + // rx.operator incl onErrorResumeNextViaObservable) + w = w.map(new Func1() { + @Override + public String call(String s) { + if ("fail".equals(s)) + throw new RuntimeException("Forced Failure"); + System.out.println("BadMapper:" + s); + return s; + } + }); + + Observable observable = w.onErrorResumeNext(new Func1>() { + + @Override + public Observable call(Throwable t1) { + return Observable.just("twoResume", "threeResume").subscribeOn(Schedulers.computation()); + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestSubscriber ts = new TestSubscriber(observer); + observable.subscribe(ts); + ts.awaitTerminalEvent(); + + verify(observer, Mockito.never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + verify(observer, times(1)).onNext("one"); + verify(observer, Mockito.never()).onNext("two"); + verify(observer, Mockito.never()).onNext("three"); + verify(observer, times(1)).onNext("twoResume"); + verify(observer, times(1)).onNext("threeResume"); + } private static class TestObservable implements Observable.OnSubscribe { diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java index 7068f38edd..7ee89a6ead 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java @@ -31,6 +31,8 @@ import rx.Observer; import rx.Subscriber; import rx.functions.Func1; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; public class OperatorOnErrorReturnTest { @@ -104,6 +106,46 @@ public String call(Throwable e) { verify(observer, times(0)).onCompleted(); assertNotNull(capturedException.get()); } + + @Test + public void testMapResumeAsyncNext() { + // Trigger multiple failures + Observable w = Observable.just("one", "fail", "two", "three", "fail"); + + // Introduce map function that fails intermittently (Map does not prevent this when the observer is a + // rx.operator incl onErrorResumeNextViaObservable) + w = w.map(new Func1() { + @Override + public String call(String s) { + if ("fail".equals(s)) + throw new RuntimeException("Forced Failure"); + System.out.println("BadMapper:" + s); + return s; + } + }); + + Observable observable = w.onErrorReturn(new Func1() { + + @Override + public String call(Throwable t1) { + return "resume"; + } + + }); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestSubscriber ts = new TestSubscriber(observer); + observable.subscribe(ts); + ts.awaitTerminalEvent(); + + verify(observer, Mockito.never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + verify(observer, times(1)).onNext("one"); + verify(observer, Mockito.never()).onNext("two"); + verify(observer, Mockito.never()).onNext("three"); + verify(observer, times(1)).onNext("resume"); + } private static class TestObservable implements Observable.OnSubscribe {