Archive for the ‘Concurrency’ Category

How parallel execution of blocking “side-effect only” (aka void) tasks became easier with Completable abstraction introduced in RxJava 1.1.1.

As you may have noticed reading my blog I primarily specialize in Software Craftsmanship and automatic code testing. However, in addition I am an enthusiast of Continuous Delivery and broadly defined concurrency. The last point ranges from pure threads and semaphores in C to more high level solutions such as ReactiveX and the actor model. This time an use case for a very convenient (in specific cases) feature introduced in the brand new RxJava 1.1.1 – rx.Completable. Similarly to many my blog entries this one is also a reflection of the actual event I encountered working on real tasks and use cases.

RxJava logo

A task to do

Imagine a system with quite complex processing of asynchronous events coming from different sources. Filtering, merging, transforming, grouping, enriching and more. RxJava suits here very well, especially if we want to be reactive. Let’s assume we have already implemented it (looks and works nicely) and there is only one more thing left. Before we start processing, it is required to tell 3 external systems that we are ready to receive messages. 3 synchronous calls to legacy systems (via RMI, JMX or SOAP). Each of them can last for a number of seconds and we need to wait for all of them before we start. Luckily, they are already implemented and we treat them as black boxes which may succeed (or fail with an exception). We just need to call them (preferably concurrently) and wait for finish.

rx.Observable – approach 1

Having RxJava at fingertips it looks like the obvious approach. Firstly, job execution can be wrapped with Observable:

private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> { 
        job.execute();
        return null;
    });
}

Unfortunately (in our case) Observable expects to have some element(s) returned. We need to use Void and awkward return null (instead of just method reference job::execute.

Next, we can use subscribeOn() method to use another thread to execute our job (and not block the main/current thread – we don’t want to execute our jobs sequentially). Schedulers.io() provides a scheduler with a set of threads intended for IO-bound work.

Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

Finally we need to wait for all of them to finish (all Obvervables to complete). To do that a zip function can be adapted. It combines items emitted by zipped Obserbables by their sequence number. In our case we are interested only in the first pseudo-item from each job Observable (we emit only null to satisfy API) and wait for them in a blocking way. A zip function in a zip operator needs to return something, thence we need to repeat a workaround with null.

Observable.zip(run1, run2, (r1, r2) -> return null)
         .toBlocking()
         .single();

It is pretty visible that Observable was designed to work with streams of values and there is some additional work required to adjust it to side-effect only (returning nothing) operations. The situation is getting even worse when we would need to combine (e.g. merge) our side-effect only operation with other returning some value(s) – an uglier cast is required. See the real use case from the RxNetty API.

public void execute() {
    Observable<Void> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Void> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

    Observable.zip(run1, run2, (r1, r2) -> null)
        .toBlocking()
        .single();
}

private Observable<Void> rxJobExecute(Job job) {
    return Observable.fromCallable(() -> { 
        job.execute();
        return null;
    });
}

rx.Observable – approach 2

There could be another approach used. Instead of generating an artificial item, an empty Observable with our task can be executed as an onComplete action. This forces us to switch from zip operation to merge. As a result we need to provide an onNext action (which is never executed for empty Observable) which confirms us in conviction the we try to hack the system.

public void execute() {
    Observable<Object> run1 = rxJobExecute(job1).subscribeOn(Schedulers.io());
    Observable<Object> run2 = rxJobExecute(job2).subscribeOn(Schedulers.io());

    Observable.merge(run1, run2)
            .toBlocking()
            .subscribe(next -> {});
}

private Observable<Object> rxJobExecute(Job job) {
    return Observable.empty()
            .doOnCompleted(job::execute);
}

rx.Completable

Better support for Observable that doesn’t return any value has been addressed in RxJava 1.1.1. Completable can be considered as a stripped version of Observable which can either finish successfully (onCompleted event is emitted) or fail (onError). The easiest way to create an Completable instance is using of a fromAction method which takes an Action0 which doesn’t return any value (like Runnable).

Completable completable1 = Completable.fromAction(job1::execute)
        .subscribeOn(Schedulers.io());
Completable completable2 = Completable.fromAction(job2::execute)
        .subscribeOn(Schedulers.io());

Next, we can use merge() method which returns a Completable instance that subscribes to all downstream Completables at once and completes when all of them complete (or one of them fails). As we used subscribeOn method with an external scheduler all jobs are executed in parallel (in different threads).

Completable.merge(completable1, completable2)
        .await();

await() method blocks until all jobs finish (in a case of error an exception will be rethrown). Pure and simple.

public void execute() {
    Completable completable1 = Completable.fromAction(job1::execute)
            .subscribeOn(Schedulers.io());
    Completable completable2 = Completable.fromAction(job2::execute)
            .subscribeOn(Schedulers.io());

    Completable.merge(completable1, completable2)
        .await();
}

java.util.concurrent.CompletableFuture

Some of you could ask: Why not to just use CompletableFuture? It would be a good question. Whereas pure Future introduced in Java 5 it could require additional work on our side, ListenableFuture (from Guava) and CompletableFuture (from Java 8) make it quite trivial.

First, we need to run/schedule our jobs execution. Next, using CompletableFuture.allOf() method we can create a new CompletableFuture which is completed the moment all jobs complete (haven’t we seen that conception before?). get() method just blocks waiting for that.

public void execute() {
    try {
        CompletableFuture<Void> run1 = CompletableFuture.runAsync(job1::execute);
        CompletableFuture<Void> run2 = CompletableFuture.runAsync(job2::execute);

        CompletableFuture.allOf(run1, run2)
            .get();

    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException("Jobs execution failed", e);
    }
}

We need to do something with checked exceptions (very often we don’t want to pollute our API with them), but in general it looks sensible. However, it is worth to remember that CompletableFuture falls short when more complex chain processing is required. In addition having RxJava already used in our project it is often useful to use the same (or similar) API instead of introducing something completely new.

Summary

Thanks to rx.Completable an execution of side-effect only (returning nothing) tasks with RxJava is much more comfortable. In codebase already using RxJava it could be a preferred over CompletableFuture even for simple cases. However, Completable provides a lot of more advances operators and techniques and in addition can be easily mixed with Observable what makes it even more powerful.

To read more about Completable you may want to see the release notes. For those who want to have a deeper insight into topic there is a very detailed introduction to Completable API on Advanced RxJava blog (part 1 and 2).

The source code for code examples is available from GitHub.

Btw, if you are interested in RxJava in general, I can with a clear conscience recommend you a book which is currently being written by Tomasz Nurkiewicz and Ben Christensen – Reactive Programming with RxJava.

RxJava book