Rx is based around two fundamental types, while several others expand the functionality around the core types. Those two core types are the Observable
and the Observer
, which will be introduced in this chapter. We will also introduce Subject
s, which ease the learning curve.
Rx builds upon the Observer pattern. It is not unique in doing so. Event handling already exists in Java (e.g. JavaFX's EventHandler). Those are simpler approaches, which suffer in comparison to Rx:
- Events through event handlers are hard to compose.
- They cannot be queried over time
- They can lead to memory leaks
- These is no standard way of signaling completion.
- Require manual handling of concurrency and multithreading.
Observable is the first core element that we will see. This class contains a lot of the implementation of Rx, including all of the core operators. We will be examining it step by step throughout this book. For now, we must understand the Subscribe
method. Here is one key overload of the method:
public final Subscription subscribe(Subscriber<? super T> subscriber)
This is the method that you use to receive the values emitted by the observable. As the values come to be pushed (through policies that we will discuss throughout this book), they are pushed to the subscriber, which is then responsible for the behaviour intended by the consumer. The Subscriber
here is an implementation of the Observer
interface.
An observable pushes 3 kinds of events
- Values
- Completion, which indicates that no more values will be pushed.
- Errors, if something caused the sequence to fail. These events also imply termination.
We already saw one abstract implementation of the Observer, Subscriber
. Subscriber
implements some extra functionality and should be used as the basis for our implementations of Observer
. For now, it is simpler to first understand the interface.
interface Observer<T> {
void onCompleted();
void onError(java.lang.Throwable e);
void onNext(T t);
}
Those three methods are the behaviour that is executed every time the observable pushes a value. The observer will have its onNext
called zero or more times, optionally followed by an onCompleted
or an onError
. No calls happen after a call to onError
or onCompleted
.
When developing Rx code, you'll see a lot of Observable
, but not so much of Observer
. While it is important to understand the Observer
, there are shorthands that remove the need to instantiate it yourself.
You could manually implement Observer
or extend Observable
. In reality that will usually be unnecessary, since Rx already provides all the building blocks you need. It is also dangerous, as interaction between parts of Rx includes conventions and internal plumbing that are not obvious to a beginner. It is both simpler and safer to use the many tools that Rx gives you for generating the functionality that you need.
To subscribe to an observable, it is not necessary to provide instances of Observer
at all. There are overloads to subscribe
that simply take the functions to be executed for onNext
, onError
and onSubscribe
, hiding away the instantiation of the corresponding Observer
. It is not even necessary to provide each of those functions. You can provide a subset of them, i.e. just onNext
or just onNext
and onError
.
The introduction of lambda functions in Java 1.8 makes these overloads very convenient for the short examples that exist in this book.
Subjects are an extension of the Observable
that also implements the Observer
interface. The idea may sound odd at first, but they make things a lot simpler in some cases. They can have events pushed to them (like observers), which they then push further to their own subscribers (like observables). This makes them ideal entry points into Rx code: when you have values coming in from outside of Rx, you can push them into a Subject
, turning them into an observable. You can think of them as entry points to an Rx pipeline.
Subject
has two parameter types: the input type and the output type. This was designed so for the sake of abstraction and not because the common uses for subjects involve transforming values. There are transformation operators to do that, which we will see later.
There are a few different implementations of Subject
. We will now examine the most important ones and their differences.
PublishSubject
is the most straight-forward kind of subject. When a value is pushed into a PublishSubject
, the subject pushes it to every subscriber that is subscribed to it at that moment.
public static void main(String[] args) {
PublishSubject<Integer> subject = PublishSubject.create();
subject.onNext(1);
subject.subscribe(System.out::println);
subject.onNext(2);
subject.onNext(3);
subject.onNext(4);
}
2
3
4
As we can see in the example, 1
isn't printed because we weren't subscribed when it was pushed. After we subscribed, we began receiving the values that were pushed to the subject.
This is the first time we see subscribe
being used, so it is worth paying attention to how it was used. In this case, we used the overload which expects one Function for the case of onNext. That function takes an argument of type Integer
and returns nothing. Functions without a return type are also called actions. We can provide that function in different ways:
- we can supply an instance of
Action1<Integer>
, - implicitly create one using a lambda expression or
- pass a reference to an existing method that fits the signature.
In this case,
System.out::println
has an overload that acceptsObject
, so we passed a reference to it.subscribe
will callprintln
with the arriving values as the argument.
ReplaySubject
has the special feature of caching all the values pushed to it. When a new subscription is made, the event sequence is replayed from the start for the new subscriber. After catching up, every subscriber receives new events as they come.
ReplaySubject<Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println("Early:" + v));
s.onNext(0);
s.onNext(1);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(2);
Early:0
Early:1
Late: 0
Late: 1
Early:2
Late: 2
All the values are received by the subscribers, even though one was late. Also notice that the late subscriber had everything replayed to it before proceeding to the next value.
Caching everything isn't always a good idea, as an observable sequence can run for a long time. There are ways to limit the size of the internal buffer. ReplaySubject.createWithSize
limits the size of the buffer, while ReplaySubject.createWithTime
limits how long an object can stay cached.
ReplaySubject<Integer> s = ReplaySubject.createWithSize(2);
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Late: 1
Late: 2
Late: 3
Our late subscriber now missed the first value, which fell off the buffer of size 2. Similarily, old values fall off the buffer as time passes, when the subject is created with createWithTime
ReplaySubject<Integer> s = ReplaySubject.createWithTime(150, TimeUnit.MILLISECONDS,
Schedulers.immediate());
s.onNext(0);
Thread.sleep(100);
s.onNext(1);
Thread.sleep(100);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Late: 1
Late: 2
Late: 3
Creating a ReplaySubject
with time requires a Scheduler
, which is Rx's way of keeping time. Feel free to ignore this for now, as we will properly introduce schedulers in the chapter about concurrency.
ReplaySubject.createWithTimeAndSize
limits both, which ever comes first.
BehaviorSubject
only remembers the last value. It is similar to a ReplaySubject
with a buffer of size 1. An initial value can be provided on creation, therefore guaranteeing that a value always will be available immediately on subscription.
BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.subscribe(v -> System.out.println("Late: " + v));
s.onNext(3);
Late: 2
Late: 3
The following example just completes, since that is the last event.
BehaviorSubject<Integer> s = BehaviorSubject.create();
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
s.subscribe(
v -> System.out.println("Late: " + v),
e -> System.out.println("Error"),
() -> System.out.println("Completed")
);
An initial value is provided to be available if anyone subscribes before the first value is pushed.
BehaviorSubject<Integer> s = BehaviorSubject.create(0);
s.subscribe(v -> System.out.println(v));
s.onNext(1);
0
1
Since the defining role of a BehaviorSubject
is to always have a value readily available, it is unusual to create one without an initial value. It is also unusual to terminate one.
AsyncSubject
also caches the last value. The difference now is that it doesn't emit anything until the sequence completes. Its use is to emit a single value and immediately complete.
AsyncSubject<Integer> s = AsyncSubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onNext(1);
s.onNext(2);
s.onCompleted();
2
Note that, if we didn't do s.onCompleted();
, this example would have printed nothing.
As we already mentioned, there are contracts in Rx that are not obvious in the code. An important one is that no events are emitted after a termination event (onError
or onCompleted
). The implemented subjects respect that, and the subscribe
method also prevents some violations of the contract.
Subject<Integer, Integer> s = ReplaySubject.create();
s.subscribe(v -> System.out.println(v));
s.onNext(0);
s.onCompleted();
s.onNext(1);
s.onNext(2);
0
Safety nets like these are not guaranteed in the entirety of the implementation of Rx. It is best that you are mindful not to violate the contract, as this may lead to undefined behaviour.
Previous | Next |
---|---|
Why Rx | Lifetime management |