Tuesday, May 28, 2019

Learning RxJava: Thoughts on Chapter 5


Multicasting

Multicasting is sending the same emissions to multiple observers. This done through the use of hot observables, such as the ConnectableObservable returned from the publish method. When using hot observables, there is a key point that needs to be understood, and that is that a hot observable does not stay hot with subsequent calls in the chain. This means that if the publish method is called in the middle of a call chain, then the emissions up to the point where the publish call was made will be shared between all observers, but any further changes or transformations to the emissions past the publish point will redone for each observer, even if they're sharing the same code.

Generally speaking, it is best to keep Observables cold, unless you have a reason to make them hot. Reasons to do this include "to increase performance, reducing memory and CPU usage, or simply because your business logic requires pushing the same emissions to all Observers."

A ConnectableObservable won't start sending emissions until the connect call is made, And you'll typically want to make sure that all Observers are subscribed before calling connect so that the Observers don't miss any emissions. There are a number of convenience methods to make connecting more straightforward under the right circumstances.

autoConnect()

The autoConnect method will automatically connect after the specified number of Observers have subscribed. It defaults to one if no number is given. You can also specify zero if you want it to connect immediately without waiting for subscribers. Also note that autoConnect will persist it's connection, even when there are no subscribers, and it will not resubscribe to the Observable if it is disposed.

refCount() and share()

refCount() is similar to autoConnect(1), but with the difference that when there are no more Observers, it will dispose of itself, and resubscribe if future Observers subscribe, essentially starting over for the new subscriber. The share() method is essentially a shortcut to writing publish().refCount().

replay() and cache()

The replay operator allows you to store emissions within certain parameters so that they can be replayed for future Observers. replay() with no parameters specified will store all emissions, and for future Observers it will immediately replay all emissions until it is caught up, and then start sending it emissions as they come. This has the potential to be very memory intensive.

If you only care about the last handful of emissions, you can specify a buffer size or a time window or both.

the cache() operator is essentially just a shortcut for replay().autoConnect(). It is meant for storing all emissions indefinitely.

Subjects

Subjects are an advanced tool that really shouldn't be used unless you've ruled out other possibilities first. They are a mix between an Observer and an Observable, and are frequently used for bridging the gap between reactive and imperitive programming. Another use case is to "use Subjects to eagerly subscribe to an unknown number of multiple source Observables and consolidate their emissions as a single Observable." There are also some possible use cases in decoupling.

A subject can be subscribed to by Observers, and it can subscribe to other Observables, and you can manually call onNext(), onError(), and onComplete() on them. Note that the first Observable to call onComplete on the Subject will cease further emissions from other Observables, as well. It is also important to note that the onSubscribe(), onNext, onError() and onComplete() methods on a Subject are not threadsafe. If you're not careful, you could easily "break the Observable contract, which demands that emissions happen sequentially."

Learning RxJava: Thoughts on Chapter 4


Merge

Merge is used to combine the emissions from multiple Observables into one Observable.
Observable.merge(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3")
).subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
At first glance it seems that merge maintains the order of the observables, but that is not guaranteed. For simple Observables it will maintain order simply because of how it is implemented under the hood, but it is not part of it's contract. Take the following example:
Observable.merge(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2
Obs 2: 9
Here, the emissions are coming through in the order that they arrive, which intermixes emissions from Observable 1 and Observable 2. As such, if you care about Observable order, then you should use concat instead, but if you don't care about Observable order, then merge is the best choice.

Note that merge can only merge up to 4 Observables. If you need to do more than that, then use mergeArray:
Observable.mergeArray(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3"),
    Observable.just("a", "b", "c"),
    Observable.just("i", "ii", "iii"),
    Observable.just("x", "y", "z")
).subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
a
b
c
i
ii
iii
x
y
z
Merging can also be done with an operator instead of a factory by using mergeWith.
Observable.just("First", "Second", "Third")
    .mergeWith(Observable.just("Alpha", "Beta", "Delta"))
    .mergeWith(Observable.just("A", "B", "C"))
    .mergeWith(Observable.just("1", "2", "3"))
    .subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
Observable.interval(1, TimeUnit.SECONDS)
    .map(t -> "Obs 1: " + t)
    .mergeWith(Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
    ).subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2

Flatmap

flatMap works in the same way that merge does, except it determines what Observables to merge and how many based off of the incoming emissions. For instance, I can recreate the merge examples from above by doing the following:
List<Observable<String>> list = asList(
    Observable.just("First", "Second", "Third"),
    Observable.just("Alpha", "Beta", "Delta"),
    Observable.just("A", "B", "C"),
    Observable.just("1", "2", "3"));

Observable.range(0, 4)
    .flatMap(i -> list.get(i))
    .subscribe(System.out::println);

Output:
First
Second
Third
Alpha
Beta
Delta
A
B
C
1
2
3
List<Observable<String>> list = asList(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t));

Observable.range(0, 2)
    .flatMap(i-> list.get(i))
    .subscribe(System.out::println);

sleep(3000);

Output:
Obs 2: 0
Obs 2: 1
Obs 2: 2
Obs 1: 0
Obs 2: 3
Obs 2: 4
Obs 2: 5
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 1: 2
Obs 2: 9
Take note that it handles Observable ordering in the same way that merge does. If you care about Observable order, then concatMap should be used instead.

Concat

Concat works similarly to merge, but it waits until onComplete is called on the first Observable before moving on to the next, and so on. Note that if there is an infinite Observable, it will never move on to the Observable following it.
Observable.concat(
    Observable.interval(1, TimeUnit.SECONDS).map(t -> "Obs 1: " + t),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
Obs 1: 0
Obs 1: 1
Obs 1: 2
But if I take this example and modify it so that the first Observable isn't infinite, we can see that it maintains order, as expected:
Observable.concat(
    Observable.interval(1, TimeUnit.SECONDS)
        .map(t -> "Obs 1: " + t)
        .take(2),
    Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(t -> "Obs 2: " + t)
).subscribe(System.out::println);

sleep(3000);

Output:
bs 1: 0
Obs 1: 1
Obs 2: 0
Obs 2: 1
Obs 2: 2
You should also note that it didn't start Observable 2 until after Observable 1 had finished. Though if we modify Observable 2 and make it a hot observable:
Observable<String> obs1 = Observable.interval(1, TimeUnit.SECONDS)
    .map(t -> "Obs 1: " + t)
    .take(2);
ConnectableObservable<String> obs2 = 
    Observable.interval(300, TimeUnit.MILLISECONDS)
    .map(t -> "Obs 2: " + t)
    .publish();
Observable.concat(obs1, obs2)
    .subscribe(System.out::println);
obs2.connect();

sleep(3000);

Output:
Obs 1: 0
Obs 1: 1
Obs 2: 6
Obs 2: 7
Obs 2: 8
Obs 2: 9
You'll notice that it missed the first 5 emissions of Observable 2. So while it will wait to start following Observables, it won't cache emissions, and anything that comes while it is waiting for the first Observable to finish will be lost.

Just like merge has mergeArray and mergeWith alternatives, there is also a concatArray and a concatWith. Syntactically, they are similar to mergeArray and mergeWith, but with the concat functionality under the hood. There is also concatMap, which is the concat equivalent to flatMap.

Ambiguous

The amb method (stands for ambiguous) waits for one of the observables to send an emission, and whichever Observable is first, it will then only send on that Observable's emissions. Also, just like the other methods here, there are ambArray and ambWith alternatives.

Zip

The zip method is used to combine emissions from multiple Observables into joint emissions on a single Observable. Note that the number of emissions sent out by a zip will be the same as the number of emissions sent by the Observable with the least number of emissions. This means that if you have three Observables, and the first one emits a, b, c, and the second one emits 1, 2, 3, 4, 5, and the third one emits i, ii, iii, iv, and you zip these three together, then the zip will send out emissions with the combinations (a, 1, i), (b, 2, ii), and (c, 3, iii), while the emissions 4 and 5 on the second Observable and the emission iv on the third Observable will be ignored.

Just like the other methods here, there are zipArray and zipWith alternatives.

CombineLatest

combineLatest works similarly to zip, except that for each emission that comes through on any observable will cause the combineLatest to send an emission. and that emission will be a combination of the latest emissions of all Observables. Note that it won't emit anything until all Observables have emitted at least once. This is frequently useful when dealing with UI elements. For example, let's say that we had a simple UI with a checkbox and a dropdown menu with three options (1, 2, 3). Let's say the checkbox defaults to uncheck and the dropdown defaults to 1. We could wrap each element in an Observable and then use combineLatest. The first emission would combine (unchecked, 1). If a user then changes the dropdown value to 2, combineLatest would then emit (unchecked, 2). If the checkbox was then checked, it would emit (checked, 2), and so on.

There isn't a combineLatestArray method, but there is an overload for combineLatest that will take in a list of Observables. The with operator equivalent is instead called withLatestFrom.

Grouping

The groupBy method allows you to take emissions and group them into GroupedObserables. The GroupedObserable has ready access to the key that you grouped on, so as you're processing the emissions in each GroupedObservable, you can easily reference the key as need be.

Wednesday, May 22, 2019

Learning Rxjava: Thoughts on Chapter 3


Basic Operators

Chapter 3 covers basic operators, or the operators that don't involve combining multiple observables together. It categorizes these operators into the categories Suppressing, Transforming, Reducing, Collection, Error Recovery, and Action. And while it make mention of a couple of the operations for combining observables, by and large that information is saved for the next chapter.

An interesting note that this chapter makes before jumping into the different types of operators is that "You should strive to execute as much logic as possible using RxJava operators," and that when you do so, "you can easily leverage the benefits of reactive programming, such as lower memory usage, flexible concurrency, and disposability."

Suppressing Operators

Suppressing operators will prevent certain emission from continuing down the chain, while letting the others through, unchanged.

filter()

The filter function takes for it's one parameter a lambda that takes incoming emission as a parameter and outputs a boolean value. If the boolean value returned is true, the emission is sent on. If it is false, the emission is prevented from continuing.

In RxJava 2, filtering an Observable with return an Observable, filtering a Single will return a Maybe, and filtering a Maybe will return a Maybe.
Observable<string> filterResult =
    Observable.just("first", "second", "third", "fourth", "fifth")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first
third
fifth
Maybe<String> filterResult = Single.just("first")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first
Maybe<String> filterResult = Maybe.just("first")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first

In RxJava 1, filtering an Observable will return an Observable, but since there is no Maybe, a Single can't be filtered directly, but must be first converted to an Observable. Note that this applies to most of the suppressing operators, and so I won't make further mention on it unless it is different.
Observable.just("first", "second", "third", "fourth", "fifth")
    .filter(s -> s.length() == 5)
    .subscribe(System.out::println);

Output:
first
third
fifth
Single.just("first").toObservable()
    .filter(s -> s.length() == 5)
    .subscribe(System.out::println);

Output:
first

take()

The book states that take has two overloads. In recent version of both RxJava 1 and 2, there are now three overloads. Take also only applies to Observables, and can't be called on Singles or Maybes, for both RxJava 1 and 2.

The first overload has a single long parameter called count, and for count value x, it will pass on the first x emissions and then call onComplete.
Observable.just("first", "second", "third", "fourth", "fifth")
    .take(3)
    .subscribe(System.out::println);

Output:
first
second
third
The second overload has a long parameter for time, and a TimeUnit parameter. It will pass on all emissions in the specified time period, and then call onComplete.
public static void main(String[] args) {
  Observable.interval(1, TimeUnit.SECONDS)
      .take(3, TimeUnit.SECONDS)
      .subscribe(System.out::println);

  sleep(5000);
}

private static void sleep(int millis) {
  try {
    Thread.sleep(millis);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

Output:
0
1
2
The third overload is like the second, but gives a third parameter for specifying a scheduler. Since schedulers haven't been covered by the book yet, I won't cover this here.

takeLast()

takeLast is similar to take, but will instead take from the end of the emissions instead of from the beginning. Under the hood, this means that it hangs onto all the emissions until the onComplete call is made, and then chooses the last x emissions to send on. This could potentially result in high memory usage and/or long delays, depending on the emissions coming through. There are also a number of extra overloads that will allow a count and a time period to be used together, options around delaying errors, and the option to give a hint at what the buffer size will need to be. This shouldn't be used with observables that never call onComplete.

skip()

skip works the same as take, but flips what is kept and what is removed. Past that it has the same options and overloads.
Observable.just("first", "second", "third", "fourth", "fifth")
    .skip(3)
    .subscribe(System.out::println);

Output:
fourth
fifth
public static void main(String[] args) {
  Observable.interval(1, TimeUnit.SECONDS)
      .skip(3, TimeUnit.SECONDS)
      .subscribe(System.out::println);

  sleep(5000);
}

private static void sleep(int millis) {
  try {
    Thread.sleep(millis);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

Output:
3
4

skipLast()

skipLast is to skip what takeLast is to take.

takeWhile()

takeWhile is a variant on take that takes in a lambda that returns a boolean. So long as each emission is true, each emission will be sent on. As soon as an emission comes through that causes the lambda to return false, onComplete is called, and no other emissions will be sent, regardless of whether or not they would've passed through the lambda successfully.

skipWhile()

skipWhile also takes a lambda that returns a boolean. So long as each emission is true, the emissions will be skipped, until an emission is encountered that comes through as false. Then all emissions from that point on are sent, without regard for the lambda condition.

takeUntil()

takeUntil is like takeWhile, but uses an observable instead. It will pass emissions through up until the point where the observable emits it's own emission, and which point takeUntil calls onComplete and sends no more emissions.

skipUntil()

Like takeUntil, skipUntil uses an observable, but it won't send any emissions until the observable emits something, after which it will send on all emissions.

distinct()

distinct will send each unique emission and filter out subsequent duplicates, based on the equals method for the object, or, if a lambda is supplied, according to the equals method for whatever object the lambda returns. To do this, it must keep a running list of what distinct emissions have already come through, and as such could become memory intensive, depending on the number of emissions. Note that in RxJava 2 there's an extra option that allows you to specify what kind of container the unique emission keys are kept in that is not present in RxJava 1.

distinctUntilChanged()

distinctUntilChanged is essentially the same as distinct, except it only keeps track of one key for comparison, that being the latest one. So the emissions 1,1,2,2,3,3,1,1 would come out as 1,2,3,1.

elementAt()

elementAt is essentially an index lookup into the emissions sent. It'll emit the one at that index and no others. If the index is out of bounds, it will send no emissions and just call onComplete.

You can supply a default emission for when it is out of bounds. Note that in RxJava 1, this is done with a separate method called elementAtOrDefault, whereas in RxJava 2, it's an overloaded option for the elementAt method.

Also, RxJava 2 supports an elementAtOrError method, which RxJava 1 doesn't have. This method allows you to throw an error if the index is out of bounds, instead of having no emissions.

With both RxJava 1 and 2, this call can only be made on an Observable. In RxJava 1, it will return an Observable. In RxJava 2, the basic elementAt returns a Maybe, whereas if it is given a default value or if elementAtOrError is used, then the return type is Single.

Transforming Operators

Transforming operators will take the emissions and transform them (either each of them individually or all of them as a whole) to something else according to some rule.

map()

map does a one-to-one conversion of each emission based off the lambda passed in.

cast()

cast is really just a specialized type of map. cast(String.class) is essentially shorthand for map(obj -> (String) obj).

startWith()

startWith allows you to append an initial emission to an Observable, to be sent out before the rest of the emissions that will come through that Observable.

startWithArray()

startWithArray works the same as startWith, but allows you to specify multiple starting emissions instead of just one.

defaultIfEmpty()

If an Observable emits no emissions before calling on complete (an empty observable), defaultIfEmpty will allow for this empty observable to be replaced with an observable with a single emission, specified by the input value to the method.

switchIfEmpty()

switchIfEmpty is similar to defaultIfEmpty in that it will replace an empty observable with another observable, but in the case of switchIfEmpty, the observable to be used instead is specified completely by the input parameter, and as such, it could be an observable with multiple emissions, if that's what you need.

sorted()

sorted will take all the emissions and sort them before sending them on. It will do this according to the default Comparator, or according to a custom Comparator or lambda that you pass in. Note that this requires collecting all emissions until the previous Observer calls onComplete, then sorting the emissions and sending them on. This could potentially mean high memory usage and delays.

delay()

This will delay each emission by a specified amount of time.

delaySubscription()

This will delay subscribing to the observable, as opposed to delaying each emission.

repeat()

When the observable that repeat is subscribed to calls onComplete, repeat will resubscribe to the observable. You can specify how many times it should repeat. If no parameter is given, this will continue indefinitely.

repeatUntil()

Similar to repeat, but takes a Boolean Subscriber lambda, and will continue to repeat until the lambda returns true.

scan()

scan is essentially the same as reduce, but instead of waiting for the onComplete event and returning a single emission based off of all previous emissions, it will do a running total, of sorts. This is very useful for Observables that never complete.

Reducing Operators

Reducing operators will take all emissions and combine them into one emission. Think summing numbers to get a total or counting all emissions.

count()

This counts the number of emissions and sends on a single emission with that total.

reduce()

reduce is used to combine all incoming emissions into a single emission. For instance:
Observable.just(1, 2, 3, 4, 5)
    .reduce((total, number) -> total + number)
    .subscribe(System.out::println);

Output:
15
Note that if you want the output to be of a different type than the inputs, then you'll need to supply a starting value. Compare the two following examples:
Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce((total, word) -> total + word.length())
    .subscribe(System.out::println);

Output:
first6565
Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce(0, (total, word) -> total + word.length())
    .subscribe(System.out::println);

Output:
27
Also, when working with a mutable variable, like a list or collection, collect should be used instead of reduce. Otherwise, you can run into issues with multiple subscribers. Here's a comparison of reduce vs collect in this scenario:
Single<ArrayList<String>> listObs = 
    Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce(new ArrayList<>(), (list, value) -> {
      list.add(value);
      return list;
    });

listObs.subscribe(System.out::println);
listObs.subscribe(System.out::println);

Output:
[first, second, third, fourth, fifth]
[first, second, third, fourth, fifth, first, second, third, fourth, fifth]
Single<ArrayList<String>> listObs2 = 
    Observable.just("first", "second", "third", "fourth", "fifth")
    .collect(ArrayList::new, (list, value) -> list.add(value));

listObs2.subscribe(System.out::println);
listObs2.subscribe(System.out::println);

Output:
[first, second, third, fourth, fifth]
[first, second, third, fourth, fifth]

all()

all will return a single emission of either true or false. It will be true if the lambda returns true for all the incoming emissions, otherwise, it will be false. If all is called on an empty observable, it will emit true.

any()

any will return a single emission of either true or false. It will be true if the lambda returns true for any of the incoming emissions, otherwise, it will be false. If any is called on an empty observable, it will emit false.

contains()

contains will return true if any of the incoming emissions matches the given value.

Collection Operators

Collection Operators are actually a specific type of reduce operator, but there's enough of them to warrant pulling them out into their own group. Like reducing operators, these operators take multiple emissions and reduce them down to one emission, but in the case of collection operators, this one emission is a collection of everything (or most everything) that was emitting by the incoming emissions, whether that be a list, map, set, or some other type of collection.

toList()

Takes the incoming emissions and puts them in a list, and then emits that list. The default list type used is ArrayList, but any Object that implements the list interface can be given instead.

toSortedList()

This works the same as toList, but it additionally will sort the emissions. A custom comparator or lambda can be specified for the sorting.

toMap()

This will take all emissions and put them into a map and emit that map. The key is determined by a lambda function. The value is either the emission itself, or specified by a lambda function. The default map implementation used is the HashMap, but any object that implements the map interface can be used. If two emissions have the same key, the later one will overwrite the former.

toMultiMap()

Works similarly to toMap, but the multiple emissions can match to the same key without overwriting each other. The values are put into lists.

collect()

collect is primarily used to collect emissions into a collection, particularly for any collection that doesn't have a more straightforward method available. See reduce for a comparison between reduce and collect.

Error Recovery Operators

Error recovery operators are just that, operators that allow you to recover from an error.

onErrorReturn()

If an error is emitted, onErrorReturn can be used to intercept that error, and then use the given lambda to supply an emission to be sent on.

onErrorReturnItem()

This works similarly to onErrorReturn, but instead of using a lambda, you simply specify the object that you want emitted in place of the error.

onErrorResumeNext()

This works similarly to the two previous, but instead you can supply an observable that will be used in place of the error.

retry()

If an error is received, this will resubscribe to the observable. It will reattempt the number of times specified. If the number of retries isn't given, it will retry indefinitely until it is successful.

retryUntil()

retryUntil is like retry, but a lambda can be supplied to determine whether or not to retry.

retryWhen()

This supports advanced composition that will all things such as delaying retries.

Action Operators

Action operators are useful for when you want to cause a side effect. One particularly useful way to use these operators is when debugging and you need to get some insight into how things look at a particular point in the chain.

doOnNext()

This allows a lambda to be called on each emission. This lambda does not have a return value, and the emissions sent on by the doOnNext are the same emissions that it receives. This is useful for creating side effects or for debugging.

doOnComplete()

This allows a lambda to be called when the onComplete is called in this part of the chain.

doOnError()

This allows for a lamdba to be called if the onError is called in this part of the chain. Useful for debugging and determining where an error originates from.

doOnEach()

doOnEach takes in three lambdas, one  for onNext, one for onError, and one for onComplete. It's essentially the doOnNext, doOnError, and doOnComplete all wrapped into one.

doOnTerminate()

This allows you to give a single lambda that you want executed on either onComplete or onError.

doOnSubscribe()

This allows you to give a single lambda that you want executed on the initial subscription.

doOnDispose()

If you unsubscribe from an observable before it completes, this will allow you to call a lambda on that event.

doFinally()

This is like doOnTerminate, but also takes into account the dispose/unsubscribe event.

doOnSuccess()

This is specific to the Single and Maybe types, and should be treated similarly to the doOnNextEvent.

Wednesday, May 15, 2019

Learning RxJava: Thoughts on Chapter 2


The Observable and the Observer

The two most fundamental components in Rx are the observable and the observer. "For a given Observable<T>,  it pushes items (called emissions) of type T through a series of operators until it finally arrives at a final Observer, which consumes the items."

The observable works by passing three types of events: onNext, onError, and onComplete (note in RxJava 1.0, it's onCompleted), and the observer works by intercepting these three types of events. A very basic example of this is as follows:

Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Side Note: RxJava 1

Note that in the Learning RxJava book, it states that in RxJava 1.0, you would need to use the fromEmitter method, like so:

Observable.fromEmitter(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Forth");
    emitter.onNext("Fifth");
    emitter.onCompleted();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}, Emitter.BackpressureMode.BUFFER).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

And this was accurate for the version of RxJava 1.0 that is listed in the book, namely 1.2.3. But in version 1.2.7, a create method was added to mirror RxJava 2.0, and the fromEmitter method was marked as deprecated, and then subsequently removed in version 1.3.0. In the latest version of RxJava 1.0, this example should be written as follows:

Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Forth");
    emitter.onNext("Fifth");
    emitter.onCompleted();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}, Emitter.BackpressureMode.BUFFER).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Operations are Observables and Observers

Now this example doesn't really do anything interesting, since it's often in the operations between the observable and observer where the interesting stuff happens, but one interesting note that the book does point out is that each intermediate operator is both an observer and an observable. In essence, the following code:

Observable.<String>create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
})
    .map(String::length)
    .subscribe(
        next -> System.out.println("Next: " + next),
        error -> System.out.println("Error: " + error),
        () -> System.out.println("Complete"));

Could be written as follows:

Observable<String> obs = Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
});

Observable<Integer> mapStringToLength = Observable.create(emitter -> {
  obs.subscribe(
      next -> emitter.onNext(next.length()),
      error -> emitter.onError(error),
      () -> emitter.onComplete());
});

mapStringToLength.subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Now, that's not to say that we should be doing this. It just makes for an interesting angle to think about it from.

Observers and Disposables

At this point the book takes a detour to talk about observers, though it does save the topic of disposables until the end of the chapter. I'll talk about observers and disposables both here so to keep my thoughts together.

An observer essentially is just the termination point for handling onNext, onError and onComplete events. New to RxJava 2 is the onSubscribe method in observers. this method gives you access to the disposable. The disposable gives the observer a way to unsubscribe from the observable early, which is important for observables of infinite length. The onSubscribe method allows you to expose the disposable to the onNext, onError, and onComplete methods for use there.

If you don't pass an observable to the subscribe method, but instead use lambda methods, then the disposable is returned by the subscribe method.

In RxJava 1, things work a bit different. An observable is instead called a subscriber, and the disposable is instead called a subscription. the subscriber does not have an onSubscribe method, and as such using the subscription in the onNext, onError, or onCompleted methods isn't straightforward, though it can still be done. See the following example for how this can be done in RxJava 1:

public class RxJava1Samples {

  public static void main(String[] args) {
    System.out.println("Start");
    MySubscriber subscriber = new MySubscriber();
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(subscriber);
    subscriber.setSubscription(subscription);
    sleep(10000);
    System.out.println("Finish");
  }

  private static void sleep(int millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static class MySubscriber extends Subscriber {

      private Subscription subscription;

      @Override
      public void onCompleted() {
        System.out.println("Complete");
      }

      @Override
      public void onError(Throwable e) {
        e.printStackTrace();
      }

      @Override
      public void onNext(Long next) {
        System.out.println("Next: " + next);
        if (next > 5 && subscription != null) {
          subscription.unsubscribe();
        }
      }

      private void setSubscription(Subscription subscription) {
        this.subscription = subscription;
      }
  }
}

If this code is run it gives the following output:

Start
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Finish

Cold Observables vs Hot Observables

The book next jumps into cold observables vs hot observables. In essense, a cold observable doesn't send any emissions until it is subscribed to, and it will send the full set of emissions to each subscriber, regardless of when it subscribed. Hot observables, on the other hand, start sending emissions immediately, and doesn't repeat emissions for late subscribers.

It would seem that frequently cold observables are used for data and hot observables are used for events, though that's not always the case and it seems that there can be certain advantages for using hot observables with data in certain situations. This will be covered more in depth in a later chapter, and as such I don't feel a need to dig too much into it here. Though it's worth noting that a cold observable can be changed to a hot observable with a call to the publish method.

Other Observable Factories and Types

At this point the book covers a number of other observable factories. I'll give a quick summary for each.

Observable.create(): Used at the beginning of this article for illustrative purposes, but it honestly doesn't need to be used very often because other observable factories are usually sufficient and much more succinct. But if there isn't an observable factory that fits your needs, this can be used.

Observable.just(): Just emits what it's given. the Observable.create from the beginning of this article could be rewrittent as follows: Observable.just("First", "Second", "Third", "Fourth", "Fifth").

Observable.range() :  Perhaps the most important thing to note here is that the parameters are start and count, not first and last, so range(5, 10) will emit: 5, 6, 7, 8, 9, 10, 11, 12, 13 ,14.

Observable.interval(): Emits at a specified interval. This runs in a separate thread, but is still a cold observable, which means if you have multiple subscribers, the intervals at which emissions are sent to the subscribers will not be in sync unless you first make it into a hot observable.

Observable.future(): Easy way to convert futures from other libraries into observables.

Observable.empty(): Emits nothing. Immediately calls on complete. Useful for various scenarios.

Observable.never(): Emits nothing. Doesn't complete. Mainly used for testing. Would be useful for testing timeouts when waiting on resources, I imagine.

Observable.error(): Emits the error that you give it. Mainly used for testing. Useful for testing exception handling.

Observable.defer(): Good for pulling in state changes between calls from different observers.

Observable.fromCallable(): Good for dealing with preemptive code that could through an exception and you want to keep the exception in the observable chain.

Some specialized observables are Single, Maybe, and Completable. a single will have one emission, and then call onComplete. A maybe will have either 0 or 1 emissions, and then call onComplete.  Completable will run a process that doesn't produce an emission, and then call onComplete.

Note on RxJava 1 to RxJava 2 Upgrade

RxJava 1 allows nulls. RxJava 2 does not. This will probably be one of the hardest parts with an upgrade. But something to keep from making the problem worse is to choose not to use nulls in RxJava 1.

Learning RxJava: Thoughts on Chapter 1

This blog post was written in context with reading chapter 1 of the Learning RxJava book by Thomas Nield. While I've tried to write it such a way be understandable on it's own, many points will still make reference to the book itself.

A Brief History

A number of years ago, object oriented programming took the programming world by storm, because it promised to help us deal with the ever growing complexity. But as time goes on, we find that while many of the principles taught in object oriented programming can be quite useful, there are a growing number of situations where they are inadequate.

In time, functional programming began to make a comeback, not to replace object oriented programming, but rather in many cases to compliment and enhance it. In particular, reactive programming, a "functional event-driven programming approach" began to grow in popularity.

While there are multiple reactive frameworks available, including Akka and Sodium, the one that we will be focusing on here is called Reactive Extensions (also known as ReactiveX or Rx). Reactive Extensions was initially developed at Microsoft by Erik Meijer for .NET, but over the years it has been ported to many other languages.

The port for Java, called RxJava, was developed primarily by Ben Christensen from Netflix and David Karnok. RxJava 1.0 was released in November 2014 and RxJava 2.0 was released in November 2016. RxJava has been the core that other JVM based Rx ports were built off of.

What is Rx?

The core idea behind Rx is that "events are data and data are events". While the world is full of objects, the objects themselves are less important than the actions they perform, and the actions performed against them. In particular, how these events act and interact when they come together tends to be where we place the greatest value. Reactive programming strives to clearly model these events.

Even static sources of data, such as a book or a music cd, can be thought of in terms of events. While the information in a book does not change, the act of reading it is an action, an event occurring between the book and the person reading it. Each word read is a stream of information flowing to the reader. Playing music from a cd works in a similar fashion. Data is read from the disk sequentially and played for the listener to hear. Rx uses this concept to treat data as events, in turn allowing us to use the same powerful tools in both cases.

Now RxJava is fairly similar to Java 8 streams, at least in syntax. So what's the major difference between these two? In essense, Java 8 streams pull, whereas RxJava observables push. Why is this important? Because both data and events can be pushed. What it means is that the observable is in control of when the data is sent. In this way it more resembles the listener pattern with a syntax similar to streams.

RxJava 1 vs RxJava 2

Where I work, we currently use RxJava 1.0, but we hope to upgrade to RxJava 2.0 at some point. As such, as I go through this material, I will be looking to cover both RxJava 1.0 and RxJava 2.0, as well as potential upgrade paths between the two. Of note in this regard, the book mention the RxJava2Interop project to help bridge between the two versions, which looks promising and will be worth exploring in more depth at a later time.

Book Review: Learning RxJava by Thomas Nield

Where I work, we use RxJava heavily, but we are limiting ourselves to only a handful of features and ignoring a great deal of what RxJava has to offer. To this end, I've chosen to go through the Learning RxJava book with the intention of better utilizing the tools at my disposal. In the following blog posts I'll be reviewing each chapter in the book and looking at how I can apply what's being taught. I intend to update this post with links to each chapter summary as I write them, to serve as a table of contents, of sorts.

Learning RxJava: Thoughts on Chapter 1

Learning RxJava: Thoughts on Chapter 2

Learning Rxjava: Thoughts on Chapter 3

Learning RxJava: Thoughts on Chapter 4

Learning RxJava: Thoughts on Chapter 5

Learning RxJava: Thoughts on Chapter 6

Learning RxJava: Thoughts on Chapter 7

Learning RxJava: Thoughts on Chapter 8

Learning RxJava: Thoughts on Chapter 9

Learning RxJava: Thoughts on Chapter 10

Learning RxJava: Thoughts on Chapter 11

Learning RxJava: Thoughts on Chapter 12