Jens Klingenberg

RxJava: A short Introduction

Posted on May 1, 2017  •  4 minutes  • 667 words
Table of contents

This post is under construction

In the last weeks i made a few experiments with RxJava in Android apps. In this post i want to give a short introduction to the main concepts of RxJava.

What is RxJava?

RxJava stands for: Reactive Extensions for Java. It’s a library for composing asynchronous and event-based programs by using observable sequences. There are also equivalent libraries for other programming languages like Swift,JS or Kotlin

Why Reactive?

Nobody likes to wait.

Users want their data now.

They want their movies now.

They want their music now.

They want their messages now.

What they don’t want is: to wait. And developers want to deliver the best user experience. This means you have to program your app in an asynchronous way. In a modern app you have to deal with a lot of events (Requesting database, requesting network, button clicks, textbox changes,..). If you need to start multiple async events, which are depending on the results of other ones, you often use callbacks within callbacks within callbacks. RxJava provides a more readable way to solve that problem.

Getting Started

Observables and Observers

Observable and Observer

Instead of callbacks RxJava is using the two main types Observable and Observer. Observables emits data and Observer can subscribe to the data from Observables. One Observable can have multiple subscribers.

How to create an Observable

Observable<String> helloObs = Observable.just("Hello", "World");

Here is one example on how to create an Observable that will emit the strings Hello and World.

This observable will only emit data when there is a Observer added to it.

Subscribe to an Observable

helloObs.subscribe(new Observer&lt;String&gt;() {
      @Override public void onSubscribe(Disposable d) {
        
      }

      @Override public void onNext(String s) {
         Log.i(TAG, &quot;onNext: &quot;+s);
      }

      @Override public void onError(Throwable e) {
         Log.i(TAG, &quot;onError: &quot;+e.getMessage());
      }

      @Override public void onComplete() {

      }
    });

or with lamda expression

helloObs.subscribe(s -> {
        Log.i(TAG, "onNext: "+s);
        },error -> {
        Log.i(TAG, "onError: "+error.getMessage());
        });

To add a subscriber you have to use the subscribe method of an Observable.

Operators

http://reactivex.io/documentation/operators.html

Just emitting and receiving data wouldn’t be that special. RxJava comes with a huge list of operators. These operators helps you to create, transform, filter or combine Observables.

Filter Observables

Observable.just("Hello", "World")
.filter(s -> s.startsWith("H"))
.subscribe(s -> Log.i(TAG, "onCreate: "+s));

Let’s say you want an Observable that only emits strings that start with “H”.

The filter operator can do that job. To use it you have to add it to the Observable before you subscribe to it.

Combine Observables

The zip operator combines two Observables

The zip operator combines two Observables

Observable<String> helloObs = Observable.just("Hello","YOU", "World");
Observable<String> helloObs2 = Observable.just("1", "2");
Observable<String> zipObs = Observable.zip(helloObs, helloObs2, (s, s2) -> s + s2);
zipObs.subscribe(s -> Log.i(TAG, "onCreate: "+s));

One operator to combine Observables is the zip operator.

The Observable zipObs combines the Observables helloObs and helloObs2.

zipObs will emit the strings “Hello1” and “YOU2”. This is because the zip operator will only combine the observables when both of them have emitted data. In this case it will not emit the word “World” because in the helloObs2 Observable is no third string to combine it with.

Shedulers

RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread.

Observable.from(Arrays.asList(1,2,3,4,5))
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread()))
        .subscribe(s ->	Log.d("emit", s));

Helpful Links

Youtube

Do you know any other solutions? Do you have any questions? Please write a comment or contact me on Twitter jklingenberg_

Let's connect: