2017. szeptember 11., hétfő

Java 9 Flow API: ordered merge

Introduction

Sometimes, one has several ordered sequences of events and would like to merge them into one single flow. Since one element from a sequence should come before another element in another sequence, we need a way to keep comparing elements with each other from different sequences.

Unfortunately, zip() doesn't work because it takes a row of available items and item #2 from sequence #2 may come before item #1 from stream #3. Plus, if one stream is shorter than the others, the end sequence stops. Similarly, flatMap() doesn't work because it takes the next item from any inner source sequence the moment it is available without any ordering considerations at that point. At least it emits all items from all sources (provided there are no errors of course).

Therefore, we need something between the two operators: one that collects up a row of items from the sources, decides which is the smallest/largest of them based on some comparison logic and only emits that. It then awaits a fresh item from that specific source (or completion) and repeats the picking of the smallest/largest item as long as there are requests for it.

Such operator, let's call it orderedMerge(), has an implication about the number of its inner source sequences: it has to be fixed. The reason for it is that it has to pick the smallest/largest of the available items in order for the output to be in order. If there is still a source missing, it can't know for sure the others are smaller/larger that any of the upcoming item from that missing source will produce.

The second implication is, what happens if the sources themselves are not ordered? The logic presented in this post still works, but the end output won't be totally ordered. It will act like some priority queue instead: picking important items first before turning to less important ones.


The inner consumer

Operators handling multiple sources often need a way to prefetch item from these sources and give out them on demand to some joining logic. This mainly happens by prefetching a fixed amount, putting items in a queue, calling the parent coordinator's drain() method, and batching out replenishing calls from the coordinator if the so-called stable-prefetch backpressure is required.

For this purpose, let's see how the inner consumer, OrderedMergeInnerSubscriber, of orderedMerge() could look like:


static final class OrderedMergeInnerSubscriber<T> 
extends AtomicReference<Flow.Subscription>
implements Flow.Subscriber<T>, Flow.Subscription {

    final OrderedMergeCoordinator<T> parent;

    final int prefetch;

    final int limit;

    final Queue<T> queue;

    int consumed;

    volatile boolean done;

    OrderedMergeInnerSubscriber(
        OrderedMergeCoordinator<T> parent,
        int prefetch
    ) {
        this.parent = parent;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);
        this.queue = new ConcurrentLinkedQueue<>()
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        // TODO implement
    }

    @Override
    public void onNext(T item) {
        // TODO implement
    }

    @Override
    public void onError(Throwable throwable) {
        // TODO implement
    }

    @Override
    public void onComplete() {
        // TODO implement
    }

    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }
}

We'll need a reference to the coordinator of the operator, the prefetch amount that we will use the 75% of for replenishing requests, a queue - ConcurrentLinkedQueue for simplicity, but bounded SpscArrayQueue from JCTools works here as well, and a counter of how many items have been consumed so far to know when to replenish.


    @Override
    public void onSubscribe(Flow.Subscription s) {
        if (compareAndSet(null, s)) {
            s.request(prefetch);
        } else {
            s.cancel();
        }
    }

    @Override
    public void onNext(T item) {
        queue.offer(item);
        parent.drain();
    }

    @Override
    public void onError(Throwable throwable) {
        parent.onInnerError(this, throwable);
    }

    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }

    @Override
    public void request(long n) {
        int c = consumed + 1;
        if (c == limit) {
            consumed = 0;
            Flow.Subscription s = get();
            if (s != this) {
                s.request(c);
            }
        } else {
            consumed = c;
        }
    }

    @Override
    public void cancel() {
        Flow.Subscription s = getAndSet(this);
        if (s != null && s != this) {
            s.cancel();
        }
    }


I'd say there is nothing too complicated here.

  • onSubscribe() saves the upstream Flow.Subscription in the AtomicReference the operator extends if not already cancelled. If successful, the prefetch amount is requested.
  • onNext() stores the item in the queue and calls drain() on parent to handle that case.
  • onError() defers the error signal to be handled by the parent coordinator: the parent may save up the errors or cancel the whole flow at once.
  • onComplete() sets the complete indicator, which tells the parent this particular source will not produce more values and thus can be skipped when looking for the next smallest/largest items to emit
  • request() will only be called by the parent to replenish one item from its perspective once the previous item has been successfully chosen as the next item to be emitted towards downstream. Since replenishing one by one is costly, we batch up those via the consumed counter. Once that counter reaches the limit (75% of prefetch), a request is issued to the upstream. Since the AtomicReference will hold itself as a cancellation indicator, we don't want to call request on ourselves. It's important to state that request() will be guaranteed to be called from one thread at a time by the virtue of the queue-drain approach the coordinator implements below.
  • cancel() atomically swaps in the this as a terminal indicator and cancels the non-null, non-this Flow.Subscription if present.

The coordinator

Since there is no primary source in this orderedMerge() operator, it acts somewhat like a plain source of events. Therefore, we have to implement it on top of the Flow.Subscription to interact with the downstream. For convenience for this blog, we'll define the operator to take a variable number of Flow.Publisher sources (which at runtime ways ends up a fixed-size array):


@SafeVarargs
public static <T> Flow.Publisher<T> orderedMerge(
        Comparator<? super T> comparator, 
        int prefetch,
        Flow.Publisher<? extends T>... sources) {
    return new OrderedMergePublisher<>(sources, prefetch, comparator);
}

final class OrderedMergePublisher<T> implements Flow.Publisher<T> {

    final Flow.Publisher<? extends T>[] sources;

    final int prefetch;

    final Comparator<? super T> comparator;

    OrderedMergePublisher(
            Flow.Publisher<? extends T>[] sources,
            int prefetch,
            Comparator<? super T> comparator) {
        this.sources = sources;
        this.prefetch = prefetch;
        this.comparator = comparator;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> s) {
         // TODO implement
    }
}


The boilerplate of writing an operator is nothing special: save up on the parameters to be used by the implementation. We allow customization via a Comparator interface. If T is self-comparable, you can use Comparators.naturalOrder() from Java itself.

The coordinator implementation has to hold onto the inner OrderedMergeInnerSubscribers for mass cancellation support, subscribe them to the sources and work out the emissions from them. Let's see the non-exciting parts of it:

static final class OrderedMergeSubscription<T>
extends AtomicInteger implements Flow.Subscription {

    final Flow.Subscriber<? super T> downstream;

    final OrderedMergeInnerSubscriber<T>[] subscribers;

    final Comparator<? super T> comparator;

    final Object[] values;

    static final Object DONE = new Object();

    Throwable error;

    boolean cancelled;

    long requested;

    long emitted;

    // -------------------------------------------------

    static final VarHandle ERROR;
 
    static final VarHandle DONE;
 
    static final VarHandle CANCELLED;
 
    static final VarHandle REQUESTED;

    static {
        Lookup lk = MethodHandles.lookup();
        try {
            ERROR = lk.findVarHandle(
                OrderedMergeSubscription.class, "error", Throwable.class);
            CANCELLED = lk.findVarHandle(
                OrderedMergeSubscription.class, "cancelled", boolean.class);
            REQUESTED = lk.findVarHandle(
                OrderedMergeSubscription.class, "requested", long.class);
        } catch (Throwable ex) {
            throw new InternalError(ex);
        }
    }

    OrderedMergeSubscription(
            Flow.Subscriber<? super T> downstream,
            Comparator<? super T> comparator,
            int prefetch,
            int n) {
        this.downstream = downstream;
        this.comparator = comparator;
        this.subscribers = new OrderedMergeInnerSubscriber[n];
        for (int i = 0; i < n; i++) {
             this.subscriber[i] = new OrderedMergeInnerSubscriber<>(this, prefetch);
        }
        this.values = new Object[n];
    }

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        // TODO implement
    }
     
    @Override
    public void request(long n) {
        // TODO implement
    }

    @Override
    public void cancel() {
        // TODO implement
    }

    void drain() {
        // TODO implement
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        // TODO implement
    }

    void updateError(Throwable ex) {
        // TODO implement
    }
}

We have a couple of fields and methods here, some should be familiar in its naming and intended purpose:


  • downstream will receive the ordered sequence of items
  • subscribers holds onto the fixed set of inner OrderedMergeInnerSubscribers, each will be subscribed to a particular Flow.Publisher and the total number of them won't ever change in this operator.
  • comparator will compare elements from various sources
  • values holds onto the next available value from each source. This allows the merging algorithm to work with queues that don't support peek() (such as RxJava 2's on queue implementations) and otherwise has nice properties such as locality, avoiding accessing internals of the inner subscribers' queues and the overhead of a peek()-poll() pair all the time.
  • The DONE constant will indicate a particular source has no further elements and can be ignored (without looking at its subscriber).
  • error will gather the errors signalled by the sources and emitted together once all sources terminated. There is an ERROR VarHandle for concurrent access to this field.
  • cancelled indicates the downstream has issued a cancel() call to stop the flow. The CANCELLED VarHandle will allow us to use compareAndSet() to cancel at most once.
  • requested accumulates the requests done by the downstream via its REQUESTED VarHandle.
  • emitted counts how many items were emitted and will be compared against requested to detect when to pause emitting.
There is no separate done indicator field because we will deduce this state by detecting that all values items are marked as DONE.

Now let's see the shorter methods implemented:


    // ...

    void subscribe(Flow.Publisher<? extends T>[] sources) {
        for (int i = 0; i < sources.length; i++) {
            sources[i].subscribe(subscribers[i]);
        }
    }
     
    @Override
    public void request(long n) {
        if (n <= 0L) {
            updateError(new IllegalArgumentException("non-negative request expected"));
        } else {
            for (;;) {
                long current = (long)REQUESTED.getAcquire(this);
                long next = current + n;
                if (next < 0L) {
                    next = Long.MAX_VALUE;
                }
                if (REQUESTED.compareAndSet(this, current, next)) {
                    break;
                }
            }
        }
        drain();
    }

    @Override
    public void cancel() {
        if (CANCELLED.compareAndSet(this, false, true)) {
            for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                inner.cancel();
            }

            if (getAndIncrement() == 0) {
                Arrays.fill(values, null);

                for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
                    inner.queue.clear();
                }
            }
        }
    }

    void onInnerError(OrderedMergeInnerSubscriber<T> sender, Throwable ex) {
        update(ex);
        sender.done = true;
        drain();
    }

    void updateError(Throwable ex) {
        for (;;) {
            Throwable current = (Throwable)ERROR.getAcquire(this);
            Throwable next;
            if (current == null) {
                next = throwable;
            } else {
                next = new Throwable();
                next.addSuppressed(current);
                next.addSuppressed(throwable);
            }
            if (ERROR.compareAndSet(this, current, next)) {
                break;
            }
        }
    }

    void drain() {
        // TODO implement
    }

}


The subscribe() method simply subscribes to all sources with the prepared array of OrderedMergeInnerSubscribers. The cancel() method cancels all inner subscribers and then enters a half-open drain mode where both the values array and each queue of the inner subscribers is cleared in order to help the GC. Both request() and updateError() should be familiar from the previous post of the series.

What's left is the drain() logic itself.


void drain() {
    if (getAndIncrement() != 0) {
        return;
    }

    int missed = 1;
    Flow.Subscriber<? super T> downstream = this.downstream;

    Comparator<? super T> comparator = comparator;

    OrderedMergeInnerSubscriber<T>[] subscribers = this.subscribers;
    int n = subscribers.length;

    Object[] values = this.values;

    long e = emitted;

    for (;;) {
         long r = (long)REQUESTED.getAcquire(this);

         for (;;) {
              // TODO implement
         }

         emitted = e;
         missed = addAndGet(-missed);
         if (missed == 0) {
             break;
         }
    }
}


We start out with the usual drain-exclusion logic: transitioning the work-in-progress counter of this (by extending AtomicInteger) from zero to one allows one thread to enter and perform the emissions. We load the frequently accessed components into local fields and do an almost-classical for-loop with missed accounting to determine when to leave the loop.

Note that in the loop, after reading the current request amount we don't have the usual while (e != r) and if (e == r) cases. The reason for this is that we can have one shared loop for both the cases when backpressure is applied and when there are no further source items to merge and can terminate the sequence without a request from downstream.


// inner for(;;) {

if ((boolean)CANCELLED.getAcquire(this)) {
    Arrays.fill(values, null);

    for (OrderedMergeInnerSubscriber<T> inner : subscribers) {
        inner.queue.clear();
    }
    return;
}

int done = 0;
int nonEmpty = 0;
for (int i = 0; i < n; i++) {
    Object o = values[i]
    if (o == DONE) {
        done++;
        nonEmpty++;
    } else
    if (o == null) {
        boolean innerDone = subscribers[i].done;
        o = subscribers[i].queue.poll();
        if (o != null) {
            values[i] = o;
            nonEmpty++;
        } else if (innerDone) {
            values[i] = DONE;
            done++;
            nonEmpty++;
        }
    } else {
        nonEmpty++;
    }
}


The first part is check if there was a cancellation from downstream. If so, we clear the internal state of the coordinator and each queue then quit. Next for each subscriber of a source, we have to poll the next available item into the common values array if there is not already an item available there. In addition, we account how many of those items indicate a completed source and how many of them has actual items.

Note that checking a source for completion has to happen before polling for the next item from its queue. As explained before on this blog, since Flow.Subscriber methods are invoked in a strict protocol order where an onComplete always happens after any previous onNext calls, if we detect done to be true and then get a null from the queue, we know there can't be any further items from that source. Otherwise, polling and seeing an empty queue first then checking done opens a window when the source quickly produces items and completes between these two checks.

Next, we handle the overall state of the operator:

if (done == n) {
    Throwable ex = (Throwable)ERROR.getAcquire(this);
    if (ex == null) {
        downstream.onComplete();
    } else {
        downstream.onError(ex);
    }
    return;
}

if (nonEmpty != n || e != r) {
    break;
}


If all of the elements turn out to be DONE, that means we exhausted all sources and can terminate the downstream accordingly (considering if there was any error along the way). If not all values slot have something or the downstream is not ready to receive an item, we break out this inner look and the outer loop will see if more work has to be done or not.

Finally, we find the smallest item from the available values:


    T min = null;
    int minIndex = -1;

    int i = 0;
    for (Object o : values) {
        if (o != DONE) {
            if (min == null || comparator.compare(min, (T)o) > 0) {
                min = (T)o;
                minIndex = i;
            }      
        }
        i++;
    }

    values[minIndex] = null;

    downstream.onNext(min);

    e++;
    subscribers[minIndex].request(1);

} // of the inner for (;;)


Once we know there we can emit an item, we'll find the smallest one among the non-DONE entries along with its index. Since we checked that not all entries are DONE before, min must end up non-null and minIndex non-negative. We clear the appropriate values entry indicating the next cycle should poll for more items from that particular source, we emit the found minimum item, increment the emission counter and signal the winning source to produce one more item.

Conclusion

The orderedMerge() operator shown in this post is perhaps one of the shortest and more comprehensible among the other ones, even if considering the lack of infrastructure with Java 9 Flows (i.e., cancelled indicator). The queue-drain approach present in many of the typical reactive operators can be observed here as well.

Since the operator collects you a row of values available, it can be relatively easily turned into a zip() operator:


  • done != 0 indicates one or more sources run out of items thus the sequence can be completed. Note that the non-done inner subscribers have to be cancelled and cleared before the downstream gets the terminal event.
  • instead of the loop that compares items, one copies the values array, clears the original one, applies a function to the copy and emit the result of that function call.


You can also turn it into a (rather clumsy) merge() operator that uses a round-robin collection strategy to pick the item to be emitted: an index (also saved into a field for subsequent drain rounds) that indicates which next slot to consider for emission if nonEmpty != 0, skipping over the DONE entries along the way.

However, there is one likely problem that troubles such value-collecting-and-emitting operators; it does - what Stephane Maldini, Project Reactor lead, once called our RxJava 2 / Reactor-Core 3 algorithms do - thread-stealing. Given all the sources, one of them will be doing the collecting and emitting the smallest item for all the other sources while that thread itself likely won't make any progress unless it finds a small pause in the onslaught of source items so the drain loop can quit.

This may be undesirable at times and there is a solution for it. To get there, we will investigate how thread switching in mid-flow can be implemented within the Java 9 Flow API in the next post.

Nincsenek megjegyzések:

Megjegyzés küldése