RxJava2 Exception Handling

Aug 25, 2017

Subscribe Consumer<Throwable>

Use the 2nd parameter of subsribe to specify an error handler with Consumer<Throwable>.

Note: If an error handler is not specified, an exception of io.reactivex.exceptions.OnErrorNotImplementedException will be thrown.

Single.fromCallable(new Callable<String>() {    @Override    public String call() throws Exception {        String output = "";        for (int i=0; i<5; i++) {            if (i == 2) {                throw new Exception("Test");            }            output += "Hello " + i + "\n";        }        return output;    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        Log.d(TAG, s);    }}, new Consumer<Throwable>() {    @Override    public void accept(Throwable throwable) throws Exception {        Log.d(TAG, "Error: " + throwable.getMessage());        throwable.printStackTrace();    }});

Subscribe Observer

You can subsribe using Observer which include error handling as well.

Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        for (int i=0; i<5; i++) {            if (i == 2) {                throw new Exception("Test");            }            emitter.onNext("Hello " + i);        }        emitter.onComplete();    }}).subscribe(new Observer<String>() {    @Override    public void onSubscribe(@NonNull Disposable d) {        Log.d(TAG, "started");    }    @Override    public void onNext(@NonNull String s) {        Log.d(TAG, s);    }    @Override    public void onError(@NonNull Throwable e) {        Log.d(TAG, "Error: " + e.getMessage());        e.printStackTrace();    }    @Override    public void onComplete() {        Log.d(TAG, "ended");    }});

Use RxJavaPlugins.setErrorHandler

You can specifiy a global exception handler using RxJavaPlugins.setErrorHandler.

Note: if you already specified an error handler using subscribe, RxJavaPlugins.setErrorHandler will not be called.

// Handle unhandled errorRxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {    @Override    public void accept(Throwable throwable) throws Exception {        Log.e(TAG, throwable.getClass().getName());             // io.reactivex.exceptions.OnErrorNotImplementedException        Log.e(TAG, throwable.getCause().getClass().getName());  // java.lang.Exception        Log.e(TAG, throwable.getMessage());                     // "Test"        throwable.printStackTrace();    }});Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        for (int i=0; i<5; i++) {            if (i == 2) {                throw new Exception("Test");            }            emitter.onNext("Hello " + i);        }        emitter.onComplete();    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        Log.d(TAG, s);    }});

Use doOnError to capture error

doOnError will not handle exception, but it will always be called when an exception happened, no matter the exception is handled or not.

Observable.create(new ObservableOnSubscribe<String>() {    @Override    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {        for (int i=0; i<5; i++) {            if (i == 2) {                throw new Exception("Test");            }            emitter.onNext("Hello " + i);        }        emitter.onComplete();    }}).doOnError(new Consumer<Throwable>() {    @Override    public void accept(Throwable throwable) throws Exception {        Log.e(TAG, "Error: " + throwable.getMessage());        throwable.printStackTrace();    }}).subscribe(new Consumer<String>() {    @Override    public void accept(String s) throws Exception {        Log.d(TAG, s);    }});

