Basic Operators
Chapter 3 covers basic operators, or the operators that don't involve combining multiple observables together. It categorizes these operators into the categories Suppressing, Transforming, Reducing, Collection, Error Recovery, and Action. And while it make mention of a couple of the operations for combining observables, by and large that information is saved for the next chapter.An interesting note that this chapter makes before jumping into the different types of operators is that "You should strive to execute as much logic as possible using RxJava operators," and that when you do so, "you can easily leverage the benefits of reactive programming, such as lower memory usage, flexible concurrency, and disposability."
Suppressing Operators
Suppressing operators will prevent certain emission from continuing down the chain, while letting the others through, unchanged.filter()
The filter function takes for it's one parameter a lambda that takes incoming emission as a parameter and outputs a boolean value. If the boolean value returned is true, the emission is sent on. If it is false, the emission is prevented from continuing.In RxJava 2, filtering an Observable with return an Observable, filtering a Single will return a Maybe, and filtering a Maybe will return a Maybe.
Observable<string> filterResult = Observable.just("first", "second", "third", "fourth", "fifth") .filter(s -> s.length() == 5); filterResult.subscribe(System.out::println); Output: first third fifth
Maybe<String> filterResult = Single.just("first") .filter(s -> s.length() == 5); filterResult.subscribe(System.out::println); Output: first
Maybe<String> filterResult = Maybe.just("first") .filter(s -> s.length() == 5); filterResult.subscribe(System.out::println); Output: first
In RxJava 1, filtering an Observable will return an Observable, but since there is no Maybe, a Single can't be filtered directly, but must be first converted to an Observable. Note that this applies to most of the suppressing operators, and so I won't make further mention on it unless it is different.
Observable.just("first", "second", "third", "fourth", "fifth") .filter(s -> s.length() == 5) .subscribe(System.out::println); Output: first third fifth
Single.just("first").toObservable() .filter(s -> s.length() == 5) .subscribe(System.out::println); Output: first
take()
The book states that take has two overloads. In recent version of both RxJava 1 and 2, there are now three overloads. Take also only applies to Observables, and can't be called on Singles or Maybes, for both RxJava 1 and 2.The first overload has a single long parameter called count, and for count value x, it will pass on the first x emissions and then call onComplete.
Observable.just("first", "second", "third", "fourth", "fifth") .take(3) .subscribe(System.out::println); Output: first second thirdThe second overload has a long parameter for time, and a TimeUnit parameter. It will pass on all emissions in the specified time period, and then call onComplete.
public static void main(String[] args) { Observable.interval(1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) .subscribe(System.out::println); sleep(5000); } private static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } Output: 0 1 2The third overload is like the second, but gives a third parameter for specifying a scheduler. Since schedulers haven't been covered by the book yet, I won't cover this here.
takeLast()
takeLast is similar to take, but will instead take from the end of the emissions instead of from the beginning. Under the hood, this means that it hangs onto all the emissions until the onComplete call is made, and then chooses the last x emissions to send on. This could potentially result in high memory usage and/or long delays, depending on the emissions coming through. There are also a number of extra overloads that will allow a count and a time period to be used together, options around delaying errors, and the option to give a hint at what the buffer size will need to be. This shouldn't be used with observables that never call onComplete.skip()
skip works the same as take, but flips what is kept and what is removed. Past that it has the same options and overloads.Observable.just("first", "second", "third", "fourth", "fifth") .skip(3) .subscribe(System.out::println); Output: fourth fifth
public static void main(String[] args) { Observable.interval(1, TimeUnit.SECONDS) .skip(3, TimeUnit.SECONDS) .subscribe(System.out::println); sleep(5000); } private static void sleep(int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } Output: 3 4
skipLast()
skipLast is to skip what takeLast is to take.takeWhile()
takeWhile is a variant on take that takes in a lambda that returns a boolean. So long as each emission is true, each emission will be sent on. As soon as an emission comes through that causes the lambda to return false, onComplete is called, and no other emissions will be sent, regardless of whether or not they would've passed through the lambda successfully.skipWhile()
skipWhile also takes a lambda that returns a boolean. So long as each emission is true, the emissions will be skipped, until an emission is encountered that comes through as false. Then all emissions from that point on are sent, without regard for the lambda condition.takeUntil()
takeUntil is like takeWhile, but uses an observable instead. It will pass emissions through up until the point where the observable emits it's own emission, and which point takeUntil calls onComplete and sends no more emissions.skipUntil()
Like takeUntil, skipUntil uses an observable, but it won't send any emissions until the observable emits something, after which it will send on all emissions.distinct()
distinct will send each unique emission and filter out subsequent duplicates, based on the equals method for the object, or, if a lambda is supplied, according to the equals method for whatever object the lambda returns. To do this, it must keep a running list of what distinct emissions have already come through, and as such could become memory intensive, depending on the number of emissions. Note that in RxJava 2 there's an extra option that allows you to specify what kind of container the unique emission keys are kept in that is not present in RxJava 1.distinctUntilChanged()
distinctUntilChanged is essentially the same as distinct, except it only keeps track of one key for comparison, that being the latest one. So the emissions 1,1,2,2,3,3,1,1 would come out as 1,2,3,1.elementAt()
elementAt is essentially an index lookup into the emissions sent. It'll emit the one at that index and no others. If the index is out of bounds, it will send no emissions and just call onComplete.You can supply a default emission for when it is out of bounds. Note that in RxJava 1, this is done with a separate method called elementAtOrDefault, whereas in RxJava 2, it's an overloaded option for the elementAt method.
Also, RxJava 2 supports an elementAtOrError method, which RxJava 1 doesn't have. This method allows you to throw an error if the index is out of bounds, instead of having no emissions.
With both RxJava 1 and 2, this call can only be made on an Observable. In RxJava 1, it will return an Observable. In RxJava 2, the basic elementAt returns a Maybe, whereas if it is given a default value or if elementAtOrError is used, then the return type is Single.
Transforming Operators
Transforming operators will take the emissions and transform them (either each of them individually or all of them as a whole) to something else according to some rule.map()
map does a one-to-one conversion of each emission based off the lambda passed in.cast()
cast is really just a specialized type of map. cast(String.class) is essentially shorthand for map(obj -> (String) obj).startWith()
startWith allows you to append an initial emission to an Observable, to be sent out before the rest of the emissions that will come through that Observable.startWithArray()
startWithArray works the same as startWith, but allows you to specify multiple starting emissions instead of just one.defaultIfEmpty()
If an Observable emits no emissions before calling on complete (an empty observable), defaultIfEmpty will allow for this empty observable to be replaced with an observable with a single emission, specified by the input value to the method.switchIfEmpty()
switchIfEmpty is similar to defaultIfEmpty in that it will replace an empty observable with another observable, but in the case of switchIfEmpty, the observable to be used instead is specified completely by the input parameter, and as such, it could be an observable with multiple emissions, if that's what you need.sorted()
sorted will take all the emissions and sort them before sending them on. It will do this according to the default Comparator, or according to a custom Comparator or lambda that you pass in. Note that this requires collecting all emissions until the previous Observer calls onComplete, then sorting the emissions and sending them on. This could potentially mean high memory usage and delays.delay()
This will delay each emission by a specified amount of time.delaySubscription()
This will delay subscribing to the observable, as opposed to delaying each emission.repeat()
When the observable that repeat is subscribed to calls onComplete, repeat will resubscribe to the observable. You can specify how many times it should repeat. If no parameter is given, this will continue indefinitely.repeatUntil()
Similar to repeat, but takes a Boolean Subscriber lambda, and will continue to repeat until the lambda returns true.scan()
scan is essentially the same as reduce, but instead of waiting for the onComplete event and returning a single emission based off of all previous emissions, it will do a running total, of sorts. This is very useful for Observables that never complete.Reducing Operators
Reducing operators will take all emissions and combine them into one emission. Think summing numbers to get a total or counting all emissions.count()
This counts the number of emissions and sends on a single emission with that total.reduce()
reduce is used to combine all incoming emissions into a single emission. For instance:Observable.just(1, 2, 3, 4, 5) .reduce((total, number) -> total + number) .subscribe(System.out::println); Output: 15Note that if you want the output to be of a different type than the inputs, then you'll need to supply a starting value. Compare the two following examples:
Observable.just("first", "second", "third", "fourth", "fifth") .reduce((total, word) -> total + word.length()) .subscribe(System.out::println); Output: first6565
Observable.just("first", "second", "third", "fourth", "fifth") .reduce(0, (total, word) -> total + word.length()) .subscribe(System.out::println); Output: 27Also, when working with a mutable variable, like a list or collection, collect should be used instead of reduce. Otherwise, you can run into issues with multiple subscribers. Here's a comparison of reduce vs collect in this scenario:
Single<ArrayList<String>> listObs = Observable.just("first", "second", "third", "fourth", "fifth") .reduce(new ArrayList<>(), (list, value) -> { list.add(value); return list; }); listObs.subscribe(System.out::println); listObs.subscribe(System.out::println); Output: [first, second, third, fourth, fifth] [first, second, third, fourth, fifth, first, second, third, fourth, fifth]
Single<ArrayList<String>> listObs2 = Observable.just("first", "second", "third", "fourth", "fifth") .collect(ArrayList::new, (list, value) -> list.add(value)); listObs2.subscribe(System.out::println); listObs2.subscribe(System.out::println); Output: [first, second, third, fourth, fifth] [first, second, third, fourth, fifth]