IntroductionOperator-fusion, one of the cutting-edge research topics in the reactive programming world, is the aim to have two of more subsequent operators combined in a way that reduces overhead (time, memory) of the dataflow.
(Other cutting-edge topics are: 1) reactive IO, 2) more native parallel async sequences and 3) transparent remote queries.)
The key insight with operator-fusion is threefold:
- many sequences are started from constant or quasi-constant sources such as just(), from(T), from(Iterable), fromCallable() which don't really need the thread-safety dance in a sequence of operators,
- some pairs of operators can share internal components such as Queues and
- some operators can tell if they consumed the value or dropped it, avoiding request(1) call overhead.
In this mini-series, I'll describe the hows and whys of operator-fusion, as we currently understand it. By "we", I mean the joint research effort on optimizing Reactive-Streams operators beyond what's there in RxJava 2.x and has been in previous versions of Project Reactor.
The experimentation happens in the reactive-streams-commons, Rsc for short, GitHub repository. The results of the Rsc is now driving Project Reactor 2.5 (currently in milestone 2) and verified by a large user base. Hopefully, RxJava can benefit from the results as well (but maybe not before 3.x).
If you are following Akka-Streams, you might have read/head about operator-fusion there as well. As far as I could understand their approach, their objective is to make sure more stages of the pipeline run on the same Actor, avoiding the previous, very likely, thread-hopping with their sequences. Essentially, there is now a mode where the developer can define the async boundaries in the pipeline. Does this sound familiar? From day 1, Rx-based libraries let you do this.
GenerationsReactive libraries and associated concepts evolved over time. What we had 7 years ago in Rx.NET, requirements and implementation-wise is significantly different what we'll have tomorrow with libraries such as Project Reactor.
With my experience with the history of "modern" reactive programming, I categorize the libraries into generations.
0th generationThe very first generation of reactive programming tools mainly consist of java.util.Observable API and its cousins in other languages and almost any callback-based API such as addXXXListener in Swing/AWT/Android.
The Observable API was most likely derived from the Gang-of-four design patterns book (or the other way around, who knows) and has the drawback of being inconvenient to use and non-composable. In today's terms, it is a limited PublishSubject where you have only one stage: publisher-subscriber.
The addXXXListener style of APIs suffer, although facilitate push-based eventing, from composability deficiencies. The lack of common base concept would require you to implement a composable library for each of them one-by-one; or have one common abstraction like RxJava and build adapter for each addXXXListener/removeXXXListener entry point.
1st generationOnce the deficiencies were recognized and addressed by Erik Meijer & Team at Microsoft, the first generation of reactive programming libraries were born: Rx.NET around 2010, Reactive4Java in 2011 and early versions of RxJava in 2013.
The others followed the Rx.NET architecture closely, but soon turned out there are problems with this architecture. When the original IObservable/IObserver is implemented with purely same-thread manner, the sequences can't be cancelled in progress with operators such as take(). Rx.NET sidestepped the issue by using mandatory asnycrony in sources such as range().
The second problem was the case when the producer side is separated by an implicit or explicit asynchronous boundary from a consumer that can't do its job fast enough. This can happen with trivial consumers as well because of the infrastructure overhead of crossing the asynchronous boundary. This is what we call the backpressure problem.
2nd generationThe new deficiencies of synchronous cancellation and the lack of backpressure were recognized by the RxJava team (I wasn't really involved) and a new architecture has been designed.
The class Subscriber was introduced which could tell if it was interested in more events or not via isUnsubscribed() that had to be checked by each source or operator emitting events.
The backpressure problem was addressed by using co-routines to signal the amount of items a Subscriber can process at a time through a Producer interface.
The third addition was the method lift() which allows a functional transformation between Subscribers directly. Almost all instance operators have been rewritten to run with lift() through the new Operator interface.
3rd generationApart from being clumsy and limiting some optimizations, the problem with RxJava's solution was that it was incompatible with the viewpoints of other (upcoming) reactive libraries at the time. Recognizing the advent of (backpressure enabled) reactive programming, engineers from various companies got together and created the Reactive-Streams specification. The main output is a set of 4 interfaces and 30 rules regarding them and their 7 total methods.
The Reactive-Streams specification allows library implementors to be compatible with each other and compose the sequences, cancellation and backpressure across library boundaries while allowing the end-user to switch between implementations at will.
Reactive-Streams, and thus 3rd generation, libraries are, for example, RxJava 2.x, Project Reactor and Akka-Streams.
4th generationImplementing a fluent library on top of Reactive-Streams requires quite a different internal architecture, thus RxJava 2.x had to be rewritten from scratch. While I was doing this reimplementation, I recognized some operators could be combined in an external or internal fashion, saving on various overheads such as queueing, concurrency-atomics and requesting more.
Since RxJava 2.x development crawled to halt due to lack of serious interest from certain parties, I set RxJava 2.x aside until Stephane Maldini (one of the contributors to Reactive-Streams and main contributor to Project Reactor) and I started talking about a set of foundational operators that both RxJava 2.x and Project Reactor 2.5+ (and eventually Akka-Streams) could use and incorporate them into the respective libraries.
With active communication, we established the reactive-streams-commons library, built the foundational operators and designed the components of optimizations that we call now operator-fusion.
Thus, a 4th generation reactive library may look like a 3rd generation from the outside, but the internals of many operators change significantly to support overhead reduction even further.
5+ generationI think, at this point, we are at half point in what operator-fusion can achieve, but there are signs the architecture of Reactive-Streams will need extensions to support reactive IO operations in the form of bi-directional sequences (or channels). In addition, transparent remote reactive queries may require changes as well (see QBservable in Rx.NET). I don't see the full extent of possibilities and requirements at this point and all is open for discussion.
The Rx lifecycleBefore jumping into operator-fusion, I'd like to define the major points (thus the terminology I'll be using) of the lifecycle of an Rx sequence. This applies to any version of RxJava and any Reactive-Streams based libraries as well.
The lifecycle can be split into 3 main points:
- Assembly-time. This is the time when you write up just().subscribeOn().map() and assign that to a field or variable of type Observable/Publisher. This is the main difference between Future-based APIs (Promise, CompletableFuture, etc.) which if support some fluent API where there isn't a separate assembly time but some form of interleaving among the 3 points.
- Subscription-time. This is the time when a Subscriber subscribes to a sequence at its very end and triggers a "storm" of subscriptions inside the various operators. On one hand, it has an upstream-directed edge and on the other hand, a downstream-directed edge of calls to setProducer/onSubscribe. This is when subscription-sideeffects are triggered and generally no value is flowing through the pipeline.
- Runtime. This is the time when items are generated followed by zero or one terminal event of error/completion.
Each distinct point in the lifecycle enables a different set of optimization possibilities.
Operator-fusionI admit, I took the term operator-fusion from some Intel CPU documentation describing their internal architecture doing macro- and micro-fusions on assembly-level operators. It kinda sounded cool and the concepts behind it could be expanded up the language level and reach the operators of reactive dataflows.
The idea, on the reactive level, is to modify the sequence the user created at various lifecycle points to remove overhead mandated by the general architecture of the reactive library.
As with the assembly-level fusion, we can define two kinds of reactive operator-fusion.
Macro-fusionMacro-fusion happens mainly in the assembly-time in the form of replacing two or more subsequent operators with a single operator, thus reducing the subscription-time overhead (and sometimes the runtime overhead in case the JIT would be overwhelmed) of the sequence. There are several ways this can happen.
1) Replacing an operator with another operator
In this form of fusion, the operator applied looks at the upstream source (this is why I mentioned lift() causes trouble) and instead of instantiating its own implementation, it calls/instantiates a different operator.
One example of this is when you try to amb()/concat()/merge() an array of sources which has only one element. In this case, it would be unnecessary to instantiate the implementation and one can avoid the overhead by returning that single element directly. This kind of optimization is already part of RxJava 1.x.
The second example is when one uses a constant source, such as range() and applies subscribeOn(). However, there is little-to-no behavioral difference between applying observeOn() in the same situation. Thus subscribeOn() detecting a range() can switch to observeOn() and perhaps benefit from other optimizations that observeOn() itself can provide.
2) Replacing an operator with a custom operator
The exist operator-pairs that come up often and may work better if they were combined into a single operator. A very common operator-pair that is used for jump-starting some asynchronous computation is just().subscribeOn() or the equivalent just().observeOn().
Such sequences have quite a large overhead compared to the single value they emit: internal queues get created, workers get instantiated and released, several atomic variables are modified.
Therefore, replacing the pair with a custom operator that combines the scheduling and emission into a single value into one single operator is a win.
This approach, especially involving just(), can be extended to other operators, such as flatMap() where all the internal complexities can be avoided by invoking the mapper function once and running with the single Observable/Publisher directly, without buffering or extra synchronization.
Again, RxJava 1.x already has optimizations such as these examples above.
3) Replacing during subscription-time
There are cases when the previous two cases may happen during subscription-time instead of assembly-time.
I can see two reasons for moving the optimization into the subscription-time: 1) safety-net in case the fluent API is bypassed and 2) convenience if the fused and non-fused version doesn't differ that much to warrant a full-independed class as operator.
4) Replacing with the same operator but with modified parameters
Users of the libraries tend to apply certain operator types multiple times in a sequence, such as map() and filter():
Observable.range(1, 10) .filter(v -> v % 3 == 0) .filter(v -> v % 2 == 0) .map(v -> v + 1) .map(v -> v * v) .subscribe(System.out::println);
This is quite convenient to look at one can more easily understand what's happening. Unfortunately, if you have a range of 1M or resubscribe to the sequence a million times, the structure has quite a measurable overhead compared to a flatter structure.
The idea with this macro-fusion is to detect if an operator of the same type was applied before, take the original source and apply the operator where the parameters get combined. In our example, that means range() is followed, internally, by a single filter() application where the two lambda functions (in their reference form) are combined:
Predicate<Integer> p1 = v -> v % 3 == 0; Predicate<Integer> p2 = v -> v % 2 == 0; Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);
A similar fusion happens with the lambda of the map() operations, with the difference that the output of the first lambda is going to be the input of the second lambda:
Function<Integer, Integer> f1 = v -> v + 1; Function<Integer, Integer> f2 = v -> v * v; Function<Integer, Integer> f3 = v -> f2.apply(f1.apply(v));
Micro-fusionMicro-fusion happens when two or more operators share their resources or internal structures and thus bypassing some overhead of the general wired-up structure. Micro-fusion can mostly happen in subscription-time.
The original idea of micro-fusion was the recognition that operators that end in an output queue and operators starting with a front-queue could share the same Queue instance, saving on allocation and saving on the drain-loop work-in-progress serialization atomics. Later, the concept has been extended to sources that could pose as Queues and thus avoiding the creation of SpscArrayQueue instances completely.
There are several forms of micro-fusion that can happen in operators.
1) Conditional Subscriber
When filtering an (upstream) source with filter() or distinct(), if that source features a drain-loop with request accounting, there is the likely scenario that filter() will request(1) if the last value has been dropped by the operator. Lots of request(1) calls, which all trigger some atomic increment or CAS loop adds up overhead relatively quickly.
The idea behind a conditional subscriber is to have an extra method, boolean onNextIf(T v), that would indicate if it didn't really consume the value. In that case, the usual drain-loop would then skip incrementing its emission counter and keep emitting until the request limit is reached by successful consumptions.
This saves a lot on request management overhead and some operators in RxJava 2.x support it, but there are some drawbacks as well, mostly affecting the library writers themselves:
a) The source and filter may be separated by other operators so those operators have to offer a conditional Subscriber version of themselves to pass along the onNextIf() calls.
b) By returning non-void, the onNextIf() implementation is forced to be synchronous in nature. However, since it just returns a boolean, it can still behave as the regular onNext() method by claiming it consumed the value even though it dropped it; therefore, it has to request(1) manually again.
Since this is an internal affair, conditional Subscribers of operators still have to implement the regular onNext() behavior in case the upstream doesn't support conditional emission and/or is from some other reactive library with different internals.
We call synchronous micro-fusion the cases when the source to an operator is synchronous in nature, and can pretend to be a Queue itself.
Typical sources of such nature are range(), fromIterable, fromArray, fromStream and fromCallable. You could count just() here as well but usually, it is involved more in macro-fusion cases.
Operators that use an internal queues are, for example, observeOn(), flatMap() in its inner sources, publish(), zip(), etc.
The idea is for the source's Subscription to also implement Queue, and during the subscription time, the onSubscribe() can check for it and use it instead of newing up its internal Queue implementation.
This requires a different operation mode (a mode switch) from both upstream and the operator itself, namely, calling request() is forbidded and one has to remember the mode itself in some field variable. In addition, when the Queue.poll() returns null, that should indicate no more values will ever come, unlike regular poll()s in operators where null means no values available but there could be in the future.
Unfortunately for the RxJava 1.x, this fusion works better with the Reactive-Streams architecture because a) setting a Producer is optional, b) the lifecycle-related behaviors are too unreliable and c) discovery difficulties and too much indirection.
When benchmarked in Rsc, this form of fusion makes a range().observeOn() sequence go from 55M Ops/s to 200M Ops/s in throughput, giving a ~4x overhead reduction in this trivial sequence.
Again, there are downsides of this kind of API "hacking":
a) In short sequences, the mode switch inside the operator may not be worth it.
b) This optimization is library local at the moment so unless there is a standard API like with Reactive-Streams interfaces, library A implementing micro-fusion may not cross-fuse with library B.
c) There are situations where this queue-fusion optimization is invalid, mainly due to thread-boundary violations (or other effects we haven't discovered yet that create invalid fused sequences).
d) This optimization has also some library-spanning effect, because intermediate operators have to support, or at least not interfere with the setup protocol of the mode-switch.
e) This also has the effect that in a Reactive-Streams architecture, an operator can't just pass along the Subscription from upstream to its downstream because if they fuse, the intermediate operator is cut out.
There are other situations when the source has its own internal, downstream facing queue which is drained by requests, but the timing and count of the items are not known upfront.
In this situation, the source can also implement the Queue interface and the operator use it instead of a fresh queue, but the protocol has to change, especially if the same operator wants to support synchronous fusion.
Therefore, in Rsc, instead of checking if Subscription implements Queue received in onSubscribe(), we established a custom interface, QueueSubscription, that implements Subscription, Queue and a method called requestFusion().
The method requestFusion() takes a int-flag telling the upstream what kind of fusion the the current operator wants or supports and the upstream should respond what kind of fusion mode it has activated.
For example, flatMap() would request a synchronous fusion from the inner source which could answer with, sorry-no, yes-synchronous or instead-asynchronous mode and act according to them. Generally, one can "downgrade" from a synchronous mode to asynchronous or none, but one can't "upgrade" to synchronous mode from asychronous mode requests.
In asynchronous-fusion mode, downstream has to still issue request() calls, but instead of enqueueing the value twice, the value gets generated into the shared queue and the upstream calls onNext() indicating its availability. The value of this call is irrelevant, we use null as a type-neutral value, and can trigger the usual drain() call directly.
Since fusion happens in subscription time, there is too late to change the Subscriber instance itself, therefore, one needs a mode flag in the operator and do a conditional check for the fusion mode. Therefore, the same class can work with regular and fuseable sources alike.
This is the point when the complexity rises 50% above the complexity of a classical backpressured operator and requires quite an in-detail knowledge of all the operators and their behavior in various situations.
Invalid fusionsBefore one goes ahead and fuses every queue in every operation, a problem comes up in the form of invalid fusion.
Operators tend to have some barriers associated with them. These are somewhat analogous to memory barriers and have a similar effect: 1) prevent certain reorderings and 2) prevent certain optimizations altogether.
For example, mapping from String to Integer and then Integer to Double can't be reordered because of the type mismatch. Reordering a filter() with map() may be invalid when the map changes types or by introducing side-effects in map that would have been avoided because filter didn't let the causing value through in the first place.
On one hand, these functional barriers mainly affect the macro-fusion operators and somewhat easier the detect and understand.
On the other hand, when asynchrony is involved, in the form of a thread-jumping behavior provided by observeOn(), micro-fusion can become invalid.
For example, if you have a sequence of
source.flatMap(u -> range(1, 2).map(v -> heavyComputation(v)) .observeOn(AndroidSchedulers.mainThread())) .subscribe(...)
The inner sequence of range-map-observeOn-flatMap would have a single fused queue, where the map()'s behavior has been reordered to the output side of the shared queue, now executes the heavy computation on the main thread.
On a side note, classical observeOn can also drag the emission to its thread due to how backpressure triggers emission, thus in the example above, if you have a longer range(), the range's emission and so the map()'s computation would end up on the main thread anyway. This is why one needs subscribeOn()/observeOn() before map to ensure it runs on the correct thread.
This required a slight change to the protocol of the requestFusion() call by introducing a bit indicating if the caller (chain) acts as an asynchronous boundary, that is, the endpoint of the fused queue would be in another thread. Intermediate operators such as map() intercept this method all and simply respond with no-fusion.
Finally, there might be a subscription-time related barrier as well that prevents reordering/optimization due to subscription side-effects. We are not sure of this yet but here are a few hands-on cases that requires further study:
1) Is it valid to turn a range().subscribeOn(s1).observeOn(s2) chain, which I call strongly-pipelined sequence because of the forced thread-boundary switch by default, into a fused range().observeOn(s2)? The tail-emission pattern is the same, you get events on Scheduler s2, but now we've lost the strong pipelining effect.
2) Subscribing to a Subject may take some in case there are lots of Subscribers there thus subscribeOn() may be a valid use to offset the overhead, but generally, there are no other side-effects happening when one subscribes to a PublishSubject. Is it valid to drop/replace subscribeOn() here?
ConclusionOperator-fusion is a great opportunity, but also a great responsibility, to reduce overhead in reactive dataflows, and sometimes, get pretty close (+50% overhead with Project Reactor 2.5 M1 instead of +200% overhead with RxJava 2.x) to a regular Java Streams sequence's overhead while still supporting asynchronous (parts of) sequences with the same API (and similar internals).
However, adding fusion to every operator over zealously may not worth it and one should focus on operators doing the heavy lifting in user's code most of the time: flatMap(), observeOn(), zip(), just(), from() etc. In addition, one could say every operator pair is macro-fuseable because a custom operator can be written for it, but then you now have a combinatorial explosion of operators that now have to interact with the regular operators and with each other.
Of course, on the other side, there are operators that don't look like they could be (micro-) fused but may turn up fuseable after all. But instead of building a huge operator cross-fusion matrix, there might be a possibility to automatically discover which operators can be fused by modelling them and the sequences in some way and applying graph algorithms on the network - a topic for further research.
Anyway, the in the next part, I'll dive deeper into how operator-fusion in Rsc has been implemented, but before that, I'd like to describe the in-depth technicalities and differences of subscribeOn() and observeOn() operators in an intermediate post for two reasons:
1) I think showing how to implement them clears up the confusion around them because I learned about subscribeOn() and observeOn()the same in-depth technical way in the first place (and I was never confused).
2) Knowing their structure and exact behavior helps in understanding the fusion-related changes applied to them later on.
As for where you can play with this fusion thing (as an end-user), check out Project Reactor 2.5, who have extensively (unit-) tested the solutions I have described in the post. Of course, since this is an ongoing research, the Rsc project itself welcomes feedback or tips on what operator combinations we should optimize for.