A Journey to a Better Stream

We’re very proud of our Rally Android app which maintains a 4.6 rating over 21,000 reviews. One of the technologies we rely on to keep the app running smoothly (and delight users) is RxJava. We use RxJava everywhere, from operations on the UI layer to effectively queuing and executing network requests. Thus, it is beneficial for us to stay with the latest version.

Towards the end of 2016, RxJava 2 was released. RxJava 2 was created to comply with the reactive streams specifications and thus was completely rewritten from scratch. The complete rewrite improved the memory consumption and performance considerably. This was a good enough reason for us to migrate our app to use RxJava 2. Also, the authors of RxJava were not interested in supporting RxJava 1 beyond the near future. But perhaps the biggest reason for us to perform the migration was to avoid errors resulting from back pressure. Back pressure is an issue that occurs when an Observable emits items faster than a Consumer or Subscriber can handle them. In RxJava 2 Observables don’t cause back pressure.

The Two Strategies

Because of the complete rewrite of RxJava 2, we knew this migration would not be a simple swap of the RxJava libraries, but would entail a significant rewrite of our app code base. The naming for a majority of the components within RxJava 2 changed and so did the behavior and functionality for some of them. Whereas RxJava 1 had Action1 and Action2, the equivalent methods in RxJava2 were Consumer and Biconsumer with different implementation methods. There were some straightforward cases, where we could use find/replace in the entire project to perform the upgrade. However, there were other cases where the implementation or the behavior of the components changed. We needed to understand this change in behavior better and a simple find/replace would not work. One such case was that of Subscription.RxJava 2 replaced Subscription with Disposable (to subscribe to observers) to be more in line with the Streams specification(although you can still use Subscription from a different package within RxJava 2). Disposable was quite similar to Subscription, but had a few minor differences in the way one got a handle on the disposable.

RxJava 2 resides under the package name of io.reactivex and RxJava is still under rx. This was done to enable the coexistence of both the packages for RxJava 1 and 2. This allows app developers to perform a gradual migration of the code to RxJava 2, while keeping some parts of the app in RxJava 1. However, we decided to completely switch over to RxJava 2 and wanted to leave no trace of RxJava 1 in our code base at all, making the migration fast and eliminating the need to import similar libraries. Just as in poker, you can slow play or go all in. We chose to go all in.

The Purge of RxJava 1

While we were performing the migration, we did learn some interesting things about RxJava 2 that forced us to change our code base to better play with RxJava 2:

Null, Maybe, and Optional

RxJava 2 no longer supports null values. Passing null results in a call to onError. We had to look at the different places where we were passing null or a null may get passed as a result of some operation. Usually all the situations involving null could be classified into two broad cases. In some cases we were interested in knowing if the value returned was null. As such we modified the Observable to emit objects of type Optional. Optional is a Java 8 feature. So, in order to use it in Android, we used an external library, retro-optional. We would then check for the presence of values in the onNext method using isPresent(). In the other cases, we generally had a single operation (such as a network request) which would either return null or some response. RxJava 2 introduced a new operator called Maybe. Maybe is a combination of Single and Completable i.e. it can either emit 0 or 1 response. Consequently its implementation methods are onSuccess, onComplete and onError. So, if the network request returned a response, onSuccess would be called and in the case of null, onComplete. These two approaches helped us overcome the majority of the cases in our code base to handle null.

Disposable Versus Subscription

RxJava 2 took great care to create a new base class to handle back pressure called Flowable. So Observables in RxJava 2 are not back pressure enabled. If a developer uses Flowables, then back pressure needs to be accounted for. Typically suggested for emissions of more than 10,000 items, Flowables need to account for back pressure. RxJava 2 took the Subscription name for a different purpose (specifying an interaction point between a source and a consumer org.reactivestreams.Subscription). Consequently a new interface was introduced io.reactivex.Disposable. Disposable would now be responsible for stream and resource lifecycle management. So subscribing to an Observable happens using a DisposableObserver and unsubscribing happens using the dispose() method. The subscribe method returns void, so in order to get a handle on the DisposableObserver, one needs to use the subscribeWith method which returns a Disposable. The dispose method can then be executed on this Disposable object to stop observing the Observable. In some cases, we were actually extending the Observer to create a custom Observer. The Observer in RxJava 2 comes with a handy method called onSubscribe which returns the Disposable that should be used to stop observing the Observable the custom observer is subscribed to. Thus to stop observing an Observable was different in both these cases, whereas in RxJava1, we were just concerned with the Subscription for managing the lifecycle for an observable.

Error Handling

An important design requirement for RxJava 2 is that no Throwable errors should be swallowed. There are cases where errors can’t be emitted because the downstream’s lifecycle has already reached it’s terminal state or downstream cancelled a sequence which was about to emit an error. These errors are routed to the RxJavaPlugins.onError handler which prints the Throwable’s stacktrace to the console and calls the current thread’s uncaught exception handler. Android terminated the applications in these uncaught exception cases. In order to avoid such crashes in RxJava 2, a no-op handler - RxJavaPlugins.setErrorHandler(Consumer<Throwable>) should be set in the main Application class of the Android application.

Observables Free From Back Pressure

This was something we enjoyed doing. In RxJava 1, we put in checks for back pressure errors for observables. We removed all those checks, since back pressure does not need to be accounted for in observables.

Minor Changes

There are some other minor changes that we encountered during the migration. Most of these were a result of aligning RxJava 2 with the streams specification:

  • Predicate: Filtering emissions is possible using the filter method. Adopting the stream specifications, the predicate interface is used to filter the emissions, instead of the test method as in RxJava 1.
  • TestSubject: RxJava2 no longer supports TestSubject. Instead its functionality can be achieved via TestScheduler, PublishProcessor/PublishSubject and observeOn(testScheduler)/scheduler parameter.
  • SerializedSubject: The SerializedSubject is no longer a public class. In order to make Subjects serializable, RxJava 2 provides a method toSerialized() for serialization.

Conclusion

Overall, we feel the migration to RxJava 2 has been beneficial for us. We no longer need to worry about back pressure errors while listening for emissions from Observables. Also, using the standard streams specifications within RxJava 2 has helped us avoid learning different terminologies for RxJava than the streams specifications in other languages.