Description
Is your feature request related to a problem? Please describe.
No - this is about a convenience function.
Describe the solution you'd like
A function that allows for calling a stream of functions with a stream of args, kwargs.
rx.call_latest(rx.just(dict))(
a=a,
b=b,
c=c,
)
Describe alternatives you've considered
There is an alternative using combine_latest and map:
rx.combine_latest(a, b, c).pipe(
ops.map(lambda *args: dict(a=args[0], b=args[1], c=args[2])
)
The downside to this is the loss of context between argument order and name. This is error-prone. Especially, inconvenient code constructs often invite bad habits and can lead to even more bugs or less readable code.
Implementation / Solution
I happen to have an implementation ready as I use it myself. The interface is makes it pipeable (for the function, if it is a stream).
def call_latest(*args, **kwargs):
def map(fn):
kwargs_keys, kwargs_vals = zip(*kwargs.items()) if kwargs else (tuple(), tuple())
combined_args = [fn, *kwargs_vals, *args]
def caller(input: tuple):
iterator = iter(input)
fun = next(iterator)
kwargs = dict(zip(kwargs_keys, iterator))
return fun(*iterator, **kwargs)
return rxo.map(caller)(rx.combine_latest(*combined_args))
return map
Let me know if anyone else is interested in this. It's not, as far as I know, part of any specifications - but I have had many uses for it over the months.