Skip to content

Strict nullability with : Any, and made RxBoxImp a bit more efficient #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

## [Unreleased]
### Changed
- **BREAKING** Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))
- Add strict nullability to RxBox and improve efficiency. ([#12](https://github.com/diffplug/durian-rx/pull/12))
- Bump required java from 11 to 17. ([#9](https://github.com/diffplug/durian-rx/pull/9))
- Replace `RxJava Disposable` with `Kotlin Job`, and remove `rxjava` completely. ([#10](https://github.com/diffplug/durian-rx/pull/10))

## [4.0.1] - 2022-12-20
### Fixed
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/diffplug/common/rx/ForwardingBox.kt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kotlinx.coroutines.flow.Flow
*
* Especially useful for overridding set().
*/
open class ForwardingBox<T, BoxType : Box<T>>
open class ForwardingBox<T : Any, BoxType : Box<T>>
protected constructor(protected val delegate: BoxType) : Box<T> {
override fun get(): T {
return delegate.get()
Expand All @@ -37,7 +37,7 @@ protected constructor(protected val delegate: BoxType) : Box<T> {
delegate.set(value)
}

class Cas<T> protected constructor(delegate: CasBox<T>) :
class Cas<T : Any> protected constructor(delegate: CasBox<T>) :
ForwardingBox<T, CasBox<T>>(delegate), CasBox<T> {
override fun compareAndSet(expect: T, update: T): Boolean {
return delegate.compareAndSet(expect, update)
Expand All @@ -48,21 +48,21 @@ protected constructor(protected val delegate: BoxType) : Box<T> {
}
}

class Lock<T> protected constructor(delegate: LockBox<T>) :
class Lock<T : Any> protected constructor(delegate: LockBox<T>) :
ForwardingBox<T, LockBox<T>>(delegate), LockBox<T> {
override fun lock(): Any {
return delegate.lock()
}
}

open class Rx<T> protected constructor(delegate: RxBox<T>) :
open class Rx<T : Any> protected constructor(delegate: RxBox<T>) :
ForwardingBox<T, RxBox<T>>(delegate), RxBox<T> {
override fun asFlow(): Flow<T> {
return delegate.asFlow()
}
}

class RxLock<T> protected constructor(delegate: RxLockBox<T>) :
class RxLock<T : Any> protected constructor(delegate: RxLockBox<T>) :
ForwardingBox<T, RxLockBox<T>>(delegate), RxLockBox<T> {
override fun lock(): Any {
return delegate.lock()
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/com/diffplug/common/rx/GuardedExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.Executor
import java.util.function.Supplier
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -33,12 +32,11 @@ import kotlinx.coroutines.flow.Flow
*/
open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor, RxSubscriber {
override fun execute(command: Runnable) {
delegate.executor().execute(guard.guard(command))
delegate.executor.execute(guard.guard(command))
}

/** Creates a runnable which runs on this Executor iff the guard widget is not disposed. */
fun wrap(delegate: Runnable): Runnable {
Objects.requireNonNull(delegate)
return Runnable { execute(guard.guard(delegate)) }
}

Expand All @@ -48,7 +46,7 @@ open class GuardedExecutor(val delegate: RxExecutor, val guard: Chit) : Executor
guard.runWhenDisposed { job.cancel() }
job
} else {
SupervisorJob().apply { cancel() }
Rx.sentinelJob
}
}

Expand Down
58 changes: 0 additions & 58 deletions src/main/java/com/diffplug/common/rx/MappedImp.java

This file was deleted.

43 changes: 43 additions & 0 deletions src/main/java/com/diffplug/common/rx/MappedImp.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 DiffPlug
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.diffplug.common.rx

import com.diffplug.common.base.Box
import com.diffplug.common.base.Converter
import java.util.function.Function

internal open class MappedImp<T : Any, R : Any, BoxType : Box<T>>(
@JvmField protected val delegate: BoxType,
@JvmField protected val converter: Converter<T, R>
) : Box<R> {
override fun get(): R = converter.convertNonNull(delegate.get())

override fun set(value: R) = delegate.set(converter.revertNonNull(value))

/** Shortcut for doing a set() on the result of a get(). */
override fun modify(mutator: Function<in R, out R>): R {
val result = Box.Nullable.ofNull<R>()
delegate.modify { input: T ->
val unmappedResult = mutator.apply(converter.convertNonNull(input))
result.set(unmappedResult)
converter.revertNonNull(unmappedResult)
}
return result.get()
}

override fun toString(): String =
"[" + delegate + " mapped to " + get() + " by " + converter + "]"
}
8 changes: 6 additions & 2 deletions src/main/java/com/diffplug/common/rx/MultiSelectModel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,14 @@ class MultiSelectModel<T : Any>(

companion object {
/** Creates an Optional<Either> from an Either<Optional>. </Optional></Either> */
fun <T, U> optEitherFrom(either: Either<Optional<T>, Optional<U>>): Optional<Either<T, U>> {
fun <T : Any, U : Any> optEitherFrom(
either: Either<Optional<T>, Optional<U>>
): Optional<Either<T, U>> {
return either.fold({ leftOpt: Optional<T> ->
leftOpt.map { l: T -> Either.createLeft(l) }
}) { rightOpt: Optional<U> -> rightOpt.map { r: U -> Either.createRight(r) } }
}) { rightOpt: Optional<U> ->
rightOpt.map { r: U -> Either.createRight(r) }
}
}
}
}
8 changes: 5 additions & 3 deletions src/main/java/com/diffplug/common/rx/Rx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ import kotlinx.coroutines.flow.merge
* (https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html)
*/
object Rx {
@JvmStatic
fun <T> createEmitFlow() =
MutableSharedFlow<T>(replay = 0, extraBufferCapacity = 1, BufferOverflow.SUSPEND)

Expand Down Expand Up @@ -132,6 +133,7 @@ object Rx {
* Creates an Rx instance which will call the given consumer whenever the followed stream or
* future completes, whether with an error or not, and the error (if present) will be logged.
*/
@JvmStatic
fun <T> onTerminateLogError(onTerminate: Consumer<Optional<Throwable>>): RxListener<T> {
return RxListener(Consumers.doNothing(), DefaultTerminate(onTerminate))
}
Expand Down Expand Up @@ -371,7 +373,7 @@ object Rx {

/** Reliable way to sync two RxBox to each other. */
@JvmStatic
fun <T> sync(left: RxBox<T>, right: RxBox<T>) {
fun <T : Any> sync(left: RxBox<T>, right: RxBox<T>) {
sync(sameThreadExecutor(), left, right)
}

Expand All @@ -380,7 +382,7 @@ object Rx {
* changes
*/
@JvmStatic
fun <T> sync(subscriber: RxSubscriber, left: RxBox<T>, right: RxBox<T>) {
fun <T : Any> sync(subscriber: RxSubscriber, left: RxBox<T>, right: RxBox<T>) {
val firstChange = Box.Nullable.ofNull<Either<T, T>?>()
subscriber.subscribe(left) { leftVal: T ->
// the left changed before we could acknowledge it
Expand Down Expand Up @@ -449,5 +451,5 @@ object Rx {
}
}

val sentinelJob: Job = Job().apply { cancel() }
@JvmStatic val sentinelJob: Job = Job().apply { cancel() }
}
36 changes: 15 additions & 21 deletions src/main/java/com/diffplug/common/rx/RxBox.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map

/** [RxGetter] and [Box] combined in one: a value you can set, get, and subscribe to. */
interface RxBox<T> : RxGetter<T>, Box<T> {
interface RxBox<T : Any> : RxGetter<T>, Box<T> {
/** Returns a read-only version of this `RxBox`. */
fun readOnly(): RxGetter<T> {
return this
}
fun readOnly(): RxGetter<T> = this

/** Maps one `RxBox` to another `RxBox`. */
override fun <R> map(converter: Converter<T, R>): RxBox<R> {
override fun <R : Any> map(converter: Converter<T, R>): RxBox<R> {
return RxBoxImp.Mapped(this, converter)
}

Expand Down Expand Up @@ -70,30 +68,26 @@ interface RxBox<T> : RxGetter<T>, Box<T> {

companion object {
/** Creates an `RxBox` with the given initial value. */
@JvmStatic
fun <T> of(initial: T): RxBox<T> {
return RxBoxImp(initial)
}
@JvmStatic fun <T : Any> of(initial: T): RxBox<T> = RxBoxImp(initial)

/**
* Creates an `RxBox` which implements the "getter" part with `RxGetter`, and the setter part
* with `Consumer`.
*/
@JvmStatic
fun <T> from(getter: RxGetter<T>, setter: Consumer<T>): RxBox<T> {
return object : RxBox<T> {
override fun asFlow(): Flow<T> {
return getter.asFlow()
}
fun <T : Any> from(getter: RxGetter<T>, setter: Consumer<T>): RxBox<T> =
object : RxBox<T> {
override fun asFlow(): Flow<T> {
return getter.asFlow()
}

override fun get(): T {
return getter.get()
}
override fun get(): T {
return getter.get()
}

override fun set(value: T) {
setter.accept(value)
override fun set(value: T) {
setter.accept(value)
}
}
}
}
}
}
18 changes: 6 additions & 12 deletions src/main/java/com/diffplug/common/rx/RxBoxImp.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,22 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map

internal open class RxBoxImp<T> private constructor(initial: T, subject: MutableStateFlow<T>) :
RxBox<T> {
private var value: T = initial
private val subject: MutableStateFlow<T> = subject

constructor(initial: T) : this(initial, MutableStateFlow(initial)) {}
internal open class RxBoxImp<T : Any>(initial: T) : RxBox<T> {
private val subject = MutableStateFlow(initial)

override fun set(newValue: T) {
if (newValue != value) {
value = newValue
if (subject.value != newValue) {
subject.value = newValue
}
}

override fun get(): T = value
override fun get(): T = subject.value

override fun asFlow(): Flow<T> = subject

internal class Mapped<T, R>(delegate: RxBox<T>, converter: Converter<T, R>) :
internal class Mapped<T : Any, R : Any>(delegate: RxBox<T>, converter: Converter<T, R>) :
MappedImp<T, R, RxBox<T>>(delegate, converter), RxBox<R> {
val flow: Flow<R> =
delegate.asFlow().map { a: T -> converter.convertNonNull(a) }.distinctUntilChanged()
val flow: Flow<R> = delegate.asFlow().map(converter::convertNonNull).distinctUntilChanged()

override fun asFlow(): Flow<R> = flow
}
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/com/diffplug/common/rx/RxExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@ import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch

class RxExecutor
internal constructor(private val executor: Executor, val dispatcher: CoroutineDispatcher) :
class RxExecutor internal constructor(val executor: Executor, val dispatcher: CoroutineDispatcher) :
RxSubscriber {

interface Has : Executor {
val rxExecutor: RxExecutor
}

fun executor() = executor

override fun <T> subscribe(flow: Flow<T>, listener: RxListener<T>) {
subscribeDisposable(flow, listener)
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/diffplug/common/rx/RxGetter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import kotlinx.coroutines.flow.map
* not change (e.g. a field is set to its current value, which produces no change) then the
* `Observable` will not fire.
*/
interface RxGetter<T> : IFlowable<T>, Supplier<T> {
interface RxGetter<T : Any> : IFlowable<T>, Supplier<T> {
/**
* Maps an `RxGetter` to a new `RxGetter` by applying the `mapper` function to all of its values.
*
Expand All @@ -46,7 +46,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* * Incorrect: `("A", "B", "C") -> map(String::length) = (1, 1, 1)`
* * Correct: `("A", "B", "C") -> map(String::length) = (1)`
*/
fun <R> map(mapper: Function<in T, out R>): RxGetter<R> {
fun <R : Any> map(mapper: Function<in T, out R>): RxGetter<R> {
val src = this
val mapped = src.asFlow().map { t: T -> mapper.apply(t) }
val observable = mapped.distinctUntilChanged()
Expand All @@ -70,7 +70,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* recorded by a non-volatile field.
*/
@JvmStatic
fun <T> from(observable: Flow<T>, initialValue: T): RxGetter<T> {
fun <T : Any> from(observable: Flow<T>, initialValue: T): RxGetter<T> {
val box = Box.of(initialValue)
subscribe(observable) { value: T -> box.set(value) }
return object : RxGetter<T> {
Expand All @@ -90,7 +90,7 @@ interface RxGetter<T> : IFlowable<T>, Supplier<T> {
* As with [.map], the observable only emits a new value if its value has changed.
*/
@JvmStatic
fun <T1, T2, R> combineLatest(
fun <T1 : Any, T2 : Any, R : Any> combineLatest(
t: RxGetter<out T1>,
u: RxGetter<out T2>,
combine: BiFunction<in T1, in T2, out R>
Expand Down
Loading
Loading