Skip to content

Commit 651429a

Browse files
Merge pull request #1738 from benjchristensen/issue-1724
Delay Operator with Reactive Pull Backpressure
2 parents 9ff74d3 + 0a229e4 commit 651429a

File tree

7 files changed

+444
-258
lines changed

7 files changed

+444
-258
lines changed

src/main/java/rx/Observable.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3823,7 +3823,7 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
38233823
public final <U, V> Observable<T> delay(
38243824
Func0<? extends Observable<U>> subscriptionDelay,
38253825
Func1<? super T, ? extends Observable<V>> itemDelay) {
3826-
return create(new OnSubscribeDelayWithSelector<T, U, V>(this, subscriptionDelay, itemDelay));
3826+
return delaySubscription(subscriptionDelay).lift(new OperatorDelayWithSelector<T, V>(this, itemDelay));
38273827
}
38283828

38293829
/**
@@ -3851,7 +3851,7 @@ public final <U, V> Observable<T> delay(
38513851
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.delay.aspx">MSDN: Observable.Delay</a>
38523852
*/
38533853
public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) {
3854-
return create(new OnSubscribeDelayWithSelector<T, U, U>(this, itemDelay));
3854+
return lift(new OperatorDelayWithSelector<T, U>(this, itemDelay));
38553855
}
38563856

38573857
/**
@@ -3897,7 +3897,7 @@ public final Observable<T> delay(long delay, TimeUnit unit) {
38973897
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229280.aspx">MSDN: Observable.Delay</a>
38983898
*/
38993899
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
3900-
return create(new OnSubscribeDelay<T>(this, delay, unit, scheduler));
3900+
return lift(new OperatorDelay<T>(this, delay, unit, scheduler));
39013901
}
39023902

39033903
/**
@@ -3943,6 +3943,26 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
39433943
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
39443944
return create(new OnSubscribeDelaySubscription<T>(this, delay, unit, scheduler));
39453945
}
3946+
3947+
/**
3948+
* Returns an Observable that delays the subscription to the source Observable by a given amount of time.
3949+
* <p>
3950+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.png" alt="">
3951+
* <dl>
3952+
* <dt><b>Scheduler:</b></dt>
3953+
* <dd>This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.</dd>
3954+
* </dl>
3955+
*
3956+
* @param delay
3957+
* the time to delay the subscription
3958+
* @param unit
3959+
* the time unit of {@code delay}
3960+
* @return an Observable that delays the subscription to the source Observable by the given amount
3961+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#delaysubscription">RxJava wiki: delaySubscription</a>
3962+
*/
3963+
public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay) {
3964+
return create(new OnSubscribeDelaySubscriptionWithSelector<T, U>(this, subscriptionDelay));
3965+
}
39463966

39473967
/**
39483968
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the

src/main/java/rx/internal/operators/OnSubscribeDelay.java

Lines changed: 0 additions & 106 deletions
This file was deleted.
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Subscriber;
21+
import rx.functions.Func0;
22+
23+
/**
24+
* Delays the subscription until the Observable<U> emits an event.
25+
*
26+
* @param <T>
27+
* the value type
28+
*/
29+
public final class OnSubscribeDelaySubscriptionWithSelector<T, U> implements OnSubscribe<T> {
30+
final Observable<? extends T> source;
31+
final Func0<? extends Observable<U>> subscriptionDelay;
32+
33+
public OnSubscribeDelaySubscriptionWithSelector(Observable<? extends T> source, Func0<? extends Observable<U>> subscriptionDelay) {
34+
this.source = source;
35+
this.subscriptionDelay = subscriptionDelay;
36+
}
37+
38+
@Override
39+
public void call(final Subscriber<? super T> child) {
40+
try {
41+
subscriptionDelay.call().take(1).unsafeSubscribe(new Subscriber<U>() {
42+
43+
@Override
44+
public void onCompleted() {
45+
// subscribe to actual source
46+
source.unsafeSubscribe(child);
47+
}
48+
49+
@Override
50+
public void onError(Throwable e) {
51+
child.onError(e);
52+
}
53+
54+
@Override
55+
public void onNext(U t) {
56+
// ignore as we'll complete immediately because of take(1)
57+
}
58+
59+
});
60+
} catch (Throwable e) {
61+
child.onError(e);
62+
}
63+
}
64+
65+
}

src/main/java/rx/internal/operators/OnSubscribeDelayWithSelector.java

Lines changed: 0 additions & 134 deletions
This file was deleted.

0 commit comments

Comments
 (0)