2017. szeptember 25., hétfő

Java 9 Flow API: timing out events

Introduction


One of the main properties of reactive programming is that the events may arrive over time instead of immediately available to a consumer. In traditional Future-based programming, one could wait for the result in a blocking manner via Future.get(long, TimeUnit). Other data sources, such as network InputStream have either their own built-in timeout facility or one has to use external means to close the stream after certain period of time to unblock the reader to it. Java 8 Streams have also no direct timeout support.

In the reactive mindset, one can consider timing out events (items) as requesting an element and racing its arrival against the clock. If the item arrives in time, we should ignore the clock. If the clock fires first, we should stop the sender of the items and somehow notify the consumer of the situation. Perhaps the simplest way is to signal onError with a TimeoutException. Since there could be multiple items from a flow, we have to do this racing for each potential items over and over until the flow terminates.


The timeout operator


Since there is "time" in timeout, we'll need a source of time that can be started and stopped at will. The first tool that comes into mind is the java.util.Timer class, however, even its Javadoc suggest one uses a ScheduledExecutorService instead. If one has to deal with a lot of timed operations, besides of timing out flows, having the control over such signals via a (set of) ScheduledExecutorServices is desirable. Therefore, let's define our timeout API with it:


public static <T> Flow.Publisher<T> timeout(
        Flow.Publisher<T> source,
        long timeout, TimeUnit unit,
        ScheduledExecutorService timer) {

    return new TimeoutPublisher<>(source, timeout, unit, timer);
}


(Note that if one uses the Executors.newScheduledExecutorService(), it has to be shutdown at some point, otherwise it's non-daemon thread by default would prevent the JVM from quitting.)

One primary responsibility of this type of operator is to make sure the downstream's onXXX methods are called in sequence and non-overlapping. However, a timeout may happen at the same time the main source signals an event which could lead to call to onNext and onError concurrently - violating the Flow protocol.

The natural non-blocking solution could be the use of a serialization primitive described some time ago in this blog, but we could go more lean on such behavior due to the special nature of this operator.

Thinking about the possible states of such operator, there are two state transitions we should consider: receiving and emitting the next upstream signal, receiving and emitting an error for the timeout signal. In concurrency land, this means using a state variable and compare-and-set to transition from state to state in an atomic fashion.

There is a small problem though: how does a timeout know it happened after the right item didn't arrive? Relying on the accuracy of Future.cancel() for this might not be the best option. Luckily, there is a way to have both proper mutual exclusion and serialization: by using a long field index to track the index of the upstream item as well as indicate a terminal state via Long.MAX_VALUE - an index unlikely reached.

The idea is as follows: a regular onNext signal from the upstream increments this long field unless it is Long.MAX_VALUE at which point the event is ignored. An upstream onError or onComplete will atomically swap in the Long.MAX_VALUE and if not already at Long.MAX_VALUE, emit the respective signal to the downstream. At the same time, a thread executing the Runnable.run() for on the timeout side will try to swap in the last event's index with a Long.MAX_VALUE too. If the current index hasn't changed during the wait (no onXXX signal from the main source), the atomic swap will ensure that any subsequent onXXX call will ignore its event, thus only one thread at a time will emit a signal to the downstream. This may sound complicated when written in plaintext but the code should look much clearer in a couple of paragraphs.

When doing the parent Flow.Publisher implementation of TimeoutPublisher, one has to consider a peculiar case. When should the timeout for the very first item start? Remember, we have to call onSubscribe on the downstream to provide it with the ability to request and cancel the flow. If the timeout would start after downstream.onSubscribe(), a synchronous and blocking source responding to the downstream's request at that point may not give back control at all, rendering the timeout operator nonoperational. If the timeout started before the call to downstream.onSubscribe(), we risk emitting the TimeoutException before or while the downstream.onSubscribe() is executing. Since we also have to intercept a cancel() call from downstream to stop an ongoing timeout, we have to call downstream.onSubscribe() from within the Flow.Publisher.subscribe() method before we subscribe to the upstream or even start the timeout for the very first item:


    @Override
    public void subscribe(Subscriber<? super T> s) {
        TimeoutSubscriber<T> parent = new TimeoutSubscriber<>(
            s, timeout, unit, timer);

        s.onSubscribe(parent);

        parent.startTimeout(0L);

        source.subscribe(parent);
    }


We create the in-sequence Flow.Subscriber first (which implements Flow.Subscription), send it to the downstream, start the timeout for the first item (index is zero at this point) and we subscribe to the upstream source.

The next step is to write up the skeleton of the TimeoutSubscriber:


static final class TimeoutSubscriber<T> implements
Flow.Subscriber<T>, Flow.Subscription {
   
    final Flow.Subscriber<? super T> downstream;

    final long timeout;

    final TimeUnit unit;

    final ScheduledExecutorService timer;

    Flow.Subscription upstream;
    static final VarHandle UPSTREAM =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "upstream", Flow.Subscription.class);

    long requested;
    static final VarHandle REQUESTED =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "requested", long.class);

    long index;
    static final VarHandle INDEX =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "index", long.class);

    Future<?> task;
    static final VarHandle TASK =
        VH.find(MethodHandles.lookup(), TimeoutSubscriber.class,
            "task", Future.class);

    static final Future<Object> CANCELLED_TASK = 
        new FutureTask<>(() -> { }, null);
    
    TimeoutSubscriber(
            Flow.Subscriber<? super T> downstream,
            long timeout,
            TimeUnit unit,
            ScheduledExecutorService timer) {
        this.downstream = downstream;
        this.timeout = timeout;
        this.unit = unit;
        this.timer = timer;
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }
   
    @Override
    public void request(long n) {
        // TODO implement
    }
    
    @Override
    public void cancel() {
        // TODO implement
    }

    void run(long idx) {
        // TODO implement
    }

    void startTimeout(long idx) {
        // TODO implement
    }
}

The fields can be described as follows:


  • downstream is the Flow.Subscriber we'd like to signal events to.
  • timeout and unit have to be stored as a new timeout task will have to be started after each in-time event.
  • timer is the ScheduledExecutorService we will submit tasks delayed by the timeout and unit.
  • upstream will hold the source's Flow.Subscription. Its VarHandle UPSTREAM will allow us to defer any cancellation coming from the downstream until there is an instance of it from the upstream. In addition, we have to defer requesting from downstream due to the same reason as with cancellation: since we call the downstream.onSubscribe before subscribing to the upstream, the downstream may issue cancel and/or request to a not-yet available Flow.Subscription.
  • requested will hold onto any downstream requests until there is an upstream Flow.Subscription available to the operator. This is part of the so-called deferred requesting pattern.
  • index holds the state that indicates if the operator has reached its terminal state naturally or due to timeout.
  • task holds the current Future for the timeout task submitted to the timer ScheduledExecutorService or the CANCELLED_TASK indicator if the operator has been cancelled concurrently to its operation and no further scheduling of any tasks should happen. Since only one such task has to run at a time, the actual parallelism level of the ScheduledExecutorService doesn't matter. The value has to be changed atomically, hence the TASK VarHandle after it.

The implementation of each method, although relatively short, is a bit more involved as they are more involved in direct, inlined atomic state changes. The onSubscribe method has to provide the deferred cancellation and deferred requesting behavior:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (UPSTREAM.compareAndSet(null, s)) {
            long r = (long)REQUESTED.getAndSet(this, 0L);
            if (r != 0L) {
                s.request(r);
            }
        } else {
            s.cancel();
        }
    }


We atomically try to set in the upstream's Flow.Subscription and if successful, we atomically take the accumulated request amount from downstream. If there was any, we request it from upstream. Otherwise, a non-null upstream just means the operator has been cancelled and we cancel the upstream as well.

Before we look into the other onXXX methods, let's see the partners of onSubscribe to show the other sides of the deferred requesting and cancellation logic:

    @Override
    public void request(long n) {
        Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAcquire(this);
        if (current != null) {
            current.request(n);
        } else {
            for (;;) {
                long r = (long)REQUESTED.getAcquire(this);
                long u = r + n;
                if (u < 0L) {
                    u = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, r, u)) {
                    break;
                }
            }

            current = (Flow.Subscription)UPSTREAM.getAcquire(this);
            if (current != null && current != this) {
                long r = (long)REQUESTED.getAndSet(this, 0L);
                if (r != 0L) {
                    current.request(r);
                }
            }
        }
    }

When the downstream calls request(), we could be in two states: the upstream Flow.Subscription is available, in which case we simply forward the request amount to it; or the upstream Flow.Subscription is not available and we have to temporarily accumulate downstream requests (which could happen on any thread and any time). This happens via our standard bounded atomic add operation. Once this succeeds, the upstream may have just called onSubscribe with a valid Flow.Subscription (the value of this indicates cancel() has been invoked, see down below). In that case, we atomically take all the accumulated requests, swapping in a zero in exchange, and if that value was non-zero, we issue the request to this fresh upstream. Concurrent interleaving will find requested zero or non-zero and issue any excess request amount accordingly.


    @Override
    public void cancel() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }
        }
    }


First, we atomically swap in the terminal index Long.MAX_VALUE, locking out both the onXXX methods and the timeout task. Then, we atomically swap in the cancelled Flow.Subscription indicator (this) and cancel any available upstream. The same thing has to happen to the timeout task.

Now let's see the onNext implementation:

    @Override
    public void onNext(T item) {

        long idx = (long)INDEX.getAcquire(this);
        if (idx == Long.MAX_VALUE) {
            return;
        }
        if (!INDEX.compareAndSet(this, idx, idx + 1)) {
            return;
        }

        Future<?> future = (Future<?>)TASK.getAcqurie(this);
        if (future == CANCELLED_TASK) {
            return;
        }
        if (future != null) {
            future.cancel(false);
        }

        downstream.onNext(item);

        startTimeout(idx + 1);
    }


First we get the current index and if already at Long.MAX_VALUE, we quit. Next, we try to atomically update the index to the next value and if that fails (due to cancellation or a racy timeout), we quit as well. Once the index has been updated successfully, we cancel the ongoing timeout task, emit the current item and start a new task with the subsequent index value.

The onError and onComplete methods are relatively similar by cancelling the timeout task and signalling the appropriate terminal event:


    @Override
    public void onError(Throwable throwable) {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onError(throwable);
        }
    }

    @Override
    public void onComplete() {
        if ((long)INDEX.getAndSet(this, Long.MAX_VALUE) != Long.MAX_VALUE) {

            Future<?> future = (Future<?>)TASK.getAndSet(this, CANCELLED_TASK);
            if (future != null) {
                future.cancel(false);
            }

            downstream.onComplete();
        }
    }


The are pretty similar to how cancel() is implemented: we atomically swap in the terminal index Long.MAX_VALUE and cancel the timeout task. Note that cancelling the upstream is not necessary at this point as it is considered cancelled in accordance with the Flow (Reactive-Streams) specification.

    void run(long idx) {
        if (INDEX.compareAndSet(this, idx, Long.MAX_VALUE)) {

            Flow.Subscription current = (Flow.Subscription)UPSTREAM.getAndSet(this, this);
            if (current != null && current != this) {
                current.cancel();
            }

            downstream.onError(new TimeoutException());
        }
    }


Given the last known item index, we atomically try to swap in the terminal index indicator and if successful, we cancel the upstream (directly or in a deferred manner) followed by the signal of the TimeoutException. Since the task executing will end either way, there is no need or reason to cancel the Future tracking the timeout task itself. If the index has been changed (either due to the arrival of an onNext item or cancellation), the timeout task will do nothing.

Finally, let's see how the timeout tasks are scheduled:


    void startTimeout(long idx) {
        Future<?> old = (Future<?>)TASK.getAcquire(this);
        if (old != CANCELLED_TASK) { 
            Future<?> future = timer.schedule(() -> run(idx), timeout, unit);
            if (!TASK.compareAndSet(this, old, future)) {
                future.cancel(false);
            }
        }
    }


First, we get the previous, old task instance so we can conditionally swap in the new task if there was no cancellation in between the two. Then, we schedule the execution of the run() method with the current item index (provided by the subscribe() or the onNext() method). If the actual index doesn't change until the run() executes, it will trigger the timeout logic. Then, we atomically try to replace the old Future with the new one and if that fails (due to cancellation), we cancel the new Future task too.


Conclusion


As demonstrated in this post, writing a basic timeout operator - which just signals a TimeoutException, can be done with a relatively few lines. The complication comes from undestanding why the atomic index changes are actually correct in various race scenarios and how it also ensures proper event serialization towards the downstream.

The same pattern can be used for writing a set of other operators which uses one-shot signal to "interrupt" a main sequence, for example, takeUntil(Flow.Publisher), where in the latter, the Future tracking is replaced by tracking the Flow.Subscription of the other Flow.Publisher.

There is, however, the natural need for doing something else than signalling a TimeoutException: switching to another Flow.Publisher for example. One would think, let's subscribe the TimeoutSubscriber to that fallback Flow.Publisher! Unfortunately, it doesn't work, partly because we'd probably didn't want to timeout the fallback Flow.Publisher's consumption (and even if), partly because we have to make sure the switch over preserves the number of outstanding downstream requests.

The deferred requesting logic in the operator shown in the post is not suitable for such transitions and it requires a peculiar Flow.Subscription management logic I call Subscription Arbitration. It enables a whole suit of other operators to work across sources, such as typical RxJava operators concat(Map), repeat(When), retry(When), timeout with fallback and onErrorResumeNext. In the next blog post, we'll see how the arbitration logic and most of these operators can be implemented.

2017. szeptember 21., csütörtök

Java 9 Flow API: switching threads

Introduction


Ensuring certain computations happen on the right thread, usually off the main thread, is a very common development task when dealing with reactive flows. When building up tools for Java 9's Flow API, one can decide to add this thread-switching support to each operator directly - see the range() operator from the start of the series -, or have a standalone stage for this purpose.

This is a tradeoff. Inlining thread switching avoids bogging down the source thread like the thread-stealing behavior of most of the queue-drain approach presented so far. A separate operator allows better composition and may even allow working with exotic asynchrony-providing components.


The observeOn operator


In Java, threading support is provided via the Executor, ExecutorService and ScheduledExecutorService-based API. Executor is is the most basic one of them which only provides a single execute(Runnable) method. This allows creating an Executor from a lambda:


Executor trampoline = Runnable::run;

Executor swing = SwingUtilities::invokeLater;

Executor pool = ForkJoinPool.commonPool();


As the least common denominator, we'll use Executor in defining our observeOn operator:


public static <T> Flow.Publisher<T> observeOn(
        Flow.Publisher<T> source, 
        Executor exec,
        int prefetch) {
    return new ObserveOnPublisher<>(source, exec, prefetch);
}


Crossing an asynchronous boundary requires the temporary storage of an event until the other side can pick it up. The queue-drain approach can provide a nice bounded queue we can size with prefetch. In addition, the so-called stable-prefetch request management (shown in the mapFilter operator before) allows minimizing the overhead of requesting more items.

First, let's see the skeleton of the operator's main Flow.Subscriber implementation:

static final class ObserveOnSubscriber<T> implements
Flow.Subscriber<T>, Flow.Subscription, Runnable {

    final Flow.Subscriber<? super T> downstream;

    final Executor exec;

    final int prefetch;

    final Queue<T> queue;

    Flow.Subscription upstream;

    int wip;
    static final VarHandle WIP = 
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "wip", int.class);

    long requested;
    static final VarHandle REQUESTED =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "requested", long.class);

    boolean done;
    static final VarHandle DONE =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "done", boolean.class);

    boolean cancelled;
    static final VarHandle CANCELLED =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class, 
                "cancelled", boolean.class);
    
    Throwable error;

    long emitted;

    int consumed;

    ObserveOnSubscriber(
            Flow.Subscriber<? super T> downstream,
            Executor exec,
            int prefetch
    ) {
        this.downstream = downstream;
        this.exec = exec;
        this.prefetch = prefetch;
        this.queue = new SpscArrayQueue<>(prefetch);
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    void schedule() {
        // TODO implement
    }

    @Override
    public void run() {
        // TODO implement
    }
}


The SpscArrayQueue is available from the JCTools library. VH is a shortand utility class for getting a VarHandle for a particular field:

public final class VH {
    public static VarHandle find(
            MethodHandles.Lookup lookup, 
            Class clazz,
            String field,
            Class type) {
        try {
            return lookup.findVarHandle(clazz, field, type);
        } catch (Throwable ex) {
            throw new InternalError(ex);
        }
    }
}


If you are using IntelliJ IDEA, the latest version has nice parameter matching support for findVarHandle and highlights it if the field or type doesn't match the target class' member definition. Unfortunately, the method throws a checked exception which requires wrapping it in try-catch unavailable at field initialization time. By factoring the logic out, we lose that support but gain some other convenience. Needless to say, if the field names or types are off, we'll get a nice InternalError during unit tests anyway. Note that the MethodHandles.Lookup instance has to be provided because otherwise the find method could not access the fields of the target class due to visibility restrictions enforced by the JVM otherwise.

Now let's describe the fields quickly:


  • downstream comes from the parent Flow.Publisher's subscribe method as usual.
  • exec is the Executor the operator will use
  • prefetch defines the number of items to request from the upstream when the connection is established. A specific proportion (75%) will be used for replenishing items.
  • queue will hold up to the given prefetch amount of items. Since there is only one thread calling onNext and one thread draining the queue, we'll use a bounded single-producer single-consumer queue.
  • upstream is received through onSubscribe from the upstream source that allows requesting and cancelling the flow.
  • wip will ensure there is only one drain running at a time, executing on the given Executor. This is the same trampolining logic as with other concurrent operators: a transition from 0 to 1 will start the drain process and any further increments to wip will indicate additional work has to be done by the drain process.
  • requested will track the items requested by the downstream as the requesting is decoupled by the operator. The reason for this is that the operator uses a bounded buffer and there is no way to expect the downstream to request only as much as the buffer can hold at a given moment.
  • done indicates the upstream has finished emitting items. Calling onComplete on the downstream immediately is not an option as there could be items still queued up.
  • cancelled indicates the downstream issued a cancel() call and both the drain loop and the upstream has to stop producing events.
  • error holds the Throwable from the upstream. We'll only read it if done == true ensuring proper cross-thread visibility.
  • emitted counts how many items have been emitted towards the downstream. Decrementing the requested field is possible but generally expensive due to unavoidable atomics (and may not auto-batch decrements at all, thus each item incurs the cost).
  • consumed counts how many items have been taken from the queue. It can be reset to zero when a certain limit is reached (75% of prefetch for example) and issue a request() towards the upstream, asking for a fixed number of items periodically.
Implementing the Flow.Subscriber methods is straightforward as they have little to do; store values and try starting the drain process:


    @Override
    public void onSubscribe(Flow.Subscription s) {
        upstream = s;
        downstream.onSubscribe(this);
        s.request(prefetch);
    }

    @Override
    public void onNext(T item) {
        queue.offer(item);
        schedule();
    }

    @Override
    public void onError(Throwable throwable) {
        error = throwable;
        DONE.setRelease(this, true);
        schedule();
    }

    @Override
    public void onComplete() {
        DONE.setRelease(this, true);
        schedule();
    }


The implementation of Flow.Subscription part is similarly not that complicated. The handling of non-positive request amount is left out for brevity. We have to trigger the draining logic when the downstream requests in case the upstream has items queued up already due to possible speed difference.

    @Override
    public void request(long n) {
        for (;;) {
           long a = (long)REQUESTED.getAcquire(this);
           long b = a + b;
           if (b < 0L) {
               b = Long.MAX_VALUE;
           }
           if (REQUESTED.compareAndSet(this, a, b)) {
               schedule();
               break;
           }
    }

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            upstream.cancel();
            if ((int)WIP.getAndAdd(this, 1) == 0) {
                queue.clear();
            }
        }
    }


The request() uses the typical, capped atomic addition of the requested amount. The cancel() method sets the cancelled flag once, cancels the upstream and if there is no draining happening at the moment, clears the queue to help the GC.

The schedule() method conceptually works like any drain() method we've implemented so far. The difference is that the drain loop inside it has to be run by the Executor (potentially) on another thread:


    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {
            exec.execute(this);
        }
    }



By implementing Runnable directly, it saves us creating the drain task all the time; here, all the state that need to be communicated between the caller of schedule() and the run() method is done through fields. Since the transition from 0 to 1 and later on, N back to 0 happens on a single thread, the underlying Executor can be a pool of any threads; this type of trampolining will make the Executor to pick one thread from this pool and while the drain lasts, prevent any other thread from the same pool to start another drain run (which would violate the Flow protocol and/or reorder events).

Finally, let's see the implementation of run() that holds the rest of a typical drain-loop:

    @Override
    public void run() {
        int missed = 1;
        
        Flow.Subscriber<? super T> a = downstream;
        Queue<T> q = queue;
        long e = emitted;
        int c = consumed;
        int limit = prefetch - (prefetch >> 2);

        for (;;) {

             long r = (long)REQUESTED.getAcquire(this);

             while (e != r) {
                 // TODO implement
             }

             if (e == r) {
                 // TODO implement
             }

             emitted = e;
             consumed = c;
             missed = (int)WIP.getAndAdd(this, -missed) - missed;
             if (missed == 0) {
                 break;
             }
        }
    }


The pattern for the drain loop is pretty standard: the missing counter will detect if there is more work to be done, we load fields into local variables to avoid fetching them from last-level-cache due to all the atomics around. Lastly, we have the usual while loop that keeps running until we run out of requests or upstream events, then checking if by reaching a terminal state in the operator, the terminal events can be emitted without actual requests or not.

There is nothing unusual at this point in the inner while loop and the if statements:

    while (e != r) {
        if ((boolean)CANCELLED.getAcquire(this)) {
            q.clear();
            return;
        }
       
        boolean d = (boolean)DONE.getAcquire(this);
        T v = q.poll();
        boolean empty = v == null;

        if (d && empty) {
            Throwable ex = error;
            if (ex == null) {
                a.onComplete();
            } else {
                a.onError(ex);
            }
            return;
        }    

        if (empty) {
            break;
        }

        a.onNext(v);

        e++;
        if (++c == limit) {
            c = 0;
            upstream.request(limit);
        }
    }

    if (e == r) {
        if ((boolean)CANCELLED.getAcquire(this)) {
            q.clear();
            return;
        }
       
        boolean d = (boolean)DONE.getAcquire(this);
        boolean empty = q.isEmpty();

        if (d && empty) {
            Throwable ex = error;
            if (ex == null) {
                a.onComplete();
            } else {
                a.onError(ex);
            }
            return;
        } 
    }


Refitting operators with async drain


As hinted during the conclusion of the orderedMerge() operator, sometimes we'd want to drain the events of a multi-source operator on another thread. If one thinks about the structure shown in the previous section, the solution to the problem should be clear: propagate an Executor instance into the operator, implement Runnable, keep the WIP increment in drain() and move the rest of the method into the run() method:


    final Executor exec;

    // ...

    void drain() {
        if (getAndIncrement() != 0) {
            return;
        }

        exec.execute(this);
    }

    @Override
    public void run() {
 
        int missed = 1;
        Flow.Subscriber downstream = this.downstream;
 
        Comparator comparator = comparator;
 
        OrderedMergeInnerSubscriber[] subscribers = this.subscribers;
        int n = subscribers.length;
 
        Object[] values = this.values;
 
        long e = emitted;
 
        for (;;) {
             long r = (long)REQUESTED.getAcquire(this);
 
             for (;;) {
                 /* unchanged, left out for brevity */
             }
 
             emitted = e;
             missed = addAndGet(-missed);
             if (missed == 0) {
                 break;
             }
        }
    }


Cancelling the drain task


So far, we used the void execute(Runnable) method and relied upon cancelled to stop the drain process itself. However, there are a couple of problems:


  • What if the onNext call on downstream blocks or takes a long time?
  • What if the Executor is so busy the drain doesn't execute within a timeout?

One way of dealing with this situation is to use the more advanced ExecutorService API and work with instances provided by the java.util.concurrent.Executors utility class (or the ForkJoinPool.commonPool()). Their submit() method returns a Future instance we can call cancel() on. The benefit is that all the Thread interruption infrastructure is provided and we don't have to write that ourselves:


    final ExecutorService execService;

    // ...

    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {
            Future<?> future = exec.submit(this);
            // do something with "future"
        }
    }


At this point we have to make future available to the cancellation routine in some thread-safe fashion because cancel() can be called at any time from any thread, even while the schedule() method is executing. The solution is to use the deferred cancellation pattern. First we have to define a field to store the current Future of the running drain task and an associated VarHandle to perform atomic operations with the reference:

    Future<?> future;
    static final VarHandle FUTURE =
        VH.find(MethodHandles.lookup(), ObserveOnSubscriber.class,
            "future", Future.class);
    static final Future<?> CANCELLED_TASK = new FutureTask<>(() -> { }, null);


Next, we have to update the schedule() method to store the Future obtained from the ExecutorService.submit call not just atomically, but also avoiding a peculiar race.

    void schedule() {
        if ((int)WIP.getAndAdd(this, 1) == 0) {

            Future<?> old = (Future<?>)FUTURE.getAcquire(this);
            if (old != CANCELLED_TASK) {
                Future<?> future = exec.submit(this);
                if (!FUTURE.compareAndSet(this, old, future)) {
                    old = (Future<?>)FUTURE.getAcquire(this);
                    if (old == CANCELLED_TASK) {
                        future.cancel(true);
                    }
                }
            }
        }
    }



Once the submit() returns, we might no longer be under the protection of the wip != 0 and thus a concurrent call to schedule() (via request() for example) may have triggered another drain run with another Future instance. If we'd update the future reference unconditionally, that newer Future instance would be knocked out and the task-cancellation would no longer work properly.

In addition, a concurrent cancellation may have swapped in the CANCELLED_TASK indicator in which case we should cancel the Future right away. There is no need to loop this construct because if the CAS fails, it is either due to a cancellation or a newer Future task. The former requires cancelling the returned Future and the latter can be ignored because the returned Future is practically finished anyway.

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            upstream.cancel();

            Future<?> future = (Future<?>)FUTURE.getAndSet(this, CANCELLED_TASK);
            if (future != null && future != CANCELLED_TASK) {
                future.cancel(true);
            }

            if ((int)WIP.getAndAdd(this, 1) == 0) {
                queue.clear();
            }
        }
    }

The cancel() method requires less changes: we atomically replace the current future with the CANCELLED_TASK indicator and if otherwise the operator was not cancelled already, we cancel the non-null Future instance and carry on.

One can, of course, go extra lengths and make sure the future field is nulled out in between drain runs to avoid leaking references with certain executor service-alikes.


Conclusion


In this post I've shown how one can implement a thread-switching operator, observeOn in RxJava terms, with the Java 9 Flow API and the standard Java async-providing in options such as Executors. The presented algorithms which automatically trampoline the draining of the queued events allows using all sorts of Executors, including a completely synchronous Runnable::run Executor that executes the task on the caller thread. Since the operator's logic is largely independent of how the asynchrony is established, it can be tested quite easily in a synchronous unit test by itself or as part of a complicated chain of potentially asynchronous flows.

In practice, one may have consider the handling of a RejectedExecutionException thrown by the execute/submit methods on certain ExecutorServices. The trouble is that even though downstream.onError can be called - as no drain() is/can run at that time -, the downstream may really expect errors also delivered on the same background thread as the onNext items. In addition, one has to suppress other onXXX signals from the upstream similar to how a failed user-provided function in an onNext has to suppress any subsequent onError or onComplete call by the upstream because these terminal events are not required to stop emitting events. This part is left to the library developer to decide on his/her own.

In the next post, we'll see how one can stop a flow if items didn't arrive in a timely manner.

2017. szeptember 11., hétfő

Interoperation between RxJava and Kotlin Coroutines

Introduction


Writing imperative-looking code with Kotlin Coroutines is certainly an attractive property of it, but I'd think things can get quite convoluted pretty fast once, for example, Selectors are involved.

I haven't gotten there to look at what Selectors are, I only read that they can help you implement a flatMap like stream combiner. We are not goind to do that now, RxJava can do it for us after all.

However, the reasonable question arises: if I have a coroutine generator, a coroutine transformation or simply want to receive items from a Flowable, how can I make RxJava work with these coroutines?

Easily with the combined magic of Kotlin Coroutines and RxJava coroutines!


Suspendable Emitter


A generator is a source-like construct that emits items followed by a terminal signal. It should be familiar from RxJava as the Flowable.generate() operator. It gives you a FlowableEmitter and the usual onNext, onError and onComplete calls on it.

One limitation is that you can call onNext only once per invocation of your (Bi)Consumer lambda that receives the emitter. The reason is that we can't block a second call to onNext and we don't want to buffer it either; therefore, RxJava cooperates with the developer.

Compiler supported suspension and state machine built by it, however, allow us to prevent a second call from getting through by suspending it until there is a demand from the downstream, which then resumes the coroutine where it left off. Therefore, we can lift the single onNext requirement for our Coroutine-based generator.

So let's define the SuspendEmitter interface


interface SuspendEmitter<in T> : CoroutineScope {

    suspend fun onNext(t: T)

    suspend fun onError(t: Throwable)

    suspend fun onComplete()
}


By extending the CoroutineScope, we provide useful infrastructure (i.e., coroutineContext, isActive) to the block that will target our SuspendEmitter. One can argue that why use onError and onComplete since a coroutine can throw and simply end. The reason is that this way, a coroutine can terminate the sequence from a transformation we'll see later, just like our recent mapFilter operator allows it.


The flow-producer

Given our context providing interface for a generator coroutine, let's define the generator method the user will call:


fun <T> produceFlow(generator: suspend SuspendEmitter.() -> Unit) : Flowable<T> {
    return Produce(generator)
}


(For those unfamiliar with the Kotlin syntax, the SuspendEmitter.() -> Unit is practically a one parameter lambda of signature (param: SuspendEmitter) -> Unit where, when the lambda is implemented, accessing methods of param do not need to be qualified by it, thus you can write onNext(1) instead of param.onNext(1).)

We have to implement a Flowable that interacts with a suspendable generator function in some fashion. When implementing source-like operators, one usually has to write a Subscription instance and call Subscriber.onSubscribe() with it.

class Produce<T>(private val generator: suspend SuspendEmitter<T>.() -> Unit) : 
        Flowable<T>() {
    override fun subscribeActual(s: Subscriber<in T>) {
        launch(Unconfined) {
            val parent = ProduceSubscription(s)
            parent.setJob(coroutineContext[Job])
            s.onSubscribe(parent)
            generator(parent)
        }
    }
}


Since the generator is a suspendable coroutine, we need a context where it can run. The Unconfined context gives us a trampolined execution environment where resumptions of suspended coroutines are not confined to any particular thread, as if you'd run with the trampoline() Scheduler in RxJava.

We create our Subscription, attach the Job of the coroutine context itself to bridge the cancellation from a downstream Subscription.cancel(), signal the custom Subscription to the downstream and then execute the provided producer block by supplying it the parent which also implements SuspendEmitter.

So far, nothing is too hairy or convoluted, however, the interaction between regular trampolined coroutines of RxJava and the Kotlin Coroutine infrastructure is more involved.

Non-blocking await/notify

We will need a way to get the generator coroutine suspended if there are no downstream requests and we have to resume that coroutine when the downstream does request an amount. This resembles the wait-notify pair of a typical BlockingQueue implementation where a blocking emission due to a full queue gets unblocked by a notification by a concurrent take()/poll() invocation. Since we don't want to block and the coroutine infrastructure supports programmatic resuming of a coroutine, we'll use this feature in two helper methods establishing a non-blocking wait-notify exchange:


typealias Cont = Continuation<Unit>

fun notify(ref: AtomicReference<Cont?>) {
    while (true) {
        val cont = ref.get()
        val next : Cont?
        if (cont != null && cont != TOKEN) {
            if (ref.compareAndSet(cont, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(cont, TOKEN)) {
                break;
            }
        }
    }
}


We will use a valueless Continuation<Unit>, Cont for short, and atomics to place an indicator or an actual continuation object in an AtomicReference. The notify() atomically performs the following logic: if there is a real continuation in the reference, we clear it and then call resume on it to trigger the resumption. Otherwise, we set it to the shared TOKEN object indicating that when the other side, await, wanted to get continued, it can do so immediately.

fun await(ref: AtomicReference<Cont?>, cont: Cont) {
    while (true) {
        val a = ref.get()
        if (a == TOKEN) {
            if (ref.compareAndSet(a, null)) {
                cont.resume(Unit)
                break
            }
        } else {
            if (ref.compareAndSet(a, cont)) {
                break;
            }
        }

    }
}


The await() method uses the same reference and the continuation instance provided by a suspendCoroutine in its code block.The method atomically checks if there is a TOKEN and if so, it calls resume on the continuation parameter after clearing the TOKEN from the reference. Otherwise, it stores the continuation in the reference and quits.

val TOKEN: Cont = object: Cont {
    override val context: CoroutineContext
        get() = throw UnsupportedOperationException()

    override fun resume(value: Unit) {
        throw UnsupportedOperationException()
    }

    override fun resumeWithException(exception: Throwable) {
        throw UnsupportedOperationException()
    }

}


Finally, the TOKEN is just an empty implementation of a Continuation - we should never call its methods as the object reference itself serves only a purpose of indicator for an immediate resumption.



The ProduceSubscription  

Now we can implement the ProduceSubscription class. First, let's see the skeleton with the relevant fields:

open class ProduceSubscription<T>(
        private val actual: Subscriber<in T>,
        private val ctx : CoroutineContext
) : Subscription, SuspendEmitter<T> {

    companion object {
        val CANCELLED = Object()
    }

    @Suppress("DEPRECATION")
    override val context: CoroutineContext
        get() = ctx!!

    override val isActive: Boolean
        get() = job.get() != CANCELLED

    private val job = AtomicReference<Any>()

    private val requested = AtomicLong()

    private val resume = AtomicReference<Cont?>()

    private var done: Boolean = false

    override suspend fun onNext(t: T) {
        // TODO implement
    }

    override suspend fun onError(t: Throwable) {
        // TODO implement
    }

    override suspend fun onComplete() {
        // TODO implement
    }

    override fun cancel() {
        // TODO implement
    }

    override fun request(n: Long) {
        // TODO implement
    }

    fun setJob(j: Job?) {
        // TODO implement
    }
}

We see the methods of both Subscription and SuspendEmitter along with a couple of fields/properties:


  • It takes the downstream's Subscriber and the CoroutineContext it will provide to the produce callback in the operator.
  • We will use the companion object's CANCELLED value to indicate the the parent job we get from the coroutineContext is cancelled exactly once.
  • It considers being active when the job object is not the CANCELLED indicator
  • Of which Job is then stored in the job AtomicReference.
  • We have to track the requested amount from downstream via an AtomicLong.
  • The resume AtomicReference stores the continuation to be used with the non-blocking await-notify shown in the previous section.
  • Finally, we have the done flag indicating the generator coroutine called onError or onComplete at most once.
Perhaps the main difficulty lies in the implementation of the onNext method as it is the primary interaction point between a coroutine that has to be suspended if there are no requests:


    override suspend fun onNext(t: T) {
        if (job.get() == CANCELLED) {
            suspendCoroutine { }
        }
        val r = requested.get()
        if (r == 0L) {
            suspendCoroutine { cont -> await(resume, cont)  }
        }

        actual.onNext(t)

        if (job.get() == CANCELLED) {
            suspendCoroutine { }
        }
        if (resume.get() == TOKEN) {
            resume.compareAndSet(TOKEN, null)
        }
        if (r != Long.MAX_VALUE) {
            requested.decrementAndGet()
        }
    }


First we check if the downstream has cancelled the generator in which case we should get out of the coroutine entirely. I'm not sure if there is a more appropriate way for doing this other than suspending indefinely.

Next, we check the request amount and if it is zero, we suspend the current coroutine by using our non-blocking await mechanism. Once notified, or there was at least one requested item, the code should continue with the emission of the item. This could trigger an in-sequence cancellation and we suspend the coroutine indefinitely again.

Since the downstream can immediately request some amount due to the s.onSubscribe(parent) call in the operator, before the generator can even run and call onNext, we may have a TOKEN in the resume field, that would otherwise incorrectly indicate the next call to await it can resume immediately, violating the backpressure we expect. I know this sounds convoluted, but I learned it the hard way...

Finally, we decrement the request amount if not unbounded.

The onError and onComplete look pretty much alike:


    override suspend fun onError(t: Throwable) {
        if (!done) {
            done = true
            actual.onError(t)
            cancel()
        }
        suspendCoroutine { }
    }

    override suspend fun onComplete() {
        if (!done) {
            done = true
            actual.onComplete()
            cancel()
        }
        suspendCoroutine { }
    }


We set the done flag to true, emit the relevant event to the downstream and then cancel the job/Subscription we are running with. I defensively suspend the coroutine afterwards.

Next we see how cancel() and setJob() works:

    override fun cancel() {
        val o = job.getAndSet(CANCELLED)
        if (o != CANCELLED) {
            (o as Job).cancel()
        }
    }

    fun setJob(j: Job?) {
        while (true) {
            val o = job.get()
            if (o == CANCELLED) {
                j?.cancel()
                break
            }
            if (job.compareAndSet(o, j)) {
                break
            }
        }
    }


They are pretty much implemented along RxJava's typical deferred cancellation mechanism. cancel() atomically swaps in the CANCELLED indicator and calls cancel on the Job it contained. setJob() atomically set the Job instance or cancels it if cancel() swapped in the CANCELLED indicator just before that.

Lastly, the request() implementation that is responsible for accounting downstream requests and resuming the suspended generator if inside onNext().

    override fun request(n: Long) {
        if (BackpressureHelper.add(requested, n) == 0L) {
            notify(resume)
        }
    }


In the RxJava world, a transition from 0 to n triggers the emission loop in a range() operator for example. Here, we notify a possibly suspended coroutine that will resume from the await() method we implemented.

Testing it is simple with RxJava:


val f = produceFlow {
    for (i in 0 until 10) {
         println("Generating $i")
         onNext(i)
    }
    onComplete()
}

f.test(0)
.assertEmpty()
.requestMore(5)
.assertValues(0, 1, 2, 3, 4)
.requestMore(5)
.assertResult(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


Outstanding!

The flow-transformer

Now that we have a way to emit items, we would like to emit an item in response to an upstream value, like the map() operator but with a suspendable coroutine function. RxJava's map is confined to return one item in exchange for one upstream item.

With coroutines and the ProduceSubscription described in the previous section, we could emit any number of items without overflowing a Subscriber!

Let's define our API and a skeleton implementation for it first:


fun <T, R> Flowable<T>.transform(
        transformer: suspend SuspendEmitter.(T) -> Unit)
 : Flowable<R> {
    return Transform(this, transformer)
}

class Transform<T, R>(
        private val source: Flowable<T>, 
        private val transformer: suspend SuspendEmitter<R>.(T) -> Unit)
 : Flowable() {
    override fun subscribeActual(s: Subscriber) {
        // TODO implement
    }
}


We define a transform extension method on Flowable with a suspendable transformer that takes our SuspendEmitter, the upstream's value and returns nothing.

This time, we have an upstream we have to subscribe to via a regular FlowableSubscriber from RxJava, call the coroutine in some way and make sure we keep calling the upstream for more values as we have to deal with the backpressure of the coroutine itself transitively.

The first step into this direction is the handling of the upstream's own Subscription we get through Subscriber.onSubscribe. We have to attach that to the Subscription we show to the downstream Subscriber. Since we will use the ProduceSubscription anyway, we extend it and override its cancel() for this purpose:


class ProduceWithResource<T>(
        actual: Subscriber<in T>,
        ctx : CoroutineContext
) : ProduceSubscription<T>(actual, ctx) {
    private val resource = AtomicReference<Subscription>()

    fun setResource(s: Subscription) {
        SubscriptionHelper.replace(resource, s)
    }

    override fun cancel() {
        SubscriptionHelper.cancel(resource)
        super.cancel()
    }
}


We simply use the deferred cancellation helper for Subscriptions.

Now let's see how we can prepare the context for running the coroutine inside the transform operator's subscribeActual() method:

    val ctx = newCoroutineContext(Unconfined)
    val parent = ProduceWithResource(s, ctx)
    s.onSubscribe(parent)
    source.subscribe(object: FlowableSubscriber {

        var upstream : Subscription? = null

        val wip = AtomicInteger()
        var error: Throwable? = null

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }
    })


First we create an unconfinded context where each invocation of the transformer coroutine will execute and suspend in. We create the producer that can hold an additional Subscription and send it to the downstream Subscriber. Finally, we subscribe to the upstream with a FlowableSubscriber.

In this custom FlowableSubscriber, we will have request from upstream, thus we save the Subscription we'll get from it. The wip and error fields will be used to achieve something similar to a half-serialization. I'll explain it once the methods are implemented.

Handling onSubscribe() is straightforward and typical for an RxJava operator:


    override fun onSubscribe(s: Subscription) {
        upstream = s
        parent.setResource(s)
        s.request(1)
    }


We store the upstream's subscription locally and in the ProducerWithResource to link up the cancellation across the operator. Then we request one item; this is partly due to simplifying the interaction between a suspended coroutine and the upstream producer. Using larger prefetch would require the use of some intermediate queue - possible, but left for the reader as an exercise. (Finally, we found a use for request(1)!)

Next, onNext():

    override fun onNext(t: T) {
        launch(ctx) {
           parent.setJob(coroutineContext[Job])

           wip.getAndIncrement()

           transformer(parent, t)

           if (wip.decrementAndGet() == 0) {
               upstream!!.request(1)
           } else {
               val ex = error;
               if (ex == null) {
                   s.onComplete()
               } else {
                   s.onError(ex)
               }
               parent.cancel()
           }
       }
    }

First, the Job of the actual coroutineContext has to be stored so a downstream cancellation can can call its Job.cancel() method. We have to do this because we will go in and out of the launch() when the upstream sends an item.

Next, the wip counter is incremented, which may seem odd. The reason for this is that if the transformer coroutine gets suspended, the execution returns to the caller of onNext(), a regular RxJava producer of some sorts. If this producer has reached its end, it will call onError or onComplete as these can be issued without request. As we'll see a bit later, forwarding these signals cuts out any pending emission from the suspended coroutine, therefore, we use the pattern of a half-serializer to save this terminal indication.

The transformer is executed with the parent ProducerWithResource instance that handles the suspendable onNext emissions towards the downstream.

Once the transformer's job has been done, the execution (resumes) with the atomic decrement of the wip counter. If it successfully decrements to 0, there was no terminal event signalled from the upstream while the transformer was suspended, thus we can request the next item from the upstream RxJava source.

The onError and onComplete are much simpler fortunately:


    override fun onError(t: Throwable) {
        error = t
        if (wip.getAndIncrement() == 0) {
            s.onError(t)
            parent.cancel()
        }
    }

    override fun onComplete() {
        if (wip.getAndIncrement() == 0) {
            s.onComplete()
            parent.cancel()
        }
    }


We store the Throwable (in onError only), then atomically increment the wip counter. If there was no ongoing coroutine, we are safe to emit the terminal event and cleanup/cancel the contextual Job we may still be referencing. If the original wip value was 1, the increment bumps it to 2 and the decrement in onNext() will detect the terminal condition and act accordingly.

Let's test it (by reusing the generator for fun)!

    f.transform({
        if (it % 2 == 0) {
            onNext(it)
        }
    })
    .test()
    .assertResult(0, 2, 4, 6, 8)

    f.transform({
        onNext(it)
        onNext(it + 1)
    })
    .test()
    .assertResult(0, 1, 1, 2, 2, 3, 3, 4, 4,
            5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10)

    f.transform({
        launch(CommonPool) {
            onNext(it + 1)
        }
    })
    .test()
    .awaitDone(5, TimeUnit.SECONDS)
    .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


We can filter or amplify a source, synchronously or asynchronously if necessary with a single operator! Excellent!

The receiver

The last operation we'd do is, given a Flowable flow, we'd like to return to the coroutine world and consume the flow. For that, a ReceiverChannel seems to be appropriate output type as it can be for-each looped nicely.

Let's define the extension method toReceiver() with a skeleton as well:


suspend fun  Flowable<T>.toReceiver(capacityHint: Int = 128) : ReceiveChannel<T> {
    val queue = Channel<T>(capacityHint)

    val upstream = AtomicReference<Subscription>()
    val error = AtomicReference<Throwable>()
    val wip = AtomicInteger()

    subscribe(object: FlowableSubscriber<T> {

        override fun onSubscribe(s: Subscription) {
            // TODO implement
        }

        override fun onNext(t: T) {
            // TODO implement
        }

        override fun onComplete() {
            // TODO implement
        }

        override fun onError(t: Throwable) {
            // TODO implement
        }

    })

    return // TODO implement
}


First, a Channel of type T and the given capacity is created. It is followed by the AtomicReference that will hold the source Flowable's Subscription, which will have to be linked up with the consumer to propagate cancellation. Next, since the upstream may signal terminal events while the channel is suspended in a send() we'll use - similar to the ProducerWithResource.onNext() situation, we will use the same AtomicInteger-based technique. The error AtomicReference will serve as the intermediary when handing over the terminal event to the channel.

Let's see the FlowableSubscriber implementation first:

        override fun onSubscribe(s: Subscription) {
            if (SubscriptionHelper.setOnce(upstream, s)) {
                s.request(1)
            }
        }

        override fun onNext(t: T) {
            launch (Unconfined) {
                wip.getAndIncrement()

                queue.send(t);

                if (wip.decrementAndGet() == 0) {
                    upstream.get().request(1)
                } else {
                    queue.cancel(error.get());
                }
            }
        }

        override fun onComplete() {
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel();
                }
            }
        }

        override fun onError(t: Throwable) {
            error.lazySet(t)
            if (wip.getAndIncrement() == 0) {
                launch(Unconfined) {
                    queue.cancel(t);
                }
            }
        }


The FlowableSubscriber implementation, practically, performs the same bookeeping as the transformer() operator did, with the exception that the closing of the channel has to happen in a launch-provided context.

However, this is only the producer half of the channel, we still need the consumer part, more specifically, the consumer-reemitter. Luckily, the build in produce() operator of the Coroutines library help with it. Why not return the channel directly? Because we need a way to detect if the channel is closed from the consumer's end and Channel doesn't allow us to register a completion handler for it. However, the Job inside the coroutineContext of produce() does:

    return produce(Unconfined) {
        coroutineContext[Job]?.invokeOnCompletion { 
            SubscriptionHelper.cancel(upstream) 
        }

        for (v in queue) send(v)
    }


Let's test this last operator:

runBlocking {
    for (i in f.toReceiver()) {
         println(i)
    }
    println("Done")

    for (i in f.subscribeOn(Schedulers.single()).toReceiver()) {
         println("Async $i")
    }
    println("Async Done")
}


Well done!

Conclusion

In this blog post, I demonstrated how one can write three operators, produceFlow, transform and toReceiver, that can interoperate with RxJava's own, backpressure enabled Flowable type reasonably well.

This should prove that both technologies, at the end, can be combined by the developer as seen fit for the target domain or business requirements.

This was somewhat a heated week for me so for now, until something interesting comes up in this topic, me writing about Kotlin Coroutines will be ... suspended.