Why use RxJava
- Wrap database operation as async without callback, such as SQL Brite.
- Wrap network call as async without callback, such as Retrofit RxJava Adapter.
- Simplify writing of AsyncTask (Android).
- Elegant solution to subscribe stream of results, filter or map results, merge or chain multiple stream of results, etc.
- Avoid nested callbacks.
- Callback replacement (don't have to declare interface).
RxJava2 setup for Android
Edit your application module’s build.gradle to add the following dependencies.
dependencies {
    ...
    // https://github.com/ReactiveX/RxAndroid/releases
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    // https://github.com/ReactiveX/RxJava/releases
    compile 'io.reactivex.rxjava2:rxjava:2.1.2'
}RxJava vs RxJava2
Learning RxJava2 could be confusing because RxJava 2.x is released on Oct 2016 while many online tutorials/questions are about 1.x. Part of the RxJava official documentation still referring to 1.x.
Some of the highlight of coding changes from 1.x to 2.x are:
- Action1is renamed to- Consumer.
- Action2is renamed to- BiConsumer.
- Func1<T, Boolean>is renamed to- Predicate<T>.
- Observable.fromEmitterrenamed to- Observable.create.
Note: many advised against using Observable.create in 1.x as it is prone to mistakes, but it's no longer the case for 2.x.
RxJava2 base class
The first thing is to select which base class to use.
- Observable: return multiple results
- Flowable: return multiple results, with backpressure support and overhead.
- Single: return 1 result only
- Completable: don't return any result
- Maybe: return 0 or 1 result
I would recommend use Single if you only plan to return 1 result, or use Observable if you want flexibility with multiple results.
Single.fromCallable(new Callable<String>() {    @Override    public String call() throws Exception {        return "Hello";    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output single result: Hello        Log.d(TAG, "result="+s);    }});Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 3 times        Log.d(TAG, "result="+s);    }});Returning results
There are a few ways for RxJava to return results.
- just: return one or more object
- range: return a range of number
- fromIterable: return multiple results
- fromCallable: a function returning single result
- create: a function returning multiple result
- others
See above for example of fromCallable and create.
just
Observable.just("Hello").subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output single result: Hello        Log.d(TAG, "result="+s);    }})range
Observable.range(1, 10).subscribe(new Consumer<Integer>() {    @Override    public void accept(Integer integer) throws Exception {        // output 10 times: 1-10        Log.d(TAG, "result="+integer);    }});fromIterable
List<String> words = new ArrayList<>();words.add("Hello");words.add("Big");words.add("World");Observable.fromIterable(words).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 3 times        Log.d(TAG, "result="+s);    }});Operator to transform results
- filter: emit only results passing the test
- map: change each result to another
- others
filter
Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).filter(new Predicate<String>() {    @Override    public boolean test(@NonNull String s) throws Exception {        // return only string with length > 3        return s.length() > 3;    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // output 2 times: Hello, World        Log.d(TAG, "result="+s);    }})map
Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).map(new Function<String, Integer>() {    @Override    public Integer apply(@NonNull String s) throws Exception {        // map string to length        return s.length();    }}).subscribe(new Consumer<Integer>() {    @Override    public void accept(Integer integer) throws Exception {        // output 3 results: 5, 3, 5        Log.d(TAG, "result="+integer);    }});Subscribe
Observable is not executed until subscribe is called.
Consumer is convinient for simplicity, but Observer is recommened for better event notification and error handling.
// the code is not executed yetObservable<String> wordObservable = Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }});// code execution beginwordObservable.subscribe(new Observer<String>() {    @Override    public void onSubscribe(@NonNull Disposable d) {        Log.d(TAG, "started");    }    @Override    public void onNext(@NonNull String s) {        Log.d(TAG, "result="+s);    }    @Override    public void onError(@NonNull Throwable e) {        e.printStackTrace();    }    @Override    public void onComplete() {        Log.d(TAG, "finished");    }});AsyncTask on Android
The following code can be used to simulate AsyncTask on Android.
final Activity activity = this;Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        // background thread: computation intensive or blocking operation        emitter.onNext("Hello");        emitter.onNext("Big");        emitter.onNext("World");        emitter.onComplete();    }}).subscribeOn(Schedulers.newThread())    .observeOn(AndroidSchedulers.mainThread())        .subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        // UI output on main thread        Toast.makeText(activity, s, Toast.LENGTH_LONG).show();    }});