2016. május 2., hétfő

Async Iterable/Enumerable vs. Reactive-Streams

Introduction


Backpressure is essential if one wants to avoid buffer bloat and excessive memory usage if two stages in a reactive pipeline consume events with different speed. RxJava and Reactive-Streams developed a non-blocking, request-coordinating protocol to solve this problem, but you may have heard there are alternatives to it. One alternative that comes up from time to time is Async Iterables (Java terminology) or Async Enumerables (C# terminology).

In fact Rx.NET has an Ix.NET (stands for Interactive Extensions) sub-project in which there is the Async Enumerables library. It solves this backpressure problem by having a Task (~ CompletableFuture, ~ Promise) returned from its MoveNext() (~ hasNext()) method and when that Task fires, you can consume the Current property (~ next() method). The backpressure behavior comes from the fact that you'd call MoveNext() again only after you processed the the current element.

Unfortunately, I haven't found a Java implementation for the IAsyncEnumerable (haven't really looked beyond a few Google searches), so I decided I'll implement it on my own in Java 8, see what it takes to get data across with it and how performant is it compared to my current cutting-edge understanding of reactive-flows: the Reactive-Streams-Commons library.


Base API


Since Async Enumerables are designed in deferred execution in mind, the base API consists of two interfaces:

interface IAsyncEnumerable<T> {
    IAsyncEnumerator<T> enumerator();
}

interface IAsyncEnumerator<T> {

    CompletionStage<Boolean> moveNext(CompositeSubscription cancel);

    T current();
}

The IAsyncEnumerable is the equivalent of Iterable and it hands out IAsyncEnumerators. IAsyncEnumerator has a moveNext method which returns a CompletionStage indicating if there is value available via current() (signals true) or the sequence ended (signals false). C# CancellationToken looks like our CompositeSubscription so I'm reusing it as the way for cancellation.

(Sidenote: I'm not sure how cancellation composes yet, the original Ix.NET IAsyncEnumerator is an IDisposable plus their Task can also be disposed, unlike CompletionStage. Luckily, I don't need this feature too extensively in this post.)


Consuming an IAsyncEnumerable


Consuming such IAsyncEnumerator is straightforward, although not as easy without C# async/await. If we are only interested in exactly one value, we can write:


IAsyncEnumerable<T> source = ...

IAsyncEnumerator<T> enumerator = source.enumerator();

enumerator.moveNext(new CompositeSubscription())
.whenComplete((b, e) -> {
    if (e != null) {
        e.printStackTrace();
    } else
    if (b) {
        System.out.println(enumerator.current());
    } else {
        System.out.println("Empty!");
    }
});

Of course, given the CompletionStage API, you are free to process the single result as you see fit.

Consuming more than one value from an IAsyncEnumerator is more involved. You have to recursively call moveNext until it errors or completes:


public void consumeAll(IAsyncEnumerator<T> enumerator, CompositeSubscription csub) {
    if (csub == null) {
        csub = new CompositeSubscription();
    }

    CompositeSubscription fcsub = csub;

    enumerator.moveNext(new CompositeSubscription())
    .whenComplete((b, e) -> {
        if (e != null) {
            e.printStackTrace();
        } else
        if (b) {
            System.out.println(enumerator.current());

            // go recursive            
            consumeAll(enumerator, fcsub);

        } else {
            System.out.println("Empty!");
        }
    });    
}

Unfortunately, there is a slight problem: if the CompletionStage is a synchronous stage, you may end up with StackOverflowError because of the recursive call to consumeAll. Therefore, to be sure, we have to trampoline the call to consumeAll to ensure the stack dept doesn't grow too large:


public final class AsyncConsumer<T> implements Subscription {

    final Consumer<? super T> onNext;

    final Consumer<Throwable> onError;

    final Runnable onComplete;

    final IAsyncEnumerator<T> enumerator;

    final AtomicInteger wip;

    final Queue<CompletionStage<Boolean>> queue;

    final CompositeSubscription csub;

    final CountDownLatch cdl;

    public AsyncConsumer(
         IAsyncEnumerator<T> enumerator,
         Consumer<? super T> onNext,
         Consumer<Throwable> onError,
         Runnable onComplete
    ) {
         this.enumerator = enumerator;
         this.onNext = onNext;
         this.onError = onError;
         this.onComplete = onComplete;
         this.wip = new AtomicInteger();
         this.queue = new SpscLinkedArrayQueue<>(16);
         this.csub = new CompositeSubscription();
         this.cdl = new CountDownLatch();
    }

    public void consumeAll() {
         if (csub.isUnsubscribed()) {
             cdl.countDown();
             return;
         }
         CompletionStage<T> stage = enumerator.moveNext(csub);
         queue.offer(stage);
         if (wip.getAndIncrement() == 0) {
             do {
                 stage = queue.poll();
                 stage.whenComplete((b, e) -> {
                     if (csub.isUnsubscribed()) {
                         cdl.countDown();
                         return;
                     } else
                     if (e != null) {
                         onError.accept(e);
                         cdl.countDown();
                     } else
                     if (b) {
                         onNext.accept(enumerator.current());
                         consumeAll();
                     } else {
                         onComplete.run();
                         cdl.countDown();
                     }
                 });
             } while (wip.decrementAndGet() != 0);
         }
    }

    public void await() throws InterruptedException {
        cdl.await();
    }

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
        return cdl.await(timeout, unit);
    }

    @Override
    public void unsubscribe() {
         csub.unsubscribe();
    }

    @Override
    public boolean isUnsubscribed() {
         return csub.isUnsubscribed();
    }
}

Looks intriguing. Apart from having the callbacks for the signal types and the enumerator instance, we need the work-in-progress wip counter and a queue just like with our typical queue-drain approach back in reactive-streams land. For cancellation, we have the CompositeSubscription and for blocking waits, we have a CountDownLatch. The consumeAll() method moves the enumerator one element forward and the trampoline loop makes sure there is only one whenComplete() active at a time. Inside the handler, we call the appropriate functional interface and in case of a value signal, we call the consumeAll recursively. The trampolining will make sure we don't get reentrant behavior whether or not consumeAll is called synchronously or asynchronously.


Writing an IAsyncEnumerable source


Of course, we need some data source to work with. Perhaps the most basic and most standard source is the range() operator in all reactive libraries - range is the counted for-loop of the reactive world.


public final class AsyncRange implements IAsyncEnumerable<Integer> {
    final int start;
    final int count;

    @Override
    public IAsyncEnumerator<Integer> enumerator() {
        return new AsyncRangeEnumerator(start, count);
    }

    static final class AsyncRangeEnumerator implements IAsyncEnumerator<Integer> {
        final long end;
        long index;

        static final CompletionStage<Boolean> TRUE = 
            CompletableFuture.completedFuture(true);

        static final CompletionStage<Boolean> FALSE = 
            CompletableFuture.completedFuture(false);

        public AsyncRangeEnumerator(int start, int count) {
            this.index = start - 1;
            this.end = (long)start + count;
        }

        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
            long i = index + 1;
            if (i == end) {
                return FALSE;
            }
            index = i;
            return TRUE;
        }

        @Override
        public Integer current() {
            return index;
        }
    }
}

The operation itself is pretty synchronous. The way I understand, CompletableFuture is like an AsyncSubject and because we only return a constant true or false stage, we can use a shared and completed instance (which should be stateless and non-interfering). Because moveNext() is called before current() we start the index from start - 1 and increment it by one in the moveNext(). If it is equal to the end, we return the false stage. If it hasn't reached end yet, we update the index field and return the true stage.


Going asynchronous


Of course, we are here for the asynchronous possibility, therefore, let's write an observeOn and subscribeOn operators. The first one makes sure default continuations on the CompletionStage<Boolean> happens on a specific thread whereas the second will make sure the actual call to moveNext() happens on the specific thread (so you can do blocking IO in moveNext() or in enumerator()). Plus, we are proficient in writing these operators, aren't we?

observeOn

If you have some unfamiliar operator to implement, the best advice I got from Erik Meijer's Channel 9 videos is: follow the types. We know we have an upstream source and some source of asynchrony. For simplicity, let's use Executor since CompletionStage XXXAsync methods take them verbatim.


public final class AsyncObserveOn<T> implements IAsyncEnumerable<T> {
    final IAsyncEnumerable<T> source;

    final Executor executor;

    public AsyncObserveOn(IAsyncEnumerable<T> source, Executor executor) {
        this.source = source;
        this.executor = executor;
    }

    @Override
    public IAsyncEnumerator<T> enumerator() {
        return new AsyncObserveOnEnumerator<>(source.enumerator(), executor);
    }

    // ... 
}

A very familiar pattern so far. The real work, hovewer, happens inside AsyncObserveOnEnumerator:


    static final class AsyncObserveOnEnumerator<T> implements IAsyncEnumerator<T> {

        final IAsyncEnumerator<T> enumerator;

        final Executor executor;

        public AsyncObserveOnEnumerator(IAsyncEnumerator<T> enumerator, Executor executor) {
            this.enumerator = enumerator;
            this.executor = executor;
        }
        
        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription csub) {
            return enumerator.moveNext(csub).thenApplyAsync(v -> v, executor);
        }

        @Override
        public T current() {
            return enumerator.current();
        }
    }

I admit I'm not too familiar with CompletionStage so it appeared to me thenApplyAsync is the closest thing to have the value delivery moved to a specific executor. Otherwise, this looks like quite straightforward and much shorter than our Rx-style observeOn().

subscribeOn

Since we don't know IAsyncEnumerable won't have synchronous action in its moveNext() method, we need a way, via subscribeOn, to make sure moveNext() is called on some other thread:


public final class AsyncSubscribeOn<T> implements IAsyncEnumerable<T> {
    final IAsyncEnumerable<T> source;

    final Executor executor;

    public AsyncSubscribeOn(IAsyncEnumerable<T> source, Executor executor) {
        this.source = source;
        this.executor = executor;
    }

    @Override
    public IAsyncEnumerator<T> enumerator() {
        AxSubscribeOnEnumerator enumerator = new AxSubscribeOnEnumerator<>(executor);
        executor.execute(() -> {
            IAsyncEnumerator<T> ae = source.enumerator();
            enumerator.setEnumerator(ae);
        });
        return enumerator;
    }

    // ... 
}

Instead of directly calling source.enumerator() we offload it to the executor, which when executes will do the call with a valid IAsyncEnumerator - which is now deferred from the consumer's perspective - but still have to return ans IAsyncEnumerator ourselves. The difficulty is now how to allow calling moveNext() when we don't have the upstream's enumerator yet. Luckily, CompletionStage will come to our rescue:


    static final class AxSubscribeOnEnumerator<T> implements IAsyncEnumerator<T> {

        final Executor executor;
        
        final CompletableFuture<IAsyncEnumerator<T>> onEnumerator;
        
        public AxSubscribeOnEnumerator(Executor executor) {
            this.executor = executor;
            this.onEnumerator = new CompletableFuture<>();
        }
        
        void setEnumerator(IAsyncEnumerator<T> enumerator) {
            onEnumerator.complete(enumerator); 
        }
        
        @Override
        public CompletionStage<Boolean> moveNext(CompositeSubscription token) {
            return onEnumerator.thenComposeAsync(ae -> ae.moveNext(token), executor);
        }

        @Override
        public T current() {
            IAsyncEnumerator<T> ae = onEnumerator.getNow(null);
            return ae != null ? ae.current() : null;
        }
        
    }

We setup an onEnumerator CompletableFuture which will be completed via setEnumerator upon receiving the actual upstream IAsyncEnumerator. The big trick is how we use composition over this deferred onEnumerator value to call moveNext() on the upstream's enumerator once available on the executor. The operator thenComposeAsync is basically flatMap. The method current() needs some extra logic, we try to get the upstream's enumerator and if it's not yet available, we simply return null - shouldn't call current() without the corresponding CompletionStage firing anyway.


Benchmark


Now that we have the three most basic operator's available, let's benchmark our IAsyncEnumerable implementations against the cutting edge equivalent in Reactive-Streams-Commons. For the source code, please refer to the benchmark implementation in my repository. For convenience, I've implemented the operators above in a fluent way where the base type is Ax - Async Extensions.

Results of the throughput benchmark (bigger is better): (i7 4790, Windows 7 x64, Java 8u92)


The benchmark range is just the basic range(1, count), the rangeAsync is a range(1, count).observeOn(executor) and the rangePipeline is range(1, count).subscribeOn(executor1).observeOn(executor2). In the columns, ax is my implementation of IAsyncEnumerable, px is the Reactive-Streams-Commons (Rsc) Publisher Extensions fluent API entry point. Since Rsc uses operator fusion, rangeAsync() is run with and without operator fusion enabled (the others don't fuse in Rsc), the latter is in the pxf column.

Evaluation

Looks like Rsc outperforms the IAsyncEnumerable implementation considerably, both in synchronous and asynchronous use. Without an independent library to compare against, I can only speculate why IAsyncEnumerable has so much overhead. Naturally, my limited experience with CompletionStage could explain some of it, but I doubt that's the main reason. Since both libraries use the same single-threaded Executor in the benchmark, we can rule out the executor overhead itself.

What remains is the architectural and conceptual differences:


  • We have possibly one allocation of the CompletionStage plus a known continuation stage per value - Rsc doesn't allocate anything
  • CompletionStage is actually between hot and cold and acts like an AsyncSubject, when one attaches the continuation to it, it could be still running or already completed - determining this and acting accordingly adds overhead - Rsc calls onNext as directly as possible
  • The longer the pipeline the more temporary CompletionStages get involved, which means allocation and individual task scheduling - Rsc exploits the emergent batching property of the streams over an async boundary.


Conclusion

I believe what we have here as IAsyncEnumerable is a corner case of the reactive-flow approach when one has basically request(1) at each stage plus some allocation overhead, making the approach more overhead than the highly optimized flow approach.

It certainly looks simpler and implements shorter operators, but I have to ask, what's the benefit over the Reactive-Streams approach?

If somebody have some tips for optimizing our IAsyncEnumerable implementation or can suggest me an independent implementation, I'd be glad to benchmark and compare it and re-evaluate my position on the topic!

2016. április 24., vasárnap

Google Agera vs. ReactiveX

Introduction


If you are following events around Android development, or just happen to follow all things reactive, there was a "big" announcement from Google: they've released their reactive programming library targeting Android specifically: Agera. Of course, one has to look into the details to get an accurate picture.

"By Google" means a team in Google working on Google Play Movies. Certainly its sounds more amplified to say Google than the full path to the team. I happen to do this as well when someone asks where I work: in a lab at the Hungarian Academy of Sciences instead of at the Engineering and Management Intelligence Research Laboratory at the Institute for Computer Science and Control of the Hungarian Academy of Sciences. (Plus, you don't get tired and lost while I'm emitting these words :)

It doesn't really matter who released it, all that matters what they released and how it relates to the well established reactive libraries, RxJava, Reactor and Akka-Streams, altogether.


The Core API

The Agera library is built around the valueless Observer pattern: Observables take Updatables and signal change via update() calls. It is then the responsibility of those Updatables to figure out what changed. This is practically a zero argument reactive dataflow which relies on side-effects per update().


interface Updatable {
    void update();
}

interface Observable {
   void addUpdatable(Updatable u);
   void removeUpdatable(Updatable u);
}


They look innocent and reactive, right? Unfortunately, they've run into the issue with the original java.util.Observable and the other addListener/removeListener based reactive APIs (which I categorized as 0th generation).

Agera Observable


The problem with this pair of methods is that every Observable who adds behavior over an incoming Updatable has to remember the original Updatable in some whay for the case when the same Updatable is removed:


public final class DoOnUpdate implements Observable {
    final Observable source;

    final Runnable action;

    final ConcurrentHashMap<Updatable, DoOnUpdatable> map;

    public DoOnUpdate(Observable source, Runnable action) {
         this.source = source;
         this.action = action;
         this.map = new ConcurrentHashMap<>();
    }

    @Override
    public void addUpdatable(Updatable u) {
        DoOnUpdatable wrapper = new DoOnUpdatable(u, action);
        if (map.putIfAbsent(u, wrapper) != null) {
            throw new IllegalStateException("Updatable already registered");
        }
        source.addUpdatable(wrapper);
    }

    public void removeUpdatable(Updatable u) {
        DoOnUpdatable wrapper = map.remove(u);
        if (wrapper == null) {
            throw new IllegalStateException("Updatable already removed");
        }
        source.removeUpdatable(wrapper);
    }

    static final class DoOnUpdatable {
        final Updatable actual;

        final Runnable run;

        public DoOnUpdatable(Updatable actual, Runnable run) {
            this.actual = actual;
            this.run = run;
        }

        @Override
        public void update() {
            run.run();
            actual.update();
        }
    }
}



This causes a contention point between independent downstream Updatables at every stage of a pipeline.

True, a similar contention point can be found with RxJava's Subjects and ConnectableObservables, but chained operators after them don't have the contentions. Unfortunately, the Reactive-Streams spec, in its current version, mandates something similar from Publishers. Now RxJava 2.x, Rsc and Reactor completely ignored this, turning out to be over-restrictive in practice, and we are pushing back instead to lighten the spec.

The second problem, although minor, is that you can't add the same Updatable multiple times. First because you can't distinguish between the different "subscriptions" via Map and second the spec mandates throwing an exception. Usually, this rarely happens because most end-consumers are solo.

The third problem is a bigger issue: throwing when the Updatable is no longer registered with the Observable. This creates an unfortunate race condition between end-consumers triggering removal while some intermediate operator such as take also triggers it; one of them will get an exception. This is why modern reactive libraries have idempotent cancellation.

The fourth problem is that in theory, addUpdatable and removeUpdatable can race with each other: some downstream operator would want to disconnect before an upstream operator has actually called addUpdatable. A possible outcome is that the removeUpdate chain throws yet addUpdatable succeeds, causing the signals to flow anyway and causing an unwanted retention of all associated objects.

Agera Updatable

Let's see the API from the consumer's perspective. Updatable is a single method functional interface which makes it easy to attach a listener to an Observable:


Observable source = ...

source.addUpdatable(() -> System.out.println("Something happened"));


Simple enough, now let's remove our listener:


source.removeUpdatable(() -> System.out.println("Something happened"));


Which yields a nice Exception: the two lambdas are not the same instance/reference. This is a very common problem with addListener/removeListener based APIs. The solution is to store the lambda in a reference and use that when needed:


Updatable u = () -> System.out.println("Something happened");

source.addUpdatable(u);

// ...

source.removeUpdatable(u);

A small inconvenience indeed, but it gets worse. What if you have many Observables and many Updatables? You have to remember who is registered with who, and keep references to them in some fields. One of the great ideas of the original Rx.NET design was to reduce this necessity to a single reference:


interface Removable extends Closeable {
    @Override
    void close(); // remove the necessity of try-catch around close()
}

public static Removable registerWith(Observable source, Updatable consumer) {
    source.addUpdatable(consumer);
    return () -> source.removeUpdatable(consumer);
}


Of course, we have to consider idempotence of calling close() here as well:


public static Removable registerWith(Observable source, Updatable consumer) {
    source.addUpdatable(consumer);
    final AtomicBoolean once = new AtomicBoolean();
    return () -> {
        if (once.compareAndSet(false, true)) {
            source.removeUpdatable(consumer);
        }
    });
}

Agera MutableRepository

The Agera MutableRepository holds a value and signals update() to registered Updatables if the value changes. This somewhat resembles to the BehaviorSubject we have, with the distinction that the new value doesn't flow to the consumers (remember, update() has no arguments) but has to be get() from the repository:


MutableRepository repo = Repositories.mutableRepository(0);

repo.addUpdatable(() -> System.out.println("Value: " + repo.get());

new Thread(() -> {
    repo.accept(1);
}).start();


When created via the factory method, it has the interesting property that the observation of the update() happens on the Looper where the repository has been created. (Looper is like a per-thread trampoline scheduler/Executor that let's one execute code on a specific thread, such as the Android main thread).

This out-of-band property creates an interesting case:


Set<Integer> set = new HashSet<>();

MutableRepository repo = Repositories.mutableRepository(0);

repo.addUpdatable(() -> set.add(repo.get()));

new Thread(() -> {
    for (int i = 0; i < 100_000; i++) {
        repo.accept(i);
    }
}).start();

Thread.sleep(20_000);

System.out.println(set.size());


Assuming 20 seconds is enough, what is the final size of the Set? One would expect it contains all 100.000 integers. In reality, the value could be anywhere between 1 and 100.000! The reason for this is because the accept() and get() run concurrently and if the consumer is slower, accept() simply overwrites the current value in the repository.

In some cases, this may be acceptable (i.e., similar to when onBackpressureDrop is applied in RxJava), sometimes its not and you may end up spending a lot of time hunting for lost values.

Error handling

Being asynchronous usually means you have asynchronous errors. RxJava and the others compose nicely in this regard: somebody errors out, the whole processing graph is cleaned up automatically unless the programmer wishes otherwise by suppressing, replacing or retrying the flow. The error and cleanup can be very complicated in some cases, but we library developers put in a lot of effort so you don't have to worry about it most of the time.

The Agera base API doesn't handle error by itself, you have to do it out-of-band just like with values. If you have multiple services composed via Agera, you have to establish the same error-management "framework" similar to how you'd have to do it in callback-hell situations. Very cumbersome and error-prone by itself due to concurrency and terminal state considerations.


Termination

Again, Agera doesn't have a notion for a completed stream - you have to figure out when that happens on your own. This might not be an issue in GUI cases where your consumer starts with your activity and ends with it as well and signals are delivered continuously. However, asynchronous background Observables now have to somehow tell or specify how many signals they will emit and how will you know the update() signal didn't happen because there is no data available.

How to design a modern zero-parameter reactive API

First of all, perhaps you shouldn't bother with one and just use an existing library for this:


rx.Observable<Void> signaller = ...

rx.Observer<Void> consumer = ...

Subscription s = signaller.subscribe(consumer);

// ...

s.unsubscribe();


You get all the infrastructure, operators and performance from them at basically no additional cost. Better yet, if you generally want to deal with signals of values, you can use the appropriate type instead of Void.

If an existing library feels to cumbersome to learn due to a lot of operators, you can perhaps fork it, delete the unnecessary stuff and use that. Of course, now you have to keep up with bugfixes and performance enhancements.

If forking and pruning doesn't sound attractive, you can develop your own library on top of the Reactive-Streams specification; Publisher<Void>, Subscriber<Void> and all the things between them you need. You get practically free interop with other Reactive-Streams libraries and consumers, plus, you can test your solution via its Test Compatibility Kit (TCK).

Of course, writing a reactive library is hard, writing a reactive library over Reactive-Streams is even harder. As a final resort, you may decide to write a barebone API from scratch.

If you really want to do a zero-argument reactive flow, here are a few tips you should consider:

1) Don't have separate addListener and removeListener. A single entry point simplifies the development of intermediate operators:


interface Observable {
    Removable register(Updatable u);
}

interface Removable {
    void remove();
}

2) Consider injecting the cancellation/remove support instead of returning a cancellation token or remover action:


interface Observable {
    void register(Updatable u);
}

interface Updatable {
    void onRegister(Removable remover);
    void update();
}

// or

interface Updatable {
    void update(Removable remover);
}

3) Consider adding error signal delivery at least:

Certainly, this complicates the lives of the library writers but can save a lot of on the side of your library's users.

interface Updatable {
    void onRegister(Removable remover);
    void update();
    void error(Throwable ex);
}


4) Consider offer asynchronous boundary as an option in the sequence.

I.e., with the MutableRepository example, you may want to react to the new value on the caller's thread before moving back to the main thread. This means observeOn and perhaps subscribeOn if you intend to have cold sources.


Conclusion

Writing a reactive library is not an easy task and one can fall into a lot of mistakes if one is not familiar with the history and evolution of field. In many companies, the "not invented here" or "we can do better" is so strong they rather start from scratch than learn/build upon somebody else's working solution.

(Funny thing, I sometimes offer RxJava for an in-house project and I'm still getting raised eyebrows, even though it practically being "developed here", mostly.)

You may ask, why do I care what Google/Agera does? Aren't I confident in RxJava? Of course I am confident and Agera's existence doesn't really strike me.

However, my experience shows, if you have big name banner over your head, unchallenged self-confidence and sub-par outcome may be forced upon an entire community. I don't really want to give out ideas here but imagine the next Android version would mandate Agera, in its current form, to be the standard for asynchrous programming!

(In addition, interop is inevitable at some point and I really don't want to get complaints on the main RxJava issue list if they down work together properly.)

Let me finish with a wisdom I came up with (as there are now 2 cases to back it up):

You want to write a reactive library? Please don't (just yet)!

2016. április 19., kedd

Operator fusion (part 2 - final)

Introduction


In the previous part, I've introduced the concepts around operator fusion. In this post, I'll detail the API and protocols required make operator fusion happen.

In its current form, operator fusion works between two subsequent operators and is based on the ability to identify each other and, in case of micro-fusion, switch to a different protocol than Reactive-Streams (RS) if both agree.

Macro-fusion constructs


The primary targets of macro-fusion are the single element sources: just(), empty(), fromCallable(). Firing up the complete RS infrastructure for such single elements is quite expensive, but half of the API use in RxJava and Reactor come from these. Therefore, RxJava introduced Single and Reactor introduced Mono to help as much as possible and offer (ever increasingly) optimized operators on them.

However, knowing a source will generate 0 or 1 element during assembly time is also a great help in regular Observable / Flux uses. In addition, knowing the source is also a constant helps inlining it in via some custom operator.

Creating 0 or 1 element synchronous sources


To indicate a source returns a single value, the Reactive-Streams-Commons (Rsc) project (and Reactor off it) established a contract:

If a Publisher implements java.util.concurrent.Callable, it is considered a 0 or 1 element source.

You can implement Callable and return a non-null value that can be computed synchronously. You can also return null which indicates an empty result. (Remember, RS doesn't allow null values over onNext.) The call to call() will happen during subscription time.

public class MySingleSource implements Publisher<Object>, Callable<Object> {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new ScalarSubscription<>(s, System.currentTimeMillis()));
    }

    @Override
    public Object call() throws Exception {
        return System.currentTimeMillis();
    }
}

If the 0 or 1 element source is known to be constant, the source can be the subject of assembly time optimizations. For example, if it returns null, indicating emptiness (like empty()), there are only a handful of operators that can be applied to it (which don't work on items) and the assembly process can just return empty().

We can extend Callable with a new interface ScalarCallable to indicate a 0 or 1 element constant source.

public interface ScalarCallable<T> extends Callable<T> {
    @Override
    T call();
}


By extending Callable, any use places who expects a 0 or 1 element dynamic source can work with a constant source. The reverse is not true; those expecting a constant source won't execute an arbitrary Callable (which could block or trigger side-effects) during assembly time:

public class MyScalarSource implements Publisher<Object>, ScalarCallable<Object> {
    @Override
    public void subscribe(Subscriber<? super Object> s) {
        s.onSubscribe(new ScalarSubscription<>(s, 1));
    }

    @Override
    public Object call() {
        return 1;
    }
}

Note that the ScalarCallable overrides the call() method and removes the throws Exception clause: scalar constants should not throw for one and consumers should not need to wrap the call() into a try-catch.

Consuming 0 or 1 element synchronous sources

Consuming Callable and ScalarCallable is a matter of instanceof checks performed either in subscription time or assembly time respectively, followed by the extraction of the single value through call().

For example, a macro-fusion on the operator count() could check for a scalar value and return a constant 0 for an empty or 1 for a single value:


public final Flux<Long> count() {
    if (this instanceof ScalarCallable) {

       T value = ((ScalarCallable<T>)this).call();

       return just(value == null ? 0 : 1);
    }
    return new FluxCount<>(this);
}


Another example is to have a shortcut in flatMap(), concatMap() or switchMap() for 0 or 1 element sources. In this case, there is no need to run the full infrastructure but just subscribe to the Publisher returned by their mapping function.

Note that since the mapper function can side-effect itself, one can't use assembly-time optimization on them and a new source operator has to be introduced.

public final <R> Px<R> flatMap(
        Function<? super T, ? extends Publisher<? extends R>> mapper) {

    if (this instanceof Callable) {

        return new PublisherCallableMap<((Callable<T>)this, mapper);
    }

    return new PublisherFlatMap<>(this, mapper, ...);
}

(Remark: Px stands for Publisher Extensions in Rsc and is the base type for Rsc's fluent API - more of a convenience in tests and perf to avoid spelling out all those PubliserXXX classes than a fully fledged API entry point.)


public final class PublisherCallableMap<T, R> implements Publisher<R> {
    final Callable<? extends T> source;
    final Function<? super T, ? extends Publisher<? extends T>> mapper;

    public PublisherCallableMap(
            Callable<? extends T> source,
            Function<? super T, ? extends Publisher<? extends T>> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(Subscriber<? super R> s) {
        T value;

        try {
            value = source.call();                                    // (1)
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            EmptySubscription.error(s, ex);
            return;
        }

        if (value == null) {
            EmptySubscription.complete(s);
            return;
        }

        Publisher<? extends R> p;

        try {
            p = mapper.apply(value);                                  // (2)
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            EmptySubscription.error(s, ex);
            return;
        }

        if (p == null) {
            EmptySubscription.error(s, 
                new NullPointerException("The mapper returned null");
            return;
        }

        if (p instanceof Callable) {                                  // (3)
            R result;

            try {
                result = ((Callable<R>)p).call();
            } catch (Throwable ex) {
                ExceptionHelper.throwIfFatal(ex);
                EmptySubscription.error(s, ex);
                return;
            }

            if (result == null) {
                EmptySubscription.complete(s);
                return;
            }

            s.onSubscribe(new ScalarSubscription<>(s, result));

            return;
        }

        p.subscribe(s);
    }
}

First (1), we extract the single value from the underlying Callable instance. If it is null, we complete the Subscriber immediately. Otherwise, we call the mapper that returns a Publisher (2). Since this publisher could be also a Callable, we do the extraction again (3) and either complete or set a backpressure-enabled ScalarSubscription on the Subscriber. Because call() can throw, we catch any exception, signal the fatal ones in some library-specific way and signal non-fatal exceptions to the Subscriber as well (plus setting its Subscription at the right time).

Caution with Callable

Since Callable is an established interface, one must be careful with implementors of Publisher and Callable where functionally, the callable means something different than a shortcut to a 0 or 1 element.

My hope is that since RS is relatively new and only a few people have actually implemented operators with it, we can avoid any pitfalls related to this combined interface approach.


Micro-fusion constructs

Unlike macro-fusion, micro-fusion requires a protocol switch between two subsequent operators; instead of using the standard RS method calls, some or all of them gets replaced by other method calls. This allows sharing internal structures or state between the two.

In theory, in a pair of operators, the upstream operator can be the initiator and work with the internals of the downstream operator. In practice, so far, we implemented fusion the other way around: the downstream operator works with the internals of the upstream operator.

However, going for a full custom interaction is not advised because that may lead to a complete custom implementation and duplication of a lot of code. (That being said, unfortunately, ConditionalSubscriber requires code duplication to avoid casting.)

Currently, Rsc and Reactor can do two kinds of micro-fusion: conditional and queue-based. On a second dimension, we can think 3 kinds of operators:

  • sources that support fusion (range(), UnicastProcessor
  • intermediate operators that may support fusion (concatMap, observeOn, groupBy, window)
    • front fusion (concatMap)
    • back fusion (groupBy)
    • transitive fusion (map, filter)
  • consumers (flatMap inner, zip)
The third dimension appears with queue-based fusion where the source can be synchronous (i.e., fromArray) or asynchronous (UnicastProcessor).


Conditional micro-fusion


The conditional micro-fusion ability is indicated by an interface: ConditionalSubscriber extending Subscriber with one extra method:

public interface ConditionalSubscriber<T> extends Subscriber<T> {
    boolean onNextIf(T t);
}

If a source or intermediate operator sees that its consumer is a ConditionalSubscriber it may call the onNextIf method. (By nature, this means a synchronous execution and response, thus conditional fusion is for synchronous cases only.)

If the method returns true, the value has been consumed as usual. If the method returns false, it means the value was dropped and a new value can be sent immediately. This avoids a request(1) call for a replenishment in filter and other operators as well.

Sidenote: You may ask, why is this important? A call to request() usually ends up in an atomic CAS, costing 21-45 cylces for each dropped element.

To work with ConditionalSubscribers in source operators, you may have to first switch on the incoming Subscriber's type and do a different implementation to avoid casting the downstream Subscriber all the time.


@Override
public void subscribe(Subscriber<? super Integer> s) {
    if (s instanceof ConditionalSubscriber) {

        s.onSubscribe(new RangeConditionalSubscription<>(
            (ConditionalSubscriber<T>)s, start, count));

    } else {
        s.onSubscribe(new RangeSubscription<>(s, start, count);
    }
}

The implementation can then can use the onNextIf method during emissions. For example, the fast-path can be rewritten as follows:


for (long i = start; i < (long)start + count; i++) {
    if (cancelled) {
        return;
    }
    s.onNextIf((int)i);
}
if (!cancelled) {
    s.onComplete();
}

You may think, why call onNextIf if we don't care about the return value? For composition reasons. Even though this path in range() doesn't need the return value, but if the downstream is also calling onNextIf further down, this can avoid a whole chain of unnecessary request(1) calls.

The slow path is more interesting in this regard:


long i = index;
long end = (long)start + count;
long r = requested;
long e = 0L;

while (i != end && e != r) {
    if (cancelled) {
       return;
    }
    
    if (s.onNextIf((int)i)) {
        e++;
    }
    i++;
}

if (i == end) {
    if (!cancelled) {
        s.onComplete();
    }
    return;
}

if (e != 0L) {
    index = i;
    REQUESTED.addAndGet(this, REQUESTED, -e);
}

In the while loop, if the onNextIf returns false, we don't increment the emission count which means the next integer value can come immediately. If a downstream consumer requests only 1 and then drops all values, the loop can exhaust the available integers and not call the atomic addAndGet even once.

Since filter is one of the most common operators in a chain, one should be prepared to work with ConditionalSubscriber even if one doesn't interfere with the number of events flowing through. For example, map() and filter appear together and it is advised map() also supports conditional fusion by switching on the Subscriber's type just like above and using a ConditionalSubscriber-based Subscriber:

static final class MapConditionalSubscriber<T, R> implements ConditionalSubscriber<T> {
    final ConditionalSubscriber<? super R> actual;
    
    final Function<? super T, ? extends R> mapper;

    boolean done;

    Subscription s;

    // ...

    @Override
    public boolean onNextIf(T t) {
        if (done) {
            return;
        }

        R v;
        
        try {
            v = mapper.apply(t);
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            s.cancel();
            onError(ex);
            return;
        }

        if (v == null) {
            s.cancel();
            onError(new NullPointerException("..."));
            return;
        }

        return actual.onNextIf(v);
    }

    // ...
}


The final case for the conditional micro-fusion is the "terminal" operator or consumer implementation. Luckily, usually doesn't have to provide two implementations, on ConditionalSubscriber and one Subscriber, but have them together. Those who can work with the ConditionalSubscriber part will do, others will just use the regular Subscriber methods:


static final FilterSubscriber<T> implements ConditionalSubscriber<T> {
    final Subscriber<? super T> actual;

    final Predicate<? super T> predicate;

    boolean done;

    Subscription s;

    // ...

    @Override
    public void onNext(T t) {

        if (!onNextIf(t)) {
           s.request(1);
        }
    }

    @Override
    public boolean onNextIf(T t) {
        if (done) {
            return;
        }
        
        boolean pass;

        try {
            pass = predicate.test(t);
        } catch (Throwable ex) {
            ExceptionHelper.throwIfFatal(ex);
            s.cancel();
            onError(ex);
        }

        if (pass) {
            actual.onNext(t);
            return true;
        }
        return false;
    }

    // ...
}

In conclusion, conditional micro-fusion is a relatively simple but sometimes verbose way of avoiding request(1) calls and the resulting per-item overhead.


Queue-based micro-fusion

Believe me if I tell, this is the most complicated thing, so far, in the reactive landscape. Not because it requires complicated structures or algorithms, but for the implications towards operators and the combinatoric-explosion nature of what happens if op1 is followed by op2 and how they can or can't fuse.

The queue-based micro-fusion is built upon the idea that many operators employ a queue to work out backpressure-related or asynchrony-related cases when notifying the downstream and happen to face their queue towards each other.

For example, UnicastProcessor has a backend-queue that holds values until the downstream requests them whereas concatMap has a front-queue that holds the source values to be mapped into Publishers. When subscribed, a value goes from one queue into the other, forming a dequeue-enqueue pair without anything functional between the two other than the atomics overhead of request management and wip-accounting.

Clearly, if we could somehow use a single queue between the two and somehow decrease the atomics overhead via it, we'd have a much lower overhead in terms of computation and memory usage.

However, what if there is an operator between the two that does something with the values? What if the fusion shouldn't happen in this case?

To solve this coordination problem, we can reuse the onSubscribe(Subscription) rail in RS and extend the protocol. Enter QueueSubscription.


public interface QueueSubscription<T> extends Queue<T>, Subscription {

    int NONE = 0;
    int SYNC = 1;
    int ASYNC = 2;
    int ANY = SYNC | ASYNC;
    int THREAD_BOUNDARY = 4;

    int requestFusion(int mode);

    @Override
    default boolean offer(T t) {
        throw new UnsupportedOperationException();
    }

    // ...
}

The QueueSubscription is a combination of Queue and Subscription interfaces, adding a new requestFusion() method, and other than keeping the following methods from the base interfaces, all other methods are defaulted to UnsupportedOperationException as we won't need them. (Java 7- note: yes you may have to manually do this for classes that can't extend a base class.):


  • void request(long n);
  • void cancel()
  • T poll()
  • boolean isEmpty()
  • void clear();
(Some libraries may choose to implement size() as well, for diagnostic purposes.)


When a source supports queue-based fusion, it can send a QueueSubscription implementation through onSubscribe. Those who can deal with it can act on it, the rest will simply see it as a regular Subscription.

The idea is that those who can deal with it can use it as a Queue instead of instantiating their own, saving on allocation and overhead at the same time. In addition, a source such as range() can itself pretend to be a queue, returning the next value through poll() or null if no more integers remain.

Since there are cases where fusion can't or should not happen, we need to perform a protocol switch during the subscription phase of a flow. This switch can be requested via the requestFusion() method, that takes and returns the constants from the interface.

(Sidenote: I know enums would be more readable, but EnumSet has a nice additional overhead you know...)

As an input, it can take:


  • SYNC - indicates the consumer wants to work with a synchronous upstream, with often known length
  • ASYNC - indicates the consumer wants to work with an asynchronous upstream with often unknown length and emission timing
  • ANY - indicates a consumer can work with both SYNC and ASYNC upstream
  • (SYNC, ASYNC) | THREAD_BOUNDARY - indicates that the consumer goes over a thread boundary and poll() happens on some other thread.

It can return:


  • NONE - fusion can't happen/rejected
  • SYNC - synchronous fusion mode activated
  • ASYNC - asynchronous fusion mode activated
If the upstream is unable to work in the requested mode or is sensitive to thread-boundary effects, it can return NONE. In this case, the flow behaves just like the regular, non-fused RS stream would. (Note that conditional fusion is still may be an option.)

Because fusion is optional, a successfully negotiated mode requires different mode of execution in either or both parties. In addition, this mode switch has to happen before any events fly through the chain, therefore, onSubscribe is an ideal place for it due to the underlying RS protocol spec.

Both SYNC and ASYNC modes have extra rules implementors must adhere. 

In SYNC mode, consumers should never call request() and producers should never return null from poll() unless they mean completion. Since the only interaction between the two are through poll() and isEmpty(), sources have no opportunity to call onError but must throw a runtime exception from these two methods. On the other side, consumers now have to wrap these methods into try-catches and handle/unwrap exceptions there.

In ASYNC mode, the producer enqueues events in its own queue and has to signal the availability to the consumer. The best way for this is through onNext. One can either signal the value itself or null - the only place where you can do this. On the consumer side, the ASYNC mode onNext now has meaningless value and should be ignored. The other methods, onError, onComplete, request and cancel should be used as in regular RS cases. In this mode, poll() can return null indicating a temporary lack of values; the termination will be indicated by onError and onComplete as usual.


Implementing fusion-enabled sources

Now let's see the API in action. First, let's make range() fusion enabled:

static final class RangeSubscription extends QueueSubscription<Integer> {
    
    // ... the Subscription part is the same

    @Override
    public Integer poll() {
        long i = index;
        if (i == (long)start + count) {
            return null;
        }
        index = i + 1;
        return (int)i;
    }

    @Override
    public boolean isEmpty() {
        return index == (long)start + count;
    }

    @Override
    public void clear() {
        index = (long)start + count;
    }

    @Override
    public int requestFusion(int mode) {
        return SYNC;
    }
}

No sign of request accounting whatsoever because range() works in synchronous pull mode; consumer does backpressure by calling poll() when it needs a new value.

UnicastProcessor (which is somewhat like onBackpressureBuffer()) can support fusion in ASYNC mode specifically:


public final class UnicastProcessor<T> implements Processor<T, T>, QueueSubscription<T> {

    volatile Subscriber<? super T> actual;

    final Queue<T> queue;

    int mode;

    // ...

    @Override
    public void onNext(T t) {
        Subscriber<? super T> a = actual;
        if (mode == ASYNC && a != null) {
            a.onNext(null);
        } else {
            queue.offer(t);
            drain();
        }
    }

    @Override
    public int requestFusion(int m) {
        if ((m & ASYNC) != 0) {
            mode = ASYNC;
            return ASYNC;
        }
        return NONE;
    }

    @Override
    public T poll() {
        return queue.poll();
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public void clear() {
        queue.clear();
    }

    @Override
    public void subscribe(Subscriber<? super T> s) {
        if (ONCE.compareAndSet(this, 0, 1)) {
            s.onSubscribe(this);
            actual = s;
            if (cancelled) {
                actual = null;
            } else {
                if (mode != NONE) {
                    if (done) {
                        if (error != null) {
                            s.onError(error);
                        } else {
                            s.onComplete();
                        }
                    } else {
                        s.onNext(null);
                    }
                } else {
                    drain();
                }
            }
        } else {
            EmptySubscription.error(s, new IllegalStateException("..."));
        }
    }
}

The fusion mode requires the following behavior changes:


  • onNext has to call actual.onNext instead of drain(),
  • requestFusion has to see if the downstream actually wants ASYNC fusion,
  • the queue methods have to be delegated to the instance queue,
  • the subscribe() has to call actual.onNext instead of drain() as well.

Doesn't look too complicated, does it? At this point, you can check your understanding of supporting fusion through an exercise: can UnicastProcessor support SYNC fusion and if so, when and how; if not, why not?


Implementing fusion-enabled intermediate operators

In practice, usually there are some intermediate operators between a fuseable source and a fusion-enabled consumer. Unfortunately, this can break the fusion (and thus reverting to the classical RS) mode or worse, the data may skip the intermediate operator altogether, causing all sorts of failures.

The latter manifests itself when an operator forwards the Subscription it received via its onSubscribe method. Now imagine if map() does this; what would be the output of the following sequence:

range(0, 10).map(v -> v + 1).concatMap(v -> just(v)).subscribe(System.out::println);

In a classical flow, you'd get values 1 through 10 printed to the console. If both range() and concatMap() do fusion but map() forwards its Subscription, the surprising output is 0 through 9! This can affect any operator.

The solution is to require all operators that don't want to participate in fusion to never forward the upstream's subscriber verbatim. A possible manifestation of this rule is to implement Subscription on yourself:


static final class MapSubscriber<T, R> implements Subscriber<T>, Subscription {
    // ...
     
    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;

        actual.onSubscribe(this);
    }

    @Override
    public void request(long n) {
        s.request(n);
    }

    @Override
    public void cancel() {
        s.cancel();
    }

    // ...
}

In practice, many operators that either manipulate requests or cancellation does this so the indirection is an acceptable trade-off for the benefit of a lower overhead dataflow in general.

This rule, unfortunately affects cross-library behavior. Even though other libraries may not speak the same fusion protocol, they could end up forwarding Subscriptions, thus if you go into and out of some other library, the same problem may appear again. Generally, libraries supposed to have a method hide() or asObservable() to hide the identity of a source as well as preventing the propagation of unwanted internal features.

Luckily, map() can participate in the fusion: it only has to be fuseable itself, mediate the requestFusion between its upstream and downstream, plus place itself at the exit point: poll().


static final class MapSubscriber<T, R> implements Subscriber<T>, QueueSubscription<R> {
    final Subscriber<? super R> actual;
    
    final Function<? super T, ? extends R> mapper;

    QueueSubscription<T> qs;

    Subscription s;

    int mode;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
        this.s = s;
        if (s instanceof QueueSubscription) {
            qs = (QueueSubscription<T>)s;
        }

        actual.onSubscribe(this);
    }

    @Override
    public void onNext(T t) {
        if (mode == NONE) {
            
            // error handling omitted for brevity

            actual.onNext(mapper.apply(t));

        } else {
            actual.onNext(null);
        }
    }

    @Override
    public int requestFusion(int m) {
        if (qs == null || (m & THREAD_BOUNDARY) != 0) {
            return NONE;
        }
        int u = qs.requestFusion(m);
        mode = u;
        return u;
    }

    @Override
    public R poll() {
        T t = qs.poll();
        if (t == null) {
            return null;
        }
        return mapper.apply(t);
    }

    @Override
    public boolean isEmpty() {
        return qs.isEmpty();
    }

    @Override
    public void clear() {
        qs.clear();
    }
}

The operator map() can implement QueueSubscription itself and have a field for the potential upstream's QueueSubscription as well. In requestFusion, if the upstream does support fusion and the downstream isn't a boundary, the request is forwarded to upstream; rejected otherwise.

Now poll() can't just forward to the upstream because the types are different. Here comes the mapper function that is applied to the upstream's value. Note that null indicates termination or temporary lack of values and should not be mapped.

The main reason THREAD_BOUNDARY was introduced as a flag is due to map(), or in a more broader sense: the restriction on where user-supplied computations happen. In fusion mode, the execution of the mapper function happens on the exit side of the queue, which could be in some other thread. Now imagine you have a heavy computation in map which would run off the main thread before reaching an observeOn. When unfused, the result of the computation would be queued up in observeOn, then dequeued on the target thread (let's say the main thread). However, if fusion is allowed, the target thread is doing the poll() and now the heavy calculation runs on the main thread.


The operator filter() can be implemented in a similar fashion, but our old request(1) comes back unfortunately:


static final class FilterSubscriber<T> implements Subscriber<T>, QueueSubscription<T> {
    // ...

    @Override
    public T poll() {
        for (;;) {
            T v = qs.poll();

            if (v == null || cancelled) {
                return null;
            }

            if (predicate.test(v)) {
                return v;
            }

            if (mode == ASYNC) {
                qs.request(1);
            }
        }
    }

    @Override
    public boolean isEmpty() {
        return qs.isEmpty();
    }

    // ...
}

Since filter() drops values, we need to loop in poll() until the predicate matches or no more upstream values are available for some reason. If the predicate doesn't match, we have to replenish our ASYNC source (remember, you are not supposed to call request() in sync mode!).


Implementing fusion-enabled consumers

Generally, operator fusion is not very useful (or really happens) with end-subscribers, such as your favorite Subscriber subclass or with subscribe(System.out::println).

The consumers I'm talking about can be considered intermediate operators as well, but since all operators are basically custom Subscribers that are subscribed to the upstream, they are consumers as well.

As I mentioned, many operators feature some internal queue on their front side (e.g., concatMap, observeOn) or when they consume some inner Publisher (i.e., flatMap, zip). These are the primary consumers and drivers of the fusion lifecycle.

Now that we are familiar with how observeOn is implemented, let's see how can we enable fusion with it:


static final class ObserveOnSubscriber<T> implements Subscriber<T>, Subscription {

    Queue<T> queue;

    int mode;

    Subscription s;

    // ...

    @Override
    public void onSubscribe(Subscription s) {
         this.s = s;

         if (s instanceof QueueSubscription) {
             QueueSubscription<T> qs = (QueueSubscription<T>)s;

             int m = qs.requestFusion(QueueSubscription.ANY
                  | QueueSubscription.THREAD_BOUNDARY);

             if (m == QueueSubscription.SYNC) {
                 q = qs;
                 mode = m;
                 done = true;
                 
                 actual.onSubscribe(this);
                 
                 return;
             }

             if (m == QueueSubscription.ASYNC) {
                 q = qs;
                 mode = m;

                 actual.onSubscribe(this);

                 s.request(prefetch);

                 return;
             }       
         }

         queue = new SpscArrayQueue<>(prefetch);
         
         actual.onSubscribe(this);

         s.request(prefetch);
    }

    @Override
    public void onNext(T t) {
        if (mode == QueueSubscription.NONE) {
            queue.offer(t);
        }
        
        drain();
    }
    
    void drain() {

        // ...
             
            if (mode != QueueSubscription.SYNC) {
                request(p);
            }

        // ...

    }

    // ...

}

Enabling fusion has two implications: 1) queue can no longer be final but has to be created in onSubscribe, 2) onNext should not offer if fusion is enabled.

The fusion mode is requested in the onSubscribe after identifying the upstream as QueueSubscription. Since the algorithm inside drain() only sees the Queue interface and doesn't particularly care when values are available in the queue, we request the ANY mode from upstream in addition to indicating the consumer is also a THREAD_BOUNDARY. This should prevent the poll() side to change the location of some user-defined function unexpectedly.

If SYNC mode is granted, we assign the QueueSubscription to our queue and call onSubscribe on the downstream Subscriber. In this mode, the prefetch amount is not requested in accordance with the synchronous fusion protocol. The big win in SYNC mode is the fact that if poll() returns null, that is an indication of termination. We already exploit this in the standard queue-drain algorithm: if the done flag is set and the queue reports null/empty, we have completed. Note however, that we have to adjust the drain algorithm a bit because we can't call request in SYNC mode anymore.

If ASYNC mode is granted, we store the queue again, but can't set the done flag as we don't know when the upstream finishes - poll() returning null is just the indication of unavailability of values at the time. In addition, once the downstream Subscriber is notified, we still have to signal a prefetch-request to upstream, so it can trigger its own sources even further up.

Note that once requestFusion returns SYNC or ASYNC, there is no going back (you may try to call requestFusion() again which may change the mode, but that's undefined behavior at the moment; it may be forbidden entirely in the future), definitely not after elements have been delivered already in any mode.

General warnings around micro-fusion

In my experience, some of my colleagues tend to become enthusiastic about micro-fusion; they want to apply it everywhere. Whenever an operator has any queue, they see fusion happening.

I must warn against such relentlessness because fusion has some requirements, implications and generally subject to cost-benefit trade-offs:


  • If an operator is a thread boundary, my current understanding is that you can't fuse both its front and back side at the same time.
  • Fusion can shift computations in time and sometimes in location (even without an explicit boundary).
  • The fact an operator has a queue doesn't mean it can be exposed/replaced. A good example of this is combineLatest: my current understanding is that the post-processing of the queue elements makes this infeasible for back side fusion. Another example is flatMap where I'm not convinced the collector logic can be integrated into a poll()/isEmpty() back-side fusion.
  • Some sources, such as 0 or 1 are likely not worth it and are better off with macro-fusions.
  • Fusion is an extra behavior which also can be buggy or in fact, hide a bug on the regular path (i.e., groupBy) and requires extra care. In addition, it increases the test method count because now you have to test with and without fusion (see hide()).

To cheer you up, there is a great counter-example operator that supports full fusion: front and back side at the same time: flattenIterable, or as you may know it, concatMapIterable/flatMapIterable.

Conclusion

In this post, I've detailed the structures and protocols of operator fusion and shown some examples how it can be utilized in source, intermediate and terminal operators.

Since operator fusion is an active research area, I can't say these are all that can happen and we are eager to hear about interesting chains of operators where fusion can happen, or in contrast, were fusion should not happen. See the Rsc repository for examples of all kinds of fusions.

In addition, I hope these fusion protocols will be standardized and be part of Reactive-Streams 2.0, allowing a full, cross-library efficient operation that maintains fusion as long as possible.

My next topic will be to finish up the series about ConnectableObservables.