Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
207 views
in Technique[技术] by (71.8m points)

android - How to combine multiple RxJava chains non-blocking in error case

My requirements:

  • N Retrofit calls in parallel
  • Wait for all calls to finish (success or failure)
  • If k (0<= k < N) calls fail, they should NOT block the others. Imagine failed calls can return null or error objects while successful ones return the ResponseBody objects.
  • Optional: if RxJava has an easy way to distinguish which call is which for each success or failure, great, if not, I'll parse the responses and figure it out myself

What I have:

        Observable<ResponseBody> api1Call = api1.fetchData();
        Observable<ResponseBody> api2Call = api2.fetchData();
        Observable<ResponseBody> api3Call = api3.fetchData();

        Observable.combineLatest(api1Call, api2Call, api3Call, new Func2<ResponseBody, ResponseBody, ResponseBody, Object>() {
            @Override
            public Object call(ResponseBody responseBody1, ResponseBody responseBody2, ResponseBody responseBody3) {
                Logger.i("what does this do? - '%s', '%s', '%s'", responseBody1, responseBody2, responseBody3);
                return null;
            }
        }).onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
            @Override
            public Observable<?> call(Throwable throwable) {
                Logger.e(throwable, "some error with one of the apis?");
                return Observable.empty();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {
                    @Override
                    public void onCompleted() {
                        Logger.i("onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Logger.e(e, "onError");
                    }

                    @Override
                    public void onNext(Object o) {
                        Logger.i("onNext " + o);
                    }
                });

The output I got:

some error with one of the apis?
// stacktrace of the error
onCompleted

I'm new to RxJava and very confused. I found some answers on StackOverflow saying zip does similar thing but it's even further from my requirements. I'm guessing one of the "combine" operators + proper exception handing will do what I need. It's just been really hard to figure it out so far

Versions I'm using:

compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1'
compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
  1. You can not achieve parallel via combineLast or zip, rxjava will execute and emit your items in sequence in my testing.

  2. If one of your task fail, your Func2#call will not get called and onError will submitted instead. You even can not get the results of other successful tasks in this way.

  3. The solution is flatMap, it's the traditional way to achieve concurrent in rxjava. It also meet your other requirements.

Here is a small but completed example.

I use a simple website service to test.

I use a Semaphore to wait for all task done, you can completely ignore it. And I add logging to the http request for better understanding, you can complete ignore it also.

public interface WebsiteService {

    @GET
    Observable<ResponseBody> website(@Url String url);

}

Then I use the following to test the result with rxjava.

   HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
    loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BASIC);

    Retrofit retrofit = new Retrofit.Builder().baseUrl("https://www.google.com")
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .client(new OkHttpClient.Builder().addInterceptor(loggingInterceptor).build())
            .build();
    WebsiteService websiteService = retrofit.create(WebsiteService.class);

    final Semaphore s = new Semaphore(1);
    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    Observable<ResponseBody> first = websiteService.website("http://github.com");
    Observable<ResponseBody> second = websiteService.website("http://stackoverflow.com");
    Observable<ResponseBody> third = websiteService.website("http://notexisting.com");

    final int numberOfCalls = 3; // testing for three calls

    Observable.just(first, second, third)
            .flatMap(new Function<Observable<ResponseBody>, ObservableSource<ResponseBody>>() {
                @Override
                public ObservableSource<ResponseBody> apply(@NonNull Observable<ResponseBody> responseBodyObservable) throws Exception {
                    return responseBodyObservable.subscribeOn(Schedulers.computation());
                }
            })
            .subscribeOn(Schedulers.computation())
            .subscribe(new Observer<ResponseBody>() {

                private int currentDoneCalls = 0;

                private void checkShouldReleaseSemaphore() {
                    if (currentDoneCalls >= numberOfCalls) {
                        s.release();
                    }
                }

                @Override
                public void onSubscribe(@NonNull Disposable d) {

                }

                @Override
                public void onNext(@NonNull ResponseBody responseBody) {
                    System.out.println("Retrofit call success " + responseBody.contentType());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("Retrofit call failed " + e.getMessage());
                    synchronized (this) {
                        currentDoneCalls++;
                    }
                    checkShouldReleaseSemaphore();
                }

                @Override
                public void onComplete() {
                    System.out.println("onComplete, All request success");
                    checkShouldReleaseSemaphore();
                }

            });

    try {
        s.acquire();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        System.out.println("All request done");
        s.release();
    }

I use rxjava2 and retrofit adapter-rxjava2 for testing.

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'
compile 'com.squareup.okhttp3:logging-interceptor:3.8.1'

Updated

The introduction page of RxJava2 from github has pointed out the practical way to implement paralellism.

Practically, paralellism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this...

Although this example is based on RxJava2, the operation flatMap is already existing in RxJava.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...