Skip to content

Delay Operator with Reactive Pull Backpressure #1738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3823,7 +3823,7 @@ public final Observable<T> defaultIfEmpty(T defaultValue) {
public final <U, V> Observable<T> delay(
Func0<? extends Observable<U>> subscriptionDelay,
Func1<? super T, ? extends Observable<V>> itemDelay) {
return create(new OnSubscribeDelayWithSelector<T, U, V>(this, subscriptionDelay, itemDelay));
return delaySubscription(subscriptionDelay).lift(new OperatorDelayWithSelector<T, V>(this, itemDelay));
}

/**
Expand Down Expand Up @@ -3851,7 +3851,7 @@ public final <U, V> Observable<T> delay(
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.delay.aspx">MSDN: Observable.Delay</a>
*/
public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) {
return create(new OnSubscribeDelayWithSelector<T, U, U>(this, itemDelay));
return lift(new OperatorDelayWithSelector<T, U>(this, itemDelay));
}

/**
Expand Down Expand Up @@ -3897,7 +3897,7 @@ public final Observable<T> delay(long delay, TimeUnit unit) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229280.aspx">MSDN: Observable.Delay</a>
*/
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler) {
return create(new OnSubscribeDelay<T>(this, delay, unit, scheduler));
return lift(new OperatorDelay<T>(this, delay, unit, scheduler));
}

/**
Expand Down Expand Up @@ -3943,6 +3943,26 @@ public final Observable<T> delaySubscription(long delay, TimeUnit unit) {
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return create(new OnSubscribeDelaySubscription<T>(this, delay, unit, scheduler));
}

/**
* Returns an Observable that delays the subscription to the source Observable by a given amount of time.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code delay} operates by default on the {@code compuation} {@link Scheduler}.</dd>
* </dl>
*
* @param delay
* the time to delay the subscription
* @param unit
* the time unit of {@code delay}
* @return an Observable that delays the subscription to the source Observable by the given amount
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#delaysubscription">RxJava wiki: delaySubscription</a>
*/
public final <U> Observable<T> delaySubscription(Func0<? extends Observable<U>> subscriptionDelay) {
return create(new OnSubscribeDelaySubscriptionWithSelector<T, U>(this, subscriptionDelay));
}

/**
* Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the
Expand Down
106 changes: 0 additions & 106 deletions src/main/java/rx/internal/operators/OnSubscribeDelay.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;

/**
* Delays the subscription until the Observable<U> emits an event.
*
* @param <T>
* the value type
*/
public final class OnSubscribeDelaySubscriptionWithSelector<T, U> implements OnSubscribe<T> {
final Observable<? extends T> source;
final Func0<? extends Observable<U>> subscriptionDelay;

public OnSubscribeDelaySubscriptionWithSelector(Observable<? extends T> source, Func0<? extends Observable<U>> subscriptionDelay) {
this.source = source;
this.subscriptionDelay = subscriptionDelay;
}

@Override
public void call(final Subscriber<? super T> child) {
try {
subscriptionDelay.call().take(1).unsafeSubscribe(new Subscriber<U>() {

@Override
public void onCompleted() {
// subscribe to actual source
source.unsafeSubscribe(child);
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(U t) {
// ignore as we'll complete immediately because of take(1)
}

});
} catch (Throwable e) {
child.onError(e);
}
}

}
134 changes: 0 additions & 134 deletions src/main/java/rx/internal/operators/OnSubscribeDelayWithSelector.java

This file was deleted.

Loading