The Missing RxJava 2 Guide to Supercharge Your Android Development (Part 2)

Aritra Roy
Aritra's Musings
Published in
10 min readOct 27, 2017

--

Today, let us continue with the second part of the series. If you have not yet read the first part, drop everything and read that first. Once you are clear with everything in the first part of the series, we are good to proceed.

In this article, we will continue with what Observers and Operators are and how we can use them to consume and manipulate items emitted from the source Observable. We will also discuss the immense multi-threading possibilities in RxJava 2 and try to build a good understanding of the Schedulers and the observeOn() /subscribeOn() operators.

So, without wasting any more time, let’s get started.

Previously On This Series

Observer

So now, as you have understood the concept of an Observable, let’s move ahead and understand what an Observer actually is.

Anything that is interested in an Observable and subscribes to it, can be called as an Observer. Whenever the source Observable emits some items, all the subscribed Observers are notified about it.

In our contrived conference example, all the attendees who were sitting there listening to the speaker are nothing but Observers. Whenever the speaker says something from the stage, all the attendees can listen and react to it.

There can be a single Observer or multiple Observers subscribed to the source Observable and all of them will get notified whenever the source Observable emits something. That’s not the normal behavior though but is fairly simple to achieve with just a few lines of code.

There are primarily three types of events that your Observer can receive -

  • onNext(T) — It will be invoked when the source Observable emits a particular item (of type T). It will get called multiple times if the source Observable has got multiple items to emit.
  • onCompleted()— It will be invoked when the source Observable completes emitting all of its items. This is a terminal event and there can be no more items emitted after this point. This simply indicates that the Observable has completed successfully.
  • onError(Throwable) — It will be invoked when the source Observable encounters an error while emitting items. This is also a terminal event and there can be no more emissions after this point as well. It indicates that the Observable has encountered a failure while executing and provides a Throwable which contains the details of the error.

Get, Set, Code

Now let us have a look at some real code,

DisposableObserver<Movie> disposable = movieObservable
.subscribeWith(new DisposableObserver<Movie>() {
@Override
public void onNext(Movie movie) {
// Access your Movie object here
}
@Override
public void onError(Throwable e) {
// Show the user that an error has occurred
}
@Override
public void onComplete() {
// Show the user that the operation is complete
}
});

This might seem like a lot at once, but it’s actually pretty simple. Let me explain.

All we are doing here is using the subscribeWith() method to subscribe to our movie observable. We are using a DisposableObserver which has three methods for the three different event types discussed earlier.

The onNext(Movie) method will be called every time a Movie object is fetched by the observable. It can be fetched from the network, database, files or from any other source (which doesn’t really matter). All that matters is that the Observer subscribed to it will always be notified of the events happening in the stream.

So, if you have 10 movies in your data store, then theonNext() method will be called 10 times with 10 Movie objects. After the operation succeeds, the onComplete() method will be called but if it fails the onError() method will be called with the appropriate error object.

Dispose, but what?

Everything about observables and observers should be crystal clear to you by now, except the DisposableObserver thing, isn’t it?

DisposableObserver is nothing but an Observer which can be disposed of as it implements the Disposable interface. Whenever an Observer subscribes to an Observable, a connection is formed between them which effectively needs to be cleared (or terminated) when not needed otherwise it can lead to resource leaks.

For example, a listener should not be listening to events eternally, it needs to stop sometime, right? It is similar to scenarios where you need to close a database cursor or a file input/output stream when you are done using them, otherwise, those unreleased resources can increase the memory footprint or cause leaks.

Your observers are nothing more than garbage after they have done their job and needs to be disposed.

@Override
public void onDestroy() {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
super.onDestroy();
}

You can also use a CompositeDisposable to add up all your disposables and get rid of all of them at once.

Operator

The real power of RxJava comes with the huge arsenal of operators it brings to the table. The operators are basically pure functions which transform or modify the observable streams.

Going back to our conference example, suppose a new speaker comes up and starts speaking completely in German but all the attendees sitting there understand nothing other than English.

This is where a translator can come into play who can translate each and every sentence that the speaker speaks into something meaningful that all the attendees can understand.

Let’s talk about some RxJava operators now,

  • filter() — We can use this operator to refine the items emitted by the source Observable and create a new Observable containing only those items that match the required condition.

Suppose, our movie observable emits lots of movies having different ratings (from 1 to 5), but we want to show only those movies that have a 5-star rating,

movieObservable.filter(new Predicate<Movie>() {
@Override
public boolean test(@NonNull Movie movie) throws Exception {
return movie.getRating() == 5;
}
});

And the mighty lambdas can turn it into something like this,

movieObservable.filter(movie -> movie.getRating() == 5);
  • map()— We can use this operator to transform items emitted by the source Observable into something completely different and create a new Observable containing those modified items.

Suppose, we have a requirement that the synopsis of the movies to be shown cannot be more than 500 characters, then we can do something like this,

Observable<Movie> movieObservable = getMoviesFromDatabase();
movieObservable.map(new Function<Movie, Movie>() {
@Override
public Movie apply(@NonNull Movie movie) throws Exception {
return new Movie(StringUtils.truncate(movie.getSynopsis(), 500));
}
});

After using lambdas and static imports, things get better,

movieObservable.map(movie -> new Movie(truncate(movie.getSynopsis(), 500)));

Now we have created a completely new stream of movies where each of them contains synopsis not greater than 500 characters.

  • skip() — We can use this operator to skip some items from the beginning of the source Observable and create a new Observable which doesn’t have these items.

Suppose, we know that our movie observable always emits a dummy Movie object (containing some metadata) in the beginning of the stream which should not be shown to the user,

movieObservable.skip(1);
  • concat() — We can use this operator to concatenate items from multiple Observables one after the other without interleaving them.

Suppose, we have two data sources to fetch movies from — database and network and we want to show the movies fetched from the database first followed by the ones fetched from the network.

Observable<Movie> database = getMoviesFromDatabase();
Observable<Movie> network = getMoviesFromNetwork();
Observable<Movie> resultObservable = Observable.concat(database, network);

We can now subscribe to this resultObservable having movies from the database being emitted before the movies from the network.

As you have already observed that the return type of these operator methods is also an Observable, which enables us to chain multiple operators one after the other and perform crazy things.

movieObservable
.skip(1)
.filter(movie -> movie.getRating() == 5)
.map(movie -> truncate(movie.getSynopsis(), 500));

In this example, we are skipping the first item emitted in the stream, then filtering only those movies having a rating of 5 and then truncating the synopsis of each movie item to 500 characters. All of this in just 4 lines of code.

There are hundreds of operators available in RxJava and it is absolutely impossible to explain each of them in this article. So let’s keep it for some other time.

But here is a complete list of all the operators available for you to use. So go ahead, and try them out.

Recommended Reading

Bonus Concepts

We are almost going towards the end of the article. But it would be incomplete (and quite unfair) if I don’t touch upon two really important concepts you should know about — observeOn/subscribeOn and Schedulers.

ObserveOn/SubscribeOn

These are two operators just like the other operators we had discussed before but with some special behavior. These operators can be used to control multi-threading in RxJava insanely easily.

subscribeOn()

You can use this operator to specify which thread the source Observable will emit its data on. Remember the word “source Observable” carefully as this operator works only on it.

Suppose, your source Observable is supposed to fetch data from the network, then it is quite obvious that you will need to perform this operation in the background thread as Android doesn’t allow you to perform network operations on the main thread.

This is where the subscribeOn() operator will come into play. Remember that you can use this operator only once. Even if you use it multiple times, only the one defined closest to the source will take effect.

movieObservable
.subscribeOn(Schedulers.io())
.subscribe(movie -> {
// Use your movie item here
}, throwable -> {
// Handle the error here
});

This will make the movie Observable operate on the I/O thread. Now let us have a look at another variant,

movieObservable
.subscribeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe(movie -> {
// Use your movie item here
}, throwable -> {
// Handle error here
});

You might be thinking that the second subscribeOn() (with Schedulers.io()) will take its effect. But no, only the subscribeOn() defined closest to the source will play its magic and nothing else.

observeOn()

You can use this operator to specify which thread the Observer will observe the emissions on.

Suppose, your source Observable emits items on the I/O thread but in Android, in order to show something in the UI you need to consume data on Android’s main thread only. So this is where the mighty observeOn() comes into play.

movieObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(movie -> {
// Use your movie item here
}, throwable -> {
// Handle error here
});

Now you will be able to access and show your movie items in the UI as they are all observed in the UI thread. But in the previous example, as observeOn() was not used, all the items were emitted and observed both in the I/O thread.

Unlike subscribeOn(), we can use observeOn() multiple times for seamless thread switching.

movieObservable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.flatMap(movie -> truncate(movie.getSynopsis(), 500))
.observeOn(Schedulers.io())
.filter(movie -> isInDatabase(movie))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(movie -> {
// Use your movie item here
}, throwable -> {
// Handle error here
});

This is a contrived example to make you understand how you might use it in real scenarios. As you can see, we have used observeOn() three times. The source observable emits items on the I/O thread, then we switch to the computation thread to perform the truncate operation (let’s assume it demands complicated computation) specified just after that.

After that, we again switch to the I/O thread to perform the filtering operation where we only take those items which are already present in the database. Now, we switch threads for the last time to the main thread so that the sink observer specified just after that is able to observe and consume items on Android’s main thread.

So, in just 7 lines of code, we have done so much multi-threading which would have taken a lot of time and effort if we had to do it in the traditional way.

Schedulers

As you have already seen some schedulers used in some of the previous examples and might have been wondering what these things actually are. Don’t worry, we will get that sorted as well.

In simple words, Schedulers represent a particular thread for operations to be performed.

There are several Schedulers available in RxJava. Let’s have a quick look at some of them -

  • Scheduler.io() — Use this scheduler for operations to be performed on the I/O thread, like network call, database transactions, file access, etc.
  • Scheduler.computation() — Use this scheduler for operations which are computationally heavy and needs to be performed on a thread which is optimized for this purpose, like parsing a huge dataset of a few million phone numbers.
  • Scheduler.newThread() — Use this scheduler for operations to be performed on a completely new thread. You should probably restrict yourself from creating new threads all the time and use the other schedulers available so that the framework can work on improving the threading performance using techniques like thread pooling.
  • AndroidSchedulers.mainThread() — Use this scheduler for operations to be performed on the Android’s main thread, like showing data on the UI and stuff like that. You need to use the RxAndroid library for it though.

I hope by now you will have a pretty good grasp of the concepts of reactive programming and how you can start using RxJava 2 to supercharge your Android development.

If there is something that is still unclear in your mind, consider giving this article (and the previous one) another read. The fundamental concepts revolving around Observable, Observer, Operator and Schedulers should be crystal clear in your mind to make proper use of RxJava in your app.

Also, try firing up your IDE and start playing around with the APIs. Only then you can have a solid grasp of this immensely powerful tool. Once you fall in love with it, there is no looking back.

This article was originally published on TechBeacon.

--

--

Design-focused Engineer | Android Developer | Open-Source Enthusiast | Part-time Blogger | Catch him at https://about.me/aritra.roy