2015. május 12., kedd

Operator concurrency primitives: producers (part 3)

Introduction

In this blog post, I'm going to implement and describe only one kind of a producer that is capable of emitting multiple values on demand while respecting the downstream's backpressure behavior.

I'll call this producer a queued-producer because it involves queuing up work and replaying part of it based on the downstream's requested amounts.

There are several use possibilities for such a producer:

  1. It can be used for disconnecting a bursty producer from a slow consumer, i.e., onBackpressureBuffer().
  2. You can hold the last n events in the queue and allow the emission only when the upstream has called your onCompleted(), i.e., takeLast().
  3. You want to produce multiple values for each value emitted by the upstream, i.e., some kind of merge().

The value-only queued-producer

Dealing with terminal events in queued producers can be quite confusing at first, therefore, I'll implement and detail a simpler version which has to deal with values only. One can use Notifications or the NotificationLite to wrap any onError and onCompleted events, although it doesn't really allow an onError to jump the queue unless you want to pay the performance penalty of a ConcurrentLinkedDeque.

As usual, we can start by extending AtomicLong which will store our downstream's current requested amount:

public final class QueuedProducer<T> 
extends AtomicLong implements Producer {
    private static final long serialVersionUID = -1;
    
    final Subscriber<? super T> child;
    final Queue<T> queue;
    final AtomicInteger wip;
    
    public QueuedProducer(Subscriber<? super T> child) {
        this.child = child;
        this.queue = new SpscLinkedQueue<>();
        this.wip = new AtomicInteger();
    }
    
    @Override
    public void request(long n) {                              // (1)
        // handle request
    }
    
    public void offer(T value) {                               // (2)
        // handle a value offered
    }
    
    private void drain() {                                     // (3)
        // attempt to drain
    }
}

Observable<Integer> source = Observable.create(child -> {
    QueuedProducer<Integer> qp = new QueuedProducer<>(child);
    child.setProducer(qp);
    for (int i = 0; i < 200; i++) {
        qp.offer(i);
    }
});

source.take(150).subscribe(System.out::println);


The underlying class structure is straightforward and resembles many of the producers shown so far. We have the entry point for downstream requests (1), there is the entry point for upstream's values (2). The new thing is the drain() method (3); which indeed is named as such because the queued-producer will be implemented by using the queue-drain of serialized access I blogged about. For simplicity, I'll implement this without any fast-path optimizations.

First, let's implement the body of the request() method:


    // ...
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();   // (1)
        }
        if (n > 0) {
            BackpressureUtils
                .getAndAddRequest(this, n);         // (2)
            drain();                                // (3)
        }
    }
    // ...

It doesn't look too complicated, which means that something else must get complicated. After doing the sanity check (1) we atomically add the requested amount to the underlying AtomicLong (2) and call drain() (3) on a 0 to n transition.

Luckily, offer() won't be any more complicated than request() was:


    // ...
    public void offer(T value) {
        queue.offer(Objects.requireNonNull(value));  // (1)
        drain();                                     // (2)
    }
    // ...

After queuing up a non-null value at (1) (JCTools' queue doesn't accept nulls) we simply call drain() at (2) and be done with the method. Note that the use of the single-producer single-consumer queue implies that the offer() method is accessed in a serialized manner, which is usually the case, however, the logic of the of the producer works just as well in a multi-producer case where you can switch to MpscLinkedQueue instead.

Finally, the only remaining method now is the drain() itself:


    // ...
    private void drain() {
        if (wip.getAndIncrement() == 0) {                // (1)
            do {
                if (child.isUnsubscribed()) {            // (2)
                    return;
                }

                wip.lazySet(1);                          // (3)
                
                long r = get();                          // (4)
                long e = 0;
                T v;
                
                while (r != 0 &&                         // (5)
                        (v = queue.poll()) != null) {    // (6)
                    child.onNext(v);
                    if (child.isUnsubscribed()) {        // (7)
                        return;
                    }
                    r--;
                    e++;
                }
                
                if (e != 0) {                            // (8)
                    addAndGet(-e);
                }
            } while (wip.decrementAndGet() != 0);        // (9)
        }
    }
}

The structure should be quite familiar because it is basically the drain loop from the previous blog post, although it handles more state than before:


  1. We enter the drain loop on a 0 to 1 transition. Any higher value indicates 'extra work' for the drain loop.
  2. We can eagerly check if the child is still interested in receiving values.
  3. We overwrite the wip count with 1 that indicates we will now deal with any 'extra work' now on; it usually reduces the times the outer loop will iterate on a state that can't really emit. Using an ordered write here saves on some cycles because subsequent operators act as a full barrier and we don't really care about values beyond 2 anyway.
  4. We read the current requested amount since we can only emit that amount of values.
  5. In this inner loop we try to emit as much values as possible, but we are limited by the amount requested and the number of available values in the queue. 
  6. We could also use !queue.isEmpty() here but poll() will tell if the queue is empty or return a value in one step (benefit of a null-disallowing queue).
  7. Again, a quick eager check if the child subscriber is still interested.
  8. We aggregate the number of emitted values locally and subtract it from the requested amount in one step (instead of on every iteration of the inner loop).
  9. Finally, we decrement the wip counter and if it reached zero, the drain can safely quit.
Sidenote: there might be optimization possibilities available to the emitter loop mentioned above: i.e., adding a fast-path that skips the queue entirely or the request() doesn't need to call drain() but only on a 0 to n transition; I haven't explored the concurrency properties of these options yet.

The full queued-producer

Now that the value-only queued producer has been explained, we'd like to extend it so that it can handle onError and onCompleted, plus allow onError to skip ahead of any values queued and make the producer terminate early.

To better match the names of the events in RxJava's Observers, the FullQueuedProducer will implement the Observer interface in addition to the Producer and still extend AtomicLong:


public final class FullQueuedProducer<T> 
extends AtomicLong implements Producer, Observer<T> {
    private static final long serialVersionUID = -1L;
    
    final Subscriber child;
    final Queue<T> queue;
    final AtomicInteger wip;
    
    Throwable error;                                         // (1)
    volatile boolean done;                                   // (2)
    
    public FullQueuedProducer(Subscriber child) {
        this.child = child;
        this.queue = new SpscLinkedQueue<>();
        this.wip = new AtomicInteger();
    }
    
    @Override
    public void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException();
        }
        if (n > 0) {
            BackpressureUtils.getAndAddRequest(this, n);
            drain();
        }
    }
    
    @Override
    public void onNext(T value) {                             // (3)
        queue.offer(Objects.requireNonNull(value));
        drain();
    }
    
    @Override
    public void onError(Throwable e) {                        // (4)
        error = e;
        done = true;
        drain();
    }
    
    @Override
    public void onCompleted() {                               // (5)
        done = true;
        drain();
    }
    
    private boolean checkTerminated(boolean isDone, 
            boolean isEmpty) {
        // implement
    }

    private void drain() {
        // different implementation
    }
}


The base class structure has some differences and we'll need a different drain loop:

  1. We store the Throwable instance in this field; any non-null value indicates an exceptional termination and a null indicates a normal termination once done is set.
  2. The flag indicates there won't be any further values or events coming from 'upstream'. This also assumes the producer's onXXX methods are called in a sequential manner. Note also that done can't be a plain field here because the drain() method needs to eagerly check its value to allow an exception to skip the queue; the usual barriers provided by wip's method aren't in play in those places.
  3. The offer() method from QueuedProducer is just renamed here.
  4. In case of an exception, we store its reference, set the done flag to true and call drain.
  5. If the upstream terminates normally, we just set the done flag and call drain.
The new drain() method now has more things to check and act upon, but first, I'll implement a helper method that does the necessary checks to determine if the producer has to complete in some manner:



    // ...
    private boolean checkTerminated(
            boolean isDone, boolean isEmpty) {
        if (child.isUnsubscribed()) {                  // (1)
            return true;
        }
        if (isDone) {                                  // (2)
            Throwable e = error;                       
            if (e != null) {                           // (3)
                queue.clear();
                child.onError(e);
                return true;
            } else
            if (isEmpty) {                             // (4)
                child.onCompleted();
                return true;
            }
        }
        return false;                                  // (5)
    }
    // ...


  1. First, we check if the child has unsubscribed and return true, which will quit the drain loop.
  2. Second, we check if the isDone parameter is true and if so, we need to determine what kind of termination happened. The reason for using a parameter and not reading the done flag directly is because of the check if the queue is empty has to happen after the check of done; I'll explain this a bit later.
  3. Since I assumed the upstream calls are serialized, reading the error field plainly is safe here because the single assignment to this field can only happen before the assignment to the volatile done and the reading of error happens after the read to done, thus establishing a proper acquire-release pair over done. If not null, the error is then emitted and the method returns true to indicate the drain loop can quit. The queue is also cleared here to avoid retaining the values 'jumped' by the error for too long.
  4. If we established earlier (but only after the read of done) that the queue is empty (by some means), we emit an onCompleted and indicate the drain loop can quit.
  5. In any other case, we indicate the drain loop can continue.
The last step for the new FullQueuedProducer is to implement the drain() method:



    // ...
    private void drain() {
        if (wip.getAndIncrement() == 0) {
            do {
                if (checkTerminated(done, queue.isEmpty())) {    // (1)
                    return;
                }
                
                wip.lazySet(1);
                
                long r = get();
                long e = 0;

                while (r != 0) {
                    boolean d = done;                            // (2)
                    T v = queue.poll();
                    if (checkTerminated(d, v == null)) {         // (3)
                        return;
                    } else
                    if (v == null) {                             // (4)
                        break;
                    }
                    
                    child.onNext(v);
                    r--;
                    e++;
                }
                
                if (e != 0) {
                    addAndGet(-e);
                }
            } while (wip.decrementAndGet() != 0);
        }
    }
}

At first glance, it pretty much looks like the previous QueuedProducer's implementation, but it has some important differences:

  1. Instead of just a plain isUnsubscribed check, we need to see if an error occurred or there won't be any more values. Note that the check for the queue being empty happens after the read of done.
  2. We read done again before checking the queue for a value (and checking its empty status as well).
  3. Now we call the checkTerminated() eagerly with the local done flag of d and with the check for null-ness for the returned value (which is an indicator for an empty queue).
  4. Even if the terminal state hasn't been reached, the queue could have been still empty so we quit the inner loop.
Why is so important to read done before checking the emptiness of the queue? The reason for it is that the drain loop may run concurrently with the onXXX methods (from request() in another thread) and if the checks were reversed, a stall in the drain() method between the check for emptiness and the check for done could give a window for a concurrent upstream to emit more values and complete. Once the drain() is unblocked again, it finds the done flag set but may still think the queue is empty and terminate, causing it to miss events.

Conclusion

Implementing a backpressure-respecting multi-value producer may seem complicated at first, but if you read the earlier posts and are familiar with basic Java concurrency primitives and some concepts about its memory model, writing it isn't that difficult after all, was it? I think it is important one understands the inner workings of this kind of producer because the most advanced operators in RxJava, such as publish(), is built around the same queue-drain logic and terminal state management as this FullQueuedProducer.

In the next post, I'll return to the more basic RangeProducer and talk about how one can add a fast-path emission loop to it in order to avoid the state management if the downstream isn't actually exercising backpressure and simply requests Long.MAX_VALUE upfront.

Nincsenek megjegyzések:

Megjegyzés küldése