2015. május 19., kedd

Operator concurrency primitives: subscription-containers (part 1)


Writing complex operators is likely to invoke the use of subscription management and thus the various standard subscription-containers. 

It is worth reiterating that subscription-containers have a terminal state, reached by calling unsubscribe() on them, that will unsubscribe any attempt to add or replace the contained subscription with a new one. This may seem odd, but the following scenario will explain the reason behind this design decision.

Let's assume your operator schedules some work with a delay on a Scheduler.Worker instance. Concurrently, the chain is unsubscribed but the schedule() call hasn't returned yet with a Subscription that lets you cancel the schedule. If there wasn't the terminal state property, adding the returned Subscription to such container would never know to unsubscribe and now you have a memory leak. With the terminal state property, the attempt to set said Subscription on the container that is unsubscribed will immediately unsubscribe it and the resources are freed. This is most notoriously appearing with respect of Android where once your application has been told to pause, you don't want to have a background task hanging around deep in the operator chain.

Further things one should already know: 
  • all operations on a container should be thread-safe and atomic from the perspective of a concurrent unsubscription,
  • the unsubscribe() call itself must be idempotent (multiple calls should be no-ops) and
  • one should avoid calling unsubscribe() while holding a lock (especially if you have control over the lock-acquisition) to avoid any chance of a deadlock.

I'll briefly touch on these classes, their use in operators and their notable properties but the blog post will be about the concepts behind the containers so one can build custom containers that can support the complex lifecycle found in Subjects and in conversion operators such as publish().

Standard subscription containers


The most frequently used container type is the CompositeSubscription. Any operator that has to deal with resources (Subscribers, wrapped Futures) that come and go is likely to use this class. The current implementation uses a synchronized block to achieve thread-safety and puts the subscription instances into a HashSet

(Sidenote: The reason for this structure instead of a copy-on-write non-blocking approach was that copy-on-write generates a lot of garbage and some particular usage pattern of RxJava at Netflix made them worry about GC.

One notable property of this composite is that remove() will unsubscribe the subscription it successfully removes from the container (which is sometimes unnecessary, for example, when one removes a wrapped Future from the tracking composite at the end of the task itself).

CompositeSubscription is well suited for operators where the resources come and go non-deterministically and its internal HashSet can shine with the O(1) add and remove complexity (example: merge()).


The SerialSubscription contains only a single Subscription at a time and replacing it with another Subscription will unsubscribe the previous Subscription. Internally, it uses the copy-on-write approach with an extra state object.

This type of container is employed in cases when one needs to track one resource at a time, such as the current Subscriber to a source in a concatMap() scenario.


The MultipleAssignmentSubscription is a variant of the SerialSubscription (unrelated class hierarchy though) where the replacement of the current contained Subscription doesn't unsubscribe that instance.

Its use is less frequent and needs some forethought because of the said behavior. One can use when the reference to the subscription can be safely 'lost' because an unsubscribe() call would only be a wasteful no-op. This container is used in the algorithms that perform periodic scheduling through the Scheduler.Worker instance by splitting it into independent delayed schedules. The result of the schedulePeriodically() will be a MultipleAssignmentSubscription that holds onto the current individual delayed schedule of the underlying task. Since the replacement happens at the very end of the repeated task, it is unnecessary and even unwanted to let itself unsubscribe().


The SubscriptionList is an internal class, similar to CompositeSubscription that uses a LinkedList instead of a HashSet, to speed up on scenarios where the subscriptions are just added to the container and never removed. Generally, one shouldn't rely on an internal class but if you'd want to submit a PR to RxJava, I'd expect you to use this class if appropriate (and is still available at that time). 

Lately, it has been retrofitted with a remove() method to support an optimization on the default computation() scheduler to speed up non-delayed task addition and removal because usually, the schedule of these task happens in the same order that they get executed and removed from the internal tracking structure. We end up removing the very first item from the LinkedList which has less overhead than a remove from a HashMap (according to our benchmarks).

This container appears in various places, most notable in the Subscriber itself, but the internal ScheduledAction uses it too to 'track the trackers'.


I'd say RefCountSubscription is a remnant from the Rx.NET set of resource containers. It holds onto a single and unchangeable Subscription and 'hands out' derived subscriptions that when all of them get unsubscribed, the main subscription gets unsubscribed. We don't use it in RxJava for a time now the original Rx.NET RefCountDisposable works much better with their standard .NET resource management IDisposables.

(Sidenote: BooleanSubscription is not a container because it doesn't hold onto another Subscription but it is aimed at wrapping arbitrary actions to be executed on unsubscription.)

Implementing blocking containers

Let's assume you need a very specific container that can hold onto exactly two Subscribers which can be freely replaced (unsubscribing the earlier value in the process). Let's call the container TwoSubscribers and have the following class skeleton:

public final class TwoSubscribers 
implements Subscription {
    private volatile boolean isUnsubscribed;          // (1)
    Subscription s1;                                  // (2)
    Subscription s2;
    public boolean isUnsubscribed() {
        return isUnsubscribed;                        // (3)
    public void set(boolean first, Subscription s) {
        // implement
    public void unsubscribe() {
        // implement

The class isn't complicated at this stage. We keep the unsubscription indicator as a volatile boolean value to avoid using a synchronized block unnecessarily (1)(3). The class needs to hold onto two subscriptions, which due to an 'external' synchronization, we have as plain instance variables (2).

The set() method takes a boolean argument to determine which of the two contained Subscription has to be replaced. The implementation is as follows:

    // ...
    public void set(boolean first, Subscription s) {
        if (!isUnsubscribed) {                       // (1)
            synchronized (this) {
                if (!isUnsubscribed) {               // (2)
                    Subscription temp;               // (3)
                    if (first) {
                        temp = s1;
                        s1 = s;
                    } else {
                        temp = s2;
                        s2 = s;
                    s = temp;                        // (4)
        if (s != null) {                             // (5)
    // ...
After all those complicated Producers, it looks quite simple:

  1. We eagerly check the unsubscription status of the container so if it is already unsubscribed, we can skip the synchronized block and just unsubscrbe the parameter directly.
  2. Otherwise, we double-check the unsubscription status and if the container is still not unsubscribed, we commence with the replacement.
  3. Because the subscription replaced should be unsubscribed we get the current subscription and replace it with the one from the parameter.
  4. We will reuse the parameter and let it store the previous subscription on the way out.
  5. We reach this point either if (1) or (2) fell through with the subscription parameter subscription intact or with s holding onto the previous subscription from one of the slots. Since the latter can be null, we call unsubscribe() after a null check.
What remains is the unsubscribe() method of the container itself:

    // ...
    public void unsubscribe() {
        if (!isUnsubscribed) {                  // (1)
            Subscription one;                   // (2)
            Subscription two;
            synchronized (this) {
                if (isUnsubscribed) {           // (3)
                isUnsubscribed = true;          // (4)
                one = s1;                       // (5)
                two = s2;
                s1 = null;
                s2 = null;
            List<Throwable> errors = null;      // (6)
            try {
                if (one != null) {
                    one.unsubscribe();          // (7)
            } catch (Throwable t) {
                errors = new ArrayList<>(); // (8)
            try {
                if (two != null) {
            } catch (Throwable t) {
                if (errors == null) {
                    errors = new ArrayList<>();

            Exceptions.throwIfAny(errors);      // (9)

Unfortunately, the method doesn't look too elegant, but because of a good reason:

  1. We check if the container is already unsubscribed, in which case there is nothing else to do.
  2. If we find the container active, we will need to get the current subscriptions out because we can't unsubscribe them while holding a lock.
  3. In the synchronized block, we double-check the unsubscription status and quit if the container is already unsubscribed.
  4. We set the isUnsubscribed field as early as possible so concurrent set() calls can see it as soon as possible and not try to enter the synchronized block.
  5. We read the current values and replace them with null to avoid retaining anything.
  6. It is a good practice to be defensive about exceptions thrown from unsubscriptions and since we have 2, we need to make sure they both get unsubscribed but the exception(s) are still propagated.
  7. If the specific subscription is not null, we call unsubscribe() on it.
  8. In case of an exception, we dynamically allocate the errors list and add the Throwable to it.
  9. After doing the same logic with the second subscription, we invoke a helper method that will throw the accumulated exceptions if any or just do nothing. If both subscriptions threw, the throwIfAny will throw a single CompositeException capturing both.


In this relatively short post, I've recapped the existing subscription-container classes, including the underlying requirements of containers, the properties and use of each standard RxJava subscription-container. I've demonstrated, through a simple container how one can create a conformant container by using blocking operations.

In the next part, I'm going to create a non-blocking variant of the example TwoSubscriptions container and show how one can extend it to hold onto any number of subscriptions and still remain non-blocking.

2 megjegyzés:

  1. In the `unsubscribe()` implementation, no need to check whether `errors` is null in the first `catch` clause, because it will always be null.