Wednesday, May 15, 2019

Learning RxJava: Thoughts on Chapter 2


The Observable and the Observer

The two most fundamental components in Rx are the observable and the observer. "For a given Observable<T>,  it pushes items (called emissions) of type T through a series of operators until it finally arrives at a final Observer, which consumes the items."

The observable works by passing three types of events: onNext, onError, and onComplete (note in RxJava 1.0, it's onCompleted), and the observer works by intercepting these three types of events. A very basic example of this is as follows:

Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Side Note: RxJava 1

Note that in the Learning RxJava book, it states that in RxJava 1.0, you would need to use the fromEmitter method, like so:

Observable.fromEmitter(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Forth");
    emitter.onNext("Fifth");
    emitter.onCompleted();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}, Emitter.BackpressureMode.BUFFER).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

And this was accurate for the version of RxJava 1.0 that is listed in the book, namely 1.2.3. But in version 1.2.7, a create method was added to mirror RxJava 2.0, and the fromEmitter method was marked as deprecated, and then subsequently removed in version 1.3.0. In the latest version of RxJava 1.0, this example should be written as follows:

Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Forth");
    emitter.onNext("Fifth");
    emitter.onCompleted();
  } catch (Throwable e) {
    emitter.onError(e);
  }
}, Emitter.BackpressureMode.BUFFER).subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Operations are Observables and Observers

Now this example doesn't really do anything interesting, since it's often in the operations between the observable and observer where the interesting stuff happens, but one interesting note that the book does point out is that each intermediate operator is both an observer and an observable. In essence, the following code:

Observable.<String>create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
})
    .map(String::length)
    .subscribe(
        next -> System.out.println("Next: " + next),
        error -> System.out.println("Error: " + error),
        () -> System.out.println("Complete"));

Could be written as follows:

Observable<String> obs = Observable.create(emitter -> {
  try {
    emitter.onNext("First");
    emitter.onNext("Second");
    emitter.onNext("Third");
    emitter.onNext("Fourth");
    emitter.onNext("Fifth");
    emitter.onComplete();
  } catch (Throwable e) {
    emitter.onError(e);
  }
});

Observable<Integer> mapStringToLength = Observable.create(emitter -> {
  obs.subscribe(
      next -> emitter.onNext(next.length()),
      error -> emitter.onError(error),
      () -> emitter.onComplete());
});

mapStringToLength.subscribe(
    next -> System.out.println("Next: " + next),
    error -> System.out.println("Error: " + error),
    () -> System.out.println("Complete"));

Now, that's not to say that we should be doing this. It just makes for an interesting angle to think about it from.

Observers and Disposables

At this point the book takes a detour to talk about observers, though it does save the topic of disposables until the end of the chapter. I'll talk about observers and disposables both here so to keep my thoughts together.

An observer essentially is just the termination point for handling onNext, onError and onComplete events. New to RxJava 2 is the onSubscribe method in observers. this method gives you access to the disposable. The disposable gives the observer a way to unsubscribe from the observable early, which is important for observables of infinite length. The onSubscribe method allows you to expose the disposable to the onNext, onError, and onComplete methods for use there.

If you don't pass an observable to the subscribe method, but instead use lambda methods, then the disposable is returned by the subscribe method.

In RxJava 1, things work a bit different. An observable is instead called a subscriber, and the disposable is instead called a subscription. the subscriber does not have an onSubscribe method, and as such using the subscription in the onNext, onError, or onCompleted methods isn't straightforward, though it can still be done. See the following example for how this can be done in RxJava 1:

public class RxJava1Samples {

  public static void main(String[] args) {
    System.out.println("Start");
    MySubscriber subscriber = new MySubscriber();
    Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(subscriber);
    subscriber.setSubscription(subscription);
    sleep(10000);
    System.out.println("Finish");
  }

  private static void sleep(int millis) {
    try {
      Thread.sleep(millis);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static class MySubscriber extends Subscriber {

      private Subscription subscription;

      @Override
      public void onCompleted() {
        System.out.println("Complete");
      }

      @Override
      public void onError(Throwable e) {
        e.printStackTrace();
      }

      @Override
      public void onNext(Long next) {
        System.out.println("Next: " + next);
        if (next > 5 && subscription != null) {
          subscription.unsubscribe();
        }
      }

      private void setSubscription(Subscription subscription) {
        this.subscription = subscription;
      }
  }
}

If this code is run it gives the following output:

Start
Next: 0
Next: 1
Next: 2
Next: 3
Next: 4
Next: 5
Next: 6
Finish

Cold Observables vs Hot Observables

The book next jumps into cold observables vs hot observables. In essense, a cold observable doesn't send any emissions until it is subscribed to, and it will send the full set of emissions to each subscriber, regardless of when it subscribed. Hot observables, on the other hand, start sending emissions immediately, and doesn't repeat emissions for late subscribers.

It would seem that frequently cold observables are used for data and hot observables are used for events, though that's not always the case and it seems that there can be certain advantages for using hot observables with data in certain situations. This will be covered more in depth in a later chapter, and as such I don't feel a need to dig too much into it here. Though it's worth noting that a cold observable can be changed to a hot observable with a call to the publish method.

Other Observable Factories and Types

At this point the book covers a number of other observable factories. I'll give a quick summary for each.

Observable.create(): Used at the beginning of this article for illustrative purposes, but it honestly doesn't need to be used very often because other observable factories are usually sufficient and much more succinct. But if there isn't an observable factory that fits your needs, this can be used.

Observable.just(): Just emits what it's given. the Observable.create from the beginning of this article could be rewrittent as follows: Observable.just("First", "Second", "Third", "Fourth", "Fifth").

Observable.range() :  Perhaps the most important thing to note here is that the parameters are start and count, not first and last, so range(5, 10) will emit: 5, 6, 7, 8, 9, 10, 11, 12, 13 ,14.

Observable.interval(): Emits at a specified interval. This runs in a separate thread, but is still a cold observable, which means if you have multiple subscribers, the intervals at which emissions are sent to the subscribers will not be in sync unless you first make it into a hot observable.

Observable.future(): Easy way to convert futures from other libraries into observables.

Observable.empty(): Emits nothing. Immediately calls on complete. Useful for various scenarios.

Observable.never(): Emits nothing. Doesn't complete. Mainly used for testing. Would be useful for testing timeouts when waiting on resources, I imagine.

Observable.error(): Emits the error that you give it. Mainly used for testing. Useful for testing exception handling.

Observable.defer(): Good for pulling in state changes between calls from different observers.

Observable.fromCallable(): Good for dealing with preemptive code that could through an exception and you want to keep the exception in the observable chain.

Some specialized observables are Single, Maybe, and Completable. a single will have one emission, and then call onComplete. A maybe will have either 0 or 1 emissions, and then call onComplete.  Completable will run a process that doesn't produce an emission, and then call onComplete.

Note on RxJava 1 to RxJava 2 Upgrade

RxJava 1 allows nulls. RxJava 2 does not. This will probably be one of the hardest parts with an upgrade. But something to keep from making the problem worse is to choose not to use nulls in RxJava 1.