diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 12fcb57551..95ffbdd1b9 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3823,7 +3823,7 @@ public final Observable defaultIfEmpty(T defaultValue) { public final Observable delay( Func0> subscriptionDelay, Func1> itemDelay) { - return create(new OnSubscribeDelayWithSelector(this, subscriptionDelay, itemDelay)); + return delaySubscription(subscriptionDelay).lift(new OperatorDelayWithSelector(this, itemDelay)); } /** @@ -3851,7 +3851,7 @@ public final Observable delay( * @see MSDN: Observable.Delay */ public final Observable delay(Func1> itemDelay) { - return create(new OnSubscribeDelayWithSelector(this, itemDelay)); + return lift(new OperatorDelayWithSelector(this, itemDelay)); } /** @@ -3897,7 +3897,7 @@ public final Observable delay(long delay, TimeUnit unit) { * @see MSDN: Observable.Delay */ public final Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { - return create(new OnSubscribeDelay(this, delay, unit, scheduler)); + return lift(new OperatorDelay(this, delay, unit, scheduler)); } /** @@ -3943,6 +3943,26 @@ public final Observable delaySubscription(long delay, TimeUnit unit) { public final Observable delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) { return create(new OnSubscribeDelaySubscription(this, delay, unit, scheduler)); } + + /** + * Returns an Observable that delays the subscription to the source Observable by a given amount of time. + *

+ * + *

+ *
Scheduler:
+ *
This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.
+ *
+ * + * @param delay + * the time to delay the subscription + * @param unit + * the time unit of {@code delay} + * @return an Observable that delays the subscription to the source Observable by the given amount + * @see RxJava wiki: delaySubscription + */ + public final Observable delaySubscription(Func0> subscriptionDelay) { + return create(new OnSubscribeDelaySubscriptionWithSelector(this, subscriptionDelay)); + } /** * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelay.java b/src/main/java/rx/internal/operators/OnSubscribeDelay.java deleted file mode 100644 index ce83bf0f4d..0000000000 --- a/src/main/java/rx/internal/operators/OnSubscribeDelay.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import java.util.concurrent.TimeUnit; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Func1; - -/** - * Delays the emission of onNext events by a given amount of time. - * @param the value type - */ -public final class OnSubscribeDelay implements OnSubscribe { - - final Observable source; - final long delay; - final TimeUnit unit; - final Scheduler scheduler; - - public OnSubscribeDelay(Observable source, long delay, TimeUnit unit, Scheduler scheduler) { - this.source = source; - this.delay = delay; - this.unit = unit; - this.scheduler = scheduler; - } - - @Override - public void call(Subscriber child) { - final Worker worker = scheduler.createWorker(); - child.add(worker); - - Observable.concat(source.map(new Func1>() { - @Override - public Observable call(T x) { - Emitter e = new Emitter(x); - worker.schedule(e, delay, unit); - return Observable.create(e); - } - })).unsafeSubscribe(child); - } - - /** - * Emits a value once the call() is invoked. - * Only one subscriber can wait for the emission. - * @param the value type - */ - public static final class Emitter implements OnSubscribe, Action0 { - final T value; - - final Object guard; - /** Guarded by guard. */ - Subscriber child; - /** Guarded by guard. */ - boolean done; - - public Emitter(T value) { - this.value = value; - this.guard = new Object(); - } - - @Override - public void call(Subscriber s) { - synchronized (guard) { - if (!done) { - child = s; - return; - } - } - s.onNext(value); - s.onCompleted(); - } - - @Override - public void call() { - Subscriber s; - synchronized (guard) { - done = true; - s = child; - child = null; - } - if (s != null) { - s.onNext(value); - s.onCompleted(); - } - } - } -} diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java new file mode 100644 index 0000000000..64e0997474 --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java @@ -0,0 +1,65 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.functions.Func0; + +/** + * Delays the subscription until the Observable emits an event. + * + * @param + * the value type + */ +public final class OnSubscribeDelaySubscriptionWithSelector implements OnSubscribe { + final Observable source; + final Func0> subscriptionDelay; + + public OnSubscribeDelaySubscriptionWithSelector(Observable source, Func0> subscriptionDelay) { + this.source = source; + this.subscriptionDelay = subscriptionDelay; + } + + @Override + public void call(final Subscriber child) { + try { + subscriptionDelay.call().take(1).unsafeSubscribe(new Subscriber() { + + @Override + public void onCompleted() { + // subscribe to actual source + source.unsafeSubscribe(child); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(U t) { + // ignore as we'll complete immediately because of take(1) + } + + }); + } catch (Throwable e) { + child.onError(e); + } + } + +} diff --git a/src/main/java/rx/internal/operators/OnSubscribeDelayWithSelector.java b/src/main/java/rx/internal/operators/OnSubscribeDelayWithSelector.java deleted file mode 100644 index c975c4e902..0000000000 --- a/src/main/java/rx/internal/operators/OnSubscribeDelayWithSelector.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Subscriber; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.internal.operators.OnSubscribeDelay.Emitter; -import rx.observers.SerializedSubscriber; -import rx.subscriptions.CompositeSubscription; - -/** - * Delay the subscription and emission of the source items by a per-item observable that fires its first element. - * @param the item type - * @param the value type of the subscription-delaying observable - * @param the value type of the item-delaying observable - */ -public final class OnSubscribeDelayWithSelector implements OnSubscribe { - final Observable source; - final Func0> subscriptionDelay; - final Func1> itemDelay; - - public OnSubscribeDelayWithSelector(Observable source, Func1> itemDelay) { - this.source = source; - this.subscriptionDelay = new Func0>() { - @Override - public Observable call() { - return Observable.just(null); - } - }; - this.itemDelay = itemDelay; - } - - public OnSubscribeDelayWithSelector(Observable source, Func0> subscriptionDelay, Func1> itemDelay) { - this.source = source; - this.subscriptionDelay = subscriptionDelay; - this.itemDelay = itemDelay; - } - - @Override - public void call(Subscriber child) { - final SerializedSubscriber s = new SerializedSubscriber(child); - final CompositeSubscription csub = new CompositeSubscription(); - child.add(csub); - - Observable osub; - try { - osub = subscriptionDelay.call(); - } catch (Throwable e) { - s.onError(e); - return; - } - - Observable> seqs = source.map(new Func1>() { - @Override - public Observable call(final T x) { - final Emitter e = new Emitter(x); - Observable itemObs = itemDelay.call(x); - - Subscriber itemSub = new Subscriber() { - boolean once = true; - @Override - public void onNext(V t) { - emit(); - } - - @Override - public void onError(Throwable e) { - s.onError(e); - s.unsubscribe(); - } - - @Override - public void onCompleted() { - emit(); - } - void emit() { - if (once) { - once = false; - e.call(); - csub.remove(this); - } - } - }; - csub.add(itemSub); - itemObs.unsafeSubscribe(itemSub); - - return Observable.create(e); - } - }); - final Observable delayed = Observable.merge(seqs); - - Subscriber osubSub = new Subscriber(child) { - boolean subscribed; - @Override - public void onNext(U ignored) { - onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (!subscribed) { - s.onError(e); - unsubscribe(); - } - } - - @Override - public void onCompleted() { - if (!subscribed) { - subscribed = true; - delayed.unsafeSubscribe(s); - } - } - }; - - osub.unsafeSubscribe(osubSub); - } -} diff --git a/src/main/java/rx/internal/operators/OperatorDelay.java b/src/main/java/rx/internal/operators/OperatorDelay.java new file mode 100644 index 0000000000..48b8454dc8 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorDelay.java @@ -0,0 +1,85 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscriber; +import rx.functions.Action0; + +/** + * Delays the emission of onNext events by a given amount of time. + * + * @param + * the value type + */ +public final class OperatorDelay implements Operator { + + final Observable source; + final long delay; + final TimeUnit unit; + final Scheduler scheduler; + + public OperatorDelay(Observable source, long delay, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber child) { + final Worker worker = scheduler.createWorker(); + child.add(worker); + return new Subscriber(child) { + + @Override + public void onCompleted() { + worker.schedule(new Action0() { + + @Override + public void call() { + child.onCompleted(); + } + + }, delay, unit); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(final T t) { + worker.schedule(new Action0() { + + @Override + public void call() { + child.onNext(t); + } + + }, delay, unit); + } + + }; + } + +} diff --git a/src/main/java/rx/internal/operators/OperatorDelayWithSelector.java b/src/main/java/rx/internal/operators/OperatorDelayWithSelector.java new file mode 100644 index 0000000000..dd90ec5e43 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorDelayWithSelector.java @@ -0,0 +1,96 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func1; +import rx.observers.SerializedSubscriber; +import rx.subjects.PublishSubject; + +/** + * Delay the subscription and emission of the source items by a per-item observable that fires its first element. + * + * @param + * the item type + * @param + * the value type of the item-delaying observable + */ +public final class OperatorDelayWithSelector implements Operator { + final Observable source; + final Func1> itemDelay; + + public OperatorDelayWithSelector(Observable source, Func1> itemDelay) { + this.source = source; + this.itemDelay = itemDelay; + } + + @Override + public Subscriber call(final Subscriber _child) { + final SerializedSubscriber child = new SerializedSubscriber(_child); + final PublishSubject> delayedEmissions = PublishSubject.create(); + + _child.add(Observable.merge(delayedEmissions).unsafeSubscribe(new Subscriber() { + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + child.onNext(t); + } + + })); + + return new Subscriber(_child) { + + @Override + public void onCompleted() { + delayedEmissions.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(final T t) { + try { + delayedEmissions.onNext(itemDelay.call(t).take(1).defaultIfEmpty(null).map(new Func1() { + + @Override + public T call(V v) { + return t; + } + + })); + } catch (Throwable e) { + onError(e); + } + } + + }; + } +} diff --git a/src/test/java/rx/internal/operators/OnSubscribeDelayTest.java b/src/test/java/rx/internal/operators/OperatorDelayTest.java similarity index 79% rename from src/test/java/rx/internal/operators/OnSubscribeDelayTest.java rename to src/test/java/rx/internal/operators/OperatorDelayTest.java index 3f28de6d51..9f80f0dc73 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeDelayTest.java +++ b/src/test/java/rx/internal/operators/OperatorDelayTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; @@ -26,6 +27,7 @@ import static org.mockito.MockitoAnnotations.initMocks; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; @@ -34,16 +36,22 @@ import org.mockito.InOrder; import org.mockito.Mock; +import rx.Notification; import rx.Observable; import rx.Observer; import rx.Subscription; import rx.exceptions.TestException; +import rx.functions.Action1; import rx.functions.Func0; import rx.functions.Func1; +import rx.internal.util.RxRingBuffer; +import rx.observers.TestObserver; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; -public class OnSubscribeDelayTest { +public class OperatorDelayTest { @Mock private Observer observer; @Mock @@ -546,20 +554,20 @@ public Observable call(Integer t1) { verify(o, never()).onError(any(Throwable.class)); verify(o, never()).onCompleted(); } - + @Test public void testDelayWithObservableAsTimed() { Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); - + final Observable delayer = Observable.timer(500L, TimeUnit.MILLISECONDS, scheduler); - + Func1> delayFunc = new Func1>() { @Override public Observable call(Long t1) { return delayer; } }; - + Observable delayed = source.delay(delayFunc); delayed.subscribe(observer); @@ -596,7 +604,7 @@ public Observable call(Long t1) { verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } - + @Test public void testDelayWithObservableReorder() { int n = 3; @@ -604,9 +612,9 @@ public void testDelayWithObservableReorder() { PublishSubject source = PublishSubject.create(); final List> subjects = new ArrayList>(); for (int i = 0; i < n; i++) { - subjects.add(PublishSubject.create()); + subjects.add(PublishSubject. create()); } - + Observable result = source.delay(new Func1>() { @Override @@ -614,28 +622,180 @@ public Observable call(Integer t1) { return subjects.get(t1); } }); - + @SuppressWarnings("unchecked") Observer o = mock(Observer.class); InOrder inOrder = inOrder(o); - + result.subscribe(o); - + for (int i = 0; i < n; i++) { source.onNext(i); } source.onCompleted(); - + inOrder.verify(o, never()).onNext(anyInt()); inOrder.verify(o, never()).onCompleted(); - + for (int i = n - 1; i >= 0; i--) { subjects.get(i).onCompleted(); inOrder.verify(o).onNext(i); } - + inOrder.verify(o).onCompleted(); - + verify(o, never()).onError(any(Throwable.class)); } + + @Test + public void testDelayEmitsEverything() { + Observable source = Observable.range(1, 5); + Observable delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler); + delayed = delayed.doOnEach(new Action1>() { + + @Override + public void call(Notification t1) { + System.out.println(t1); + } + + }); + TestObserver observer = new TestObserver(); + delayed.subscribe(observer); + // all will be delivered after 500ms since range does not delay between them + scheduler.advanceTimeBy(500L, TimeUnit.MILLISECONDS); + observer.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5)); + } + + @Test + public void testBackpressureWithTimedDelay() { + TestSubscriber ts = new TestSubscriber(); + Observable.range(1, RxRingBuffer.SIZE * 2) + .delay(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + int c = 0; + + @Override + public Integer call(Integer t) { + if (c++ <= 0) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + return t; + } + + }).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + } + + @Test + public void testBackpressureWithSubscriptionTimedDelay() { + TestSubscriber ts = new TestSubscriber(); + Observable.range(1, RxRingBuffer.SIZE * 2) + .delaySubscription(100, TimeUnit.MILLISECONDS) + .delay(100, TimeUnit.MILLISECONDS) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + int c = 0; + + @Override + public Integer call(Integer t) { + if (c++ <= 0) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + return t; + } + + }).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + } + + @Test + public void testBackpressureWithSelectorDelay() { + TestSubscriber ts = new TestSubscriber(); + Observable.range(1, RxRingBuffer.SIZE * 2) + .delay(new Func1>() { + + @Override + public Observable call(Integer i) { + return Observable.timer(100, TimeUnit.MILLISECONDS); + } + + }) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + int c = 0; + + @Override + public Integer call(Integer t) { + if (c++ <= 0) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + return t; + } + + }).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + } + + @Test + public void testBackpressureWithSelectorDelayAndSubscriptionDelay() { + TestSubscriber ts = new TestSubscriber(); + Observable.range(1, RxRingBuffer.SIZE * 2) + .delay(new Func0>() { + + @Override + public Observable call() { + return Observable.timer(500, TimeUnit.MILLISECONDS); + } + }, new Func1>() { + + @Override + public Observable call(Integer i) { + return Observable.timer(100, TimeUnit.MILLISECONDS); + } + + }) + .observeOn(Schedulers.computation()) + .map(new Func1() { + + int c = 0; + + @Override + public Integer call(Integer t) { + if (c++ <= 0) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } + return t; + } + + }).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + } }