2016. április 19., kedd

Operator fusion (part 2 - final)

Introduction


In the previous part, I've introduced the concepts around operator fusion. In this post, I'll detail the API and protocols required make operator fusion happen.

In its current form, operator fusion works between two subsequent operators and is based on the ability to identify each other and, in case of micro-fusion, switch to a different protocol than Reactive-Streams (RS) if both agree.

Macro-fusion constructs


The primary targets of macro-fusion are the single element sources: just(), empty(), fromCallable(). Firing up the complete RS infrastructure for such single elements is quite expensive, but half of the API use in RxJava and Reactor come from these. Therefore, RxJava introduced Single and Reactor introduced Mono to help as much as possible and offer (ever increasingly) optimized operators on them.

However, knowing a source will generate 0 or 1 element during assembly time is also a great help in regular Observable / Flux uses. In addition, knowing the source is also a constant helps inlining it in via some custom operator.

Creating 0 or 1 element synchronous sources


To indicate a source returns a single value, the Reactive-Streams-Commons (Rsc) project (and Reactor off it) established a contract:

If a Publisher implements java.util.concurrent.Callable, it is considered a 0 or 1 element source.

You can implement Callable and return a non-null value that can be computed synchronously. You can also return null which indicates an empty result. (Remember, RS doesn't allow null values over onNext.) The call to call() will happen during subscription time.

public class MySingleSource implements Publisher<Object>, Callable<Object> {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new ScalarSubscription<>(s, System.currentTimeMillis()));
    }

    @Override
    public Object call() throws Exception {
        return System.currentTimeMillis();
    }
}

If the 0 or 1 element source is known to be constant, the source can be the subject of assembly time optimizations. For example, if it returns null, indicating emptiness (like empty()), there are only a handful of operators that can be applied to it (which don't work on items) and the assembly process can just return empty().

We can extend Callable with a new interface ScalarCallable to indicate a 0 or 1 element constant source.

public interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call();
}


By extending Callable, any use places who expects a 0 or 1 element dynamic source can work with a constant source. The reverse is not true; those expecting a constant source won't execute an arbitrary Callable (which could block or trigger side-effects) during assembly time:

public class MyScalarSource implements Publisher<Object>, ScalarCallable<Object> {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new ScalarSubscription<>(s, 1));
    }

    @Override
    public Object call() {
        return 1;
    }
}

Note that the ScalarCallable overrides the call() method and removes the throws Exception clause: scalar constants should not throw for one and consumers should not need to wrap the call() into a try-catch.

Consuming 0 or 1 element synchronous sources

Consuming Callable and ScalarCallable is a matter of instanceof checks performed either in subscription time or assembly time respectively, followed by the extraction of the single value through call().

For example, a macro-fusion on the operator count() could check for a scalar value and return a constant 0 for an empty or 1 for a single value:


public final Flux<Long> count() {
    if (this instanceof ScalarCallable) {

       T value = ((ScalarCallable<T>)this).call();

       return just(value == null ? 0 : 1);
    }
    return new FluxCount<>(this);
}


Another example is to have a shortcut in flatMap(), concatMap() or switchMap() for 0 or 1 element sources. In this case, there is no need to run the full infrastructure but just subscribe to the Publisher returned by their mapping function.

Note that since the mapper function can side-effect itself, one can't use assembly-time optimization on them and a new source operator has to be introduced.

public final <R> Px<R> flatMap(
        Function<? super T, ? extends Publisher<? extends R>> mapper) {

    if (this instanceof Callable) {

        return new PublisherCallableMap<((Callable<T>)this, mapper);
    }

    return new PublisherFlatMap<>(this, mapper, ...);
}

(Remark: Px stands for Publisher Extensions in Rsc and is the base type for Rsc's fluent API - more of a convenience in tests and perf to avoid spelling out all those PubliserXXX classes than a fully fledged API entry point.)


public final class PublisherCallableMap<T, R> implements Publisher<R> {
    final Callable<? extends T> source;
    final Function<? super T, ? extends Publisher<? extends T>> mapper;

    public PublisherCallableMap(
            Callable<? extends T> source,
            Function<? super T, ? extends Publisher<? extends T>> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Subscriber<? super R> s) {
        T value;

        try {
            value = source.call();                                    // (1)
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            EmptySubscription.error(s, ex);
            return;
        }

        if (value == null) {
            EmptySubscription.complete(s);
            return;
        }

        Publisher<? extends R> p;

        try {
            p = mapper.apply(value);                                  // (2)
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            EmptySubscription.error(s, ex);
            return;
        }

        if (p == null) {
            EmptySubscription.error(s, 
                new NullPointerException("The mapper returned null");
            return;
        }

        if (p instanceof Callable) {                                  // (3)
            R result;

            try {
                result = ((Callable<R>)p).call();
            } catch (Throwable ex) {
                ExceptionHelper.throwIfFatal(ex);
                EmptySubscription.error(s, ex);
                return;
            }

            if (result == null) {
                EmptySubscription.complete(s);
                return;
            }

            s.onSubscribe(new ScalarSubscription<>(s, result));

            return;
        }

        p.subscribe(s);
    }
}

First (1), we extract the single value from the underlying Callable instance. If it is null, we complete the Subscriber immediately. Otherwise, we call the mapper that returns a Publisher (2). Since this publisher could be also a Callable, we do the extraction again (3) and either complete or set a backpressure-enabled ScalarSubscription on the Subscriber. Because call() can throw, we catch any exception, signal the fatal ones in some library-specific way and signal non-fatal exceptions to the Subscriber as well (plus setting its Subscription at the right time).

Caution with Callable

Since Callable is an established interface, one must be careful with implementors of Publisher and Callable where functionally, the callable means something different than a shortcut to a 0 or 1 element.

My hope is that since RS is relatively new and only a few people have actually implemented operators with it, we can avoid any pitfalls related to this combined interface approach.


Micro-fusion constructs

Unlike macro-fusion, micro-fusion requires a protocol switch between two subsequent operators; instead of using the standard RS method calls, some or all of them gets replaced by other method calls. This allows sharing internal structures or state between the two.

In theory, in a pair of operators, the upstream operator can be the initiator and work with the internals of the downstream operator. In practice, so far, we implemented fusion the other way around: the downstream operator works with the internals of the upstream operator.

However, going for a full custom interaction is not advised because that may lead to a complete custom implementation and duplication of a lot of code. (That being said, unfortunately, ConditionalSubscriber requires code duplication to avoid casting.)

Currently, Rsc and Reactor can do two kinds of micro-fusion: conditional and queue-based. On a second dimension, we can think 3 kinds of operators:

  • sources that support fusion (range(), UnicastProcessor
  • intermediate operators that may support fusion (concatMap, observeOn, groupBy, window)
    • front fusion (concatMap)
    • back fusion (groupBy)
    • transitive fusion (map, filter)
  • consumers (flatMap inner, zip)
The third dimension appears with queue-based fusion where the source can be synchronous (i.e., fromArray) or asynchronous (UnicastProcessor).


Conditional micro-fusion


The conditional micro-fusion ability is indicated by an interface: ConditionalSubscriber extending Subscriber with one extra method:

public interface ConditionalSubscriber<T> extends Subscriber<T> {
    boolean onNextIf(T t);
}

If a source or intermediate operator sees that its consumer is a ConditionalSubscriber it may call the onNextIf method. (By nature, this means a synchronous execution and response, thus conditional fusion is for synchronous cases only.)

If the method returns true, the value has been consumed as usual. If the method returns false, it means the value was dropped and a new value can be sent immediately. This avoids a request(1) call for a replenishment in filter and other operators as well.

Sidenote: You may ask, why is this important? A call to request() usually ends up in an atomic CAS, costing 21-45 cylces for each dropped element.

To work with ConditionalSubscribers in source operators, you may have to first switch on the incoming Subscriber's type and do a different implementation to avoid casting the downstream Subscriber all the time.


@Override
public void subscribe(Subscriber<? super Integer> s) {
    if (s instanceof ConditionalSubscriber) {

        s.onSubscribe(new RangeConditionalSubscription<>(
            (ConditionalSubscriber<T>)s, start, count));

    } else {
        s.onSubscribe(new RangeSubscription<>(s, start, count);
    }
}

The implementation can then can use the onNextIf method during emissions. For example, the fast-path can be rewritten as follows:


for (long i = start; i < (long)start + count; i++) {
    if (cancelled) {
        return;
    }
    s.onNextIf((int)i);
}
if (!cancelled) {
    s.onComplete();
}

You may think, why call onNextIf if we don't care about the return value? For composition reasons. Even though this path in range() doesn't need the return value, but if the downstream is also calling onNextIf further down, this can avoid a whole chain of unnecessary request(1) calls.

The slow path is more interesting in this regard:


long i = index;
long end = (long)start + count;
long r = requested;
long e = 0L;

while (i != end && e != r) {
    if (cancelled) {
       return;
    }
    
    if (s.onNextIf((int)i)) {
        e++;
    }
    i++;
}

if (i == end) {
    if (!cancelled) {
        s.onComplete();
    }
    return;
}

if (e != 0L) {
    index = i;
    REQUESTED.addAndGet(this, REQUESTED, -e);
}

In the while loop, if the onNextIf returns false, we don't increment the emission count which means the next integer value can come immediately. If a downstream consumer requests only 1 and then drops all values, the loop can exhaust the available integers and not call the atomic addAndGet even once.

Since filter is one of the most common operators in a chain, one should be prepared to work with ConditionalSubscriber even if one doesn't interfere with the number of events flowing through. For example, map() and filter appear together and it is advised map() also supports conditional fusion by switching on the Subscriber's type just like above and using a ConditionalSubscriber-based Subscriber:

static final class MapConditionalSubscriber<T, R> implements ConditionalSubscriber<T> {
    final ConditionalSubscriber<? super R> actual;
    
    final Function<? super T, ? extends R> mapper;

    boolean done;

    Subscription s;

    // ...

    @Override
    public boolean onNextIf(T t) {
        if (done) {
            return;
        }

        R v;
        
        try {
            v = mapper.apply(t);
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            s.cancel();
            onError(ex);
            return;
        }

        if (v == null) {
            s.cancel();
            onError(new NullPointerException("..."));
            return;
        }

        return actual.onNextIf(v);
    }

    // ...
}


The final case for the conditional micro-fusion is the "terminal" operator or consumer implementation. Luckily, usually doesn't have to provide two implementations, on ConditionalSubscriber and one Subscriber, but have them together. Those who can work with the ConditionalSubscriber part will do, others will just use the regular Subscriber methods:


static final FilterSubscriber<T> implements ConditionalSubscriber<T> {
    final Subscriber<? super T> actual;

    final Predicate<? super T> predicate;

    boolean done;

    Subscription s;

    // ...

    @Override
    public void onNext(T t) {

        if (!onNextIf(t)) {
           s.request(1);
        }
    }

    @Override
    public boolean onNextIf(T t) {
        if (done) {
            return;
        }
        
        boolean pass;

        try {
            pass = predicate.test(t);
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }

        if (pass) {
            actual.onNext(t);
            return true;
        }
        return false;
    }

    // ...
}

In conclusion, conditional micro-fusion is a relatively simple but sometimes verbose way of avoiding request(1) calls and the resulting per-item overhead.


Queue-based micro-fusion

Believe me if I tell, this is the most complicated thing, so far, in the reactive landscape. Not because it requires complicated structures or algorithms, but for the implications towards operators and the combinatoric-explosion nature of what happens if op1 is followed by op2 and how they can or can't fuse.

The queue-based micro-fusion is built upon the idea that many operators employ a queue to work out backpressure-related or asynchrony-related cases when notifying the downstream and happen to face their queue towards each other.

For example, UnicastProcessor has a backend-queue that holds values until the downstream requests them whereas concatMap has a front-queue that holds the source values to be mapped into Publishers. When subscribed, a value goes from one queue into the other, forming a dequeue-enqueue pair without anything functional between the two other than the atomics overhead of request management and wip-accounting.

Clearly, if we could somehow use a single queue between the two and somehow decrease the atomics overhead via it, we'd have a much lower overhead in terms of computation and memory usage.

However, what if there is an operator between the two that does something with the values? What if the fusion shouldn't happen in this case?

To solve this coordination problem, we can reuse the onSubscribe(Subscription) rail in RS and extend the protocol. Enter QueueSubscription.


public interface QueueSubscription<T> extends Queue<T>, Subscription {

    int NONE = 0;
    int SYNC = 1;
    int ASYNC = 2;
    int ANY = SYNC | ASYNC;
    int THREAD_BOUNDARY = 4;

    int requestFusion(int mode);

    @Override
    default boolean offer(T t) {
        throw new UnsupportedOperationException();
    }

    // ...
}

The QueueSubscription is a combination of Queue and Subscription interfaces, adding a new requestFusion() method, and other than keeping the following methods from the base interfaces, all other methods are defaulted to UnsupportedOperationException as we won't need them. (Java 7- note: yes you may have to manually do this for classes that can't extend a base class.):


  • void request(long n);
  • void cancel()
  • T poll()
  • boolean isEmpty()
  • void clear();
(Some libraries may choose to implement size() as well, for diagnostic purposes.)


When a source supports queue-based fusion, it can send a QueueSubscription implementation through onSubscribe. Those who can deal with it can act on it, the rest will simply see it as a regular Subscription.

The idea is that those who can deal with it can use it as a Queue instead of instantiating their own, saving on allocation and overhead at the same time. In addition, a source such as range() can itself pretend to be a queue, returning the next value through poll() or null if no more integers remain.

Since there are cases where fusion can't or should not happen, we need to perform a protocol switch during the subscription phase of a flow. This switch can be requested via the requestFusion() method, that takes and returns the constants from the interface.

(Sidenote: I know enums would be more readable, but EnumSet has a nice additional overhead you know...)

As an input, it can take:


  • SYNC - indicates the consumer wants to work with a synchronous upstream, with often known length
  • ASYNC - indicates the consumer wants to work with an asynchronous upstream with often unknown length and emission timing
  • ANY - indicates a consumer can work with both SYNC and ASYNC upstream
  • (SYNC, ASYNC) | THREAD_BOUNDARY - indicates that the consumer goes over a thread boundary and poll() happens on some other thread.

It can return:


  • NONE - fusion can't happen/rejected
  • SYNC - synchronous fusion mode activated
  • ASYNC - asynchronous fusion mode activated
If the upstream is unable to work in the requested mode or is sensitive to thread-boundary effects, it can return NONE. In this case, the flow behaves just like the regular, non-fused RS stream would. (Note that conditional fusion is still may be an option.)

Because fusion is optional, a successfully negotiated mode requires different mode of execution in either or both parties. In addition, this mode switch has to happen before any events fly through the chain, therefore, onSubscribe is an ideal place for it due to the underlying RS protocol spec.

Both SYNC and ASYNC modes have extra rules implementors must adhere. 

In SYNC mode, consumers should never call request() and producers should never return null from poll() unless they mean completion. Since the only interaction between the two are through poll() and isEmpty(), sources have no opportunity to call onError but must throw a runtime exception from these two methods. On the other side, consumers now have to wrap these methods into try-catches and handle/unwrap exceptions there.

In ASYNC mode, the producer enqueues events in its own queue and has to signal the availability to the consumer. The best way for this is through onNext. One can either signal the value itself or null - the only place where you can do this. On the consumer side, the ASYNC mode onNext now has meaningless value and should be ignored. The other methods, onError, onComplete, request and cancel should be used as in regular RS cases. In this mode, poll() can return null indicating a temporary lack of values; the termination will be indicated by onError and onComplete as usual.


Implementing fusion-enabled sources

Now let's see the API in action. First, let's make range() fusion enabled:

static final class RangeSubscription extends QueueSubscription<Integer> {
    
    // ... the Subscription part is the same

    @Override
    public Integer poll() {
        long i = index;
        if (i == (long)start + count) {
            return null;
        }
        index = i + 1;
        return (int)i;
    }

    @Override
    public boolean isEmpty() {
        return index == (long)start + count;
    }

    @Override
    public void clear() {
        index = (long)start + count;
    }

    @Override
    public int requestFusion(int mode) {
        return SYNC;
    }
}

No sign of request accounting whatsoever because range() works in synchronous pull mode; consumer does backpressure by calling poll() when it needs a new value.

UnicastProcessor (which is somewhat like onBackpressureBuffer()) can support fusion in ASYNC mode specifically:


public final class UnicastProcessor<T> implements Processor<T, T>, QueueSubscription<T> {

    volatile Subscriber<? super T> actual;

    final Queue<T> queue;

    int mode;

    // ...

    @Override
    public void onNext(T t) {
        Subscriber<? super T> a = actual;
        if (mode == ASYNC && a != null) {
            a.onNext(null);
        } else {
            queue.offer(t);
            drain();
        }
    }

    @Override
    public int requestFusion(int m) {
        if ((m & ASYNC) != 0) {
            mode = ASYNC;
            return ASYNC;
        }
        return NONE;
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public void subscribe(Subscriber<? super T> s) {
        if (ONCE.compareAndSet(this, 0, 1)) {
            s.onSubscribe(this);
            actual = s;
            if (cancelled) {
                actual = null;
            } else {
                if (mode != NONE) {
                    if (done) {
                        if (error != null) {
                            s.onError(error);
                        } else {
                            s.onComplete();
                        }
                    } else {
                        s.onNext(null);
                    }
                } else {
                    drain();
                }
            }
        } else {
            EmptySubscription.error(s, new IllegalStateException("..."));
        }
    }
}

The fusion mode requires the following behavior changes:


  • onNext has to call actual.onNext instead of drain(),
  • requestFusion has to see if the downstream actually wants ASYNC fusion,
  • the queue methods have to be delegated to the instance queue,
  • the subscribe() has to call actual.onNext instead of drain() as well.

Doesn't look too complicated, does it? At this point, you can check your understanding of supporting fusion through an exercise: can UnicastProcessor support SYNC fusion and if so, when and how; if not, why not?


Implementing fusion-enabled intermediate operators

In practice, usually there are some intermediate operators between a fuseable source and a fusion-enabled consumer. Unfortunately, this can break the fusion (and thus reverting to the classical RS) mode or worse, the data may skip the intermediate operator altogether, causing all sorts of failures.

The latter manifests itself when an operator forwards the Subscription it received via its onSubscribe method. Now imagine if map() does this; what would be the output of the following sequence:

range(0, 10).map(v -> v + 1).concatMap(v -> just(v)).subscribe(System.out::println);

In a classical flow, you'd get values 1 through 10 printed to the console. If both range() and concatMap() do fusion but map() forwards its Subscription, the surprising output is 0 through 9! This can affect any operator.

The solution is to require all operators that don't want to participate in fusion to never forward the upstream's subscriber verbatim. A possible manifestation of this rule is to implement Subscription on yourself:


static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
    // ...
     
    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;

        actual.onSubscribe(this);
    }

    @Override
    public void request(long n) {
        s.request(n);
    }

    @Override
    public void cancel() {
        s.cancel();
    }

    // ...
}

In practice, many operators that either manipulate requests or cancellation does this so the indirection is an acceptable trade-off for the benefit of a lower overhead dataflow in general.

This rule, unfortunately affects cross-library behavior. Even though other libraries may not speak the same fusion protocol, they could end up forwarding Subscriptions, thus if you go into and out of some other library, the same problem may appear again. Generally, libraries supposed to have a method hide() or asObservable() to hide the identity of a source as well as preventing the propagation of unwanted internal features.

Luckily, map() can participate in the fusion: it only has to be fuseable itself, mediate the requestFusion between its upstream and downstream, plus place itself at the exit point: poll().


static final class MapSubscriber<T, R> implements Subscriber<T>, QueueSubscription<R> {
    final Subscriber<? super R> actual;
    
    final Function<? super T, ? extends R> mapper;

    QueueSubscription<T> qs;

    Subscription s;

    int mode;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        if (s instanceof QueueSubscription) {
            qs = (QueueSubscription<T>)s;
        }

        actual.onSubscribe(this);
    }

    @Override
    public void onNext(T t) {
        if (mode == NONE) {
            
            // error handling omitted for brevity

            actual.onNext(mapper.apply(t));

        } else {
            actual.onNext(null);
        }
    }

    @Override
    public int requestFusion(int m) {
        if (qs == null || (m & THREAD_BOUNDARY) != 0) {
            return NONE;
        }
        int u = qs.requestFusion(m);
        mode = u;
        return u;
    }

    @Override
    public R poll() {
        T t = qs.poll();
        if (t == null) {
            return null;
        }
        return mapper.apply(t);
    }

    @Override
    public boolean isEmpty() {
        return qs.isEmpty();
    }

    @Override
    public void clear() {
        qs.clear();
    }
}

The operator map() can implement QueueSubscription itself and have a field for the potential upstream's QueueSubscription as well. In requestFusion, if the upstream does support fusion and the downstream isn't a boundary, the request is forwarded to upstream; rejected otherwise.

Now poll() can't just forward to the upstream because the types are different. Here comes the mapper function that is applied to the upstream's value. Note that null indicates termination or temporary lack of values and should not be mapped.

The main reason THREAD_BOUNDARY was introduced as a flag is due to map(), or in a more broader sense: the restriction on where user-supplied computations happen. In fusion mode, the execution of the mapper function happens on the exit side of the queue, which could be in some other thread. Now imagine you have a heavy computation in map which would run off the main thread before reaching an observeOn. When unfused, the result of the computation would be queued up in observeOn, then dequeued on the target thread (let's say the main thread). However, if fusion is allowed, the target thread is doing the poll() and now the heavy calculation runs on the main thread.


The operator filter() can be implemented in a similar fashion, but our old request(1) comes back unfortunately:


static final class FilterSubscriber<T> implements Subscriber<T>, QueueSubscription<T> {
    // ...

    @Override
    public T poll() {
        for (;;) {
            T v = qs.poll();

            if (v == null || cancelled) {
                return null;
            }

            if (predicate.test(v)) {
                return v;
            }

            if (mode == ASYNC) {
                qs.request(1);
            }
        }
    }

    @Override
    public boolean isEmpty() {
        return qs.isEmpty();
    }

    // ...
}

Since filter() drops values, we need to loop in poll() until the predicate matches or no more upstream values are available for some reason. If the predicate doesn't match, we have to replenish our ASYNC source (remember, you are not supposed to call request() in sync mode!).


Implementing fusion-enabled consumers

Generally, operator fusion is not very useful (or really happens) with end-subscribers, such as your favorite Subscriber subclass or with subscribe(System.out::println).

The consumers I'm talking about can be considered intermediate operators as well, but since all operators are basically custom Subscribers that are subscribed to the upstream, they are consumers as well.

As I mentioned, many operators feature some internal queue on their front side (e.g., concatMap, observeOn) or when they consume some inner Publisher (i.e., flatMap, zip). These are the primary consumers and drivers of the fusion lifecycle.

Now that we are familiar with how observeOn is implemented, let's see how can we enable fusion with it:


static final class ObserveOnSubscriber<T> implements Subscriber<T>, Subscription {

    Queue<T> queue;

    int mode;

    Subscription s;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
         this.s = s;

         if (s instanceof QueueSubscription) {
             QueueSubscription<T> qs = (QueueSubscription<T>)s;

             int m = qs.requestFusion(QueueSubscription.ANY
                  | QueueSubscription.THREAD_BOUNDARY);

             if (m == QueueSubscription.SYNC) {
                 q = qs;
                 mode = m;
                 done = true;
                 
                 actual.onSubscribe(this);
                 
                 return;
             }

             if (m == QueueSubscription.ASYNC) {
                 q = qs;
                 mode = m;

                 actual.onSubscribe(this);

                 s.request(prefetch);

                 return;
             }       
         }

         queue = new SpscArrayQueue<>(prefetch);
         
         actual.onSubscribe(this);

         s.request(prefetch);
    }

    @Override
    public void onNext(T t) {
        if (mode == QueueSubscription.NONE) {
            queue.offer(t);
        }
        
        drain();
    }
    
    void drain() {

        // ...
             
            if (mode != QueueSubscription.SYNC) {
                request(p);
            }

        // ...

    }

    // ...

}

Enabling fusion has two implications: 1) queue can no longer be final but has to be created in onSubscribe, 2) onNext should not offer if fusion is enabled.

The fusion mode is requested in the onSubscribe after identifying the upstream as QueueSubscription. Since the algorithm inside drain() only sees the Queue interface and doesn't particularly care when values are available in the queue, we request the ANY mode from upstream in addition to indicating the consumer is also a THREAD_BOUNDARY. This should prevent the poll() side to change the location of some user-defined function unexpectedly.

If SYNC mode is granted, we assign the QueueSubscription to our queue and call onSubscribe on the downstream Subscriber. In this mode, the prefetch amount is not requested in accordance with the synchronous fusion protocol. The big win in SYNC mode is the fact that if poll() returns null, that is an indication of termination. We already exploit this in the standard queue-drain algorithm: if the done flag is set and the queue reports null/empty, we have completed. Note however, that we have to adjust the drain algorithm a bit because we can't call request in SYNC mode anymore.

If ASYNC mode is granted, we store the queue again, but can't set the done flag as we don't know when the upstream finishes - poll() returning null is just the indication of unavailability of values at the time. In addition, once the downstream Subscriber is notified, we still have to signal a prefetch-request to upstream, so it can trigger its own sources even further up.

Note that once requestFusion returns SYNC or ASYNC, there is no going back (you may try to call requestFusion() again which may change the mode, but that's undefined behavior at the moment; it may be forbidden entirely in the future), definitely not after elements have been delivered already in any mode.

General warnings around micro-fusion

In my experience, some of my colleagues tend to become enthusiastic about micro-fusion; they want to apply it everywhere. Whenever an operator has any queue, they see fusion happening.

I must warn against such relentlessness because fusion has some requirements, implications and generally subject to cost-benefit trade-offs:


  • If an operator is a thread boundary, my current understanding is that you can't fuse both its front and back side at the same time.
  • Fusion can shift computations in time and sometimes in location (even without an explicit boundary).
  • The fact an operator has a queue doesn't mean it can be exposed/replaced. A good example of this is combineLatest: my current understanding is that the post-processing of the queue elements makes this infeasible for back side fusion. Another example is flatMap where I'm not convinced the collector logic can be integrated into a poll()/isEmpty() back-side fusion.
  • Some sources, such as 0 or 1 are likely not worth it and are better off with macro-fusions.
  • Fusion is an extra behavior which also can be buggy or in fact, hide a bug on the regular path (i.e., groupBy) and requires extra care. In addition, it increases the test method count because now you have to test with and without fusion (see hide()).

To cheer you up, there is a great counter-example operator that supports full fusion: front and back side at the same time: flattenIterable, or as you may know it, concatMapIterable/flatMapIterable.

Conclusion

In this post, I've detailed the structures and protocols of operator fusion and shown some examples how it can be utilized in source, intermediate and terminal operators.

Since operator fusion is an active research area, I can't say these are all that can happen and we are eager to hear about interesting chains of operators where fusion can happen, or in contrast, were fusion should not happen. See the Rsc repository for examples of all kinds of fusions.

In addition, I hope these fusion protocols will be standardized and be part of Reactive-Streams 2.0, allowing a full, cross-library efficient operation that maintains fusion as long as possible.

My next topic will be to finish up the series about ConnectableObservables.

2 megjegyzés:

  1. This blog awesome and i learn a lot about programming from here.The best thing about this blog is that you doing from beginning to experts level.

    Love from

    VálaszTörlés
  2. Ezt a megjegyzést eltávolította a blog adminisztrátora.

    VálaszTörlés