Wednesday, May 22, 2019

Learning Rxjava: Thoughts on Chapter 3


Basic Operators

Chapter 3 covers basic operators, or the operators that don't involve combining multiple observables together. It categorizes these operators into the categories Suppressing, Transforming, Reducing, Collection, Error Recovery, and Action. And while it make mention of a couple of the operations for combining observables, by and large that information is saved for the next chapter.

An interesting note that this chapter makes before jumping into the different types of operators is that "You should strive to execute as much logic as possible using RxJava operators," and that when you do so, "you can easily leverage the benefits of reactive programming, such as lower memory usage, flexible concurrency, and disposability."

Suppressing Operators

Suppressing operators will prevent certain emission from continuing down the chain, while letting the others through, unchanged.

filter()

The filter function takes for it's one parameter a lambda that takes incoming emission as a parameter and outputs a boolean value. If the boolean value returned is true, the emission is sent on. If it is false, the emission is prevented from continuing.

In RxJava 2, filtering an Observable with return an Observable, filtering a Single will return a Maybe, and filtering a Maybe will return a Maybe.
Observable<string> filterResult =
    Observable.just("first", "second", "third", "fourth", "fifth")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first
third
fifth
Maybe<String> filterResult = Single.just("first")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first
Maybe<String> filterResult = Maybe.just("first")
    .filter(s -> s.length() == 5);
filterResult.subscribe(System.out::println);

Output:
first

In RxJava 1, filtering an Observable will return an Observable, but since there is no Maybe, a Single can't be filtered directly, but must be first converted to an Observable. Note that this applies to most of the suppressing operators, and so I won't make further mention on it unless it is different.
Observable.just("first", "second", "third", "fourth", "fifth")
    .filter(s -> s.length() == 5)
    .subscribe(System.out::println);

Output:
first
third
fifth
Single.just("first").toObservable()
    .filter(s -> s.length() == 5)
    .subscribe(System.out::println);

Output:
first

take()

The book states that take has two overloads. In recent version of both RxJava 1 and 2, there are now three overloads. Take also only applies to Observables, and can't be called on Singles or Maybes, for both RxJava 1 and 2.

The first overload has a single long parameter called count, and for count value x, it will pass on the first x emissions and then call onComplete.
Observable.just("first", "second", "third", "fourth", "fifth")
    .take(3)
    .subscribe(System.out::println);

Output:
first
second
third
The second overload has a long parameter for time, and a TimeUnit parameter. It will pass on all emissions in the specified time period, and then call onComplete.
public static void main(String[] args) {
  Observable.interval(1, TimeUnit.SECONDS)
      .take(3, TimeUnit.SECONDS)
      .subscribe(System.out::println);

  sleep(5000);
}

private static void sleep(int millis) {
  try {
    Thread.sleep(millis);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

Output:
0
1
2
The third overload is like the second, but gives a third parameter for specifying a scheduler. Since schedulers haven't been covered by the book yet, I won't cover this here.

takeLast()

takeLast is similar to take, but will instead take from the end of the emissions instead of from the beginning. Under the hood, this means that it hangs onto all the emissions until the onComplete call is made, and then chooses the last x emissions to send on. This could potentially result in high memory usage and/or long delays, depending on the emissions coming through. There are also a number of extra overloads that will allow a count and a time period to be used together, options around delaying errors, and the option to give a hint at what the buffer size will need to be. This shouldn't be used with observables that never call onComplete.

skip()

skip works the same as take, but flips what is kept and what is removed. Past that it has the same options and overloads.
Observable.just("first", "second", "third", "fourth", "fifth")
    .skip(3)
    .subscribe(System.out::println);

Output:
fourth
fifth
public static void main(String[] args) {
  Observable.interval(1, TimeUnit.SECONDS)
      .skip(3, TimeUnit.SECONDS)
      .subscribe(System.out::println);

  sleep(5000);
}

private static void sleep(int millis) {
  try {
    Thread.sleep(millis);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

Output:
3
4

skipLast()

skipLast is to skip what takeLast is to take.

takeWhile()

takeWhile is a variant on take that takes in a lambda that returns a boolean. So long as each emission is true, each emission will be sent on. As soon as an emission comes through that causes the lambda to return false, onComplete is called, and no other emissions will be sent, regardless of whether or not they would've passed through the lambda successfully.

skipWhile()

skipWhile also takes a lambda that returns a boolean. So long as each emission is true, the emissions will be skipped, until an emission is encountered that comes through as false. Then all emissions from that point on are sent, without regard for the lambda condition.

takeUntil()

takeUntil is like takeWhile, but uses an observable instead. It will pass emissions through up until the point where the observable emits it's own emission, and which point takeUntil calls onComplete and sends no more emissions.

skipUntil()

Like takeUntil, skipUntil uses an observable, but it won't send any emissions until the observable emits something, after which it will send on all emissions.

distinct()

distinct will send each unique emission and filter out subsequent duplicates, based on the equals method for the object, or, if a lambda is supplied, according to the equals method for whatever object the lambda returns. To do this, it must keep a running list of what distinct emissions have already come through, and as such could become memory intensive, depending on the number of emissions. Note that in RxJava 2 there's an extra option that allows you to specify what kind of container the unique emission keys are kept in that is not present in RxJava 1.

distinctUntilChanged()

distinctUntilChanged is essentially the same as distinct, except it only keeps track of one key for comparison, that being the latest one. So the emissions 1,1,2,2,3,3,1,1 would come out as 1,2,3,1.

elementAt()

elementAt is essentially an index lookup into the emissions sent. It'll emit the one at that index and no others. If the index is out of bounds, it will send no emissions and just call onComplete.

You can supply a default emission for when it is out of bounds. Note that in RxJava 1, this is done with a separate method called elementAtOrDefault, whereas in RxJava 2, it's an overloaded option for the elementAt method.

Also, RxJava 2 supports an elementAtOrError method, which RxJava 1 doesn't have. This method allows you to throw an error if the index is out of bounds, instead of having no emissions.

With both RxJava 1 and 2, this call can only be made on an Observable. In RxJava 1, it will return an Observable. In RxJava 2, the basic elementAt returns a Maybe, whereas if it is given a default value or if elementAtOrError is used, then the return type is Single.

Transforming Operators

Transforming operators will take the emissions and transform them (either each of them individually or all of them as a whole) to something else according to some rule.

map()

map does a one-to-one conversion of each emission based off the lambda passed in.

cast()

cast is really just a specialized type of map. cast(String.class) is essentially shorthand for map(obj -> (String) obj).

startWith()

startWith allows you to append an initial emission to an Observable, to be sent out before the rest of the emissions that will come through that Observable.

startWithArray()

startWithArray works the same as startWith, but allows you to specify multiple starting emissions instead of just one.

defaultIfEmpty()

If an Observable emits no emissions before calling on complete (an empty observable), defaultIfEmpty will allow for this empty observable to be replaced with an observable with a single emission, specified by the input value to the method.

switchIfEmpty()

switchIfEmpty is similar to defaultIfEmpty in that it will replace an empty observable with another observable, but in the case of switchIfEmpty, the observable to be used instead is specified completely by the input parameter, and as such, it could be an observable with multiple emissions, if that's what you need.

sorted()

sorted will take all the emissions and sort them before sending them on. It will do this according to the default Comparator, or according to a custom Comparator or lambda that you pass in. Note that this requires collecting all emissions until the previous Observer calls onComplete, then sorting the emissions and sending them on. This could potentially mean high memory usage and delays.

delay()

This will delay each emission by a specified amount of time.

delaySubscription()

This will delay subscribing to the observable, as opposed to delaying each emission.

repeat()

When the observable that repeat is subscribed to calls onComplete, repeat will resubscribe to the observable. You can specify how many times it should repeat. If no parameter is given, this will continue indefinitely.

repeatUntil()

Similar to repeat, but takes a Boolean Subscriber lambda, and will continue to repeat until the lambda returns true.

scan()

scan is essentially the same as reduce, but instead of waiting for the onComplete event and returning a single emission based off of all previous emissions, it will do a running total, of sorts. This is very useful for Observables that never complete.

Reducing Operators

Reducing operators will take all emissions and combine them into one emission. Think summing numbers to get a total or counting all emissions.

count()

This counts the number of emissions and sends on a single emission with that total.

reduce()

reduce is used to combine all incoming emissions into a single emission. For instance:
Observable.just(1, 2, 3, 4, 5)
    .reduce((total, number) -> total + number)
    .subscribe(System.out::println);

Output:
15
Note that if you want the output to be of a different type than the inputs, then you'll need to supply a starting value. Compare the two following examples:
Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce((total, word) -> total + word.length())
    .subscribe(System.out::println);

Output:
first6565
Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce(0, (total, word) -> total + word.length())
    .subscribe(System.out::println);

Output:
27
Also, when working with a mutable variable, like a list or collection, collect should be used instead of reduce. Otherwise, you can run into issues with multiple subscribers. Here's a comparison of reduce vs collect in this scenario:
Single<ArrayList<String>> listObs = 
    Observable.just("first", "second", "third", "fourth", "fifth")
    .reduce(new ArrayList<>(), (list, value) -> {
      list.add(value);
      return list;
    });

listObs.subscribe(System.out::println);
listObs.subscribe(System.out::println);

Output:
[first, second, third, fourth, fifth]
[first, second, third, fourth, fifth, first, second, third, fourth, fifth]
Single<ArrayList<String>> listObs2 = 
    Observable.just("first", "second", "third", "fourth", "fifth")
    .collect(ArrayList::new, (list, value) -> list.add(value));

listObs2.subscribe(System.out::println);
listObs2.subscribe(System.out::println);

Output:
[first, second, third, fourth, fifth]
[first, second, third, fourth, fifth]

all()

all will return a single emission of either true or false. It will be true if the lambda returns true for all the incoming emissions, otherwise, it will be false. If all is called on an empty observable, it will emit true.

any()

any will return a single emission of either true or false. It will be true if the lambda returns true for any of the incoming emissions, otherwise, it will be false. If any is called on an empty observable, it will emit false.

contains()

contains will return true if any of the incoming emissions matches the given value.

Collection Operators

Collection Operators are actually a specific type of reduce operator, but there's enough of them to warrant pulling them out into their own group. Like reducing operators, these operators take multiple emissions and reduce them down to one emission, but in the case of collection operators, this one emission is a collection of everything (or most everything) that was emitting by the incoming emissions, whether that be a list, map, set, or some other type of collection.

toList()

Takes the incoming emissions and puts them in a list, and then emits that list. The default list type used is ArrayList, but any Object that implements the list interface can be given instead.

toSortedList()

This works the same as toList, but it additionally will sort the emissions. A custom comparator or lambda can be specified for the sorting.

toMap()

This will take all emissions and put them into a map and emit that map. The key is determined by a lambda function. The value is either the emission itself, or specified by a lambda function. The default map implementation used is the HashMap, but any object that implements the map interface can be used. If two emissions have the same key, the later one will overwrite the former.

toMultiMap()

Works similarly to toMap, but the multiple emissions can match to the same key without overwriting each other. The values are put into lists.

collect()

collect is primarily used to collect emissions into a collection, particularly for any collection that doesn't have a more straightforward method available. See reduce for a comparison between reduce and collect.

Error Recovery Operators

Error recovery operators are just that, operators that allow you to recover from an error.

onErrorReturn()

If an error is emitted, onErrorReturn can be used to intercept that error, and then use the given lambda to supply an emission to be sent on.

onErrorReturnItem()

This works similarly to onErrorReturn, but instead of using a lambda, you simply specify the object that you want emitted in place of the error.

onErrorResumeNext()

This works similarly to the two previous, but instead you can supply an observable that will be used in place of the error.

retry()

If an error is received, this will resubscribe to the observable. It will reattempt the number of times specified. If the number of retries isn't given, it will retry indefinitely until it is successful.

retryUntil()

retryUntil is like retry, but a lambda can be supplied to determine whether or not to retry.

retryWhen()

This supports advanced composition that will all things such as delaying retries.

Action Operators

Action operators are useful for when you want to cause a side effect. One particularly useful way to use these operators is when debugging and you need to get some insight into how things look at a particular point in the chain.

doOnNext()

This allows a lambda to be called on each emission. This lambda does not have a return value, and the emissions sent on by the doOnNext are the same emissions that it receives. This is useful for creating side effects or for debugging.

doOnComplete()

This allows a lambda to be called when the onComplete is called in this part of the chain.

doOnError()

This allows for a lamdba to be called if the onError is called in this part of the chain. Useful for debugging and determining where an error originates from.

doOnEach()

doOnEach takes in three lambdas, one  for onNext, one for onError, and one for onComplete. It's essentially the doOnNext, doOnError, and doOnComplete all wrapped into one.

doOnTerminate()

This allows you to give a single lambda that you want executed on either onComplete or onError.

doOnSubscribe()

This allows you to give a single lambda that you want executed on the initial subscription.

doOnDispose()

If you unsubscribe from an observable before it completes, this will allow you to call a lambda on that event.

doFinally()

This is like doOnTerminate, but also takes into account the dispose/unsubscribe event.

doOnSuccess()

This is specific to the Single and Maybe types, and should be treated similarly to the doOnNextEvent.