RxJava1 -> RxJava2 Upgrade Notes/Tips

Ken Yee
6 min readJan 1, 2018

This is a list of issues encountered/solved when upgrading a very large app (172 files that used RxJava) from RxJava 1.x to RxJava 2.x. The basic steps were:

  1. Migrate a portion of the app the RxJava2 using RxJavaInterop to find some of the patterns that would work; it helps for the person doing the migrating (if one person is doing the upgrade) to do the part of the app they’re most familiar with
  2. Let the app run with RxJava2 for a sprint to see if there are other performance issues/bugs that weren’t expected
  3. Gradually migrate other portions of the app by functional area so testing could be concentrated on that area
  4. Migrate core RxJava services (OAuth Session refreshing, error handling, location services, etc.)
  5. Remove all RxJavaInterop/RxJava1 code
  6. Convert RxJava2 Flowable to Observable
  7. Celebrate because this was a lot more involved than a Retrofit2 upgrade ;-)

This took place over a month or two with typical PRs in the 60–70 file range because of all the interop code that was changed.

I made the decision to use RxJava2 Flowables initially because:
- it would act the same as RxJava1 Observables (which have backpressure)
- it’s a lot easier to see what portions of code have been migrated to RxJava2 already

Observable to Flowable/Single for REST APIs

If you use Retrofit2 like most projects, your network interfaces were written using Observables in RxJava 1.x. The proper way to do this was really to use a Single in RxJava 1.x. Migration is still simplest if you swap Observables to Flowables but you should review whether it’d be better to use Single or Completable; if you have a transformer that handles OAuth token renewals that can only handle Observable/Flowables, you can’t use Single/Completable unless you do .toFlowable() on them.

You’ll also have to add the RxJava2 adapter to Retrofit2:

Retrofit.Builder adapterBuilder = new Retrofit.Builder()
.baseUrl(endpoint)
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(httpClient);

CompositeSubscription vs. CompositeDisposable

RxJava 2.x renamed Subscriptions to Disposables to conform to the Reactive Streams spec. If you have both RxJava1 and RxJava2 in your code, you’ll need to do the usual adding of subscriptions and clearing them when your activity/fragment/view gets removed but you use CompositeDisposable instead.

Observable.from() becomes Flowable.fromIterable()

If you’re creating an observable from a list or array, you need to use the various fromArray, fromIterable, etc. instead of using the generic from() method.

Flowable.blockingGet()

If you have code that is using RxJava as a replacement for the missing Java8/Kotlin Streams support, you’ll have to replace

.toList.toBlocking().first()

with

.toList.blockingGet()

.subscribe() becomes .subscribeWith() only if you need the third parameter or you use a Subscriber subclass

If you use the more typical .subscribe with two parameter (onNext and onError lambdas), you can still use .subscribe() which returns a Disposable.

You also can’t use “Actions.empty()” for the first parameter if you’re only handling errors but you have to use “unused -> {}” instead; it’s a good time to review whether what you’re subscribing to should really be a Completable.

Subscribers onComplete vs. onCompleted

If you define Subscriber classes for subscription handlers instead of using lambdas, the onCompleted() method has been renamed to onComplete(). Instead of extending Subscriber<T>, you now have to implement Subscriber<T> or CompletableObserver (in the case where you want to add a subscriber handler class to a Completable).

Note also that if you do .subscribe(<yourSubscriberClass>), it doesn’t return a Disposable; you have to use subscribeWith instead. Make sure yourSubscriberClass extends DisposableSubscriber<T> instead of implementing Subscriber<T>.

Also, if you override onStart in DisposableSubscriber, be sure to call super.onStart() or weird things will happen (like your onNext never gets called). That took a while to figure out because it was used so rarely :-(

Observable<Void> should be Completable

If you have REST APIs that were of the Observable<Void> type, they should really be Completable. Note that your .subscribe(), if using lambdas in the onNext (1st parameter), needs to move those lambdas to the onComplete (3rd parameter).

Crash handlers return Consumer<Throwable> instead of Action1<Throwable>

If you have a crash handler for the second parameter of a .subscribe() call, it now returns a Consumer<Throwable>.

Set a Global Error Handler

This is a significant difference from RxJava1. If your observable has a delayed error from a network call, it will crash without one set, even if the observable has been unsubscribed/disposed; in RxJava1, the error would have just been swallowed. See https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0#error-handling

The last one listed is a best practice one and really should be done by default by RxJava2, IMHO.

Action0/Action1/Action2

These have been renamed Action, Consumer and BiConsumer, respectively. They also can throw exceptions when you call them; wrap the exceptions in AssertError or another exception of your choice and rethrow to simulate the old behavior.

You also can no longer subscribe using an Action1; implement FlowableOnSubscribe instead.

Func0/Func1

There is no RxJava2 version of Func0; use Callable instead. This is used to make Java look functional even though it’s not. Use Kotlin :-) Or you can copy the RxJava1 FuncN implementations and rename them in your project.

Func1 becomes Function. Func2 becomes BiFunction.

Subjects now only take a single Generic Type parameter

private Subject<Something, Something> somethingSubject = PublishSubject.create();

becomes this after you switch to the io.reactivex.subjects package:

private Subject<Something> somethingSubject = PublishSubject.create();

When you do a share() from the subjects, they also return Observables instead of Flowables. RxBinding methods also do this.

BehaviorSubjects are Observables

If you’re doing the temporary Flowable renaming, you have to do .toFlowable() on BehaviorSubjects to use them with your other Flowables.

The create(<defaultValue>) constructor has also been renamed createDefault();

Transformers

You can’t do simple transformers like this now:

public static <T> Transformer<T, T> subscribeOnIOAndObserveOnMain() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}

because there’s no base Transformer interface. You have to create one for each of Single, Maybe, Completable, Flowable, Observable (https://github.com/ReactiveX/RxJava/issues/4796)

If you replace Observables with Flowables, you can make new copies of the transformers with “2” appended to their names and change the return type appropriately for RxJava2 for interop during the migration.

RxBinding2 returns Observables instead of Flowable

That’s one case where it wasn’t an obvious 1:1 mapping of V1 Observable to V2 Flowable. It makes sense because these are UI events so they don’t really have an understanding of backpressure. Usually, adding .toFlowable(BackpressureStrategy.LATEST) is enough if you want to map V1 Observables to Flowables.

Nulls Not Allowed

This is what can be the most painful part of the RxJava2 migration. In RxJava1, it was ok to pass null through a stream. In RxJava2, you cannot. You can use Optional if you have a minSDK of 24, but you’ll most likely need to add you own wrapper class like this:

public class NullableWrapper<T> {

private final T optional;

public NullableWrapper(@Nullable T optional) {
this.optional = optional;
}

public boolean isEmpty() {
return this.optional == null;
}

public T get() {
if (optional == null) {
throw new NoSuchElementException("No value present");
}
return optional;
}
}

What was a bit of a surprise is that Retrofit2 can also return nulls from REST API calls. In particular, what bit us is our server backend returned HTTP 204 for some responses and that returned a null object instead of a new blank object. To work around this, you can immediately .map() in RxJava2 or you can modify your REST interface definition to return Single<Response<yourResponseClass>> instead; the latter is what we ended up doing because you also get a copy of the initial request in Retrofit’s Response object.

TestSubscriber

If you’re using TestSubscriber in your unit tests, a few methods have been renamed:

.assertComplete() = .assertCompleted()
.assertValueSequence() = .assertReceivedOnNext()
.getOnNextEvents() = .values()

Exclude rxjava.properties in your build.gradle

RxJava1.x and RxJava2.x include this file which is not used at runtime, so you’ll have to exclude with the other duplicated META-INF files in your app’s build.gradle:

packagingOptions {
exclude 'META-INF/rxjava.properties'
}

RxJavaV2 Observable is not a Publisher

Because mobile apps generally do not need Backpressure support, switching Flowables back to Observables is more efficient.

A Flowable is a Publisher but an Observable is not. Some transformers will have to extend Observable instead of Publisher.

If you’re using .subscribeWith with your Flowable, you had to use a DisposableSubscriber; this has to be a DisposableObservable when you switch to Observables.

Other Considerations

Does your app do a lot of I/O? RxJava1 and RxJava2 have separate thread pools. Using both in your app means you’ll have double the pools which may cause memory/CPU issues. It wasn’t an issue w/ our app, but if you’re memory constrained, it might affect yours.

References

--

--

Ken Yee

Mobile (Native Android), Backend (Spring Boot, Quarkus), Devops (Jenkins, Docker, K8s) software engineer. Currently Kotlin All The Things!