RxJava2: Use Output Of Observable As Input Parameter For Another Observable

August 25, 2017
Pass output as input parameter when chaining Observables

Use flatMap

The example below will use a Observable to generate a String, where the String output is used as input parameter for another Observable to split the string within flatMap.

Since there is only one result, we could use Single instead.

Note: flatMap return Observable, while map return actual object.

public Observable<String[]> splitWords(final String phrase) {
    return Observable.fromCallable(new Callable<String[]>() {
        @Override
        public String[] call() throws Exception {
            return phrase.split("\\s+");
        }
    });
}

public void testPassParameter() {
    Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "I am here";
        }
    }).flatMap(new Function<String, ObservableSource<String[]>>() {
        @Override
        public ObservableSource<String[]> apply(@NonNull String s) throws Exception {
            return splitWords(s);
        }
    }).subscribe(new Consumer<String[]>() {
        @Override
        public void accept(String[] strings) throws Exception {
            // 3 outputs: I, am, here
            for (String text : strings) {
                Log.d(TAG, text);
            }
        }
    });
}

Use blocking call

You can execute Observable sequentially using blocking call.

public Observable<String[]> splitWords(final String phrase) {
    return Observable.fromCallable(new Callable<String[]>() {
        @Override
        public String[] call() throws Exception {
            return phrase.split("\\s+");
        }
    });
}

public Observable<String> generateString() {
    return Observable.fromCallable(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "I am here";
        }
    });
}

public void testPassParameter() {
    Observable.fromCallable(new Callable<String[]>() {
        @Override
        public String[] call() throws Exception {
            // String phrase = "I am here";
            String phrase = generateString().blockingSingle();
            return splitWords(phrase).blockingSingle();
        }
    }).subscribe(new Consumer<String[]>() {
        @Override
        public void accept(String[] strings) throws Exception {
            // 3 outputs: I, am, here
            for (String text : strings) {
                Log.d(TAG, text);
            }
        }
    });
}
This work is licensed under a
Creative Commons Attribution-NonCommercial 4.0 International License.