There's a lot of misleading info out there about SubscribeOn
and ObserveOn
.
Summary
SubscribeOn
intercepts calls to the single method of IObservable<T>
, which is Subscribe
, and calls to Dispose
on the IDisposable
handle returned by Subscribe
.
ObserveOn
intercepts calls to the methods of IObserver<T>
, which are OnNext
, OnCompleted
& OnError
.
- Both methods cause the respective calls to be made on the specified scheduler.
Analysis & Demonstrations
The statement
ObserveOn sets where the code in the Subscribe handler is
executed:
is more confusing than helpful. What you are referring to as the "Subscribe handler" is really an OnNext
handler. Remember, the Subscribe
method of IObservable
accepts an IObserver
that has OnNext
, OnCompleted
and OnError
methods, but it is extension methods that provide the convenience overloads that accept lambdas and build an IObserver
implementation for you.
Let me appropriate the term though; I think of the "Subscribe handler" being the code in the observable that is invoked when Subscribe
is called. In this way, the description above more closely resembles the purpose of SubscribeOn
.
SubscribeOn
SubscribeOn
causes the Subscribe
method of an observable to be executed asynchronously on the specified scheduler or context. You use it when you don't want to call the Subscribe
method on an observable from whatever thread you are running on - typically because it can be long-running and you don't want to block the calling thread.
When you call Subscribe
, you are calling an observable that may be part of a long chain of observables. It's only the observable that SubscribeOn
is applied to that it effects. Now it may be the case that all the observables in the chain will be subscribed to immediately and on the same thread - but it doesn't have to be the case. Think about Concat
for example - that only subscribes to each successive stream once the preceding stream has finished, and typically this will take place on whatever thread the preceding stream called OnCompleted
from.
So SubscribeOn
sits between your call to Subscribe
and the observable you are subscribing to, intercepting the call and making it asynchronous.
It also affects disposal of subscriptions. Subscribe
returns an IDisposable
handle which is used to unsubscribe. SubscribeOn
ensures calls to Dispose
are scheduled on the supplied scheduler.
A common point of confusion when trying to understand what SubscribeOn
does is that the Subscribe
handler of an observable may well call OnNext
, OnCompleted
or OnError
on this same thread. However, its purpose is not to affect these calls. It's not uncommon for a stream to complete before the Subscribe
method returns. Observable.Return
does this, for example. Let's take a look.
If you use the Spy method I wrote, and run the following code:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
You get this output (thread id may vary of course):
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
You can see that the entire subscription handler ran on the same thread, and finished before returning.
Let's use SubscribeOn
to run this asynchronously. We will Spy on both the Return
observable and the SubscribeOn
observable:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
This outputs (line numbers added by me):
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
01 - The main method is running on thread 1.
02 - the Return
observable is evaluated on the calling thread. We're just getting the IObservable
here, nothing is subscribing yet.
03 - the SubscribeOn
observable is evaluated on the calling thread.
04 - Now finally we call the Subscribe
method of SubscribeOn
.
05 - The Subscribe
method completes asynchronously...
06 - ... and thread 1 returns to the main method. This is the effect of SubscribeOn in action!
07 - Meanwhile, SubscribeOn
scheduled a call on the default scheduler to Return
. Here it is received on thread 2.
08 - And as Return
does, it calls OnNext
on the Subscribe
thread...
09 - and SubscribeOn
is just a pass through now.
10,11 - Same for OnCompleted
12 - And last of all the Return
subscription handler is done.
Hopefully that clears up the purpose and effect of SubscribeOn
!
ObserveOn
If you think of SubscribeOn
as an interceptor for the Subscribe
method that passes the call on to a different thread, then ObserveOn
does the same job, but for the OnNext
, OnCompleted
and OnError
calls.
Recall our original example:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Which gave this output:
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Now lets alter this to use ObserveOn
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
We get the following output:
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
01 - The main method is running on Thread 1.
02 - As before, the Return
observable is evaluated on the calling thread. We're just getting the IObservable
here, nothing is subscribing yet.
03 - The ObserveOn
observable is evaluated on the calling thread too.
04 - Now we subscribe, again on the calling thread, first to the ObserveOn
observable...
05 - ... which then passes the call through to the Return
observable.
06 - Now Return
calls OnNext
in its Subscribe
handler.
07 - Here is the effect of ObserveOn
. We can see that the OnNext
is scheduled asynchronously on Thread 2.
08 - Meanwhile Return
calls OnCompleted
on Thread 1...
09 - And Return
's subscription handler completes...
10 - and then so does ObserveOn
's subscription handler...
11 - so control is returned to the main method
12 - Meanwhile, ObserveOn
has shuttled Return
's OnCompleted
call this over to Thread 2. This could have happened at any time during 09-11 because it is running asynchronously. Just so happens it's finally called now.
What are the typical use cases?
You will most often see SubscribeOn
used in a GUI when you need to Subscribe
to a long running observable and want to get off the dispatcher thread as soon as possible - maybe because you know it's one of those observables that does all it's work in the subscription handler. Apply it at the end of the observable chain, because this is the first observable called when you subscribe.
You will most often see ObserveOn
used in a GUI when you want to ensure OnNext
, OnCompleted
and OnError
calls are marshalled back to the dispatcher thread. Apply it at the end of the observable chain to transition back as late as possible.
Hopefully you can see that the answer to your question is that ObserveOnDispatcher
won't make any difference to the threads that Where
and SelectMany
are executed on - it all depends what thread stream is calling them from! stream's subscription handler will be invoked on the calling thread, but it's impossible to say where Where
and SelectMany
will run without knowing how stream
is implemented.
Observables with lifetimes that outlive the Subscribe call
Up until now, we've been looking exclusively at Observable.Return
. Return
completes its stream within the Subscribe
handler. That's not atypical, but it's equally common for streams to outlive the Subscribe
handler. Look at Observable.Timer
for example:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
This returns the following:
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
You can clearly see the subscription to complete and then OnNext
and OnCompleted</c