Tuesday, May 28, 2019

Learning RxJava: Thoughts on Chapter 4


Merge

Merge is used to combine the emissions from multiple Observables into one Observable.
Observable.merge(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3")
).subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
At first glance it seems that merge maintains the order of the observables, but that is not guaranteed. For simple Observables it will maintain order simply because of how it is implemented under the hood, but it is not part of it's contract. Take the following example:
Observable.merge(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2
Obs 2: 9
Here, the emissions are coming through in the order that they arrive, which intermixes emissions from Observable 1 and Observable 2. As such, if you care about Observable order, then you should use concat instead, but if you don't care about Observable order, then merge is the best choice.

Note that merge can only merge up to 4 Observables. If you need to do more than that, then use mergeArray:
Observable.mergeArray(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3"),
    Observable.just("a", "b", "c"),
    Observable.just("i", "ii", "iii"),
    Observable.just("x", "y", "z")
).subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
a
b
c
i
ii
iii
x
y
z
Merging can also be done with an operator instead of a factory by using mergeWith.
Observable.just("First", "Second", "Third")
    .mergeWith(Observable.just("Alpha", "Beta", "Delta"))
    .mergeWith(Observable.just("A", "B", "C"))
    .mergeWith(Observable.just("1", "2", "3"))
    .subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
Observable.interval(1, TimeUnit.SECONDS)
    .map(t -> "Obs 1: " + t)
    .mergeWith(Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
    ).subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2

Flatmap

flatMap works in the same way that merge does, except it determines what Observables to merge and how many based off of the incoming emissions. For instance, I can recreate the merge examples from above by doing the following:
List<Observable<String>> list = asList(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3"));

Observable.range(0, 4)
    .flatMap(i -> list.get(i))
    .subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
List<Observable<String>> list = asList(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t));

Observable.range(0, 2)
    .flatMap(i-> list.get(i))
    .subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2
Obs 2: 9
Take note that it handles Observable ordering in the same way that merge does. If you care about Observable order, then concatMap should be used instead.

Concat

Concat works similarly to merge, but it waits until onComplete is called on the first Observable before moving on to the next, and so on. Note that if there is an infinite Observable, it will never move on to the Observable following it.
Observable.concat(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
Obs 1: 0
Obs 1: 1
Obs 1: 2
But if I take this example and modify it so that the first Observable isn't infinite, we can see that it maintains order, as expected:
Observable.concat(
    Observable.interval(1, TimeUnit.SECONDS)
        .map(t -> "Obs 1: " + t)
        .take(2),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
bs 1: 0
Obs 1: 1
Obs 2: 0
Obs 2: 1
Obs 2: 2
You should also note that it didn't start Observable 2 until after Observable 1 had finished. Though if we modify Observable 2 and make it a hot observable:
Observable<String> obs1 = Observable.interval(1, TimeUnit.SECONDS)
    .map(t -> "Obs 1: " + t)
    .take(2);
ConnectableObservable<String> obs2 = 
    Observable.interval(300, TimeUnit.MILLISECONDS)
    .map(t -> "Obs 2: " + t)
    .publish();
Observable.concat(obs1, obs2)
    .subscribe(System.out::println);
obs2.connect();

sleep(3000);

Output:
Obs 1: 0
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 2: 9
You'll notice that it missed the first 5 emissions of Observable 2. So while it will wait to start following Observables, it won't cache emissions, and anything that comes while it is waiting for the first Observable to finish will be lost.

Just like merge has mergeArray and mergeWith alternatives, there is also a concatArray and a concatWith. Syntactically, they are similar to mergeArray and mergeWith, but with the concat functionality under the hood. There is also concatMap, which is the concat equivalent to flatMap.

Ambiguous

The amb method (stands for ambiguous) waits for one of the observables to send an emission, and whichever Observable is first, it will then only send on that Observable's emissions. Also, just like the other methods here, there are ambArray and ambWith alternatives.

Zip

The zip method is used to combine emissions from multiple Observables into joint emissions on a single Observable. Note that the number of emissions sent out by a zip will be the same as the number of emissions sent by the Observable with the least number of emissions. This means that if you have three Observables, and the first one emits a, b, c, and the second one emits 1, 2, 3, 4, 5, and the third one emits i, ii, iii, iv, and you zip these three together, then the zip will send out emissions with the combinations (a, 1, i), (b, 2, ii), and (c, 3, iii), while the emissions 4 and 5 on the second Observable and the emission iv on the third Observable will be ignored.

Just like the other methods here, there are zipArray and zipWith alternatives.

CombineLatest

combineLatest works similarly to zip, except that for each emission that comes through on any observable will cause the combineLatest to send an emission. and that emission will be a combination of the latest emissions of all Observables. Note that it won't emit anything until all Observables have emitted at least once. This is frequently useful when dealing with UI elements. For example, let's say that we had a simple UI with a checkbox and a dropdown menu with three options (1, 2, 3). Let's say the checkbox defaults to uncheck and the dropdown defaults to 1. We could wrap each element in an Observable and then use combineLatest. The first emission would combine (unchecked, 1). If a user then changes the dropdown value to 2, combineLatest would then emit (unchecked, 2). If the checkbox was then checked, it would emit (checked, 2), and so on.

There isn't a combineLatestArray method, but there is an overload for combineLatest that will take in a list of Observables. The with operator equivalent is instead called withLatestFrom.

Grouping

The groupBy method allows you to take emissions and group them into GroupedObserables. The GroupedObserable has ready access to the key that you grouped on, so as you're processing the emissions in each GroupedObservable, you can easily reference the key as need be.