Mark Elston (132) [Avatar] Offline
#1
I just finished with section 10.1.2 and am a bit confused. It seems to me that there isn't any real difference between the use of schedulers in the Observable creation functions discussed in this section and the use of subscribeOn discussed earlier in the book. Don't they both do the same thing?
Tamir Dresher (35) [Avatar] Offline
#2
Hi Mark,

Thanks for the question.
The answer is that it depends on the observable/operator implementation.
You see, when an operator allows you to pass a scheduler, it means that it will introduce concurrency sometime in the future. but it's not necessarily at the time of the subscription.
Take for example the Interval operator which by default uses the DefaultScheduler (which can have different implementations for differnt platforms)
it doesn't matter if you subscribe to the observable it creates using the ImmediateScheduler, it will still emit each emission on a different thread (when running on windows)
Observable.Interval(TimeSpan.FromSeconds(0.5), 	DefaultScheduler.Instance)
    .Take(5)
    .SubscribeOn(ImmediateScheduler.Instance)
    .Subscribe(x => Console.WriteLine("1: "+Thread.CurrentThread.ManagedThreadId));

This code produced the following output when i ran it in LINQPad
1: 19
1: 10
1: 15
1: 32
1: 20


But, if you pass the ImmediateScheduler to the Interval operator, then, no matter what scheduler you used to subscribe to the observable, the emissions will always be on the same thread. And yes, in this case, this will the thread on which the subscription took place, but that is only because we used the ImmediateScheduler and not something else
Observable.Interval(TimeSpan.FromSeconds(0.5), ImmediateScheduler.Instance)
    .Take(5)
    .SubscribeOn(DefaultScheduler.Instance)
    .Subscribe(x => Console.WriteLine("2: "+Thread.CurrentThread.ManagedThreadId));


output:
2: 28
2: 28
2: 28
2: 28
2: 28
Mark Elston (132) [Avatar] Offline
#3
Thanks for the explanation. I think I will have to see more examples to really get a feel for when to use one approach versus the other. I have a feeling the implications should be obvious, but they just aren't, yet.
Rohit (7) [Avatar] Offline
#4
Mark Elston (132) [Avatar] Offline
#5
Actually, now that I look at your example a bit more closely I am even more confused. My understanding was that SubscribeOn instructs the observable to do its work using a given scheduler, regardless of what it would normally use. Why does providing an explicit Scheduler prevent this from happening?
Rohit (7) [Avatar] Offline
#6
See http://stackoverflow.com/questions/15341864/what-are-the-default-schedulers-for-each-observable-operator


All TimeBasedOperations use the Default scheduler if one is not provided, SubscribeOn yields nothing but additional overhead for such operators

Buffer, Delay, DelaySubscription, Generate, Interval, Sample, Skip, SkipLast
SkipUntil, Take, TakeLast, TakeLastBuffer, TakeUntil, Throttle, TimeInterval,
Timeout, Timer, Timestamp, Window



Apologies if i am cross posting from another book but i found the following quote very useful
Reactive Programming with RxJava

"Observable should only be responsible for production logic, whereas it is only the client code that can make judicious decision about concurrency. Remember that Observable is lazy but also immutable, in the sense that subscribeOn() affects only downstream subscribers, if someone subscribes to the exact same Observable without subscribeOn() in between, no concurrency will be involved by default."

So in case of interval and above such operators, the concurrency via schedulers is the one we provide as a parameters, else it takes the platform specific default schedulers

"Once you grasp reactive extensions and begin using it on large scale, the value of subscribeOn() diminishes. In entirely reactive software stacks, as found for example at Netflix , subscribeOn() is almost never used, yet all Observables are asynchronous. Most of the time Observables come from asynchronous sources and they are treated as asynchronous by default. Therefor using subscribeOn() is very limited, mostly when retrofitting existing APIs or libraries."
Mark Elston (132) [Avatar] Offline
#7
Hi Rohit. I'm not sure what your point is here. My question wasn't what the default schedulers were but why SubscribeOn had no effect when providing a scheduler to, for example, the Interval operation (as in Tamir's example).
Rohit (7) [Avatar] Offline
#8
See this

  
            var observable = Observable.Defer<int>(() =>
            {
                return Observable.Create<int>(obs =>
                {
                    Task.Run(() =>
                    {
                        obs.OnNext(1);
                        obs.OnNext(2);
                        obs.OnNext(3);
                        obs.OnCompleted();
                    });
                    return Disposable.Empty;
                });
            });
            
            
          observable.SubscribeOn(new WhateverScheduler())).Subscribe(Console.WriteLine);


- The scheduler you pass via SubscribeOn (WhateverScheduler) is the one that is used in the creation of Observable (line 4-11), so the creation code is executed in context of WhateverScheduer

- But as you can see the OnNext and OnCompleted are executed as Task on a separate thread context, which is different from WhateverScheduler's context; so no matter which scheduler you pass via SubscribeOn, it is basically ignored and has no affect.

- Likewise, When we pass a scheduler directly to Scheduler.Interval as a parameter, then it uses that scheduler for dispatching item via OnNext and OnCompleted. Any scheduler you pass via SubscribeOn to an Interval is only causing an overhead. This is why Tamir suggested earlier "it depends on the operator's implementation"

SubscribeOn is used to avoid blocking creation.
When we create an observable, we should not hardcode the scheduler it should run on, the scheduler should be passed a parameter (as with Observable.Interval). However Observable.Interval and other above listed operators are time based operations and they use a default scheduler if one is not provided, all such operators basically ignores any scheduler you pass via SubscribeOn

Please re-read the below

"Observable should only be responsible for production logic, whereas it is only the client code that can make judicious decision about concurrency. Remember that Observable is lazy but also immutable, in the sense that subscribeOn() affects only downstream subscribers, if someone subscribes to the exact same Observable without subscribeOn() in between, no concurrency will be involved by default."

"Once you grasp reactive extensions and begin using it on large scale, the value of subscribeOn() diminishes. In entirely reactive software stacks, as found for example at Netflix , subscribeOn() is almost never used, yet all Observables are asynchronous. Most of the time Observables come from asynchronous sources and they are treated as asynchronous by default. Therefor using subscribeOn() is very limited, mostly when retrofitting existing APIs or libraries."



Hope it helps?
Mark Elston (132) [Avatar] Offline
#9
Thanks, Rohit. Your example implementation helps a lot.

Regarding the quote:

... if someone subscribes to the exact same Observable without subscribeOn() in between, no concurrency will be involved by default.


This was misleading to me. The implication here is that only by passing a scheduler to subscribeOn() would we see any concurrency. But, as your example shows, that just isn't true. And that implication put the following paragraph in a different context that just didn't make any sense to me; especially the comment that:

... yet all Observables are asynchronous.


How could they be asynchronous when "no concurrency will be involved by default." In fact, many of the Observables created in the previous examples didn't show any concurrency at all. For example:

IEnumerable<string> names = new []{"Shira", "Yonatan", "Gabi", "Tamir"};
IObservable<string> observable = names.ToObservable();
observable.SubscribeConsole("names");


Hence, my confusion.

I suspect that this section might be better worded to express the intent more clearly.
Rohit (7) [Avatar] Offline
#10
This was misleading to me. The implication here is that only by passing a scheduler to subscribeOn() would we see any concurrency.


Sorry that was in the context of normal observables or the one we create, not all built in operators supports overriding concurrency as is the case with Observable.Interval.