2016. március 31., csütörtök

SubscribeOn and ObserveOn

Introduction


One of the most confused operator pair of the reactive ecosystem is the subscribeOn and observeOn operators. The source of confusion may be rooted in a few causes:


  • they sound alike,
  • they sometimes show similar behavior when looked at from downstream and
  • they are duals in some sense.

It appears the name-confusion isn't local to RxJava. Project Reactor faces a similar issue with their publishOn and dispatchOn operators. Apparently, it doesn't matter what they are called and people will confuse them anyhow.

When I started learning about Rx.NET back in 2010, I never experienced this confusion; subscribeOn affects subscribe() and observeOn affects onXXX()

(Remark: I've searched Channel 9 for the early videos but couldn't really find the talk where they build up these operators just like I'm about to do. The closest thing was this.)

My "thesis" is that the confusion may be resolved by walking through how one can implement these operators and thus showing the internal method-call flow.


SubscribeOn


The purpose of subscribeOn() is to make sure side-effects from calling subscribe() happens on some other thread. However, almost no standard RxJava source does side-effects on its own; you can have side-effects with custom Observables, wrapped subscription-actions via create() or as of lately, the with the SyncOnSubscribe and fromCallable() APIs.

Why would one move the side-effects? The main use cases are doing network calls or database access on the current thread or anything that involves blocking wait. Holding off a Tomcat worker thread hasn't been much of a programming problem (that doesn't mean we can't improve the stack with reactive) but holding off the Event Dispatch Thread in a Swing application or the Main thread in an Android application has adverse effect on the user experience.

(Sidenote: it's a funny thing that blocking the EDT is basically a convenience backpressure strategy in the GUI world to prevent the user from changing the application state while some activity was happening.)

Therefore, if the source does something immediately when a child subscribes, we'd want it to happen somewhere off the precious current thread. Naturally, we could just submit the whole sequence and the call to subscribe() to an ExecutorService, but then we'd be faced with the problem of cancellation being separate from the Subscriber. As more and more (complex) sequences requiring this asynchronous subscription behavior, the inconvenient it becomes to manage all those in this manner.

Luckily, we can include this behavior into an operator we call subscribeOn().

For simplicity, let's build this operator on a much simpler reactive base type: the original IObservable from Rx.NET:

@FunctionalInterface
interface IObservable<T> {
    IDisposable subscribe(IObserver<T> observer);
}

@FunctionalInterface
interface IDisposable {
    void dispose();
}

interface IObserver<T> {
    void onNext(T t);
    void onError(Throwable e);
    void onCompleted();
}

Don't worry about synchronous cancellation and backpressure for now.

Let's assume we have a source which just sleeps for 10 seconds:


IObservable<Object> sleeper = o -> {
    try {
        Thread.sleep(1000);
        o.onCompleted();
    } catch (InterruptedException ex) {
        o.onError(ex);
    }
};


which will obviously go to sleep if we call sleeper.subscribe(new IObserver ... ); Let's now create an operator that moves this sleep to some other thread:


ExecutorService exec = Executors.newSingleThreadedExecutor();

IObservable<Object> subscribeOn = o -> {
    Future<?> f = exec.submit(() -> sleeper.subscribe(o));
    return () -> f.cancel(true);
}


The subscribeOn instance will submit the action that subscribes to the actual IObservable to the executor and returns a disposable that will cancel the resulting Future from the submission.

Of course, one would instead have this in some static method or on a wrapper around the IObservable (as Java doesn't support extension methods):




public static <T> IObservable<T> subscribeOn(IObservable<T> source, 
    ExecutorService executor);

public Observable<T> subscribeOn(Scheduler scheduler);

Two of the common questions regarding subscribeOn are what happens when one applies it twice (directly or some regular operators in between) and why can't one change the original thread with a second subscribeOn. I hope the answer becomes apparent from the simplified structure above. Let's apply the operator a second time:

ExecutorService exec2 = Executors.newSingleThreadedExecutor();

IObservable<Object> subscribeOn2 = o -> {
    Future<?> f2 = exec2.submit(() -> subscribeOn.subscribe(o));
    return () -> f2.cancel(true);
};

Now let's expand subscribeOn.subscribe() in place:


IObservable<Object> subscribeOn2 = o -> {
    Future<?> f2 = exec2.submit(() -> {
       Future<?> f = exec.submit(() -> {
          sleeper.subscribe(o);
       });
    });
};

We can simply read this from top to bottom. When o arrives, a task is scheduled on exec2 which when executes, another task is scheduled on exec which when executes subscribes to sleeper with the original o. Becasue the subscribeOn2 was the last, it gets executed first and no matter where it runs the task, it gets rescheduled by subscribeOn anyway on its thread. Therefore, that subscribeOn() operator's thread will matter which is closest to the source and one can't use another subscribeOn() application to change this. This is why APIs built on top of Rx either should not pre-apply subscribeOn() when they return an Observable or give the option to specify a scheduler.

Unfortunately, the subscribeOn operator above doesn't handle unsubscription properly: the result of the sleeper.subscribe() is not wired up to that external IDisposable instance and thus won't dispose the "real" subscription. Of course, this can be resolved by having a composite IDisposable and adding all relevant resources to it. In RxJava 1, however, we don't need this kind of juggling and the operator can be written with less work:


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);
    worker.schedule(
        () -> source.unsafeSubscribe(Schedulers.wrap(subscriber))
    )
});

This makes sure the unsubscribe() call on the subscriber will affect the schedule() and whatever resources the upstream source would use. We can use unsafeSubscribe() to avoid the unnecessary wrapping into a SafeSubscriber but we have to wrap the subscriber anyway because both subscribe() and unsafeSubscribe() call onStart() on the incoming Subscriber, which has already been called by the outer Observable. This avoids repeating any effects inside the user's Subscriber.onStart() method.

The structure above composes backpressure as well, but we are not done.

Before RxJava got backpressure, the subscribeOn() implementation above made sure that an otherwise synchronous source would emit all of its events on the same thread:


Observable.create(s -> {
    for (int i = 0; i < 1000; i++) {
        if (s.isUnsubscribed()) return;
       
        s.onNext(i);
    }

    if (s.isUnsubscribed()) return;

    s.onCompleted();
});

Users started to implicitly rely on this property. Backpressure breaks this property because usually the thread that calls request() will end up running the fragment of the loop above (see range()), causing potential thread-hopping. Therefore, to keep the same property, calls to request() has to go to the very same Worker that did the original subscription.

The actual operator thus is more involved:


subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);
    
    worker.schedule(() -> {
       Subscriber<T> s = new Subscriber<T>(subscriber) {
           @Override
           public void onNext(T v) {
               subscriber.onNext(v);
           }

           @Override
           public void onError(Throwable e) {
               subscriber.onError(e);
           }

           @Override
           public void onCompleted() {
               subscriber.onCompleted();
           }

           @Override
           public void setProducer(Producer p) {
               subscriber.setProducer(n -> {
                   worker.schedule(() -> p.request(n));
               });
           }
       };

       source.unsafeSubscribe(s);
    });
}

Other than forwarding the onXXX() methods to the child subscriber, we set a custom producer on the child where the request() method schedules an action that calls the original producer with the same amount on the scheduler, ensuring that if there is an emission tied to the request, that happens on the same thread every time.

This can be optimized a bit by capturing the current thread in the outer schedule() action, comparing it to the caller thread in the custom Producer and then calling p.request(n) directly instead of scheduling it:


    Thread current = Thread.currentThread();

    // ...

    subscriber.setProducer(n -> {
        if (Thread.currentThread() == current) {
            p.request(n);
        } else {
            worker.schedule(() -> p.request(n));
        }
    });



ObserveOn


The purpose of observeOn is to make sure values coming from any thread are received or observed on the proper thread. RxJava is by default synchronous, which technically means that onXXX() methods are called in sequence on the same thread:


for (int i = 0; i < 1000; i++) {
    MapSubscriber.onNext(i) {
       FilterSubscriber.onNext(i) {
           TakeSubscriber.onNext(i) {
               MySubscriber.onNext(i);
           }
       }
    }
}

There are several use cases for moving this onNext() call (and any subsequent calls chained after) to another thread. For example, generating the input to a map() operation is cheap but the calculation itself is expensive and would hold off the GUI thread. Another example is when there is a background activity (database, network or the previous heavy computation), the results should be presented on the GUI and that requires the programmer to only interact with the GUI framework on the specific thread.

In concept, observeOn works by scheduling a task for each onXXX() calls from the source on a specific scheduler where the original parameter value is handed to the downstream's onXXX() methods:


ExecutorService exec = Executors.newSingleThreadedExecutor();

IObservable<T> observeOn = o -> {
    source.subscribe(new Observer<T>() {
        @Override
        public void onNext(T t) {
            exec.submit(() -> o.onNext(t));
        }
        
        @Override
        public void onError(Throwable e) {
            exec.submit(() -> o.onError(e));
        }

        @Override
        public void onCompleted() {
            exec.submit(() -> o.onCompleted());
        }            
    });
};

This pattern only works if the executor is single threaded or otherwise ensures FIFO behavior and doesn't execute multiple tasks from the same "client" at the same time.

Unsubscription here is more complicated, because one has to keep track all the pending tasks, remove them when they have finished and make sure every pending task can be mass-cancelled.

I believe the Rx.NET has some complicated machinery for this, but luckily, RxJava has a simple solution in the form of the Scheduler.Worker, taking care of all the required unsubscription behavior:


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);

    source.unsafeSubscribe(new Subscriber<T>(subscriber) {
        @Override
        public void onNext(T t) {
            worker.schedule(() -> subscriber.onNext(t));
        }
        
        @Override
        public void onError(Throwable e) {
            worker.schedule(() -> subscriber.onError(e));
        }

        @Override
        public void onCompleted() {
            worker.schedule(() -> subscriber.onCompleted());
        }            
    });
});


Now if we compare subscribeOn and observeOn, one can see that subscribeOn schedules the entire source.subscribe(...) part whereas observeOn schedules the individual subscriber.onXXX() calls onto another thread.

You can now see if observeOn is applied twice, that inner scheduled task expands to another lever of scheduling:

worker.schedule(() -> worker2.schedule(() -> subscriber.onNext(t)));

thus it overrides the emission thread in the chain, therefore, functionally, the closest observeOn to the consumer will win. From the expanded call above, you can see that worker is now wasted as a resource while providing no functional value to the sequence.

The observeOn with the given structure has a drawback. If the source is some trivial Observable such as range(0, 1M); that will emit all of its values and suddenly, we have a large amount of pending task in the underlying threadpool of the scheduler. This can overwhelm the downstream consumer and also consumes lot of memory.

Backpressure was introduced mostly to handle such cases, preventing internal buffer bloat and unbounded memory usage due to an asynchronous boundary. Consumers specifying the items they can consume via request() makes sure the producer side will only emit that many elements onNext(). Once the consumer is ready, it will issue another request(). The observeOn() above, with its new Subscriber<T>(subscriber) wrapping, already composes backpressure and relays the request() calls to the upstream source. However, this doesn't prevent the consumer from requesting everything via Long.MAX_VALUE and now we have the same bloat problem again.

Unfortunately, RxJava discovered the backpressure problem too late and the mandatory requesting would have required a lot of user code changes. Instead, backpressure was introduced as an optional behavior and made the resposibility of the operators such as observeOn to handle it while maintaining transparency with bounded Subscribers and unbounded Observers alike.

The way it can be handled is via a queue, request tracking for the child Subscriber, fixed request amount towards the source and a queue-drain loop.


Observable.create(subscriber -> {
    Worker worker = scheduler.createWorker();
    subscriber.add(worker);

    source.unsafeSubscribe(new Subscriber<T>(subscriber) {
        final Queue<T> queue = new SpscAtomicArrayQueue<T>(128);

        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();

        Producer p;

        volatile boolean done;
        Throwable error;

        @Override
        public void onNext(T t) {
            queue.offer(t);
            trySchedule();
        }
        
        @Override
        public void onError(Throwable e) {
            error = e;
            done = true;
            trySchedule();
        }

        @Override
        public void onCompleted() {
            done = true;
            trySchedule();
        }            
        
        @Override
        public void setProducer(Producer p) {
            this.p = p;
            subscriber.setProducer(n -> {
                BackpressureUtils.addAndGetRequested(requested, n);
                trySchedule();
            }); 
            p.request(128);
        }

        void trySchedule() {
            if (wip.getAndIncrement() == 0) {
                worker.schedule(this::drain);
            }
        }

        void drain() {
            int missed = 1;
            for (;;) {
                long r = requested.get();
                long e = 0L;

                while (e != r) {
                    boolean d = done;
                    T v = queue.poll();
                    boolean empty = v == null;
                    
                    if (checkTerminated(d, empty)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    subscriber.onNext(v);
                    e++;
                }

                if (e == r && checkTerminated(done, queue.isEmpty())) {
                    break;
                }

                if (e != 0) {
                    BackpressureHelper.produced(requested, e);
                    p.request(e);
                }

                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

        boolean checkTerminated(boolean d, boolean empty) {
            if (subscriber.isUnsubscribed()) {
                queue.clear();
                return true;
            }
            if (d) {
                Throwable e = error;
                if (e != null) {
                    subscriber.onError(e);
                    return true;
                } else
                if (empty) {
                    subscriber.onCompleted();
                    return true;
                }
            }
            return false;
       }
    });
});

By now, the pattern should be quite familiar. We queue up the item or save the exception, then increment the wip counter and schedule the draining of the queue. This is necessary as values may arrive the same time the downstream issues a request. Issuing a request has to schedule the drain as well because values may be available in the queue already. The drain loop emits what it can and asks for replenishment from the upstream Producer it got through the setProducer() call.

Naturally, one can extend this with additional safeguards, error-delay capability, parametric initial request amount and even stable replenishment amounts. This trySchedule setup has the property that it doesn't require a single threaded scheduler to begin with as it self-trampolines: due to the getAndIncrement, only a single thread will issue the drain task at a time and then only when the wip counter is decremented to zero will open the opportunity for scheduling another drain task by somebody.


Conclusion


In this post, I've tried to clear up the confusion around the subscribeOn and observeOn operators by showing a simplified, clutter free way of implementing them.

We saw then that the complication in RxJava comes from the need for handling backpressure somewhat transparently for consumers that do or don't directly drive a sequence through it.

Now that the inner workings and structures have been clarified, let's continue with the discussion about operator fusion where I can now use subscribeOn and observeOn as an example how macro- and micro-fusion can help around the asynchronous boundaries they provide.

Nincsenek megjegyzések:

Megjegyzés küldése