2015. május 12., kedd

Pitfalls of operator implementations (part 1)

Introduction

In the early days, implementing an Operator was a relatively simple task: just provide a 'manipulation' subscriber to upstream and perform the designated logic in the onXXX methods. 

However, it was soon realized that this approach doesn't really work if the upstream is synchronous and infinite and one uses take(n) on it. The solution was to inject an unsubscription token (i.e., an Observer and Subscription) into the flow and upstream 'producers' should check on its isUnsubscribed() method and stop their activity. It was kind of strange as the original Iterator-dualization Erik Meijer described in the Channel 9 videos seem to have failed to anticipate this problem (as he put it "that's where the heavy handwaving starts").

The second 'complexity shock' happened when the need for backpressure came in and the Producer interface was introduced. If one thought unsubscription was complicated, backpressure is far more difficult. It is so difficult even I (who wrote about 27% of RxJava code lines according to git blame) can't get it right 100% of the time.

Since dealing with these two features is part of almost all operators, I'll give a break from explaining producers and show some of the most common pitfalls operator writers fall into when they post a pull request or ask about problems on StackOverflow.

#1: Breaking the chain of unsubscription and backpressure

Some operators, such as map(), alter the stream of values on a 1:1 basis but they themselves don't directly affect unsubscription or backpressure.

For example, if one wants to implement an 'optimized' operator which turns integer values into boolean values based on their oddity, he/she may come up with the following operator:

Operator<Boolean, Integer> isOdd = child -> {
    return new Subscriber<Integer>() {                    // (1)
        @Override
        public void onNext(Integer value) {
            child.onNext((value & 1) != 0);
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            child.onCompleted();
        }
    };
};

Observable.range(1, 2_000_000_000)
    .lift(isOdd)
    .take(2)
    .subscribe(System.out::println);

If we run this example, we will find that after printing true and false, the program keeps running for bunch of seconds (or more on a slower machine). The unsubscription request of take() didn't reach the range() operator. In this case, only took some extra few seconds to complete but there could be other, expensive operators before lift that may hog the CPU and resources much longer than needed.

The problem is in the operator at (1): the child is disconnected from our subscriber and it has no way of signaling the upstream production should stop. To fix the broken chain, we can use the Subscribe(Subscriber<?> op) which will establish the necessarily linking between our subscriber and the child subscriber:

Operator<Boolean, Integer> isOdd = child -> {

    return new Subscriber<Integer>(child) {

    // the rest is the same

#2: Unsubscribing the downstream

Some operators, such as take(), may need to terminate before the upstream has the chance of emitting onCompleted().

For example, one may try to implement a takeNone() operator which simply completes when the very first onNext event arrives and dutifully unsubscribes itself to stop the upstream. Of course, we learned to chain the Subscribers to keep downstream unsubscription and backpressure working:

Operator<Integer, Integer> takeNone = child -> {
    return new Subscriber<Integer>(child) {          // (1)
        @Override
        public void onNext(Integer t) {
            child.onCompleted();
            unsubscribe();                           // (2)
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onCompleted() {
            child.onCompleted();
        }
    };
};

TestSubscriber<Integer> ts = new TestSubscriber<>();

Subscription importantResource = Subscriptions.empty();
ts.add(importantResource);

Observable.range(1, 100).lift(takeNone).unsafeSubscribe(ts);

if (importantResource.isUnsubscribed()) {
    System.err.println("Somebody unsubscribed our resource!");
}

In the example, even though we used unsafeSubscribe() to prevent auto-unsubscription of ts, somehow our associated resource got unsubscribed. The problem is now the 'over-chaining'. In this case, the fact that Subscriber(Subscriber<?> op) at (1) shares a single underlying composite to hold associated resources fires back and even though it seems we unsubscribe our Subscriber only at (2), we unsubscribe the child as well. Some may think this is a contrived example, however, this kind of bug affected many of RxJava's own operators especially if certain operators could start async work on their onCompleted() path, such as toList().observeOn().

To fix this kind of an issue, we can simply use another constructor of Subscriber: Subscriber(Subscriber<?> op, boolean shareSubscriptions).


Operator<Integer, Integer> takeNone = child -> {
    Subscriber<Integer> parent = 
            new Subscriber<Integer>(child, false) {   // (1)
        @Override
        public void onNext(Integer t) {
            child.onCompleted();
            unsubscribe();
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onCompleted() {
            child.onCompleted();
        }
    };
    child.add(parent);                             // (2)
    return parent;
};

In this resolution, first we break the automatic chaining of unsubscriptions (1) but keep the backpressure-chain intact, then add our subscriber as resource to the child subscriber (2). This will reestablish the unsubscription chain; if the child unsubscribes, it will unsubscribe our parent subscriber and in turn the upstream, whereas we call unsubscribe() in onNext(), it will only unsubscribe us and the upstream but leave the child 'subscribed'.

Sidenote: I argued about this pattern to be the default for all operators - until proven unnecessary - back then, but my arguments and examples didn't come through and RxJava ended up with operators like the initial takeNone. You can guess how many issue reports about missed events and unexpected cancellations was traced back to this type of a bug.

#3: Forgetting to request more

Many operators participating in backpressure expect the requested amount of values to be delivered to them before they request more. If one writes an operator which, for example, has to filter out the odd values, he/she may implement this in the following manner:

Operator<Integer, Integer> evenFilter = child -> {
    return new Subscriber<Integer>(child) {
        @Override
        public void onNext(Integer t) {
            if ((t & 1) == 0) {
                child.onNext(t);
            }
                                                        // (1)
        }

        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }

        @Override
        public void onCompleted() {
            child.onCompleted();
        }
    };
};

Observable.range(1, 2).lift(evenFilter).subscribe(
        System.out::println,
        Throwable::printStackTrace,
        () -> System.out.println("Done"));
        
Observable.range(1, 2).lift(evenFilter).take(1).subscribe(
        System.out::println,
        Throwable::printStackTrace,
        () -> System.out.println("Done"));


The first observation prints 2 and Done as expected, but the second doesn't print anything even though it should be functionally equivalent as there is only one even number in the range. Again the bug is marked at location (1): when our operator drops a value, it doesn't ask for a replacement. The upstream can't know if we ignored one of its values and since the downstream's request for 1 was passed up directly, the upstream won't do anything else unless requested. Therefore, the fix is to request one more value if our operator drops one value:

        // ... same as before
        @Override
        public void onNext(Integer t) {
            if ((t & 1) == 0) {
                child.onNext(t);
            } else {
                request(1);
            }
        }
        // ... same as before

With the fix, both observations print 2 and Done initially as expected.

#4: Completing again

Some operators work on two streams where a secondary stream affects the state of the primary stream, such as takeUntil(). For example, one would want to write an operator that relays events until some other stream emits a zero or just completes:


Observable<Integer> other = Observable.<Integer>create(o -> {
    try {
        Thread.sleep(100);
    } catch (Throwable e) {
        o.onError(e);
        return;
    }
    o.onNext(0);
    o.onCompleted();
}).subscribeOn(Schedulers.io());

Operator<Integer> takeUntilZero = child -> {
    Subscriber<Integer> main = 
            new Subscriber<Integer>(child, false) {
        @Override
        public void onNext(Integer t) {
            child.onNext(t);
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            child.onCompleted();
        }
    };
    Subscriber<Integer> secondary = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer t) {
            if (t == 0) {
                onCompleted();
            }
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {                  // (1)
            child.onCompleted();
            main.unsubscribe();
            unsubscribe();
        }
    };
    child.add(main);
    child.add(secondary);
    
    other.unsafeSubscribe(secondary);
    
    return main;
};

Observable<Integer> source = 
        Observable.timer(30, 30, TimeUnit.MILLISECONDS)
        .map(v -> v.intValue());

source.lift(takeUntilZero).unsafeSubscribe(
        Subscribers.create(
            System.out::println,
            Throwable::printStackTrace,
            () -> System.out.println("Done")
        )
);

Thread.sleep(1000);

If the example is run, you'll find that it prints Done twice! The problem is at (1) again and how we implemented the requirement of "other stream emits a zero or just completes". Since streams may legally ignore unsubscription just before they would send an onCompleted(), our other stream will emit a single zero directly followed by an onCompleted() call, which will then trigger two onCompleted() calls in secondary and eventually in child. By default, RxJava has some safeguards against this built in that protects end-users, but since we want to write efficient operators, we opt to forego such safeguards, hence the use of unsafeSubscribe() in the example. The fix is to introduce a boolean flag, let's say done, in the secondary subscriber which is set to true on the first call to onComplete() and won't let any further events through:


    // ... same as before
    Subscriber<Integer> secondary = new Subscriber<Integer>() {
        boolean done;
        @Override
        public void onNext(Integer t) {
            if (t == 0) {
                onCompleted();
            }
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            if (!done) {
                done = true;
                child.onCompleted();
                main.unsubscribe();
                unsubscribe();
            }
        }
    };
    // ... same as before

#5: Forgetting to serialize

In the completing again example above there another bug hidden which is hard to demonstrate by some simple runs: accessing the child's onXXX methods can happen from any thread at any time which breaks the RxJava contract: one must serialize access to the Observer's methods.

To prevent it, we need to add some serialization but applying the techniques from the serialized access post is an overkill; RxJava has the convenient SerializedSubscriber for this purpose:


        // ... same as before
        Operator<Integer, Integer> takeUntilZero = c -> {
            Subscriber<Integer> child = new SerializedSubscriber<>(c);
        // ... same as before


By simply renaming the lambda parameter to c and wrapping it with SerializedSubscriber, the serialized access requirement is now satisfied.

Conclusion

In this post, I've demonstrated the most common and most basic types of pitfalls one can implement in an Operator and showed how to detect and fix them. Indeed, the small runnable code sequence below each operator is the kind of operator unit test we expect one to write, especially when proposing a new operator in RxJava.

There are other kinds of pitfalls possible, especially with backpressure, but they are more likely caused by the lack of deep understanding how backpressure and Producers should work in a more complex operator.

After this small detour, I'll continue explaining some typical but advanced producers: the single-producer and single-delayed-producer.

2 megjegyzés:

  1. another awesome post David, thanks, great blog!

    VálaszTörlés
  2. I should have read this before I started creating custom Operators. I didn't realize simply manipulating the onXXX calls between parent and child subscribers is pretty naive, even for seemingly simple Operators.

    VálaszTörlés