Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
149 views
in Technique[技术] by (71.8m points)

c# - ObserveOn and SubscribeOn - where the work is being done

Based on reading this question: What's the difference between SubscribeOn and ObserveOn

ObserveOn sets where the code is in the Subscribe handler is is executed:

stream.Subscribe(_ => { // this code here });

The SubscribeOn method sets which thread the setup of the stream is done on.

I'm led to understand that if these aren't explicitly set, then the TaskPool is used.

Now my question is, lets say I do something like this:

Observable.Interval(new Timespan(0, 0, 1))
          .Where(t => predicate(t))
          .SelectMany(t => lots_of(t))
          .ObserveOnDispatcher()
          .Subscribe(t => some_action(t));

Where are the Where predicate and SelectMany lots_of being executed, given that some_action is being executed on the dispatcher?

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

There's a lot of misleading info out there about SubscribeOn and ObserveOn.

Summary

  • SubscribeOn intercepts calls to the single method of IObservable<T>, which is Subscribe, and calls to Dispose on the IDisposable handle returned by Subscribe.
  • ObserveOn intercepts calls to the methods of IObserver<T>, which are OnNext, OnCompleted & OnError.
  • Both methods cause the respective calls to be made on the specified scheduler.

Analysis & Demonstrations

The statement

ObserveOn sets where the code in the Subscribe handler is executed:

is more confusing than helpful. What you are referring to as the "Subscribe handler" is really an OnNext handler. Remember, the Subscribe method of IObservable accepts an IObserver that has OnNext, OnCompleted and OnError methods, but it is extension methods that provide the convenience overloads that accept lambdas and build an IObserver implementation for you.

Let me appropriate the term though; I think of the "Subscribe handler" being the code in the observable that is invoked when Subscribe is called. In this way, the description above more closely resembles the purpose of SubscribeOn.

SubscribeOn

SubscribeOn causes the Subscribe method of an observable to be executed asynchronously on the specified scheduler or context. You use it when you don't want to call the Subscribe method on an observable from whatever thread you are running on - typically because it can be long-running and you don't want to block the calling thread.

When you call Subscribe, you are calling an observable that may be part of a long chain of observables. It's only the observable that SubscribeOn is applied to that it effects. Now it may be the case that all the observables in the chain will be subscribed to immediately and on the same thread - but it doesn't have to be the case. Think about Concat for example - that only subscribes to each successive stream once the preceding stream has finished, and typically this will take place on whatever thread the preceding stream called OnCompleted from.

So SubscribeOn sits between your call to Subscribe and the observable you are subscribing to, intercepting the call and making it asynchronous.

It also affects disposal of subscriptions. Subscribe returns an IDisposable handle which is used to unsubscribe. SubscribeOn ensures calls to Dispose are scheduled on the supplied scheduler.

A common point of confusion when trying to understand what SubscribeOn does is that the Subscribe handler of an observable may well call OnNext, OnCompleted or OnError on this same thread. However, its purpose is not to affect these calls. It's not uncommon for a stream to complete before the Subscribe method returns. Observable.Return does this, for example. Let's take a look.

If you use the Spy method I wrote, and run the following code:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

You get this output (thread id may vary of course):

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

You can see that the entire subscription handler ran on the same thread, and finished before returning.

Let's use SubscribeOn to run this asynchronously. We will Spy on both the Return observable and the SubscribeOn observable:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");

This outputs (line numbers added by me):

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.

01 - The main method is running on thread 1.

02 - the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - the SubscribeOn observable is evaluated on the calling thread.

04 - Now finally we call the Subscribe method of SubscribeOn.

05 - The Subscribe method completes asynchronously...

06 - ... and thread 1 returns to the main method. This is the effect of SubscribeOn in action!

07 - Meanwhile, SubscribeOn scheduled a call on the default scheduler to Return. Here it is received on thread 2.

08 - And as Return does, it calls OnNext on the Subscribe thread...

09 - and SubscribeOn is just a pass through now.

10,11 - Same for OnCompleted

12 - And last of all the Return subscription handler is done.

Hopefully that clears up the purpose and effect of SubscribeOn!

ObserveOn

If you think of SubscribeOn as an interceptor for the Subscribe method that passes the call on to a different thread, then ObserveOn does the same job, but for the OnNext, OnCompleted and OnError calls.

Recall our original example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");

Which gave this output:

Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned

Now lets alter this to use ObserveOn:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");

We get the following output:

01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2

01 - The main method is running on Thread 1.

02 - As before, the Return observable is evaluated on the calling thread. We're just getting the IObservable here, nothing is subscribing yet.

03 - The ObserveOn observable is evaluated on the calling thread too.

04 - Now we subscribe, again on the calling thread, first to the ObserveOn observable...

05 - ... which then passes the call through to the Return observable.

06 - Now Return calls OnNext in its Subscribe handler.

07 - Here is the effect of ObserveOn. We can see that the OnNext is scheduled asynchronously on Thread 2.

08 - Meanwhile Return calls OnCompleted on Thread 1...

09 - And Return's subscription handler completes...

10 - and then so does ObserveOn's subscription handler...

11 - so control is returned to the main method

12 - Meanwhile, ObserveOn has shuttled Return's OnCompleted call this over to Thread 2. This could have happened at any time during 09-11 because it is running asynchronously. Just so happens it's finally called now.

What are the typical use cases?

You will most often see SubscribeOn used in a GUI when you need to Subscribe to a long running observable and want to get off the dispatcher thread as soon as possible - maybe because you know it's one of those observables that does all it's work in the subscription handler. Apply it at the end of the observable chain, because this is the first observable called when you subscribe.

You will most often see ObserveOn used in a GUI when you want to ensure OnNext, OnCompleted and OnError calls are marshalled back to the dispatcher thread. Apply it at the end of the observable chain to transition back as late as possible.

Hopefully you can see that the answer to your question is that ObserveOnDispatcher won't make any difference to the threads that Where and SelectMany are executed on - it all depends what thread stream is calling them from! stream's subscription handler will be invoked on the calling thread, but it's impossible to say where Where and SelectMany will run without knowing how stream is implemented.

Observables with lifetimes that outlive the Subscribe call

Up until now, we've been looking exclusively at Observable.Return. Return completes its stream within the Subscribe handler. That's not atypical, but it's equally common for streams to outlive the Subscribe handler. Look at Observable.Timer for example:

Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");

This returns the following:

Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2

You can clearly see the subscription to complete and then OnNext and OnCompleted</c


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...