Imagine a bacon-wrapped Ferrari. Still not better than our free technical reports.
See all our reports

RxJava for easy concurrency and backpressure

RxJava is growing in popularity in Android application development. However, it is also very capable for server-side apps. RxJava makes concurrency easier even though a seasoned Java developer would have to re-learn the concepts to become comfortable with the new idioms.

In this post, I want to share how to use RxJava to process data concurrently and enable backpressure.

The use case

One of our backends is dedicated to data processing. The task is simple yet specific: the application should poll the storage and efficiently schedule the processing of the retrieved data. This sounds like a very much like a producer-consumer pattern and should be simple to implement.

The specific and important part to note is that the time it takes to poll for the data takes much less time than it takes to process it. Hence, we need something between these stages that can handle the backpressure, in case the data processing stage starts to bottleneck.

So the task involves periodic scheduling, data flow organization, concurrency, and backpressure. It would seem like a good candidate for RxJava.

The building blocks

Observable

Everything starts with an observable. In our case, the observable is the source of the data that we want to poll periodically. The Observable.create() method is a nice tool that can achieve this for us.

Observable.<Data>create(observer -> {
 Data data = datasource.get();
 if (data != null) {
   observer.onNext(data);
 } 
 observer.onCompleted();
});

By chaining calls to doOnNext() and subscribe() to the created observable we can implement a fully operational flow for our program, as shown:

Observable.<Data>create(...)
  .doOnNext(this::process)
  .subscribe();

The created observable polls the data sources and sends the data on, in the onNext(data) call. This results in the process(data) invocation. The implementation details of the processing logic are not important at the stage but let’s assume that the processing takes a few seconds.

The next thing to do is reschedule the execution once the operation completes.

Loop this!

Luckily, the Observable class includes a few methods for organizing this repetitive execution. In our case, we wanted to execute the same task again with a little delay. Hence, the Observable.repeatWhen() method looks like our best option.

Observable.<Data>create(...)
  .repeatWhen(observable -> observable.delay(500, TimeUnit.MILLISECONDS))
  .doOnNext(this::process).subscribe();

After the observer.onComplete() call, or after the data processing finishes, the execution is rescheduled with a 500-millisecond delay. So simple!

Concurrency & Backpressure

Until now everything was quite simple, and there aren’t that many gains that can be seen in using RxJava for the task just yet. The benefit so far is that we didn’t have to write too much infrastructure code. However, we could have implemented all this without using RxJava only with little overhead.

Let’s introduce a new requirement: the data processing should be scheduled concurrently to optimize for the machine resources. Hence, we want doOnNext(data) to run on multiple threads.

In RxJava, there’s a beautiful hack that can be applied via a flatMap method call. The overloaded version of this method takes a maxConcurrent parameter which defines the maximum number of Observables that may be concurrently subscribed to.

Observable.<Data>create(...)
  .repeatWhen(observable -> observable.delay(500, TimeUnit.MILLISECONDS))
  .flatMap(data -> Observable.just(data)
               .subscribeOn(Schedulers.io())
               .doOnNext(this::process)
           , MAX_CONCURRENT)
  .subscribe();

The Schedulers.io() method call schedules the execution via the shared thread pool. The pool size is unlimited this time around as we’ll pass the MAX_CONCURRENT parameter on the overloaded version of the flatMap method.

I also mentioned that processing of the data might take longer than polling. Hence backpressure support is another requirement.

RxJava provides backpressure support out of the box. However, these are either relying on buffering or skipping the events. Losing the events would be totally fine when we’re handling mouse events, for instance. But in our case, loosing data snapshots is not an option.

The beautiful part here is that the flatMap implementation is already backpressure-aware. All we need to do is to provide OnSubscribe class implementation which handles the backpressure for us. Luckily, RxJava provides a SyncOnSubscribe class so we don’t even have to implement it ourselves.

Observable.create(SyncOnSubscribe.<Data>createStateless(...))
  .repeatWhen(observable -> observable.delay(500, TimeUnit.MILLISECONDS))
  .flatMap(data -> Observable.just(data)
               .subscribeOn(Schedulers.io())
               .doOnNext(this::process)
           , MAX_CONCURRENT)
  .subscribe();

Voila! Instead of juggling around with the threads ourselves, we now have our infrastructure code nicely decoupled from our business logic. The code that handles concurrency, backpressure, and scheduling is all handled by just a few lines of code. It may take a while to comprehend when you read this code for the first time, but nevertheless, the solution looks insanely beautiful when I finally got my head around it!

Beyond the conclusion

RxJava looks like a nice framework to get acquainted with, and I hope this post will motivate you to do so. There’s a nice book on the subject also. You’re welcome!


Read next: