2017. szeptember 9., szombat

Rewriting RxJava with Kotlin Coroutines?

Introduction


Someone influential stated that RxJava should be rewritten with Kotlin Coroutines. I haven't seen any attempt of it as of now and declaring such a thing to be (not) worth without actually trying is irresponsive.

As we saw in the earlier post and the response in the comment section, following up on the imperative-reactive promise leads to some boilerplate and questionable cancellation management, and the idiomatic Kotlin/Coroutine enhancement suggested is to ... factor out the imperative control structures into common routines and have the user specify lambda callback(s); thus it can become declarative-reactive, just like RxJava interpreted from a higher level viewpoint. Kind of defeats one of the premises in my understanding.

This doesn't diminish the power of coroutine-based abstraction but certainly implies a relevant question: who is supposed to write these abstract operators?

One possible answer is, of course, library writers who not only have experience with abstracting away control structures but perhaps wield deeper knowledge about how the coroutine infrastructure can be utilized in certain complicated situations.

If this assumption of mine is true, that somewhat defeats another premise of coroutines: the end user will likely have to stick to writing suspendable functionals and discover operators provided by a library most of the time.

So what's mainly left is to see if implementing a declarative-reactive library on top of coroutines gives benefits to the library developer (i.e., ease of writing) over hand crafted state-machines and (reasonable) performance to the user of the library itself.

The library implementation


Perhaps one of the more attractive properties of RxJava is the deferred lazy execution of a reactive flow (cold). One sets up a template of transformations and issues a subscribe() call to begin execution. In contrast, CompletableFuture and imperative Coroutines can be thought as eager executions - in order to retry them one has to recreate the whole chain, plus their execution may be ongoing while one still is busy applying operators on top of them.

Base interfaces


Since the former structure is more enabling at little to no overhead, we'll define our base types as follows:


interface CoFlow<out T> {
    suspend fun subscribe(consumer: CoConsumer<T>)
}


The main interface, CoFlow, matches the usual pattern of the Reactive-Streams Publisher.

interface CoConsumer<in T> {

    suspend fun onSubscribe(connection: CoConnection)

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


The consumer type, CoConsumer, is also matching the Reactive-Streams Subscriber pattern.

interface CoConnection {
    suspend fun close()
}


The final type, CoConnection, is responsible for cancelling a flow. Unlike the Reactive-Streams Subscription, there is no request() method because we will follow up on the non-blocking suspension promise of the coroutines: the sender will be suspended if the receiver is not in the position to receive, thus there should be no need for request accounting as the state machine generated by the compiler will implicitly do it for us.

Those with deeper understanding of how cancellation works with coroutines may object to this connection object. Indeed, there are probably better ways of including cancellation support, however, my limited understanding of the coroutine infrastructure didn't yield any apparent concept-match between the two. Suggestions welcome.

Entering the CoFlow world

Perhaps the most basic way of creating a flow of values is the Just(T) operator that when subscribed to, emits its single item followed by a completion signal. Since we don't have to deal with a backpressure state machine, this should be relatively short to write:


class Just<out T>(private val value: T) {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        consumer.onSubscribe(???)
        consumer.onNext(value)
        consumer.onComplete()
    }
}

In order to allow the downstream to indicate cancellation, we have to send something along onSubscribe. Since coroutines appear as synchronous execution, we would have the same synchronous cancellation problem that the Reactive-Streams Subscription (and RxJava before it) solves: inversion of control by sending down something cancellable first, then checking if the consumer had enough.


class BooleanConnection : CoConnection {

   @Volatile var cancelled : Boolean = false

   override suspend fun close() {
       cancelled = true
   }
}


Which we now can use with Just(T):

class Just<out T>(private val value: T) {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)

        if (conn.cancelled) {
            return
        }
        consumer.onNext(value)

        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}

Since everything is declared suspend, we should have no problem interacting with an operator downstream that suspends execution in case of an immediate backpressure.

Let's see a source that emits multiple items, but for an (expectable) twist, we implement an uncommon source: Chars(String) which emits the characters of a string as Ints:


class Chars(private val string: String) : CoFlow<Int> {
    override suspend fun subscribe(consumer: CoConsumer<Int>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)
  
        for (v in 0 until string.length) {
            if (conn.cancelled) {
                return
            }
            consumer.onNext(v.asInt())
        }
        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}

And lastly for this subsection, we will implement FromIterable(T):


class FromIterable<T>(private val source: Iterable<T>) : CoFlow<T> {
    override suspend fun subscribe(consumer: CoConsumer<T>) {
        val conn = BooleanConnection()
        consumer.onSubscribe(conn)
  
        for (v in source) {
            if (conn.cancelled) {
                return
            }
            consumer.onNext(v)
        }
        if (conn.cancelled) {
            return
        }
        consumer.onComplete()
    }
}


So far, these sources look pretty much like how the non-backpressured RxJava 2 Observable is implemented. I'm sure there are more concise way of expressing them; I have, unfortunately, only limited knowledge about Kotlin's syntax improvements over Java, however, since the blog's audience I think is mainly Java programmers, something familiar looking should be "less alien" at this point.

Transformations

What is the most common transformation in the reactive world? Mapping of course! Therefore, let's see how the instance extension method Map(T -> R) looks like.


suspend fun <T, R> CoFlow<T>.map(mapper: suspend (T) -> R): CoFlow<R> {
    val source = this
    
    return object: CoFlow<R> {
        override suspend fun subscribe(consumer: CoConsumer<R>) {

            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    val v: R;
                    try {
                        v = mapper(t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                    consumer.onNext(v)
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onComplete()
                    }
                }
            })
        }
    }
}

Perhaps what I most envy of Kotlin is the extension method support. I can only hope for it in Java now that Oracle switches to a 6 months feature enhancement cycle. The val source = this may seem odd to a Kotlin developer; maybe there is a syntax for it so that the outer this may be accessible from the anonymous inner class (object: CoFlow<R>) in some other way. Note also the suspend (T) -> R signature: we will, of course, mainly support suspendable functions.

The logic, again, resembles of RxJava's own map() implementation. We save and forward the upstream connection instance to the consumer as there is no real need to intercept the close call. We apply the upstreams value to the mapper function and forward the result to the consumer. If the mapper function crashes, we stop the upstream and emit the error. This may happen for the very last item and the upstream may still emit a regular onComplete(), which should be avoided just like with Reactive-Streams.

The next common operator is Filter(T):


suspend fun <T> CoFlow<T>.filter(predicate: suspend (T) -> Boolean): CoFlow<T> {
    val source = this
    
    return object: CoFlow<T> {
        override suspend fun subscribe(consumer: CoConsumer<R>) {
            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    val v: Boolean;
                    try {
                        v = predicate(t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                    if (v) {
                        consumer.onNext(t)
                    }
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onComplete()
                    }
                }
            })
        }
    }
}

I guess the pattern is now obvious. Let's see a couple of other operators.

Take

suspend fun <T> CoFlow<T>.take(n: Long): CoFlow<T> {

// ...

     var remaining = n

     override suspend fun onNext(t: T) {
         val r = remaining
         if (r != 0L) {
             remaining = --r;
             consumer.onNext(t)
             if (r == 0L) {
                 upstream!!.close()
                 consumer.onComplete()
             }
         }
     }

// ...

     override suspend fun onComplete() {
         if (remaining != 0L) {
             consumer.onComplete()
         }
     }
}

Skip


suspend fun <T> CoFlow<T>.skip(n: Long): CoFlow<T> {

// ...

     var remaining = n

     override suspend fun onNext(t: T) {
         val r = remaining
         if (r == 0L) {
             consumer.onNext(t)
         } else {
             remaining = r - 1
         }
     }

     // ...
}

Collect


suspend fun <T, R> CoFlow<T>.collect(
         collectionSupplier: suspend () -> R,
         collector: suspend (R, T) -> Unit
): CoFlow<R> {
    val source = this
    
    return object: CoFlow<R> {

        override suspend fun subscribe(consumer: CoConsumer<R>) {

            val coll : R

            try {
                coll = collectionSupplier()
            } catch (ex: Throwable) {
                consumer.onSubscribe(BooleanConnection())
                consumer.onError(ex)
                return
            }                     

            source.subscribe(object: CoConsumer<T> {

                var upstream: CoConnection? = null
                var done: Boolean = false
                val collection: R = coll

                override suspend fun onSubscribe(conn: CoConnection) {
                    upstream = conn
                    consumer.onSubscribe(conn)
                }

                override suspend fun onNext(t: T) {
                    try {
                        collector(collection, t)
                    } catch (ex: Throwable) {
                        done = true
                        upstream!!.close()
                        consumer.onError(ex)
                        return
                    }
                }

                override suspend fun onError(t: Throwable) {
                    if (!done) {
                        consumer.onError(t)
                    }
                }

                override suspend fun onComplete() {
                    if (!done) {
                        consumer.onNext(collection)
                        consumer.onComplete()
                    }
                }
            })
         
        }
    }
}


Sum


suspend fun <T: Number> CoFlow<T>.sumInt(): CoFlow<Int> {


    // ...
    var sum: Int = 0
    var hasValue: Boolean = false

    override suspend fun onNext(t: T) {
        if (!hasValue) {
            hasValue = true
        }
        sum += t.toInt()
    }

    // ...

    override suspend fun onComplete() {
        if (hasValue) {
            consumer.onNext(sum)
        }
        consumer.onComplete()
    }
}

Max


suspend fun <T: Comparable<T>> CoFlow<T>.max(): CoFlow<T> {

    // ...
    var value: T? = null

    override suspend fun onNext(t: T) {
        val v = value
        if (v == null || v < t) {
            value = t
        }               
    }

    // ...

    override suspend fun onComplete() {
        val v = value
        if (v != null) {
            consumer.onNext(v)
        }
        consumer.onComplete()
    }
}

Flatten


suspend fun <T, R> CoFlow<T>.flatten(mapper: suspend (T) -> Iterable<R>): CoFlow<R> {

    // ...

    override suspend fun onNext(t: T) {

        try {
            for (v in mapper(t)) {
                consumer.onNext(v)
            }
        } catch (ex: Throwable) {
            done = true
            upstream!!.close()
            consumer.onError(ex)
            return
        }
    }

}

Concat


suspend fun <T, R> CoFlow<T>.concat(vararg sources: CoFlow<T>): CoFlow<T> {
    return object: CoFlow<T> {
        suspend override fun subscribe(consumer: CoConsumer<T>) {
            val closeToken = SequentialConnection()
            consumer.onSubscribe(closeToken)
            launch(Unconfined) {
                val ch = Channel<Unit>(1);

                for (source in sources) {

                    source.subscribe(object: CoConsumer<T> {
                        suspend override fun onSubscribe(conn: CoConnection) {
                            closeToken.replace(conn)
                        }

                        suspend override fun onNext(t: T) {
                            consumer.onNext(t)
                        }

                        suspend override fun onError(t: Throwable) {
                            consumer.onError(t)
                            ch.close()
                        }

                        suspend override fun onComplete() {
                            ch.send(Unit)
                        }

                    })

                    try {
                        ch.receive()
                    } catch (ex: Throwable) {
                        // ignored
                        return@launch
                    }
                }

                consumer.onComplete()
            }
        }
    }
}


Before concat, we did not have to interact with the cancellation mechanism of the coroutine world. Here, if one wants to avoid unbounded recursion due to switching to the next source, some trampolining is necessary. The launch(Unconfined), as I understand it, should do just that. Note that the returned Job is not joined into the CoConnection rail, partly due to avoid writing a CompositeCoConnection, partly because I don't know how generally such contextual component should interact with our CoFlow setup. Suggestions welcome.

As for the use of Channel(1), I encountered two problems:

  • I don't know how to hold off the loop otherwise as suspendCoroutine { } doesn't allow its block to be suspendable and we have subscribe() as suspendable.
  • The plain Channel() is a so-called rendezvous primitive where send() and receive() have to meet. Unfortunately, a synchronously executed CoFlow will livelock because send() suspends - because there is no matching receive() call on the same thread - which would resume receive(). A one element channel solved this.


The (simpler) SequentialConnection is implemented as follows:


class SequentialConnection : AtomicReference<CoConnection?>(), CoConnection {

    object Disconnected : CoConnection {
        suspend override fun close() {
        }
    }

    suspend fun replace(conn: CoConnection?) : Boolean {
        while (true) {
            val a = get()
            if (a == Disconnected) {
                conn?.close()
                return false
            }
            if (compareAndSet(a, conn)) {
                return true
            }
        }
    }

    suspend override fun close() {
        getAndSet(Disconnected)?.close()
    }
}

It uses the same atomics logic as the SequentialDisposable in RxJava.

Leaving the reactive world

Eventually, we'd like to return to the plain coroutine world and resume our imperative code section after a CoFlow has run. One case is to actually ignore any emission and just wait for the CoFlow to terminate. Let's write an await() operator for that:


suspend fun <T> CoFlow<T>.await() {
    val source = this

    val ch = Channel<T>(1)

    source.subscribe(object : CoConsumer<T> {
        var upstream : CoConnection? = null

        suspend override fun onSubscribe(conn: CoConnection) {
            upstream = conn
        }

        suspend override fun onNext(t: T) {
        }

        suspend override fun onError(t: Throwable) {
            ch.close(t)
        }

        suspend override fun onComplete() {
            ch.close()
        }
    })

    try {
        ch.receive()
    } catch (ex: ClosedReceiveChannelException) {
        // expected closing
    }
}

The same Channel(1) trick is used here. Again, I don't know how to attach the CoConnection to the caller's context.

Sometimes, we are interested in the first or last item generated through the CoFlow. Let's see how to get to the first item via an awaitFirst():


suspend fun <T> CoFlow<T>.awaitFirst() : T {
    val source = this

    val ch = Channel<T>(1)

    source.subscribe(object : CoConsumer<T> {
        var upstream : CoConnection? = null
        var done : Boolean = false

        suspend override fun onSubscribe(conn: CoConnection) {
            upstream = conn
        }

        suspend override fun onNext(t: T) {
            done = true
            upstream!!.close()
            ch.send(t)
        }

        suspend override fun onError(t: Throwable) {
            if (!done) {
                ch.close(t)
            }
        }

        suspend override fun onComplete() {
            if (!done) {
                ch.close(NoSuchElementException())
            }
        }
    })

    return ch.receive()
}


The benchmark


Since benchmarking concurrent performance would be somewhat unfair at this point, the next best benchmark I can think of is our standard Shakespeare Plays Scrabble. It can show the infrastructure overhead of a solution without any explicitly stated concurrency need from the solution.

Rather than showing the somewhat long Kotlin source code adapted for CoFlow, you can find the benchmark code in my repository. The environment: i7 4770K, Windows 7 x64, Java 8u144, Kotlin 1.1.4-3, Coroutines 0.18, RxJava 2.1.3 for comparison:

    RxJava Flowable: 26 milliseconds / op
    Coroutines CoFlow: 52.4 milliseconds / op

Not bad for the first try with limited knowledge. I can only speculate about a source of the 2x slower CoFlow implementation: Channel. I'm not sure it meant to support multiple senders and multiple receives, thus the internal queue is involved in way more atomics operation than necessary for our single-producer-single-consumer CoFlow/Reactive-Streams architecture.

Conclusion


As demonstrated, it is possible to rewrite (a set of) RxJava operators with coroutines and depending on the use case, even this (unoptimized) 2x overhead could be acceptable. Does this mean the rest of the 180 operators can be (reasonably) well translated?

I don't know yet; flatMap(), groupBy() and window() are the most notoriously difficult operators due to the increased concurrency and backpressure interaction:


  • flatMap has to manage a dynamic set of sources which each have to be backpressured. Should each of them use the same Channel.send() or go round robin in some way?
  • groupBy is prone to livelock if the groups as whole and individually are not consumed.
  • window has a pecuilar operation mode (true for groupBy) that if one takes one window only, the upstream should not be cancelled until items aimed at that window have been emitted by the upstream or the consumption of the window is cancelled.

Can RxJava be ported to Kotlin Coroutines: yes. Should the next RxJava rather be written in Kotlin Coroutines: I don't think so. The reasons I'm still not for "Coroutines everywhere" despite all the code shown in this post are:

  • I had to do this porting myself, which hardly constitutes as an unbiased and independent verification.
  • The coroutine concept is great, but tied to Kotlin as a compiler and its standard library. What should happen with the non-Kotlin, non-Android reactive users? What about other JVM languages?
  • Building the state machine is hidden from the developer by the compiler. There is always the risk the compiler doesn't do reasonable optimization job and/or doesn't introduce certain bugs you can't workaround easily from the user level. How often is the Kotlin language/standard library updated to fix issues? How is that SAM issue doing?

Solving problems developers face is great, hyping about "burrying Reactive programming as obsolete" without supporting evindence is not.

1 megjegyzés:

  1. Accessing an outer receiver is done via this@map or @whatever the outer scope is called.

    VálaszTörlés