2015. december 11., péntek

The new Completable API (part 1)

Introduction


If you are following the day-to-day RxJava GitHub activity, you might have noticed a PR about a new and mysterious rx.Completable class. This PR has been merged into the 1.x branch (in @Experimental fashion) and will most likely be part of RxJava 1.1.1.

In this two part series, I'm first going to introduce the usage of this class and its relation to the existing Observable and Single classes, then I'll explain the internals and development practices of Completable operators.

Note that as the @Experimental tag indicates, method names and their availability may change at any time before (or after) 1.1.1 is released.

What is this Completable class?

We can think of a Completable object as a stripped version of Observable where only the terminal events, onError and onCompleted are ever emitted; they may look like an Observable.empty() typified in a concrete class but unlike empty(), Completable is an active class. Completable mandates side effects when subscribed to and it is its main purpose indeed. Completable contains some deferred computation with side effects and only notifies about the success or failure of such computation.

Similar to Single, the Completable behavior can be emulated with Observable<?> to some extend, but many API designers think codifying the valuelessness in a separate type is more expressive than messing with wildcards (and usually type-variance problems) in an Observable chain.

Completable doesn't stream a single value, therefore, it doesn't need backpressure, simplifying the internal structure from one perspective, however, optimizing these internals requires more lock-free atomics knowledge in some respect.


Hello World!

Let's see how one can build a (side-effecting) Hello World Completable:

Completable.fromAction(() -> System.out.println("Hello World!"))
.subscribe();

Quite straightforward. We have a set of fromXXX method which can take many sources: Action, Callable, Single and even Observable (stripping any values generated by the latter 3 of course). On the receiving end, we have the usual subscribe capabilities: empty, lambdas for the terminal events, a rx.Subscriber and a rx.Completable.CompletableSubscriber, the main intended receiver for Completables.

Reactive-Empty-Streams?

The definition of the CompletableSubscriber looks quite similar to a Reactive-Streams Subscriber and was chosen over the rx.Subscriber for performance reasons:


public interface CompletableSubscriber {

    void onCompleted();

    void onError(Throwable e);

    void onSubscribe(Subscription d);
}

It features the usual onCompleted() and onError() but instead of extending Subscription and having an add() method like rx.Subscriber, it receives the unsubscription enabling Subscription via the onSubscribe call as in the Reactive-Streams API. This setup has the following benefits:

  • Each CompletableSubscriber implementor can decide if it wants to expose the unsubscription capability to external users unlike rx.Subscriber where anybody can unsubscribe it.
  • In rx.Subscriber, a mandatory (and maybe shared) SubscriptionList container is created to support resource association with any Subscriber instance. However, many Observable operators don't use (or require) resources themselves and have the unnecessary allocation and instant size overhead.
The terminal event semantics is also the same as in Reactive-Streams. When onError or onCompleted is called, the formerly received Subscription should be considered already unsubscribed.

Thus, the protocol looks like as follows:

onSubscribe (onError | onCompleted)?

It contains a mandatory onSubscribe call with a non-null argument followed by, optionally, either an onError with a non-null Throwable or an onCompleted. As within Reactive-Streams, the methods can't throw any checked exceptions or unchecked exceptions other than NullPointerException. This doesn't mean methods shouldn't fail; it means methods should fail in the downstream direction. There are many cases, however, that one can't really put the received exception anywhere (i.e., post onComplete exceptions); the last resort is to sink it into the RxJavaPlugins.getInstance().getErrorHandler().handleError(e).

Create, Lift and Transform

The Completable class has three additional standard helper interfaces, now becoming common with all RxJava base classes:

The first defines a way to specify the deferred computation and send the terminal notifications out to a CompletableSubscriber:

public interface CompletableOnSubscribe
    extends Action1<CompletableSubscriber> { }

CompletableOnSubscribe complete = cs -> {
    cs.onSubscribe(Subscriptions.unsubscribed());
    cs.onCompleted();
}

It is practically a named alias of an Action1 parametrized by the CompletableSubscriber. Creating an instance via a lambda expression is also straightforward (but one has to remember to call onSubscribe before calling the other onXXX methods).

The second interface allows lifting into a Completable sequence by specifying a CompletableSubscriber level transformation.


public interface CompletableOperator 
    extends Func1<CompletableSubscriber, CompletableSubscriber> { }

CompletableOperator swap = child -> new CompletableSubscriber() {
    @Override
    public void onSubscribe(Subscription s) {
        child.onSubscribe(s);
    }
    @Override
    public void onCompleted() {
        child.onError(new RuntimeException());
    }
    @Override
    public void onError(Throwable e) {
        child.onCompleted();
    }
};

Again, the CompletableOperator is an alias for a Func1 instance that let's you wrap, replace and enrich the downstream's CompletableSubscriber. The example implementation shows how one can turn one terminal event into the other via an operator.

The final helper interface allows preparing entire chains of operators to be included in an existing chain:


public interface CompletableTransformer
    extends Func1<Completable, Completable> { }

CompletableTransformer schedule = c -> 
    c.subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());


With this alias, one can pre-compose common operations and present it to a stream through the usual compose() method. The example shows the canonical example of making sure the async computation starts on the IO scheduler and the completion is observed on the main thread.


Entering into the Completable world

As with Observables, Completable offers static factory methods that let's you start a stream from various sources:

  • create(CompletableOnSubscribe): let's you write your custom deferred computation that receives a CompletableSubscriber as its observer. The CompletableOnSubscribe is invoked for all CompletableSubscriber separately.
  • complete(): returns a constant instance of a Completable which calls onCompleted without doing anything else.
  • defer(Func0<Completable>): calls the function for each incoming CompletableSubscriber which should create the actual Completable instance said subscriber will subscribe to.
  • error(Throwable): it will emit the given constant Throwable to the incoming CompletableSubscribers.
  • error(Func0<Throwable>): for each incoming CompletableSubscriber, the Func0 is called individually and the returned Throwable is emitted through onError.
  • fromAction(Action0): let's you execute an action for each CompletableSubscriber which call onCompleted (or onError if the action throws an unchecked exception).
  • fromCallable(Callable): unfortunately, Java doesn't have a standard interface for an action which returns void and can throw a checked exception (not even in 8). The closest thing is the Callable interface. This let's you write an action that doesn't require you to wrap the computation into a try-catch but mandates the return of some arbitrary value (ignored). Returning null is acceptable here.
  • fromFuture(Future): let's you attach to a future and wait for its completion, literally. This blocks the subscriber's thread so you will have to use subscribeOn().
  • fromObservable(Observable): let's you skip all values of the source and just react to its terminal events. The Observable is observed in an unbounded backpressure mode and the unsubscription (naturally) composes through.
  • fromSingle(Single): let's you turn the onSuccess call into onCompleted call coming from the Single.
  • never(): does nothing other than setting an empty Subscription via onSubscribe.
  • timer(long, TimeUnit): completes after the specified time elapsed.

In addition, both the Observable and Single classes feature a toCompletable() method for convenience.

The naming of the fromXXX methods are deliberately specific: the Java 8 compiler likes to get into ambiguity problems due to the similarly appearing functional-interfaces.


Leaving the Completable world

One has to, eventually, leave the Completable world and observe the terminal event in some fashion. The Completable offers some familiar methods to make this happen: subscribe(...).

We can group the subscribe() overloads into two sets. The first set returns a Subscription that allows external cancellation and the second relies on the provided class to allow/manage unsubscriptions.

The first group consists of the lambda-form subscriptions:

  • subscribe(): runs the Completable and relay any onError call to the RxJavaPlugins.
  • subscribe(Action0): runs the Completable and calls the given Action0 on successful completion. The onError calls are still relayed to RxJavaPlugins.
  • subscribe(Action1, Action0): runs the Completable and calls Action1 if it ended with an onError or calls Action0 if it ended with a normal onCompleted.

Since the lambda callbacks don't have access to the underlying Subscription sent through onSubscribe, these methods return a Subscription themselves to allow external unsubscription to happen. Without it, there wouldn't be any way of cancelling such subscriptions.


The second group of subscribe methods take the multi-method Subscriber instances:

  • subscribe(CompletableSubscriber): runs the Completable and calls the appropriate onXXX methods on the supplied CompletableSubscriber instance.
  • subscribe(Subscriber<T>): runs the Completable and calls the appropriate onXXX methods on the supplied rx.Subscriber instance.


Sometimes, one wants to wait for the completion on the current thread. Observable has a set of methods accessible through toBlocking() for this purpose. Since there are not many ways one can await the result of a Completable, the blocking methods are part of the Completable class itself:

  • await(): await the termination of the Completable indefinitely and rethrow any exception it received (wrapped into a RuntimeException if necessary).
  • await(long, TimeUnit): same as await() but with bounded wait time which after a TimeoutException is thrown.
  • get(): await the termination of the Completable indefinitely, return null for successful completion or return the Throwable received via onError.
  • get(long, TimeUnit): same as get() but with bounded wait time which after a TimeoutException is thrown.


Completable operators

Finally, let's see what operators are available to work with an Completable. Unsurprisingly, many of them match their counterpart in Observable, however, a lot of them is missing because they don't make sense in a valueless stream. This include the familiar maptake, skip, flatMap, concatMap, switchMap, etc. operators.

The first set of operators is accessible as a static method and usually deal with a set of Completables. Many of them have overloads for varargs and Iterable sequences.

  • amb(Completable...): terminates as soon as any of the source Completables terminates, cancelling the rest.
  • concat(Completable...): runs the Completable one after another until all complete successfully or one fails.
  • merge(Completable...): runs the Completable instances "in parallel" and completes once all of them completed or any of them failed (cancelling the rest).
  • mergeDelayError(Completable...): runs all Completable instances "in parallel" and terminates once all of them terminate; if all went successful, it terminates with onCompleted, otherwise, the failure Throwables are collected and emitted in onError.
  • using(Func0, Func1, Action1): opens, uses and closes a resource for the duration of the Completable returned by Func1.

The second set of operators are the usual (valueless) transformations:

  • ambWith(Completable): completes once either this or the other Completable terminates, cancelling the still running Completable.
  • concatWith(Completable): runs the current and the other Completable in sequence.
  • delay(long, TimeUnit): delays the delivery of the terminal events by a given time amount.
  • endWith(...): continues the execution with another Completable, Single or Observable.
  • lift(CompletableOperator): lifts a custom operator into the sequence that allows manipulationg the incoming downstream's CompletableSubscriber's lifecycle and event delivery in some manner before continuing the subscribing upstream.
  • mergeWith(Completable): completes once both this and the other Completable complete normally
  • observeOn(Scheduler): moves the observation of the terminal events (or just onCompletded) to the specified Scheduler.
  • onErrorComplete(): If this Completable terminates with an onError, the exception is dropped and downstream receives just onCompleted.
  • onErrorComplete(Func1): The supplied predicate will receive the exception and should return true if the exception should be dropped and replaced by a onCompleted event.
  • onErrorResumeNext(Func1): If this Completable fails, the supplied function will receive the exception and it should return another Completable to resume with.
  • repeat(): repeatedly executes this Completable (or a number of times in another overload)
  • repeatWhen(Func1): repeatedly execute this Completable if the Observable returned by the function emits a value or terminate if this Observable emits a terminal event.
  • retry(): retries this Completable if it failed indefinitely (or after checking some condition in other overloads):
  • retryWhen(Func1): retries this Completable if it failed and the Observable returned by the function emits a value in response to the current exception or terminates if this Observable emits a terminal event.
  • startWith(...): begins the execution with the given Completable, Single or Observable and resumes with the current Completable.
  • timeout(long, TimeUnit, Completable): switches to another Completable if this completable doesn't terminate within the specified time window.
  • to(Func1): allows fluent conversion by calling a function with this Completable instance and returning the result.
  • toObservable(): converts this Completable into an empty Observable that terminates if this Completable terminates.
  • toSingle(Func0<T>): converts this Completable into a Single in a way that when the Completable completes normally, the value provided by the Func0 is emitted as onSuccess while an onError just passes through.
  • toSingleDefault(T): converts this Completable into a Single in a way that when the Completable completes normally, the value provided is emitted as onSuccess while an onError just passes through.
  • unsubscribeOn(Scheduler): when the downstream calls unsubscribe on the supplied Subscription via onSubscribe, the action will be executed on the specified scheduler (and will propagate upstream).

The final set of operators support executing callbacks at various lifecycle stages (which can be used for debugging or other similar side-effecting purposes):

  • doAfterTerminate(Action0): executes the action after the terminal event has been sent downstream CompletableSubscriber.
  • doOnComplete(Action0): executes an action just before the completion event is sent downstream.
  • doOnError(Action1): calls the action with the exception in a failed Completable just before the error is sent downstream.
  • doOnTerminate(Action0): executes the action just before any terminal event is sent downstream.
  • doOnSubscribe(Action1): calls the action with the Subscription instance received during the subscription phase.
  • doOnUnsubscribe(Action0): executes the action if the downstream unsubscribed the Subscription connecting the stages.
  • doOnLifecycle(...): combines the previous operators into a single operator and calls the appropriate action.


Currently, there are no equivalent Subject implementations nor publish/replay/cache methods available. Depending on the need for these, they can be added later on. Note however that since Completable deals only with terminal events, all Observable-based Subject implementation have just a single equivalent, Completable-based Subject implementation and there is only one way to implement the publish/replay/cache methods.

It is likely the existing Completable operators can be extended or other existing Observable operators matched. Until then, you can use the

toObservable().operator.toCompletabe()

conversion pattern to reach out to these unavailable operators. In addition, I didn't list all overloads so please consult with the source code of the class (or the Javadoc once it becomes available online).


Conclusion

In this post, I've introduced the new Completable base class and detailed the available methods and operators on it. Its usage pattern greatly resembles the use of Observable or Single with the difference that it doesn't deal with values at all but only with the terminal events and as such, many operators are meaningless for Completable.

In the next part, I'm going to talk about how one can create source and transformative operators for Completable by implementing the CompletableOnSubscribe and CompletableOperator interfaces respectively.

1 megjegyzés:

  1. Hey! Thanks for this! Very helpful.

    Now there is a CompletableSubject in RxJava 2.1. http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/CompletableSubject.html

    VálaszTörlés