RxJava2 (Java 7) For Android: Introduction And Basic Usage

Aug 25, 2017

Why use RxJava

  • Wrap database operation as async without callback, such as SQL Brite.
  • Wrap network call as async without callback, such as Retrofit RxJava Adapter.
  • Simplify writing of AsyncTask (Android).
  • Elegant solution to subscribe stream of results, filter or map results, merge or chain multiple stream of results, etc.
  • Avoid nested callbacks.
  • Callback replacement (don't have to declare interface).

RxJava2 setup for Android

Edit your application module’s build.gradle to add the following dependencies.

dependencies {
    ...
    // https://github.com/ReactiveX/RxAndroid/releases
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // https://github.com/ReactiveX/RxJava/releases
    compile 'io.reactivex.rxjava2:rxjava:2.1.2'
}

RxJava vs RxJava2

Learning RxJava2 could be confusing because RxJava 2.x is released on Oct 2016 while many online tutorials/questions are about 1.x. Part of the RxJava official documentation still referring to 1.x.

Some of the highlight of coding changes from 1.x to 2.x are:

  • Action1 is renamed to Consumer.
  • Action2 is renamed to BiConsumer.
  • Func1<T, Boolean> is renamed to Predicate<T>.
  • Observable.fromEmitter renamed to Observable.create.

Note: many advised against using Observable.create in 1.x as it is prone to mistakes, but it's no longer the case for 2.x.

RxJava2 base class

The first thing is to select which base class to use.

  • Observable: return multiple results
  • Flowable: return multiple results, with backpressure support and overhead.
  • Single: return 1 result only
  • Completable: don't return any result
  • Maybe: return 0 or 1 result

I would recommend use Single if you only plan to return 1 result, or use Observable if you want flexibility with multiple results.

Single.fromCallable(new Callable<String>() {    @Override    public String call() throws Exception {        return "Hello";    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output single result: Hello        Log.d(TAG, "result="+s);    }});
Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 3 times        Log.d(TAG, "result="+s);    }});

Returning results

There are a few ways for RxJava to return results.

  • just: return one or more object
  • range: return a range of number
  • fromIterable: return multiple results
  • fromCallable: a function returning single result
  • create: a function returning multiple result
  • others

See above for example of fromCallable and create.

just

Observable.just("Hello").subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output single result: Hello        Log.d(TAG, "result="+s);    }})

range

Observable.range(1, 10).subscribe(new Consumer<Integer>() {    @Override    public void accept(Integer integer) throws Exception {        // output 10 times: 1-10        Log.d(TAG, "result="+integer);    }});

fromIterable

List<String> words = new ArrayList<>();words.add("Hello");words.add("Big");words.add("World");Observable.fromIterable(words).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 3 times        Log.d(TAG, "result="+s);    }});

Operator to transform results

  • filter: emit only results passing the test
  • map: change each result to another
  • others

filter

Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).filter(new Predicate<String>() {    @Override    public boolean test(@NonNull String s) throws Exception {        // return only string with length > 3        return s.length() > 3;    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 2 times: Hello, World        Log.d(TAG, "result="+s);    }})

map

Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).map(new Function<String, Integer>() {    @Override    public Integer apply(@NonNull String s) throws Exception {        // map string to length        return s.length();    }}).subscribe(new Consumer<Integer>() {    @Override    public void accept(Integer integer) throws Exception {        // output 3 results: 5, 3, 5        Log.d(TAG, "result="+integer);    }});

Subscribe

Observable is not executed until subscribe is called.

Consumer is convinient for simplicity, but Observer is recommened for better event notification and error handling.

// the code is not executed yetObservable<String> wordObservable = Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }});// code execution beginwordObservable.subscribe(new Observer<String>() {    @Override    public void onSubscribe(@NonNull Disposable d) {        Log.d(TAG, "started");    }    @Override    public void onNext(@NonNull String s) {        Log.d(TAG, "result="+s);    }    @Override    public void onError(@NonNull Throwable e) {        e.printStackTrace();    }    @Override    public void onComplete() {        Log.d(TAG, "finished");    }});

AsyncTask on Android

The following code can be used to simulate AsyncTask on Android.

final Activity activity = this;Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        // background thread: computation intensive or blocking operation        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).subscribeOn(Schedulers.newThread())    .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // UI output on main thread        Toast.makeText(activity, s, Toast.LENGTH_LONG).show();    }});

❤️ Is this article helpful?

Buy me a coffee ☕ or support my work via PayPal to keep this space 🖖 and ad-free.

Do send some 💖 to @d_luaz or share this article.

✨ By Desmond Lua

A dream boy who enjoys making apps, travelling and making youtube videos. Follow me on @d_luaz

👶 Apps I built

Travelopy - discover travel places in Malaysia, Singapore, Taiwan, Japan.