Skip to content

Backpressure: Publish #1732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Oct 7, 2014 · 7 comments
Closed

Backpressure: Publish #1732

benjchristensen opened this issue Oct 7, 2014 · 7 comments
Milestone

Comments

@benjchristensen
Copy link
Member

Is it possible in limited cases to support reactive pull backpressure on multicast use cases such as .publish()?

In the general case of a ConnectableObservable it wouldn't be, as those are "hot". I'm interested in exploring if there are cases when it will work, such as when using refCount to address use cases such as discussed in #1649 (comment)

As per the example from @headinthebox

import rx.lang.scala._

object MainScala {
  def main(args: Array[String]): Unit = {
  val xs = Observable.items(0, 1, 2, 3, 4)
  val zs = xs.publish[Int]((xs: Observable[Int]) => xs.takeUntil(xs.dropWhile(x => x < 3).tail))
  zs.subscribe(z => println(z)) // 0,1,2,3
  readLine()
}
@headinthebox
Copy link
Contributor

+1

I have especially good hopes for xs.publish(_xs->ys).zs since the sharing of xs is all encapsulated.

@benjchristensen benjchristensen changed the title Backpressure: Publish? Backpressure: Multicast? Oct 15, 2014
@benjchristensen
Copy link
Member Author

I don't think this should try to generically solve for use of Subjects, but for use of the multicast operator.

In that case I propose we adopt a similar strategy as groupBy employs which is that the flow goes at the rate of the slowest subscriber. It is the most obvious and expected result and if someone needs to speed things up then users opt in to sampling/throttling/etc whereas right now it very non-obviously breaks composition of backpressure and will cause backpressure exceptions and require buffering or dropping data.

@akarnokd
Copy link
Member

The main difficulty I see is how to manage the dynamic combining of request(n) calls of client subscribers as they may come and go at will.

For example, given a multicast source, a subscriber S1 enters and calls request(10), soon after a different subscriber S2 enters and calls request(5). Since the first request started to execute, the S2 shouldn't receive more than 5 and the multicast somehow needs to buffer the remaining 5 until the subscriber S2 asks for more or S1 doesn't receive more elements until S2 asks for more. In addition, if S2 unsubscribes, S1 may now receive its remaining requested elements.

@benjchristensen
Copy link
Member Author

I agree that it's a challenge. I wonder though if we can solve this in a similar way as groupBy works, as it's a similar challenge. groupBy is effectively multicasting a single source into n sources where each can come and go and request different sizes at different rates.

It maintains a single buffer where it puts notifications if it can't emit and ensures it never requests more upstream than its buffer.

The nuance of this is that publish will have some replay type behavior when it has slow consumers, but I think that's correct considering the expected behavior of composing reactive pull backpressure. If someone wants truly unbounded emission they can opt into that by using terminal operators or onBackpressure* strategies.

This approach has worked well for groupBy this past week in our stream processing system we're doing. The publish/share use case is I believe the last blocker we have.

@benjchristensen benjchristensen changed the title Backpressure: Multicast? Backpressure: Publish? Oct 17, 2014
@benjchristensen benjchristensen changed the title Backpressure: Publish? Backpressure: Publish Oct 17, 2014
@benjchristensen
Copy link
Member Author

After reviewing this with @neerajrj it seems to not make sense and try and make multicast support backpressure as it allows passing any Subject into it. A Subject is by definition "hot" and not appropriate for trying to compose backpressure.

The publish (and cache/share) operator however has no specific reliance on the use of a Subject even though it currently leverages multicast and the Subject implementations (PublishSubject and ReplaySubject).

We can achieve the multicast behavior of publish() without a PublishSubject and then be able to compose the backpressure from all subscribers very similar to how groupBy works without needing to modify how subjects work.

This was referenced Oct 20, 2014
@benjchristensen
Copy link
Member Author

Implemented and merged in #1784

@JakeWharton
Copy link
Contributor

Is there a non-trivial or non-contrived use case for this code to help me understand its applicability? I'm having a hard time conceptualizing what it would be used for when the example can be rewritten as .takeWhile(Predicates.not(x => x > 3)) (or, of course, simply .takeWhile(x => x <= 3)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants