rxjava and a synchronous main thread
Jul 24, 2016 · 5 minute read · Commentscode
androidrxjava
not too long ago, someone pointed me to this blog post about keeping your main thread synchronous. it referenced Ray Ryan’s excellent talk about the matter. the talk and blog post lead me to a set of investigations, which prompted me to write this blog post.
quick note: why keep the main thread synchronous
if you are curious as to the kinds of problems that can occur if the main thread isn’t synchronous (or why it’s not synchronous when you use observeOn
), see this post about the main thread for a good explanation.
the summary is that everytime you use handler.post()
, you post something to be run later, and you don’t have any guarantees as to when it will be run. a common case is when you schedule something to update the ui, but before it actually runs, an onDestroy
comes in, causing the code to update the ui after the destruction of the activity.
in rx, specifically, observeOn(AndroidSchedulers.mainThread())
causes a handler.sendMessageDelayed
(see LooperScheduler.java), which could cause code to run at a point after we are thought to have unsubscribed, thus causing issues.
basic rules of rxjava threading
in many of the talks about rxjava1, we find a set of repeated rules about rxjava and threading:
- rxjava is single threaded by default unless you explicitly ask it otherwise
subscribeOn
only affects upstream- only the
subscribeOn
closest to the source matters observeOn
only affects downstream
while these are all true, there are a few minor points that were not immediately obvious to me early on about the first three, so i would like to elaborate a bit on those.
rxjava is single threaded by default
as long as you do not use observeOn
, subscribeOn
, or an operator that runs on a particular scheduler (ex timer
), the callback will be receieved on the thread subscribe
happened on.
subscribeOn only affects upstream
one subtle point - consider:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.subscribe(integer -> {
Log.d(TAG, "got value on " + Thread.currentThread().getName());
});
despite subscribeOn
only affecting upstream, this will always print the result on an io thread, irrespective of the thread on which we called this code. this is because subscribeOn
subscribes to the observable on the thread passed in, which means that onNext
will be called on that particular thread.
only the subscribeOn closest to the source matters
consider this code:
Observable.just(1, 2, 3)
.doOnSubscribe(() ->
Log.d(TAG, "subscribe to just on " +
Thread.currentThread().getName()))
.subscribeOn(Schedulers.io())
.filter(integer -> integer % 2 == 0)
.doOnSubscribe(() ->
Log.d(TAG, "subscribe to filter on " +
Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation())
.subscribe(integer -> {
Log.d(TAG, "got value on " + Thread.currentThread().getName());
});
running this example results in:
D/are: subscribe to filter on RxComputationScheduler-1
D/are: subscribe to just on RxIoScheduler-2
D/are: got value on RxIoScheduler-2
if, however, we changed doOnSubscribe
with doOnNext
in the code block above, we’d instead get:
D/are: onNext from just with RxIoScheduler-2
D/are: onNext from just with RxIoScheduler-2
D/are: onNext from filter with RxIoScheduler-2
D/are: got value on RxIoScheduler-2
D/are: onNext from just with RxIoScheduler-2
the caveat here is that the subscribeOn
closest to the source is the one that determines which thread onNext
will get called on (but subscriptions still happen on the thread specified by subscribeOn
).
the reason for this is that each subscribeOn
subscribes to the upstream observable on that particular thread.
let’s take an example - given:
Observable.just(1,2,3)
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.computation())
.subscribe();
let’s break down what happens:
-
Observable.just(1,2,3)
internally callsObservable.create(new OnSubscribeFromArray())
, so we have anObservable
which will callOnSubscribeFromArray
on subscribe. -
we then call
subscribeOn(Schedulers.io())
, which callsObservable.create(new OperatorSubscribeOn(this, scheduler))
, wherethis
is theObservable
from above, andscheduler
isio
. In other words, we now have anObservable
, which has anonSubscribe
that willsubscribe
to theObservable
from the step above on theio
thread. -
we then call
subscribeOn(Schedulers.computation())
, which callsObservable.create(new OperatorSubscribeOn(this, scheduler))
, wherethis
is theObservable
from above, andscheduler
iscomputation
. -
finally, we call
subscribe
, which callsObservable.subscribe
, which calls the method fromOperatorSubscribeOn
from point 3 - thissubscribe
s to theObservable
from point 2 on the computation thread. Ultimately, this causes theOperatorSubscribeOn
from point 2 to be called, which then callssubscribe
to theObservable
from point 1 on theio
thread.OnSubscribeFromArray
produces values on the same thread, thus causing all the items to be emitted on theio
thread.
see the source for subscribeOn
for more details.
running things in parallel with rxjava
in order to run things in parallel, we use flatMap
or concatMap
, with multiple observers that can then subscribeOn
whatever scheduler they want to. the difference between concatMap
and flatMap
is that flatMap
can emit items out of order, whereas concatMap
will always emit items in order.
so what does this do? flatMap
is essentially a merge
, which “combines multiple Observables into one by merging their emissions”2. note that the observable contract stipulates that “Observables must issue notifications to observers serially (not in parallel).” this means that onNext
will not be called concurrently, and part of merge
’s job is to make sure that onNext
is only called by one thread at a time.
for more on this, see Thomas Nield’s article about achieving parallelization, and also, see David Karnok’s article about FlatMap.
special thanks to Michael Evans for proofreading this.
-
both of the aforementioned talks are definitely worth watching if you haven’t already seen them! ↩︎
-
quote from rx merge docs ↩︎