RxJava2 Synchronous Blocking Call

August 25, 2017

RxJava2 is asynchronous by default, but you still can make synchronous blocking call using blocking methods.

public Observable<String> generateWords() {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("I");
            emitter.onNext("Am");
            emitter.onNext("Home");
            emitter.onComplete();
        }
    });
}

public Single<String> mergeWords(final Collection<String> words) {
    return Single.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return TextUtils.join(" ", words);
        }
    });
}

public void testBlocking() {
    Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Observable<String> wordsObservable = generateWords();
            Iterator<String> it = wordsObservable.blockingIterable().iterator();
            List<String> words = new ArrayList<String>();
            while (it.hasNext()) {
                String word = it.next();
                words.add(word);
            }
            return mergeWords(words).blockingGet();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "result="+s);
        }
    });
}
This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.