RxJava: A short Introduction
Posted on May 1, 2017 • 4 minutes • 667 words
Table of contents
This post is under construction
In the last weeks i made a few experiments with RxJava in Android apps. In this post i want to give a short introduction to the main concepts of RxJava.
What is RxJava?
RxJava stands for: Reactive Extensions for Java. It’s a library for composing asynchronous and event-based programs by using observable sequences. There are also equivalent libraries for other programming languages like Swift,JS or Kotlin
Why Reactive?
Users want their data now.
They want their movies now.
They want their music now.
They want their messages now.
What they don’t want is: to wait. And developers want to deliver the best user experience. This means you have to program your app in an asynchronous way. In a modern app you have to deal with a lot of events (Requesting database, requesting network, button clicks, textbox changes,..). If you need to start multiple async events, which are depending on the results of other ones, you often use callbacks within callbacks within callbacks. RxJava provides a more readable way to solve that problem.
Getting Started
Observables and Observers
Instead of callbacks RxJava is using the two main types Observable and Observer. Observables emits data and Observer can subscribe to the data from Observables. One Observable can have multiple subscribers.
How to create an Observable
Observable<String> helloObs = Observable.just("Hello", "World");
Here is one example on how to create an Observable that will emit the strings Hello and World.
This observable will only emit data when there is a Observer added to it.
Subscribe to an Observable
helloObs.subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
}
@Override public void onNext(String s) {
Log.i(TAG, "onNext: "+s);
}
@Override public void onError(Throwable e) {
Log.i(TAG, "onError: "+e.getMessage());
}
@Override public void onComplete() {
}
});
or with lamda expression
helloObs.subscribe(s -> {
Log.i(TAG, "onNext: "+s);
},error -> {
Log.i(TAG, "onError: "+error.getMessage());
});
To add a subscriber you have to use the subscribe method of an Observable.
-
onSubscribe
-
onError: If an error occurs with emitting items, this method will get called. By default the Observable will not emit any more items after this error.
-
onNext: This method will receive the emitted strings from helloObs.
-
onComplete: This method gets called when the last data from an Observable is emitted.
Operators
http://reactivex.io/documentation/operators.html
Just emitting and receiving data wouldn’t be that special. RxJava comes with a huge list of operators. These operators helps you to create, transform, filter or combine Observables.
Filter Observables
Observable.just("Hello", "World")
.filter(s -> s.startsWith("H"))
.subscribe(s -> Log.i(TAG, "onCreate: "+s));
Let’s say you want an Observable that only emits strings that start with “H”.
The filter operator can do that job. To use it you have to add it to the Observable before you subscribe to it.
Combine Observables
The zip operator combines two Observables
Observable<String> helloObs = Observable.just("Hello","YOU", "World");
Observable<String> helloObs2 = Observable.just("1", "2");
Observable<String> zipObs = Observable.zip(helloObs, helloObs2, (s, s2) -> s + s2);
zipObs.subscribe(s -> Log.i(TAG, "onCreate: "+s));
One operator to combine Observables is the zip operator.
The Observable zipObs combines the Observables helloObs and helloObs2.
zipObs will emit the strings “Hello1” and “YOU2”. This is because the zip operator will only combine the observables when both of them have emitted data. In this case it will not emit the word “World” because in the helloObs2 Observable is no third string to combine it with.
Shedulers
RxJava is synchronous by default, but work can be defined asynchronously using schedulers. For instance, we can define that the network call should be done on a background thread, but the callback should be done on the main UI thread.
Observable.from(Arrays.asList(1,2,3,4,5))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread()))
.subscribe(s -> Log.d("emit", s));
Helpful Links
- http://reactivex.io/intro.html
- Episodes with RxJava from Fragmented Podcast
- http://rxmarbles.com/
- https://github.com/ReactiveX
- Whats different in RxJava 2.0
Youtube
- Christina Lee: Intro to RxJava
- Droidcon SF - Common RxJava Mistakes by Dan Lew
- Reactive Extensions: Beyond the Basics
Do you know any other solutions? Do you have any questions? Please write a comment or contact me on Twitter jklingenberg_