Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit errors #51

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
"typings": "out/main.d.ts",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"snyk-protect": "snyk protect",
"prepublish": "npm run build && yarn run snyk-protect",
"postinstall": "npm run build",
"build": "tsc -d"
},
"repository": {
Expand Down
15 changes: 5 additions & 10 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ export class CommentStream extends Poll<Snoowrap.Comment> {
constructor(client: Snoowrap, options: SnooStormOptions = DefaultOptions) {
super({
frequency: options.pollTime || 2000,
get: async () => client.getNewComments(options.subreddit, options),
identifier: "id",
get: async () => client.getNewComments(options.subreddit, options)
});
}
}
Expand All @@ -27,8 +26,7 @@ export class SubmissionStream extends Poll<Snoowrap.Submission> {
constructor(client: Snoowrap, options: SnooStormOptions = DefaultOptions) {
super({
frequency: options.pollTime || 2000,
get: async () => client.getNew(options.subreddit, options),
identifier: "id",
get: async () => client.getNew(options.subreddit, options)
});
}
}
Expand Down Expand Up @@ -58,8 +56,7 @@ export class InboxStream extends Poll<
) {
super({
frequency: options.pollTime,
get: async () => client.getInbox(options),
identifier: "id",
get: async () => client.getInbox(options)
});
}
}
Expand All @@ -71,8 +68,7 @@ export class ModMailStream extends Poll<Snoowrap.PrivateMessage> {
) {
super({
frequency: options.pollTime || 2000,
get: async () => client.getModmail(options),
identifier: "id",
get: async () => client.getModmail(options)
});
}
}
Expand All @@ -86,8 +82,7 @@ export class ModQueueStream extends Poll<
options: SnooStormOptions & { subreddit: string }) {
super({
frequency: options.pollTime || 2000,
get: async () => client.getSubreddit(options.subreddit).getModqueue(options),
identifier: "id",
get: async () => client.getSubreddit(options.subreddit).getModqueue(options)
});
}
}
Expand Down
46 changes: 26 additions & 20 deletions src/util/Poll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ interface PollEvents<T> {
item: (item: T) => void;
listing: (items: T[]) => void;
end: () => void;
error: (e: Error) => void;
}

export default interface Poll<T extends object> {
Expand All @@ -24,40 +25,45 @@ export default interface Poll<T extends object> {
export interface PollConfiguration<T> {
frequency: number;
get: () => Awaitable<T[]>;
identifier: keyof T;
}

export default class Poll<T extends object> extends EventEmitter {
frequency: number;
interval: NodeJS.Timeout;
interval?: NodeJS.Timeout;
getter: () => any = () => [];

processed: Set<T[keyof T]> = new Set();

constructor({ frequency, get, identifier }: PollConfiguration<T>) {
constructor({ frequency, get }: PollConfiguration<T>) {
super();
this.frequency = frequency || 2000;
this.getter = get;
}

this.interval = setInterval(async () => {
const batch = await get();
start() {
const fetch = async () => {
try {
const batch = await this.getter();

const newItems: T[] = [];
for (const item of batch) {
const id = item[identifier];
if (this.processed.has(id)) continue;
const newItems: T[] = [];
for (const item of batch) {
// Emit for new items and add it to the list
newItems.push(item);
this.emit("item", item);
}

// Emit for new items and add it to the list
newItems.push(item);
this.processed.add(id);
this.emit("item", item);
// Emit the new listing of all new items
this.emit("listing", newItems);
} catch (e) {
this.emit("error", e);
}

// Emit the new listing of all new items
this.emit("listing", newItems);
}, frequency);
};
this.interval = setInterval(fetch, this.frequency);
fetch();
}

end() {
clearInterval(this.interval);
if (this.interval) {
clearInterval(this.interval);
}
this.emit("end");
}
}