Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
266 views
in Technique[技术] by (71.8m points)

java - My subscriber's onNext and onComplete functions do not run when I call onNext within my FlowableOnSubscribe class

In an Android project that uses RxJava 2, I create a Flowable like this in the onCreate of my initial activity:

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());

The implementation of the FlowableOnSubscribe is:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}

This is the subscriber implementation:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}

And the action implementation is:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}

In my output, I'm expecting to a log statement from onNext, but I don't see one. Instead, this is my entire output:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX?MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX?MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX?MyAction: run

This indicates that onNext never runs, and onComplete doesn't even run either. But MyAction runs successfully.

Here's what happens when I comment out the call to onNext:

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX?MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX?MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX?MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX?MySubscriber: onComplete

In this case onNext of course doesn't run, but at least onComplete runs.

I expected that I would see onComplete run in both cases, and onNext run when I call emitter.onNext. What am I doing wrong here?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You need to manually issue a request otherwise no data will be emitted when extending Subscriber directly:

@Override
public void onSubscribe(Subscription s) {
    Log.i(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
}

Alternatively, you could extend DisposableSubscriber or ResourceSubscriber.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...