Skip to content

Commit 1b95587

Browse files
committed
Simplify signatures of retryWhen/repeatWhen as discussed in ReactiveX#24
- If RxJava is updated with the same signature changes with ReactiveX/RxJava#1720, the conversion from RxScala to RxJava can be simplified, without any external signature changes in RxScala.
1 parent a9c96b7 commit 1b95587

File tree

2 files changed

+96
-29
lines changed

2 files changed

+96
-29
lines changed

examples/src/test/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,7 @@ class RxScalaDemo extends JUnitSuite {
554554
println(doubleAverage(Observable.empty).toBlocking.single)
555555
println(doubleAverage(List(0.0).toObservable).toBlocking.single)
556556
println(doubleAverage(List(4.44).toObservable).toBlocking.single)
557-
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlocking.single)
557+
println(doubleAverage(List(1.0, 2.0, 3.5).toObservable).toBlocking.single)
558558
}
559559

560560
@Test def testSum() {
@@ -1132,20 +1132,51 @@ class RxScalaDemo extends JUnitSuite {
11321132
Observable[String]({ subscriber =>
11331133
println("subscribing")
11341134
subscriber.onError(new RuntimeException("always fails"))
1135-
}).retryWhen(attempts => {
1136-
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1135+
}).retryWhen({ throwableObservable =>
1136+
throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
11371137
println("delay retry by " + i + " second(s)")
11381138
Observable.timer(Duration(i, TimeUnit.SECONDS))
11391139
})
11401140
}).toBlocking.foreach(s => println(s))
11411141
}
11421142

1143+
@Test def retryWhenDifferentExceptionsExample(): Unit = {
1144+
var observableCreateCount = 1 // Just to support switching which Exception is produced
1145+
Observable[String]({ subscriber =>
1146+
println("subscribing")
1147+
if (observableCreateCount <= 2) {
1148+
subscriber.onError(new IOException("IO Fail"))
1149+
} else {
1150+
subscriber.onError(new RuntimeException("Other failure"))
1151+
}
1152+
observableCreateCount += 1
1153+
}).retryWhen({ throwableObservable =>
1154+
throwableObservable.zip(Observable.from(1 to 3)).flatMap({ case (error, retryCount) =>
1155+
error match {
1156+
// Only retry 2 times if we get a IOException and then error out with the third IOException.
1157+
// Let the other Exception's pass through and complete the Observable.
1158+
case _: IOException =>
1159+
if (retryCount <= 3) {
1160+
println("IOException delay retry by " + retryCount + " second(s)")
1161+
Observable.timer(Duration(retryCount, TimeUnit.SECONDS))
1162+
} else {
1163+
Observable.error(error)
1164+
}
1165+
1166+
case _ =>
1167+
println("got error " + error + ", will stop retrying")
1168+
Observable.empty
1169+
}
1170+
})
1171+
}).toBlocking.foreach(s => println(s))
1172+
}
1173+
11431174
@Test def repeatWhenExample(): Unit = {
11441175
Observable[String]({ subscriber =>
11451176
println("subscribing")
11461177
subscriber.onCompleted()
1147-
}).repeatWhen(attempts => {
1148-
attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
1178+
}).repeatWhen({ unitObservable =>
1179+
unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
11491180
println("delay repeat by " + i + " second(s)")
11501181
Observable.timer(Duration(i, TimeUnit.SECONDS))
11511182
})

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 60 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,7 +3265,7 @@ trait Observable[+T]
32653265
/**
32663266
* Returns an Observable that emits the same values as the source observable with the exception of an
32673267
* `onError`. An `onError` notification from the source will result in the emission of a
3268-
* [[Notification]] to the Observable provided as an argument to the `notificationHandler`
3268+
* [[Throwable]] to the Observable provided as an argument to the `notificationHandler`
32693269
* function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call
32703270
* `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
32713271
* resubscribe to the source Observable.
@@ -3276,54 +3276,60 @@ trait Observable[+T]
32763276
*
32773277
* This retries 3 times, each time incrementing the number of seconds it waits.
32783278
*
3279-
* <pre>
3279+
* @example
3280+
*
3281+
* This retries 3 times, each time incrementing the number of seconds it waits.
3282+
*
3283+
* {{{
32803284
* Observable[String]({ subscriber =>
32813285
* println("subscribing")
32823286
* subscriber.onError(new RuntimeException("always fails"))
3283-
* }).retryWhen(attempts => {
3284-
* attempts.zipWith(Observable.from(1 to 3))((n, i) => i).flatMap(i => {
3287+
* }).retryWhen({ throwableObservable =>
3288+
* throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
32853289
* println("delay retry by " + i + " second(s)")
32863290
* Observable.timer(Duration(i, TimeUnit.SECONDS))
32873291
* })
32883292
* }).toBlocking.foreach(s => println(s))
3289-
* </pre>
3293+
* }}}
32903294
*
32913295
* Output is:
32923296
*
3293-
* <pre>
3297+
* {{{
32943298
* subscribing
32953299
* delay retry by 1 second(s)
32963300
* subscribing
32973301
* delay retry by 2 second(s)
32983302
* subscribing
32993303
* delay retry by 3 second(s)
33003304
* subscribing
3301-
* </pre>
3305+
* }}}
3306+
*
33023307
* <dl>
33033308
* <dt><b>Scheduler:</b></dt>
33043309
* <dd>`retryWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
33053310
* </dl>
33063311
*
3307-
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3312+
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the
33083313
* retry
33093314
* @return the source Observable modified with retry logic
33103315
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3316+
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
33113317
* @since 0.20
33123318
*/
3313-
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
3319+
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = {
33143320
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
33153321
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
33163322
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3317-
notificationHandler(on).asJavaObservable
3323+
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
33183324
}
33193325

33203326
toScalaObservable[T](asJavaObservable.retryWhen(f))
33213327
}
33223328

33233329
/**
33243330
* Returns an Observable that emits the same values as the source observable with the exception of an `onError`.
3325-
* An onError will emit a [[Notification]] to the observable provided as an argument to the notificationHandler
3326-
* func. If the observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
3331+
* An onError will emit a [[Throwable]] to the Observable provided as an argument to the notificationHandler
3332+
* func. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
33273333
* or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
33283334
* <p>
33293335
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/retryWhen.f.png" alt="">
@@ -3333,18 +3339,19 @@ trait Observable[+T]
33333339
* <dd>you specify which [[Scheduler]] this operator will use</dd>
33343340
* </dl>
33353341
*
3336-
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the
3337-
* retry
3342+
* @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting
3343+
* the retry
33383344
* @param scheduler the Scheduler on which to subscribe to the source Observable
33393345
* @return the source Observable modified with retry logic
33403346
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
3347+
* @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
33413348
* @since 0.20
33423349
*/
3343-
def retryWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
3350+
def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
33443351
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
33453352
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
33463353
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3347-
notificationHandler(on).asJavaObservable
3354+
notificationHandler(on.map({ case Notification.OnError(error) => error })).asJavaObservable
33483355
}
33493356

33503357
toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
@@ -3415,7 +3422,7 @@ trait Observable[+T]
34153422
/**
34163423
* Returns an Observable that emits the same values as the source Observable with the exception of an
34173424
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
3418-
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
3425+
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
34193426
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
34203427
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
34213428
* resubscribe to the source Observable, on a particular Scheduler.
@@ -3426,18 +3433,18 @@ trait Observable[+T]
34263433
* <dd>you specify which [[Scheduler]] this operator will use</dd>
34273434
* </dl>
34283435
*
3429-
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3436+
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
34303437
* @param scheduler the Scheduler to emit the items on
34313438
* @return the source Observable modified with repeat logic
34323439
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
34333440
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
34343441
* @since 0.20
34353442
*/
3436-
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any], scheduler: Scheduler): Observable[T] = {
3443+
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
34373444
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
34383445
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
34393446
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3440-
notificationHandler(on).asJavaObservable
3447+
notificationHandler(on.map( _ => Unit )).asJavaObservable
34413448
}
34423449

34433450
toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
@@ -3446,28 +3453,57 @@ trait Observable[+T]
34463453
/**
34473454
* Returns an Observable that emits the same values as the source Observable with the exception of an
34483455
* `onCompleted`. An `onCompleted` notification from the source will result in the emission of
3449-
* a [[Notification]] to the Observable provided as an argument to the `notificationHandler`
3456+
* a [[scala.Unit]] to the Observable provided as an argument to the `notificationHandler`
34503457
* function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
34513458
* call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
34523459
* resubscribe to the source observable.
34533460
* <p>
34543461
* <img width="640" height="430" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/repeatWhen.f.png" alt="">
3462+
*
3463+
* @example
3464+
*
3465+
* This repeats 3 times, each time incrementing the number of seconds it waits.
3466+
*
3467+
* {{{
3468+
* Observable[String]({ subscriber =>
3469+
* println("subscribing")
3470+
* subscriber.onCompleted()
3471+
* }).repeatWhen({ unitObservable =>
3472+
* unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
3473+
* println("delay repeat by " + i + " second(s)")
3474+
* Observable.timer(Duration(i, TimeUnit.SECONDS))
3475+
* })
3476+
* }).toBlocking.foreach(s => println(s))
3477+
* }}}
3478+
*
3479+
* Output is:
3480+
*
3481+
* {{{
3482+
* subscribing
3483+
* delay repeat by 1 second(s)
3484+
* subscribing
3485+
* delay repeat by 2 second(s)
3486+
* subscribing
3487+
* delay repeat by 3 second(s)
3488+
* subscribing
3489+
* }}}
3490+
*
34553491
* <dl>
34563492
* <dt><b>Scheduler:</b></dt>
34573493
* <dd>`repeatWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
34583494
* </dl>
34593495
*
3460-
* @param notificationHandler receives an Observable of notifications with which a user can complete or error, aborting the repeat.
3496+
* @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
34613497
* @return the source Observable modified with repeat logic
34623498
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
34633499
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
34643500
* @since 0.20
34653501
*/
3466-
def repeatWhen(notificationHandler: Observable[Notification[Any]] => Observable[Any]): Observable[T] = {
3502+
def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = {
34673503
val f: Func1[_ >: rx.Observable[_ <: rx.Notification[_ <: Any]], _ <: rx.Observable[_ <: Any]] =
34683504
(jOn: rx.Observable[_ <: rx.Notification[_ <: Any]]) => {
34693505
val on = toScalaObservable[rx.Notification[_ <: Any]](jOn).map({ jN => toScalaNotification[Any](jN) })
3470-
notificationHandler(on).asJavaObservable
3506+
notificationHandler(on.map( _ => Unit )).asJavaObservable
34713507
}
34723508

34733509
toScalaObservable[T](asJavaObservable.repeatWhen(f))

0 commit comments

Comments
 (0)