RxJava2 (Java 7) For Android: Introduction And Basic Usage

August 25, 2017

Why use RxJava

  • Wrap database operation as async without callback, such as SQL Brite.
  • Wrap network call as async without callback, such as Retrofit RxJava Adapter.
  • Simplify writing of AsyncTask (Android).
  • Elegant solution to subscribe stream of results, filter or map results, merge or chain multiple stream of results, etc.
  • Avoid nested callbacks.
  • Callback replacement (don’t have to declare interface).

RxJava2 setup for Android

Edit your application module’s build.gradle to add the following dependencies.

dependencies {
    ...
    // https://github.com/ReactiveX/RxAndroid/releases
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // https://github.com/ReactiveX/RxJava/releases
    compile 'io.reactivex.rxjava2:rxjava:2.1.2'
}

RxJava vs RxJava2

Learning RxJava2 could be confusing because RxJava 2.x is released on Oct 2016 while many online tutorials/questions are about 1.x. Part of the RxJava official documentation still referring to 1.x.

Some of the highlight of coding changes from 1.x to 2.x are:

  • Action1 is renamed to Consumer.
  • Action2 is renamed to BiConsumer.
  • Func1<T, Boolean> is renamed to Predicate<T>.
  • Observable.fromEmitter renamed to Observable.create.

Note: many advised against using Observable.create in 1.x as it is prone to mistakes, but it’s no longer the case for 2.x.

RxJava2 base class

The first thing is to select which base class to use.

  • Observable: return multiple results
  • Flowable: return multiple results, with backpressure support and overhead.
  • Single: return 1 result only
  • Completable: don’t return any result
  • Maybe: return 0 or 1 result

I would recommend use Single if you only plan to return 1 result, or use Observable if you want flexibility with multiple results.

Single.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "Hello";
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // output single result: Hello
        Log.d(TAG, "result="+s);
    }
});
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
        emitter.onNext("Big");
        emitter.onNext("World");
        emitter.onComplete();
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // output 3 times
        Log.d(TAG, "result="+s);
    }
});

Returning results

There are a few ways for RxJava to return results.

  • just: return one or more object
  • range: return a range of number
  • fromIterable: return multiple results
  • fromCallable: a function returning single result
  • create: a function returning multiple result
  • others

See above for example of fromCallable and create.

just

Observable.just("Hello").subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // output single result: Hello
        Log.d(TAG, "result="+s);
    }
})

range

Observable.range(1, 10).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // output 10 times: 1-10
        Log.d(TAG, "result="+integer);
    }
});

fromIterable

List<String> words = new ArrayList<>();
words.add("Hello");
words.add("Big");
words.add("World");
Observable.fromIterable(words).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // output 3 times
        Log.d(TAG, "result="+s);
    }
});

Operator to transform results

  • filter: emit only results passing the test
  • map: change each result to another
  • others

filter

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
        emitter.onNext("Big");
        emitter.onNext("World");
        emitter.onComplete();
    }
}).filter(new Predicate<String>() {
    @Override
    public boolean test(@NonNull String s) throws Exception {
        // return only string with length > 3
        return s.length() > 3;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // output 2 times: Hello, World
        Log.d(TAG, "result="+s);
    }
})

map

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
        emitter.onNext("Big");
        emitter.onNext("World");
        emitter.onComplete();
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(@NonNull String s) throws Exception {
        // map string to length
        return s.length();
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        // output 3 results: 5, 3, 5
        Log.d(TAG, "result="+integer);
    }
});

Subscribe

Observable is not executed until subscribe is called.

Consumer is convinient for simplicity, but Observer is recommened for better event notification and error handling.

// the code is not executed yet
Observable<String> wordObservable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Hello");
        emitter.onNext("Big");
        emitter.onNext("World");
        emitter.onComplete();
    }
});

// code execution begin
wordObservable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "started");
    }

    @Override
    public void onNext(@NonNull String s) {
        Log.d(TAG, "result="+s);
    }

    @Override
    public void onError(@NonNull Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "finished");
    }
});

AsyncTask on Android

The following code can be used to simulate AsyncTask on Android.

final Activity activity = this;
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
        // background thread: computation intensive or blocking operation
        emitter.onNext("Hello");
        emitter.onNext("Big");
        emitter.onNext("World");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        // UI output on main thread
        Toast.makeText(activity, s, Toast.LENGTH_LONG).show();
    }
});
This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.