diff --git a/src/ope/Filter.ts b/src/ope/Filter.ts new file mode 100644 index 0000000..1807166 --- /dev/null +++ b/src/ope/Filter.ts @@ -0,0 +1,56 @@ +import { ObserverType } from "../Base/Observer"; +import { Sink } from "../Base/Sink"; +import { Cancelable, Disposable } from "../Base/Disposable"; +import { Next, Completed } from "../common/Event"; +import { Producer } from "../Base/Producer"; +import { Observable, ObservableType } from "../Base/Observable"; +import { RDEvent, RDError } from "../common/Event"; + + +export class FilterSink, ResultType> extends Sink implements ObserverType { + private m_predicate: (source: SourceType) => boolean + constructor(predicate: (source: SourceType) => boolean, observer: Observer, cancel: Cancelable) { + super(observer, cancel); + this.m_predicate = predicate; + } + on(ev: RDEvent): void { + switch (ev.kind) { + case "next": + if (this.m_predicate(ev.next)) { + this.forwardOn(ev); + } + return; + case "error": + this.forwardOn(ev) + this.dispose(); + return; + case "completed": + this.forwardOn(ev); + this.dispose(); + return; + } + } +} + +export class Filter extends Producer { + private source: Observable + private predicate: (s: SourceType) => boolean; + + constructor(source: Observable, predicate: (s: SourceType) => boolean) { + super(); + this.source = source; + this.predicate = predicate; + } + + run>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } { + const sink = new FilterSink(this.predicate, observer, cancel); + const subscription = this.source.subscribe(sink); + return { sink: sink, subscription: subscription }; + } + +} + + +export function filter(predicate: (elm: Elem) => boolean): (ob: ObservableType) => Observable { + return (ob) => new Filter(ob, predicate); +} \ No newline at end of file diff --git a/src/ope/Just.ts b/src/ope/Just.ts index 29d1efb..c3238e1 100644 --- a/src/ope/Just.ts +++ b/src/ope/Just.ts @@ -31,32 +31,3 @@ import { Next, Completed } from "../common/Event"; } } - - -export function list(elems: Elem[]): Observable { - return new List(elems); -} - - -class List extends Producer { - private elems: Elem[]; - constructor(elems: Elem[]) { - super(); - this.elems = elems; - } - - subscribe>(observer: T): Disposable { - // thread... - for (let el of this.elems) { - observer.on(new Next(el)); - } - observer.on(new Completed()); - return Disposables.create(); - } - - run>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } { - return { sink: Disposables.create(), subscription: Disposables.create() }; - } - -} - diff --git a/src/ope/List.ts b/src/ope/List.ts new file mode 100644 index 0000000..2d1a102 --- /dev/null +++ b/src/ope/List.ts @@ -0,0 +1,33 @@ +import { Observable } from "../Base/Observable"; +import { Producer } from "../Base/Producer"; +import { ObserverType } from "../Base/Observer"; +import { Disposable, Disposables, Cancelable } from "../Base/Disposable"; +import { Next, Completed } from "../common/Event"; + +export function list(elems: Elem[]): Observable { + return new List(elems); +} + + +export class List extends Producer { + private elems: Elem[]; + constructor(elems: Elem[]) { + super(); + this.elems = elems; + } + + subscribe>(observer: T): Disposable { + // thread... + for (let el of this.elems) { + observer.on(new Next(el)); + } + observer.on(new Completed()); + return Disposables.create(); + } + + run>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } { + return { sink: Disposables.create(), subscription: Disposables.create() }; + } + +} +