Skip to content

Commit

Permalink
add Filter and List
Browse files Browse the repository at this point in the history
  • Loading branch information
KatagiriSo committed Nov 11, 2019
1 parent 6e0d951 commit 6cef564
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 29 deletions.
56 changes: 56 additions & 0 deletions src/ope/Filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { ObserverType } from "../Base/Observer";
import { Sink } from "../Base/Sink";
import { Cancelable, Disposable } from "../Base/Disposable";
import { Next, Completed } from "../common/Event";
import { Producer } from "../Base/Producer";
import { Observable, ObservableType } from "../Base/Observable";
import { RDEvent, RDError } from "../common/Event";


export class FilterSink<SourceType, Observer extends ObserverType<SourceType>, ResultType> extends Sink<Observer, SourceType> implements ObserverType<SourceType> {
private m_predicate: (source: SourceType) => boolean
constructor(predicate: (source: SourceType) => boolean, observer: Observer, cancel: Cancelable) {
super(observer, cancel);
this.m_predicate = predicate;
}
on(ev: RDEvent<SourceType>): void {
switch (ev.kind) {
case "next":
if (this.m_predicate(ev.next)) {
this.forwardOn(ev);
}
return;
case "error":
this.forwardOn(ev)
this.dispose();
return;
case "completed":
this.forwardOn(ev);
this.dispose();
return;
}
}
}

export class Filter<SourceType> extends Producer<SourceType> {
private source: Observable<SourceType>
private predicate: (s: SourceType) => boolean;

constructor(source: Observable<SourceType>, predicate: (s: SourceType) => boolean) {
super();
this.source = source;
this.predicate = predicate;
}

run<O extends ObserverType<SourceType>>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } {
const sink = new FilterSink(this.predicate, observer, cancel);
const subscription = this.source.subscribe(sink);
return { sink: sink, subscription: subscription };
}

}


export function filter<Elem>(predicate: (elm: Elem) => boolean): (ob: ObservableType<Elem>) => Observable<Elem> {
return (ob) => new Filter(ob, predicate);
}
29 changes: 0 additions & 29 deletions src/ope/Just.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,3 @@ import { Next, Completed } from "../common/Event";
}

}


export function list<Elem>(elems: Elem[]): Observable<Elem> {
return new List(elems);
}


class List<Elem> extends Producer<Elem> {
private elems: Elem[];
constructor(elems: Elem[]) {
super();
this.elems = elems;
}

subscribe<T extends ObserverType<Elem>>(observer: T): Disposable {
// thread...
for (let el of this.elems) {
observer.on(new Next(el));
}
observer.on(new Completed());
return Disposables.create();
}

run<O extends ObserverType<Elem>>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } {
return { sink: Disposables.create(), subscription: Disposables.create() };
}

}

33 changes: 33 additions & 0 deletions src/ope/List.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Observable } from "../Base/Observable";
import { Producer } from "../Base/Producer";
import { ObserverType } from "../Base/Observer";
import { Disposable, Disposables, Cancelable } from "../Base/Disposable";
import { Next, Completed } from "../common/Event";

export function list<Elem>(elems: Elem[]): Observable<Elem> {
return new List(elems);
}


export class List<Elem> extends Producer<Elem> {
private elems: Elem[];
constructor(elems: Elem[]) {
super();
this.elems = elems;
}

subscribe<T extends ObserverType<Elem>>(observer: T): Disposable {
// thread...
for (let el of this.elems) {
observer.on(new Next(el));
}
observer.on(new Completed());
return Disposables.create();
}

run<O extends ObserverType<Elem>>(observer: O, cancel: Cancelable): { sink: Disposable, subscription: Disposable } {
return { sink: Disposables.create(), subscription: Disposables.create() };
}

}

0 comments on commit 6cef564

Please sign in to comment.