Introduce Flow.all/any/none operators
CLOVIS-AI opened this issue · 3 comments
Use case
I have a complex Flow
that possesses many elements. I have a business rule that is, literally, to do something if any of them satisfies a condition.
If I were using List
or Set
, I would use any { theCondition(it) }
. However, Flow
doesn't seem to have any
.
There is an old issue (#2239) that asks for this feature, and is closed because of alternative implementations:
suspend fun <T : Any> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean { return this.firstOrNull { predicate(it) } != null } suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean { return this.count { !predicate(it) } == 0 }
I dislike this solution because:
- The first one doesn't work with a
Flow<Foo?>
becausefirstOrNull
is only available on non-nullable types. - Both options obfuscate the operation I am actually attempting to perform.
I believe it is worth having any
/all
/none
directly in the library because the proposed implementation have downsides.
The Shape of the API
suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.all(predicate: suspend (T) -> Boolean): Boolean
suspend fun <T> Flow<T>.none(predicate: suspend (T) -> Boolean): Boolean
Prior Art
These three functions are already available on Iterable
and Sequence
. The behavior should be the same.
all
and none
can use the count
operator under the hood, since it already shortcuts. I believe this is a possible implementation of any
, though I haven't tested it yet:
suspend fun <T> Flow<T>.any(predicate: suspend (T) -> Boolean) = this
.transformWhile {
if (predicate(it)) {
emit(true)
false // ONE element MATCHES, no need to go further
} else {
true // continue
}
}
.onEmpty { emit(false) }
.first()
I can submit a PR if you think this is worth pursuing.
Simplified implementation:
suspend fun <T> Flow<T>.any(predicate: (T) -> Boolean) = this
.filter { predicate(it) }
.map { true }
.onEmpty { emit(false) }
.first()
We had a Slack discussion about the use case which prompted this (https://kotlinlang.slack.com/archives/C1CFAFJSK/p1723480063142319).
What I took away from that discussion surprised me, but I now believe that Flow
is applicable everywhere Sequence
is applicable, meaning that we should add Sequence
API to Flow
on demand without asking for use cases.
The use case is completely linear, with few hints at asynchronous behavior. A list is taken, converted to a Flow
, then map { aSuspendingFunction() }
is performed, and last, any { }
collects the result.
Conceptually, Sequence
is a good choice here: the use case is not to create cold streams of values, it's to enable short-circuiting behavior while traversing the list as a sequence. Calling a suspending function in map
, however, breaks this nice concept and forces Flow
into the code, even if we attempt to utilize the sequence
function that allows running suspend
code:
- Structured concurrency is broken if we call
suspend fun
→fun
→suspend fun
. The middlefun
can berunBlocking
, or it can besequence { }.any { }
—in any case, cancellation stops working unless you write bespoke code to preserve it. - The thread is hogged by
fun
if we callsuspend fun
→fun
→suspend fun
, so functions that callsuspend fun
ctions are a bit safer to use if the middlefun
becomes asuspend fun
as well. - If we have
suspend fun
running in aSequence
, the fairly natural desire to actually make the code run asynchronously will not work, asSequence
lacks the necessary facilities. The edit distance for adding buffering to aFlow
is tiny, but adding buffering to aSequence
means rewriting everything toFlow
.
It seems to me like kotlinx.coroutines
users should be able to use Flow
whenever a Sequence
is idiomatic, if only to break the suspend fun
→ fun
→ suspend fun
call chain.
That's a really nice angle!
Originally, we thought of a Flow
as an asynchronous cold (push-based) stream. It was never supposed to be a short-circuit analogy of channels or a suspendable sequence. Thus, not only was operator parity not a concern, but we also were extremely cautious about adding new ones as we made this mistake with channel operators. That's why we asked for use cases even for the trivial operators (also, the idea of "Flow operators are trivial, you can write your own in a few lines of code" is still here). We wanted to minimize API surface, minimize the potential of misuse and nudge people to a sequence where they needed a sequence.
Yet the reality begs to differ -- among other things, Flow
is indeed used as a suspendable sequence, people expect all the batteries to be included and, if somethings looks like a sequence, quacks like a sequence and walks like a sequence [in trivial scenarios] it's better to have operators like a sequence.
There are not really many upsides of keeping the status quo, I think