Why use RxJava
- Wrap database operation as async without callback, such as SQL Brite.
- Wrap network call as async without callback, such as Retrofit RxJava Adapter.
- Simplify writing of AsyncTask (Android).
- Elegant solution to subscribe stream of results, filter or map results, merge or chain multiple stream of results, etc.
- Avoid nested callbacks.
- Callback replacement (don't have to declare interface).
RxJava2 setup for Android
Edit your application module’s build.gradle to add the following dependencies.
dependencies {
...
// https://github.com/ReactiveX/RxAndroid/releases
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
// https://github.com/ReactiveX/RxJava/releases
compile 'io.reactivex.rxjava2:rxjava:2.1.2'
}
RxJava vs RxJava2
Learning RxJava2 could be confusing because RxJava 2.x is released on Oct 2016 while many online tutorials/questions are about 1.x. Part of the RxJava official documentation still referring to 1.x.
Some of the highlight of coding changes from 1.x to 2.x are:
Action1
is renamed toConsumer
.Action2
is renamed toBiConsumer
.Func1<T, Boolean>
is renamed toPredicate<T>
.Observable.fromEmitter
renamed toObservable.create
.
Note: many advised against using Observable.create
in 1.x as it is prone to mistakes, but it's no longer the case for 2.x.
RxJava2 base class
The first thing is to select which base class to use.
Observable
: return multiple resultsFlowable
: return multiple results, with backpressure support and overhead.Single
: return 1 result onlyCompletable
: don't return any resultMaybe
: return 0 or 1 result
I would recommend use Single
if you only plan to return 1 result, or use Observable
if you want flexibility with multiple results.
Single.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return "Hello"; }}).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // output single result: Hello Log.d(TAG, "result="+s); }});
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("Big"); emitter.onNext("World"); emitter.onComplete(); }}).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // output 3 times Log.d(TAG, "result="+s); }});
Returning results
There are a few ways for RxJava to return results.
just
: return one or more objectrange
: return a range of numberfromIterable
: return multiple resultsfromCallable
: a function returning single resultcreate
: a function returning multiple result- others
See above for example of fromCallable
and create
.
just
Observable.just("Hello").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // output single result: Hello Log.d(TAG, "result="+s); }})
range
Observable.range(1, 10).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // output 10 times: 1-10 Log.d(TAG, "result="+integer); }});
fromIterable
List<String> words = new ArrayList<>();words.add("Hello");words.add("Big");words.add("World");Observable.fromIterable(words).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // output 3 times Log.d(TAG, "result="+s); }});
Operator to transform results
filter
: emit only results passing the testmap
: change each result to another- others
filter
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("Big"); emitter.onNext("World"); emitter.onComplete(); }}).filter(new Predicate<String>() { @Override public boolean test(@NonNull String s) throws Exception { // return only string with length > 3 return s.length() > 3; }}).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // output 2 times: Hello, World Log.d(TAG, "result="+s); }})
map
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("Big"); emitter.onNext("World"); emitter.onComplete(); }}).map(new Function<String, Integer>() { @Override public Integer apply(@NonNull String s) throws Exception { // map string to length return s.length(); }}).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { // output 3 results: 5, 3, 5 Log.d(TAG, "result="+integer); }});
Subscribe
Observable is not executed until subscribe
is called.
Consumer
is convinient for simplicity, but Observer
is recommened for better event notification and error handling.
// the code is not executed yetObservable<String> wordObservable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); emitter.onNext("Big"); emitter.onNext("World"); emitter.onComplete(); }});// code execution beginwordObservable.subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "started"); } @Override public void onNext(@NonNull String s) { Log.d(TAG, "result="+s); } @Override public void onError(@NonNull Throwable e) { e.printStackTrace(); } @Override public void onComplete() { Log.d(TAG, "finished"); }});
AsyncTask on Android
The following code can be used to simulate AsyncTask
on Android.
final Activity activity = this;Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception { // background thread: computation intensive or blocking operation emitter.onNext("Hello"); emitter.onNext("Big"); emitter.onNext("World"); emitter.onComplete(); }}).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { // UI output on main thread Toast.makeText(activity, s, Toast.LENGTH_LONG).show(); }});