2015. június 1., hétfő

Schedulers (part 2)


In the previous post, I've show how one can build a custom scheduler mainly based on existing RxJava classes.

In this post, I'm going deeper and show how one can control the interaction between the underlying ExecutorService and RxJava constructs at a level which is inaccessible through NewThreadWorker.

The ScheduledAction class

In the pre-Scheduler/Worker days, interacting with a Future was straightforward; we just wrapped its cancel() method with a Subscription and they were added to various subscription-containers.

However, with the Scheduler/Worker API, this doesn't work anymore due to the need to track and mass-cancel such tasks. Once the Future's get tracked, they need to be un-tracked if they complete or get cancelled, or else face memory leaks. This extra administration means that one can't just submit an Action0/Runnable directly to the ExecutorService but he/she needs to decorate it so either the cancellation or regular completion cleans up after the task.

The solution comes in the form of the ScheduledAction class. Every regular Action0 is wrapped into these by NewThreadWorker.scheduleActual() methods. The ScheduledAction class contains a SubscriptionList as the holder for 'actions' that need to be executed on a normal completion and on general unsubscription:

public final class ScheduledAction 
implements Runnable, Subscription {
    final Action0 action;                       // (1)
    final SubscriptionList slist;               // (2)

    public ScheduledAction(Action0 action) {
        this.action = action;
        this.slist = new SubscriptionList();
    public void run() {
        try {
            action.call();                      // (3)
        } finally {
            unsubscribe();                      // (4)
    public boolean isUnsubscribed() {
        return slist.isUnsubscribed();
    public void unsubscribe() {
    public void add(Subscription s) {           // (5)

The class is relatively straightforward:

  1. We hold onto the real action to be executed.
  2. We need a composite to store all the unsubscription actions. Since this list will be add-only, the SubscriptionList will suffice.
  3. Since the ExecutorService requires a Runnable instance, the class extends this interface and in the run() method, we delegate to the actual action.
  4. Whether or not the call succeeds, we call unsubscribe() on ourselves which should trigger the necessary cleanup actions.
  5. However, these cleanup actions need to be registered with this ScheduledAction thus we need to expose the add() method of the SubscriptionList.
The next step is to wire up all the tracking and cleanup actions before the action gets submitted to an ExecutorService. For simplicity, I'll assume said service is a single-threaded service. I'll deal with a multi-threaded service later. Let's start with the skeleton of a new custom Worker:

public final class CustomWorker 
extends Scheduler.Worker {
    final ExecutorService exec;                             // (1)
    final CompositeSubscription tracking;                   // (2)
    final boolean shutdown;                                 // (3)
    public CustomWorker() {
        exec = Executors.newSingleThreadExecutor();
        tracking = new CompositeSubscription();
        shutdown = true;
    public CustomWorker(ExecutorService exec) {
        this.exec = exec;
        tracking = new CompositeSubscription();
        shutdown = false;                                   // (4)
    public Subscription schedule(Action0 action) {
        return schedule(action, 0, null);                   // (5)
    public Subscription schedule(Action0 action,
            long delayTime, TimeUnit unit) {
        // implement
    public boolean isUnsubscribed() {
        return tracking.isUnsubscribed();                   // (6)
    public void unsubscribe() {
        if (shutdown) {
            exec.shutdownNow();                             // (7)

At this point, the skeleton isn't complicated either:

  1. We store the reference to the actual thread pool.
  2. We also need to track the tasks submitted in order to mass-cancel them.
  3. We plan to allow users to submit their own single-threaded executor services, in which case shutting it down should be the responsibility of the caller. 
  4. We'll only shut down our own services but not those received through the constructor.
  5. We delegate the non-delayed scheduling to the delayed-scheduling via a zero delayTime value.
  6. The tracking structure doubles as the way of telling if the worker has been unsubscribed.
  7. In case the ExecutorService was created by this class, we'll shut it down, then all tracked tasks are unsubscribed. (Note that if the service is our own, we don't really need to track the tasks because the service itself already tracks all submitted tasks the call to shutdownNow() will mass-cancel them.)
Finally, let's see the implementation of the delayed schedule() method:

    // ...
    public Subscription schedule(Action0 action, 
            long delayTime, TimeUnit unit) {
        if (isUnsubscribed()) {                                // (1)
            return Subscriptions.unsubscribed();
        ScheduledAction sa = new ScheduledAction(action);      // (2)
        tracking.add(sa);                                      // (3)
            () -> tracking.remove(sa)));
        Future<?> f;
        if (delayTime <= 0) {                                  // (4)
            f = exec.submit(sa);
        } else 
        if (exec instanceof ScheduledExecutorService) {        // (5)
            f = ((ScheduledExecutorService)exec)
                 .schedule(sa, delayTime, unit);
        } else {
            f = genericScheduler.schedule(() -> {              // (6)
                Future<?> g = exec.submit(sa);
                sa.add(Subscriptions.create(                   // (7)
                    () -> g.cancel(false)));
            }, delayTime, unit);
        sa.add(Subscriptions.create(                           // (8)
            () -> f.cancel(false)));

        return sa;                                             // (9)
    // ...

Our first truly complicated logic in this blog post works as follows:

  1. If the worker is unsubscribed, the method will return with a constant unsubscribed subscription. Note that schedule calls slipping past this will likely receive an unchecked RejectedExecutionException from the underlying thread pool. You can wrap the rest of the method with try-catch and return the same constant as necessary.
  2. We wrap the action into our ScheduledAction.
  3. Before the action gets even scheduled, we add it to the tracking structure and add callback that will remove the ScheduledAction from the tracking structure if it completes or gets unsubscribed directly. Note that due to idempotence, the remove() can't get into an infinite loop by calling unsubscribe() on the ScheduledAction again.
  4. In case there wasn't any delay time, we schedule the action directly and taking hold on the returned Future for it.
  5. In case the ExecutorService is also a ScheduledExecutorService, we can call schedule() on it with the delay parameters directly.
  6. Otherwise, we'll need some ScheduledExecutorService instance that will do the delaying for us, but we can't just schedule the action directly on it: it would run on the wrong thread. Instead, we need to schedule an intermediate task which when the time has come, will perform the actual scheduling, without delay, on the proper thread pool.
  7. We need to wire up the returned future to be able to cancel the tasks via a unsubscribe() call. Here we'll add the inner future to the ScheduledAction instance.
  8. Either scheduled directly or indirectly, we need a way to communicate the cancellation to the thread pool, therefore, we wrap the action of cancelling the future via cancel() into a Subscription and add it to the ScheduledAction. Here, you have the option to do an interrupting cancel or just a regular cancel. (RxJava does both depending on what thread issues the cancel call: if it is the same thread that runs the action, there is no need for interrupts.)
  9. The wrapper itself will be the cancellation token directly.
Because of the terminal-state-nature of the subscription-containers, even if (7) or (8) happens after the unsubscription of the ScheduledAction, they will get immediately cancelled. For extra eagerness, you can modify the ScheduledAction.run() to explicitly check isUnsubscribed() before calling the wrapped action.

The last tiny bit of code missing is an example for the genericScheduler. You can add a static final field to the worker and set it up like this:

    // ...
    static final ScheduledExecutorService genericScheduler;
    static {
        genericScheduler = Executors.newScheduledThreadPool(1, r -> {
            Thread t = new Thread(r, "GenericScheduler");
            return t;
    // ...


In this post, I've shown how one can implement a wrapper for the actions to be scheduled and how it can be wired up properly to work with unsubscription at an individual task or at a worker level.

In the final part of this series, I'll talk about how to handle the case when the ExecutorService has multiple threads since we can't let non-delayed tasks reordered or run in parallel with each other.

Nincsenek megjegyzések:

Megjegyzés küldése