Wednesday, June 5, 2019

Learning RxJava: Thoughts on Chapter 6


Concurrency and Parallelization

Simply put, in order to utilize the full power and speed of the CPU, you need to be able to run things concurrently. Concurrency (also called multithreading) is essentially multitasking, or performing more than one thing at the same time. The modern CPU has multiple cores, and the only way to utilize multiple cores is through multithreading. Without multithreading, you are limited to a single core.

There are a number of gotchas around concurrency, which RxJava tries to smooth out and make simple. For instance, there's a fair amount of overhead in creating a thread, so a best practice in multithreading is instead to have a thread pool, where threads are previously created, and when a thread is needed, it is pulled out of this pool, and when it is no longer needed, it is returned to this pool instead of being destroyed. RxJava strives to make this process simple through the use of Schedulers.

Schedulers

Schedulers are essentially predefined thread pool managers, with there being multiple different managers for the different types of tasks to be performed.

Computation

The computation scheduler is designed around computation heavy tasks; the kind of tasks that would require the full use of the CPU core. These tasks usually focus around math, algorithms, or complex logic. To handle this, the computation scheduler limits the number of threads in its thread pool based off of the processor count available to the JVM.

IO

The io scheduler is generally used for waiting on devices or protocols that are slow, where a fair amount of what the thread needs to do is sit around and wait for a response. This is commonly the case with disk read operations or calls over a network. To handle these, the io scheduler will try to match the number of threads in it's pool to the number of tasks that are needing a thread. To do this, it will dynamically grow or shrink its thread pool.

New Thread

The new thread scheduler is straightforward enough. It will create a new thread for each Observer, and then destroy the thread when it is done. There is no thread pool. This is useful for the every once in a while situation where it makes sense to have an individual thread for a very specific purpose.

Single

The single scheduler has a single thread in its pool. This can be useful "to isolate fragile, non-threadsafe operations to a single thread."

Trampoline

"In practicality, you will not invoke [the trampoline scheduler] often as it is used primarily in RxJava's internal implementation. [...] It is just like the default scheduler on the immediate thread, but it prevents cases of recursive scheduling where a task schedules a task while on the same thread. Instead of causing a stack overflow error, it will allow the current task to finish and then execute that new scheduled task afterward."

ExecutorService

You can build a custom scheduler off of an ExecutorService. This will allow you to have fine grained control over the thread pool and the rules used to govern it. This is useful for those circumstances where the defaults don't fit your needs.

Starting and Stopping Schedulers

Each of the default schedulers is lazily instantiated. At any point in time you can call shutdown() on any of them to immediately stop its threads, or call Schedulers.shutdown() to stop them all at once. After that, you can call start() to start any of them back up, or Schedulers.start() to start them all back up at once.

Using Schedulers

To use schedulers, you will utilize the subscribeOn(), observeOn(), and unsubscribeOn() methods.

subscribeOn()

The subscribeOn() method is used to suggest which scheduler should be used to begin the Observable chain. The placement of the subscribeOn() method in the chain has no effect, since in each case it will suggest that the Observable chain be started off on the specified scheduler, but in general best practice is to keep it as close to the source Observable as possible. Note that subscribeOn() will not work with certain Observable factories, such as Obsevable.interval. In such cases, these factories will have a method overload that will allow you to specify the scheduler directly to the factory.

observeOn()

The observeOn() method is used to switch to a different scheduler at that point in the Observable chain, such that the part above the observeOn() call will be run on one thread, and the part below will be run on a different thread. The observeOn() serves as a bridge, moving data from one thread to the next.

This can be particularly useful in applications that have a dedicated UI thread, allowing you to switch off of the UI thread to perform computations, then switch back to the UI thread to update the user interface. In this was you can keep the UI from freezing.

unsubscribeOn()

If unsubscribing from an observable is particularly costly, the unsubscribeOn() method can be used to specify that the unsubscribe code be run on a different thread. This can be useful in situations where, for instance, database connections need to be closed on unsubscribe.

Parallelization with flatMap()

If you have a lot of emissions that you need to send through some intensive or time consuming operations and you'd like to speed up the process by performing these operations across multiple threads, you can do so by taking advantage of flatMap(). FlatMap merges multiple Observables together, and it's designed to work even if those Observables are on different threads.

So, to run these operations in parallel, take each emission (or group of emissions) and flatMap it to an Observable that uses either subscribeOn or observeOn to move it to another thread. It's as simple as that.

Major Takeaways

One of the things that makes Rx extremely powerful is the ability to take sequential code and in just one or two lines transform it into code that runs on a separate thread or runs mutiple threads in parallel. It greatly simplifies concurrency and parallelization.