Skip to content
Rit Li edited this page May 5, 2019 · 8 revisions

Types

Time

type Time = number
type Interval = number

Time is a monotonic number. It represents the current time according to a scheduler. Time can be any monotonic number, and does not have to mean Date.now.

Interval is a number greater or equal to zero. It represents a time interval, like a delay.

Stream

type Stream<A> = { source: Source<A> }

Stream is just a wrapper around a source, and provides a prototype on which public-facing APIs live.

Source

type Source<A> = {
  run: (sink: Sink<A>, scheduler: Scheduler) => Disposable
}

A source represents a view of events over time. Its single method, run, arranges to propagate events to the provided sink. If it needs to deal with time, it can use scheduler, which has methods for knowing the current time, and scheduling future tasks in a more efficient way than using setTimeout directly.

A source's run method must return a disposable.

Some sources are simple and just store parameters which get passed wholesale to a sink. Other sources do more, especially in the case of sources that need to combine multiple streams, or deal with higher order streams.

Some sources ultimately produce events, such as from DOM events. A producer source must never produce an event in the same call stack as its run method is called. It must begin producing items asynchronously. In some cases, this comes for free, such as with DOM events. In other cases, it must be done explicitly, such as with values from an array. A scheduler provides several ways to schedule asynchronous tasks.

Sink

type Sink<A> = {
  event: (t:Time, a:A) => void
  end:   (t:Time, a:?A) => void
  error: (t:Time, e:Error) => void
}

A sink receives events, typically does something with them, such as transforming or filtering them, and then propagates them to another sink. It has 3 methods, each corresponding to a type of channel: event, end, and error.

Typically a combinator will be implemented as a source and a sink. The source is usually stateless/immutable, and creates a new sink for each new stream observer. In most cases, the relationship of stream to source is 1-1, but source to sink is 1-many.

Scheduler

type Scheduler = {
  now: () => Time
  asap: (task:Task) => ScheduledTask
  delay: (delay:Interval, task:Task) => ScheduledTask
  periodic: (period:Interval, task:Task) => ScheduledTask
  schedule: (delay:Interval, period:Interval, task:Task) => ScheduledTask
  cancel: (task:ScheduledTask) => void
}

type Task = {
  run: (t:Time) => void
  error: (t:Time, e:Error) => void
  dispose: () => ?Promise<any>
}

type ScheduledTask {
  cancel: () => ?Promise<any>
}

Disposable

type Disposable = {
  dispose: () => ?Promise<any>
}

Source and sink chains

Applying combinators to a stream composes a source chain that defines the behavior of the stream. When an observer begins observing a stream, a "run" message is sent "backwards" through the source chain, to the ultimate producer source--the one that will produce events in the first place.

As it travels, that message composes a sink chain analogous to the source chain. When the messages reaches the producer, it begins producing events. With the exception of a few combinators (such as delay), events propagate synchronously "forward" through the sink chain.

Note: a producer must not begin producing events synchronously. It must schedule the start of its production, using the scheduler passed to its run method. However, once it does begin, it may then produce events synchronously.

Event propagation

Each event propagation is synchronous by default. One sink calls the event method of the next, forming a synchronous call stack.

Some combinators, like delay, introduce asynchrony into the sink chain.

Error propagation

If an exception is thrown during event propagation, it will stop the propagation and travel "backwards" through the sink chain, by unwinding the call stack. If that exception is not caught, it will reach the producer, and finally, the scheduler. The scheduler will catch it and send the error "forward" again synchronously, using the error channel of the sink chain.

Clone this wiki locally