2015. május 15., péntek

Operator concurrency primitives: producers (part 5)

Introduction 

Dealing with multiple sources and backpressure at the same time is difficult when one writes an operator. Even a simple task such as continuing with another observable sequence once the first has completed puts a challenge on how request accounting and propagation should be performed for correct behavior.

Generally in RxJava and especially with reactive-streams, one must set the Producer on a Subscriber at most/exactly once. Don't let the thread-safety in Subscriber.setProducer() fool you into thinking that you can change the producer because the Subscriber doesn't do (nor should it IMO) proper request accounting but just holds onto your initial request amount(s) until a Producer is set on it. If you have outstanding requests and you change set the new Producer, that producer's request will be called with the value of the latest 'private' request() call instead of with a request amount equal to the undelivered amount remaining from the previous producer.

In this blog post, I'll introduce a kind of producer which can help overcome this particular challenging situation.

The producer-arbiter

Let's assume we need to write an operator with following behavior (and we don't like concatWith()): Given two source Observables, we need an operator that observes the first one and once it completes normally, it starts observing the other Observable till completion and itself complete normally.


public final class ThenObserve<T> implements Operator<T, T> {
    final Observable<? extends T> other;
    public ThenObserve(Observable<? extends T> other) {
        this.other = other;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        Subscriber<T> parent = new Subscriber<T>(child, false) {
            @Override
            public void onNext(T t) {
                child.onNext(t);
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                other.unsafeSubscribe(child);
            }
        };
        child.add(parent);
        return parent;
    }
}

Observable<Integer> source = Observable
    .range(1, 10)
    .lift(new ThenObserve<>(Observable.range(11, 90)));

source.subscribe(System.out::println);

System.out.println("---");

TestSubscriber<Integer> ts = new TestSubscriber<>();
ts.requestMore(20);

source.subscribe(ts);

ts.getOnNextEvents().forEach(System.out::println);

If you run the example, the first observation of the source prints, as expected, values from 1 to 100. However, the second case, where we request 20 elements only, it prints values from 1 to 30! Clearly, the other range observable didn't just produce the remaining 10 elements but thought it can produce up to 20 elements.

Therefore, we need to track the request amounts from downstream and the production amount from upstream and once there has to be a source change, make the new source continue with the remaining value. To accomplish this, we need a Producer to manage the request and source changes in a thread-safe manner.

Let's call it ProducerArbiter and have the following base class structure:


public final class ProducerArbiter 
implements Producer {
    long requested;                                   // (1)   
    Producer currentProducer;

    boolean emitting;                                 // (2)
    long missedRequested;
    long missedProduced;
    Producer missedProducer;
    
    static final Producer NULL_PRODUCER = n -> { };   // (3)
    
    @Override
    public void request(long n) {
        // to implement
    }
    
    public void produced(long n) {
        // to implement
    }
    
    public void set(Producer newProducer) {
        // to implement
    }

    public void emitLoop() {
        // to implement
    }
}


The ProducerArbiter looks quite normal, so far, with the following properties:

  1. We keep track of the current outstanding request amount and the current producer. This producer can be null during which we'll still keep aggregating the requested amounts and request them together once a producer arrives.
  2. We will use the emitter-loop serialization construct because we should allow concurrent request() calls and changing the producer concurrently. Note that this doesn't serialize in respect of the onNext events travelling from upstream to downstream and a concurrent producer change may trigger a concurrent emission of onNext events from two sources. I'll handle this case in the next part of the series. Luckily, this won't be a problem with our ThenObserve operator's use case.
  3. We'd like to clear the current producer in some situations to avoid retention, but we'll use null in missedProducer to indicate there was no attempt to set a producer at that time.
With the basic structure set up, let's start implementing the methods one by one:


    // ... same as before
    @Override
    public void request(long n) {
        if (n < 0) {                                   // (1)
            throw new IllegalArgumentException();
        }
        if (n == 0) {
            return;
        }
        synchronized (this) {
            if (emitting) {
                missedRequested += n;                  // (2)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r + n;
            if (u < 0) {
                u = Long.MAX_VALUE;
            }
            requested = u;                             // (3)
            
            Producer p = currentProducer;
            if (p != null) {
                p.request(n);                          // (4)
            }
            
            emitLoop();                                // (5)
            skipFinal = true;
        } finally {
            if (!skipFinal) {                          // (6)
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

This won't be the longest method I'm going to explain:

  1. We perform the usual request amount checks.
  2. We track the missed requests separately from the missed production because if combined, we couldn't distinguish between an allowed request 'overflow' (capped at Long.MAX_VALUE) and overproduction.
  3. We are performing the capped update to the requested amount, as usual.
  4. If there is a producer attached, request the amount n (and not the requested amount!) because requests are cumulative on every producers.
  5. There could have been concurrent method calls so we enter the loop-phase of the emitter-loop approach.
  6. If emitLoop() returns normally, we skip the finally, otherwise, we 'unlock' the producer to allow using the producer-arbiter again with perhaps a different producer (see Observable.retry()).

Production accounting looks as follows:

    // ... continued
    public void produced(long n) {
        if (n <= 0) {                                    // (1)
            throw new IllegalArgumentException();
        }
        synchronized (this) {
            if (emitting) {
                missedProduced += n;                     // (2)
                return;
            }
            emitting = true;
        }
        
        boolean skipFinal = false;
        try {
            long r = requested;
            long u = r - n;
            if (u < 0) {
                throw new IllegalStateException();       // (3)
            }
            requested = u;
        
            emitLoop();                                  // (4)
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

Quite a similar structure:

  1. We treat production of 0 or negative as bugs.
  2. We track the missed production amount separately (see the explanation in request() above).
  3. We subtract the produced amount from the current requested amount and check for an underflow. Such underflow is either a bug or the lack of backpressure-support from the upstream.
  4. We now try to process any missed actions.


    // ... continued
    public void set(Producer newProducer) {
        synchronized (this) {
            if (emitting) {
                missedProducer = newProducer == null ? 
                    NULL_PRODUCER : newProducer;          // (1)
                return;
            }
            emitting = true;
        }
        boolean skipFinal = false;
        try {
            currentProducer = newProducer;
            if (newProducer != null) {
                newProducer.request(requested);           // (2)
            }
            
            emitLoop();                                   // (3)
            skipFinal = true;
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }
    }

The structure and behavior should be quite familiar now:

  1. We indicate the current producer should be updated to a new producer or cleared. We overwrite any previous missed producer because it means they didn't really get the chance of emitting anything anyway.
  2. If we got a new, replacement producer, we request the entire requested amount. This is what accomplishes the the requirement that new producers should receive the remaining requested amounts not produced by the previous ProducerNote that an asynchronous producer change run concurrently with an onNext event has the potential to over-request: the new producer receives n but the previous producer may just produce 1 before 'stopped' and now we have an unwanted plus 1 coming down and the potential for MissingBackpressureException. As I mentioned before, the situation can't happen in the current scenario and I'll have a separate and full post about how to resolve it.
  3. The rest is the usual: enter the loop to handle any missed actions and quit by 'unlocking' in normal or exception cases.
The final method remaining is the popular emitLoop() itself:

    // ... continued
    public void emitLoop() {
        for (;;) {
            long localRequested;
            long localProduced;
            Producer localProducer;
            synchronized (this) {
                localRequested = missedRequested;
                localProduced = missedProduced;
                localProducer = missedProducer;
                if (localRequested == 0L 
                        && localProduced == 0L
                        && localProducer == null) {       // (1)
                    emitting = false;
                    return;
                }
                missedRequested = 0L;
                missedProduced = 0L;
                missedProducer = null;                    // (2)
            }
            
            long r = requested;
            
            if (r != Long.MAX_VALUE) {                    // (3)
                long u = r + localRequested;
                if (u < 0 || u == Long.MAX_VALUE) {       // (4)
                    r = Long.MAX_VALUE;
                    requested = r;
                } else {
                    long v = u - localProduced;           // (5)
                    if (v < 0) {
                        throw new IllegalStateException();
                    }
                    r = v;
                    requested = v;
                }
            }
            if (localProducer != null) {                  // (6)
                if (localProducer == NULL_PRODUCER) {
                    currentProducer = null;
                } else {
                    currentProducer = localProducer;
                    localProducer.request(r);             // (7)
                }
            } else {
                Producer p = currentProducer;
                if (p != null && localRequested != 0L) {
                    p.request(localRequested);            // (8)
                }
            }
        }
    }
}

Lots of state and possibilities to cover with the emitLoop():

  1. The condition to leave the emitter-loop is to not have any missed requests, productions and producer changes at all.
  2. Once we discover we missed something, we clear each indicator fields.
  3. If the current requested amount is Long.MAX_VALUE, there is no point in doing too much request accounting because the downstream is in 'infinite' or 'unlimited' mode
  4. But even if not, the missed request amount could just put us into this 'infinite' mode where no further request accounting is necessary.
  5. Otherwise, we need to subtract the missed production amount, update the requested amount locally and in the instance field.
  6. If there was a producer change, we update the currentProducer (or clear it).
  7. If there new producer isn't the null-producer, we update the currentProducer and request the entire amount of the currently known outstanding requests.
  8. Otherwise, if there weren't any new Producers and there were some missed requests, we only request that many from the current producer (if there is one).
With the fully functional ProducerArbiter in hand, we can modify the original ThenObserve operator to do the right thing:

public final class ThenObserve<T> implements Operator<T, T> {
    final Observable<? extends T> other;
    public ThenObserve(Observable<? extends T> other) {
        this.other = other;
    }

    @Override
    public Subscriber<? super T> call(
            Subscriber<? super T> child) {
        ProducerArbiter pa = new ProducerArbiter();         // (1)
        
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                child.onNext(t);
                pa.produced(1);                             // (2)
            }
            @Override
            public void onError(Throwable e) {
                pa.set(null);                               // (3)
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                pa.set(null);                               // (4)
                
                Subscriber<T> parent2 = create2(pa, child); // (5)
                child.add(parent2);
                
                other.unsafeSubscribe(parent2);             // (6)
            }
            @Override
            public void setProducer(Producer producer) {
                pa.set(producer);                           // (7)
            }
        };
        child.add(parent);
        child.setProducer(pa);
        return parent;
    }
    
    Subscriber<T> create2(ProducerArbiter pa, 
            Subscriber<? super T> child) {                  // (8)
        return new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                child.onNext(t);
                pa.produced(1);
            }
            @Override
            public void onError(Throwable e) {
                pa.set(null);
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                pa.set(null);
                child.onCompleted();
            }
            @Override
            public void setProducer(Producer producer) {
                pa.set(producer);
            }
        };
    }
}

This extension doesn't look too elegant, what's going on?

  1. We create an instance of our new ProducerArbiter.
  2. We acknowledge the production of a single onNext value.
  3. We release the current producer on receipt of an error to avoid unnecessary object retention.
  4. Again, since the upstream (the first Observable) completed, we get rid of its producer.
  5. Subscribers shouldn't and often can't be reused so we need to create another subscriber for the second source. Besides, using this would end up in infinite resubscription due to the recursion (and stopped by a StackOverflowError).
  6. With the new subscriber, added to the child for unsubscription propagation, we subscribe to the other Observable and complete the first subscriber.
  7. We capture the potential producer from upstream and use it with the arbiter.
  8. For convenience, I've moved the creation of the second Subscriber into a helper method. Its structure looks quite the same to the first Subscriber, with the exception that in the onCompleted method, we complete the child instead of doing any other rounds with any other Observable.

Conclusion

With a producer structured like our new ProducerArbiter, we can change the producer over the head of a child subscriber and retain the correct amount of requested and thus produced values across producers of various upstream sources.

However, as I called for caution in the detailed explanation, this producer-arbiter runs independently of the onXXX event delivery and if one is not in 'control' of the event emission, strange and unwanted behavior may arise. With our example, this wasn't the case because when we switched producers in the first Subscriber.onCompleted() method, we could be sure no more onNext or onError events will be fired from the current upstream ever and thus make the swap safely.

In fact, most very advanced operators, such as switchOnNext() and others, experience such concurrent requesting-delivering so that to make them behave properly, we need a more advanced arbiter: a producer-observer-arbiter; the topic of the next post in the series.

6 megjegyzés:

  1. "If you run the example, the first observation of the source prints, as expected, values from 1 to 100."

    Should it only print from 1 to 90?

    VálaszTörlés
    Válaszok
    1. range's second parameter is the count, thus 10 + 90 = 100

      Törlés
  2. Ezt a megjegyzést eltávolította a szerző.

    VálaszTörlés
  3. “Its structure looks quite the same to the first Subscriber, with the exception that in the onNext method, we complete the child instead of doing any other rounds with any other Observable.”

    The exception should be onCompleted, right?

    VálaszTörlés