From e78d92033766af3ba6b18aa3a06c2e4843eeb554 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 11:58:17 +0000 Subject: [PATCH 01/31] Backoff RSS requests if a url repeatedly fails. --- src/feeds/FeedReader.ts | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 46ccadb2c..ffb551034 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -10,6 +10,10 @@ import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; +const FEED_BACKOFF_TIME_MS = 5 * 1000; +const FEED_BACKOFF_POW = 1.05; +const FEED_BACKOFF_TIME_MAX_MS = 60 * 60 * 1000; + const log = new Logger("FeedReader"); export class FeedError extends Error { constructor( @@ -89,6 +93,9 @@ export class FeedReader { private feedQueue: string[] = []; + private feedBackoff: Map = new Map(); + private feedLastBackoff: Map = new Map(); + // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -249,6 +256,8 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); + this.feedLastBackoff.delete(url); + this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) { @@ -256,12 +265,14 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } + const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( + Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); + this.feedBackoff.set(url, Date.now() + backoffDuration); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); - log.error("Unable to read feed:", feedError.message); + log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); } finally { - this.feedQueue.push(url); } return seenEntriesChanged; } @@ -308,5 +319,15 @@ export class FeedReader { } void this.pollFeeds(workerId); }, sleepFor); + + // Reinsert any feeds that we may have backed off. + for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) { + if (retryAfter < Date.now()) { + log.debug(`Adding back ${feedUrl} from backoff set`); + this.feedQueue.push(feedUrl); + // Store the last backoff time. + this.feedBackoff.delete(feedUrl) + } + } } } From f5568e2fb9adbe12f712352aba87ea2856cd6af9 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 11:58:58 +0000 Subject: [PATCH 02/31] Increase max backoff time to a day --- src/feeds/FeedReader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index ffb551034..8cce8566b 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -12,7 +12,7 @@ import UserAgent from "../UserAgent"; const FEED_BACKOFF_TIME_MS = 5 * 1000; const FEED_BACKOFF_POW = 1.05; -const FEED_BACKOFF_TIME_MAX_MS = 60 * 60 * 1000; +const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; const log = new Logger("FeedReader"); export class FeedError extends Error { From 2ee501291c55e3f6dd42241817c9d23b0f83aa53 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:08:15 +0000 Subject: [PATCH 03/31] Add backoff for failing feeds. --- changelog.d/890.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/890.misc diff --git a/changelog.d/890.misc b/changelog.d/890.misc new file mode 100644 index 000000000..23b8fb1c1 --- /dev/null +++ b/changelog.d/890.misc @@ -0,0 +1 @@ +Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown. \ No newline at end of file From b7cd4fec5d3f1655ec3628ca62fc7e881baaa1cb Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:09:04 +0000 Subject: [PATCH 04/31] Remove unused finally --- src/feeds/FeedReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 8cce8566b..2c1cd1c7f 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -272,7 +272,6 @@ export class FeedReader { const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); this.queue.push({ eventName: 'feed.error', sender: 'FeedReader', data: feedError}); - } finally { } return seenEntriesChanged; } From 18429cef94e660306451d483b3c9e2cff9f48320 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:10:04 +0000 Subject: [PATCH 05/31] Add this.feedLastBackoff --- src/feeds/FeedReader.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 2c1cd1c7f..d4909ba72 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -268,6 +268,7 @@ export class FeedReader { const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); this.feedBackoff.set(url, Date.now() + backoffDuration); + this.feedLastBackoff.set(url, backoffDuration); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); From 80d4b9badcbaf871cdb83c1a891245ea24e3f30f Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 12:58:37 +0000 Subject: [PATCH 06/31] Rewrite in rust. --- Cargo.lock | 1 + Cargo.toml | 1 + src/feeds/FeedReader.ts | 48 ++++--------------- src/lib.rs | 1 + src/util/mod.rs | 100 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 113 insertions(+), 38 deletions(-) create mode 100644 src/util/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7d8fdad65..e844d1cc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,6 +681,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "rand", "reqwest", "rgb", "rss", diff --git a/Cargo.toml b/Cargo.toml index 4d75e4089..ab6e33d66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ rss = "2.0" atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" +rand = "0.8.5" [build-dependencies] napi-build = "2" diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index d4909ba72..28fcf7a9e 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -9,10 +9,7 @@ import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; - -const FEED_BACKOFF_TIME_MS = 5 * 1000; -const FEED_BACKOFF_POW = 1.05; -const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); export class FeedError extends Error { @@ -77,24 +74,13 @@ function normalizeUrl(input: string): string { return url.toString(); } -function shuffle(array: T[]): T[] { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; -} - export class FeedReader { private connections: FeedConnection[]; // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) private observedFeedUrls: Set = new Set(); - private feedQueue: string[] = []; - - private feedBackoff: Map = new Map(); - private feedLastBackoff: Map = new Map(); + private feedQueue = new QueueWithBackoff(); // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -111,7 +97,7 @@ export class FeedReader { get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -156,16 +142,16 @@ export class FeedReader { private calculateFeedUrls(): void { // just in case we got an invalid URL somehow - const normalizedUrls = []; + const observedFeedUrls = new Set(); for (const conn of this.connections) { try { - normalizedUrls.push(normalizeUrl(conn.feedUrl)); + observedFeedUrls.add(normalizeUrl(conn.feedUrl)); } catch (err: unknown) { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.observedFeedUrls = new Set(normalizedUrls); - this.feedQueue = shuffle([...this.observedFeedUrls.values()]); + observedFeedUrls.forEach(url => this.feedQueue.push(url)); + this.feedQueue.shuffle(); Metrics.feedsCount.set(this.observedFeedUrls.size); Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); @@ -256,7 +242,6 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); - this.feedLastBackoff.delete(url); this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. @@ -265,10 +250,7 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } - const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( - Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); - this.feedBackoff.set(url, Date.now() + backoffDuration); - this.feedLastBackoff.set(url, backoffDuration); + const backoffDuration = this.feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); @@ -288,11 +270,11 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); const fetchingStarted = Date.now(); - const [ url ] = this.feedQueue.splice(0, 1); + const url = this.feedQueue.next(); let sleepFor = this.sleepingInterval; if (url) { @@ -319,15 +301,5 @@ export class FeedReader { } void this.pollFeeds(workerId); }, sleepFor); - - // Reinsert any feeds that we may have backed off. - for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) { - if (retryAfter < Date.now()) { - log.debug(`Adding back ${feedUrl} from backoff set`); - this.feedQueue.push(feedUrl); - // Store the last backoff time. - this.feedBackoff.delete(feedUrl) - } - } } } diff --git a/src/lib.rs b/src/lib.rs index 1d03f680a..3a15bd657 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod feeds; pub mod format_util; pub mod github; pub mod jira; +pub mod util; #[macro_use] extern crate napi_derive; diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..42e453538 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,100 @@ +use std::collections::LinkedList; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; +use rand::prelude::*; + +const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; +const BACKOFF_POW: f32 = 1.05f32; +const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; + +#[napi] + +pub struct QueueWithBackoff { + queue: LinkedList, + backoff: HashMap, + last_backoff: HashMap +} + +#[napi] + +impl QueueWithBackoff { + #[napi(constructor)] + pub fn new() -> Self { + QueueWithBackoff { + queue: LinkedList::new(), + backoff: HashMap::new(), + last_backoff: HashMap::new(), + } + } + + + #[napi] + pub fn next(&mut self) -> Option { + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let mut items_to_rm: Vec = vec![]; + for item in self.backoff.iter() { + if *item.1 < since_the_epoch.as_millis() { + self.queue.push_back(item.0.clone()); + items_to_rm.push(item.0.clone()); + } + } + + for item in items_to_rm { + self.backoff.remove(&item); + } + + return self.queue.pop_front() + } + + + #[napi] + pub fn push(&mut self, item: String) { + self.last_backoff.remove(&item); + self.queue.push_back(item); + } + + + #[napi] + pub fn backoff(&mut self, item: String) -> u32 { + let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; + + let mut rng = rand::thread_rng(); + let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 + + let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_item = item.clone(); + self.last_backoff.insert(item, backoff_duration); + + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let time = since_the_epoch.as_millis() + backoff_duration as u128; + + self.backoff.insert(backoff_item, time); + return backoff_duration; + } + + + #[napi] + pub fn length(&self) -> u32 { + self.queue.len() as u32 + } + + #[napi] + pub fn shuffle(&mut self) { + let mut rng = rand::thread_rng(); + let old_queue = self.queue.clone(); + self.queue.clear(); + for item in old_queue { + if rng.gen_bool(0.5) { + self.queue.push_front(item); + } else { + self.queue.push_back(item); + } + } + } +} \ No newline at end of file From eff30b8f36ca29593103dbe2e314173f523ce8be Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 13:02:04 +0000 Subject: [PATCH 07/31] linting --- src/util/mod.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 42e453538..b2d7963a1 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,7 @@ -use std::collections::LinkedList; +use rand::prelude::*; use std::collections::HashMap; +use std::collections::LinkedList; use std::time::{SystemTime, UNIX_EPOCH}; -use rand::prelude::*; const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; const BACKOFF_POW: f32 = 1.05f32; @@ -12,9 +12,14 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, backoff: HashMap, - last_backoff: HashMap + last_backoff: HashMap, } +impl Default for QueueWithBackoff { + fn default() -> Self { + Self::new() + } +} #[napi] impl QueueWithBackoff { @@ -27,12 +32,10 @@ impl QueueWithBackoff { } } - #[napi] - pub fn next(&mut self) -> Option { + pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH).unwrap(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); let mut items_to_rm: Vec = vec![]; for item in self.backoff.iter() { @@ -46,17 +49,15 @@ impl QueueWithBackoff { self.backoff.remove(&item); } - return self.queue.pop_front() + self.queue.pop_front() } - #[napi] pub fn push(&mut self, item: String) { self.last_backoff.remove(&item); self.queue.push_back(item); } - #[napi] pub fn backoff(&mut self, item: String) -> u32 { let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; @@ -64,21 +65,20 @@ impl QueueWithBackoff { let mut rng = rand::thread_rng(); let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 - let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) + .min(BACKOFF_TIME_MAX_MS) as u32; let backoff_item = item.clone(); self.last_backoff.insert(item, backoff_duration); let start = SystemTime::now(); - let since_the_epoch = start - .duration_since(UNIX_EPOCH).unwrap(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); let time = since_the_epoch.as_millis() + backoff_duration as u128; self.backoff.insert(backoff_item, time); - return backoff_duration; + backoff_duration } - #[napi] pub fn length(&self) -> u32 { self.queue.len() as u32 @@ -97,4 +97,4 @@ impl QueueWithBackoff { } } } -} \ No newline at end of file +} From 517d04482cea9454e3ce7edcf0f80aa31d3d2f89 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 13:03:24 +0000 Subject: [PATCH 08/31] pop --- src/feeds/FeedReader.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 28fcf7a9e..dd8ec8a38 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -274,7 +274,7 @@ export class FeedReader { const fetchingStarted = Date.now(); - const url = this.feedQueue.next(); + const url = this.feedQueue.pop(); let sleepFor = this.sleepingInterval; if (url) { From ea7af887841dff02bebc1c62210aea7675cd1d38 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:13:11 +0000 Subject: [PATCH 09/31] Optimise backoff function further --- src/util/mod.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index b2d7963a1..78cfa6483 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ use rand::prelude::*; +use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::LinkedList; use std::time::{SystemTime, UNIX_EPOCH}; @@ -11,7 +12,7 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, - backoff: HashMap, + backoff: BTreeMap, last_backoff: HashMap, } @@ -27,7 +28,7 @@ impl QueueWithBackoff { pub fn new() -> Self { QueueWithBackoff { queue: LinkedList::new(), - backoff: HashMap::new(), + backoff: BTreeMap::new(), last_backoff: HashMap::new(), } } @@ -35,19 +36,17 @@ impl QueueWithBackoff { #[napi] pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - - let mut items_to_rm: Vec = vec![]; - for item in self.backoff.iter() { - if *item.1 < since_the_epoch.as_millis() { - self.queue.push_back(item.0.clone()); - items_to_rm.push(item.0.clone()); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); + + // We only need to check this once, as we won't be adding to the backoff queue + // as often as we pull from it. + if let Some(item) = self.backoff.first_entry() { + if *item.key() < since_the_epoch { + let v = item.remove(); + self.queue.push_back(v); } } - for item in items_to_rm { - self.backoff.remove(&item); - } self.queue.pop_front() } @@ -73,9 +72,15 @@ impl QueueWithBackoff { let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - let time = since_the_epoch.as_millis() + backoff_duration as u128; + let mut time = since_the_epoch.as_millis() + backoff_duration as u128; - self.backoff.insert(backoff_item, time); + // If the backoff queue contains this time (unlikely, but we don't) + // want to overwrite, then add an extra ms. + while self.backoff.contains_key(&time) { + time = time + 1; + } + + self.backoff.insert(time, backoff_item); backoff_duration } From 965c30bc01489c271a8c9883bc587407e77ebebe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:14:48 +0000 Subject: [PATCH 10/31] Drop only! --- spec/github.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/github.spec.ts b/spec/github.spec.ts index 57a431c5b..132674298 100644 --- a/spec/github.spec.ts +++ b/spec/github.spec.ts @@ -58,7 +58,7 @@ describe('GitHub', () => { return testEnv?.tearDown(); }); - it.only('should be able to handle a GitHub event', async () => { + it('should be able to handle a GitHub event', async () => { const user = testEnv.getUser('user'); const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user); const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] }); From e3d085e5d6ad05db94035ce48798b0c19a5ca9fe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:14:53 +0000 Subject: [PATCH 11/31] fix test --- tests/connections/GithubRepoTest.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/connections/GithubRepoTest.ts b/tests/connections/GithubRepoTest.ts index 85af40c62..e686f2a5c 100644 --- a/tests/connections/GithubRepoTest.ts +++ b/tests/connections/GithubRepoTest.ts @@ -23,6 +23,7 @@ const GITHUB_ISSUE = { }, html_url: `https://github.com/${GITHUB_ORG_REPO.org}/${GITHUB_ORG_REPO.repo}/issues/1234`, title: "My issue", + assignees: [] }; const GITHUB_ISSUE_CREATED_PAYLOAD = { @@ -137,7 +138,7 @@ describe("GitHubRepoConnection", () => { intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.html_url, 0); intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.title, 0); }); - it.only("will handle assignees on issue creation", async () => { + it("will handle assignees on issue creation", async () => { const { connection, intent } = createConnection(); await connection.onIssueCreated({ ...GITHUB_ISSUE_CREATED_PAYLOAD, From 4274eb41a6c9df2127540ad8fe9785fe2d80a899 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:15:40 +0000 Subject: [PATCH 12/31] lint --- src/util/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 78cfa6483..2246a0e3d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -39,7 +39,7 @@ impl QueueWithBackoff { let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); // We only need to check this once, as we won't be adding to the backoff queue - // as often as we pull from it. + // as often as we pull from it. if let Some(item) = self.backoff.first_entry() { if *item.key() < since_the_epoch { let v = item.remove(); @@ -47,7 +47,6 @@ impl QueueWithBackoff { } } - self.queue.pop_front() } @@ -79,7 +78,7 @@ impl QueueWithBackoff { while self.backoff.contains_key(&time) { time = time + 1; } - + self.backoff.insert(time, backoff_item); backoff_duration } From 64ab8081798f48764f8a0caa4ca4a9ef4412ca9a Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:16:01 +0000 Subject: [PATCH 13/31] lint further --- src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 2246a0e3d..5b535ef72 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -76,7 +76,7 @@ impl QueueWithBackoff { // If the backoff queue contains this time (unlikely, but we don't) // want to overwrite, then add an extra ms. while self.backoff.contains_key(&time) { - time = time + 1; + time += 1; } self.backoff.insert(time, backoff_item); From 92322131595a627bf35600b5cc1ec029d54d1c33 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Tue, 6 Feb 2024 16:20:21 +0000 Subject: [PATCH 14/31] Better comments --- src/util/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 5b535ef72..00944d82f 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -12,8 +12,14 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; pub struct QueueWithBackoff { queue: LinkedList, + /** + * A map of absolute backoff timestamps mapped to the value. + */ backoff: BTreeMap, - last_backoff: HashMap, + /** + * The last duration applied when a value was backed off. + */ + last_backoff_duration: HashMap, } impl Default for QueueWithBackoff { @@ -29,7 +35,7 @@ impl QueueWithBackoff { QueueWithBackoff { queue: LinkedList::new(), backoff: BTreeMap::new(), - last_backoff: HashMap::new(), + last_backoff_duration: HashMap::new(), } } @@ -52,13 +58,13 @@ impl QueueWithBackoff { #[napi] pub fn push(&mut self, item: String) { - self.last_backoff.remove(&item); + self.last_backoff_duration.remove(&item); self.queue.push_back(item); } #[napi] pub fn backoff(&mut self, item: String) -> u32 { - let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; let mut rng = rand::thread_rng(); let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 @@ -66,7 +72,7 @@ impl QueueWithBackoff { let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) .min(BACKOFF_TIME_MAX_MS) as u32; let backoff_item = item.clone(); - self.last_backoff.insert(item, backoff_duration); + self.last_backoff_duration.insert(item, backoff_duration); let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); From d0846b132541c417d6d5174aad506a0d76e5228d Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:21:17 +0000 Subject: [PATCH 15/31] Fix urls calculation --- src/feeds/FeedReader.ts | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index dd8ec8a38..b2460dd41 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -77,8 +77,6 @@ function normalizeUrl(input: string): string { export class FeedReader { private connections: FeedConnection[]; - // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) - private observedFeedUrls: Set = new Set(); private feedQueue = new QueueWithBackoff(); @@ -113,7 +111,7 @@ export class FeedReader { this.timeouts.fill(undefined); Object.seal(this.timeouts); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); - this.calculateFeedUrls(); + const initialFeeds = this.calculateFeedUrls(); connectionManager.on('new-connection', c => { if (c instanceof FeedConnection) { log.debug('New connection tracked:', c.connectionId); @@ -128,7 +126,7 @@ export class FeedReader { } }); - log.debug('Loaded feed URLs:', this.observedFeedUrls); + log.debug('Loaded feed URLs:', [...initialFeeds].join(', ')); for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i); @@ -140,7 +138,7 @@ export class FeedReader { this.timeouts.forEach(t => clearTimeout(t)); } - private calculateFeedUrls(): void { + private calculateFeedUrls(): Set { // just in case we got an invalid URL somehow const observedFeedUrls = new Set(); for (const conn of this.connections) { @@ -150,11 +148,14 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } + observedFeedUrls.add("http://example.com/not-an-rss-feed"); observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); - Metrics.feedsCount.set(this.observedFeedUrls.size); - Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); + + Metrics.feedsCount.set(observedFeedUrls.size); + Metrics.feedsCountDeprecated.set(observedFeedUrls.size); + return observedFeedUrls; } /** From 5659d889da891b4e11980a2d3cfa02b1f2d9a2f8 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:13 +0000 Subject: [PATCH 16/31] Remove testing URL --- src/feeds/FeedReader.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index b2460dd41..373f77724 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -148,7 +148,6 @@ export class FeedReader { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - observedFeedUrls.add("http://example.com/not-an-rss-feed"); observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); From e6ddcb624fb8efefe9654b0805b4d795ede1782e Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:21 +0000 Subject: [PATCH 17/31] Add some variance to speed up while loop --- src/util/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 00944d82f..e6520bb8d 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -80,9 +80,9 @@ impl QueueWithBackoff { let mut time = since_the_epoch.as_millis() + backoff_duration as u128; // If the backoff queue contains this time (unlikely, but we don't) - // want to overwrite, then add an extra ms. + // want to overwrite, then add some variance. while self.backoff.contains_key(&time) { - time += 1; + time += (y * BACKOFF_TIME_MS) as u128; } self.backoff.insert(time, backoff_item); From 4838ba1fab8fefcd90e64ca8e0a94be3edfcf807 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:27:34 +0000 Subject: [PATCH 18/31] correct comment --- src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index e6520bb8d..9a6b6a4ca 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -67,7 +67,7 @@ impl QueueWithBackoff { let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; let mut rng = rand::thread_rng(); - let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 + let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0.5 and 1.1 let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) .min(BACKOFF_TIME_MAX_MS) as u32; From 1614a2ab96707053a239dbfa55875e66e7142750 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 10:44:40 +0000 Subject: [PATCH 19/31] Follow the advice and use a VecDeque as it's slightly faster. --- src/util/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 9a6b6a4ca..3fd9dff24 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,7 +1,7 @@ use rand::prelude::*; use std::collections::BTreeMap; use std::collections::HashMap; -use std::collections::LinkedList; +use std::collections::VecDeque; use std::time::{SystemTime, UNIX_EPOCH}; const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; @@ -11,7 +11,7 @@ const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; #[napi] pub struct QueueWithBackoff { - queue: LinkedList, + queue: VecDeque, /** * A map of absolute backoff timestamps mapped to the value. */ @@ -33,7 +33,7 @@ impl QueueWithBackoff { #[napi(constructor)] pub fn new() -> Self { QueueWithBackoff { - queue: LinkedList::new(), + queue: VecDeque::new(), backoff: BTreeMap::new(), last_backoff_duration: HashMap::new(), } From 958bdcb26d6bbffb704aa7ad45bdaf5f6f2cc1f0 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:09:55 +0000 Subject: [PATCH 20/31] Vastly better shuffle method --- src/util/mod.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 3fd9dff24..5e0d9621f 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -97,14 +97,6 @@ impl QueueWithBackoff { #[napi] pub fn shuffle(&mut self) { let mut rng = rand::thread_rng(); - let old_queue = self.queue.clone(); - self.queue.clear(); - for item in old_queue { - if rng.gen_bool(0.5) { - self.queue.push_front(item); - } else { - self.queue.push_back(item); - } - } + self.queue.make_contiguous().shuffle(&mut rng); } } From bec670f71213afcebcdc5a638981d3d2ea3caf72 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:53:21 +0000 Subject: [PATCH 21/31] Speed up checking for previous guids. --- src/Stores/MemoryStorageProvider.ts | 4 ++-- src/Stores/RedisStorageProvider.ts | 20 ++++++++++++++------ src/Stores/StorageProvider.ts | 6 +++--- src/feeds/FeedReader.ts | 17 +++++++---------- src/feeds/parser.rs | 2 +- 5 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index 257c1da3c..d44add7f0 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -35,8 +35,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider return this.feedGuids.has(url); } - async hasSeenFeedGuid(url: string, guid: string): Promise { - return this.feedGuids.get(url)?.includes(guid) ?? false; + async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + return this.feedGuids.get(url)?.filter((existingGuid) => guids.includes(existingGuid)) ?? []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { diff --git a/src/Stores/RedisStorageProvider.ts b/src/Stores/RedisStorageProvider.ts index 4f2343ce5..e1dc77433 100644 --- a/src/Stores/RedisStorageProvider.ts +++ b/src/Stores/RedisStorageProvider.ts @@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens."; const FEED_GUIDS = "feeds.guids."; - - const log = new Logger("RedisASProvider"); export class RedisStorageContextualProvider implements IStorageProvider { + constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { } public setSyncToken(token: string|null){ @@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme await this.redis.set(key, JSON.stringify(value)); } - public async storeFeedGuids(url: string, ...guid: string[]): Promise { + public async storeFeedGuids(url: string, ...guids: string[]): Promise { const feedKey = `${FEED_GUIDS}${url}`; - await this.redis.lpush(feedKey, ...guid); + await this.redis.lpush(feedKey, ...guids); await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS); } @@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1; } - public async hasSeenFeedGuid(url: string, guid: string): Promise { - return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null; + public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { + let multi = this.redis.multi(); + for (const guid of guids) { + multi = multi.lpos(`${FEED_GUIDS}${url}`, guid); + } + const res = await multi.exec(); + if (res === null) { + // Just assume we've seen none. + return []; + } + return guids.filter((_guid, index) => res[index][1] !== null); } } diff --git a/src/Stores/StorageProvider.ts b/src/Stores/StorageProvider.ts index 73790ff95..50175d75e 100644 --- a/src/Stores/StorageProvider.ts +++ b/src/Stores/StorageProvider.ts @@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto setStoredTempFile(key: string, value: string): Promise; getGitlabDiscussionThreads(connectionId: string): Promise; setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise; - storeFeedGuids(url: string, ...guid: string[]): Promise; - hasSeenFeed(url: string, ...guid: string[]): Promise; - hasSeenFeedGuid(url: string, guid: string): Promise; + storeFeedGuids(url: string, ...guids: string[]): Promise; + hasSeenFeed(url: string): Promise; + hasSeenFeedGuids(url: string, ...guids: string[]): Promise; } \ No newline at end of file diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 373f77724..04dcda996 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -196,22 +196,20 @@ export class FeedReader { if (feed) { // If undefined, we got a not-modified. log.debug(`Found ${feed.items.length} entries in ${url}`); - + const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!)) for (const item of feed.items) { // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) { log.error(`Could not determine guid for entry in ${url}, skipping`); continue; } - const hashId = `md5:${item.hashId}`; - newGuids.push(hashId); - - if (initialSync) { - log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`); + if (seenItems.includes(item.hashId)) { continue; } - if (await this.storage.hasSeenFeedGuid(url, hashId)) { - log.debug('Skipping already seen entry', item.id ?? hashId); + newGuids.push(item.hashId); + + if (initialSync) { + log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`); continue; } const entry = { @@ -236,7 +234,6 @@ export class FeedReader { if (seenEntriesChanged && newGuids.length) { await this.storage.storeFeedGuids(url, ...newGuids); } - } this.queue.push({ eventName: 'feed.success', sender: 'FeedReader', data: { url } }); // Clear any feed failures @@ -291,7 +288,7 @@ export class FeedReader { log.warn(`It took us longer to update the feeds than the configured pool interval`); } } else { - // It may be possible that we have more workers than feeds. This will cause the worker to just sleep. + // It is possible that we have more workers than feeds. This will cause the worker to just sleep. log.debug(`No feeds available to poll for worker ${workerId}`); } diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0dcbc7d16..0bb332c57 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -70,7 +70,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| hash_id(f).ok()), + .and_then(|f| Some(format!("md5:{:?}", hash_id(f)))), }) .collect(), } From be4a46894afa037154edd5bc19be985b8639f954 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 11:58:58 +0000 Subject: [PATCH 22/31] fix hasher function --- src/feeds/parser.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 0bb332c57..3624c3c1b 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .map(|f| f.value) .or(item.link.clone()) .or(item.title.clone()) - .and_then(|f| Some(format!("md5:{:?}", hash_id(f)))), + .and_then(|f| hash_id(f).ok()) + .and_then(|f| Some(format!("md5:{}", f))) }) .collect(), } From bd0822ee9585e96a5b288c5339fe4e37b36f9660 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 12:59:30 +0000 Subject: [PATCH 23/31] lint --- src/feeds/parser.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index 3624c3c1b..f6e845035 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -71,7 +71,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .or(item.link.clone()) .or(item.title.clone()) .and_then(|f| hash_id(f).ok()) - .and_then(|f| Some(format!("md5:{}", f))) + .and_then(|f| Some(format!("md5:{}", f))), }) .collect(), } From db23748dba1d317b6f486b220d40f581d9e44fbe Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:11:32 +0000 Subject: [PATCH 24/31] Content doesn't need to be calculated twice. --- src/Connections/FeedConnection.ts | 17 +++++++++-------- src/feeds/FeedReader.ts | 2 -- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/Connections/FeedConnection.ts b/src/Connections/FeedConnection.ts index d1aa9b248..26f427f38 100644 --- a/src/Connections/FeedConnection.ts +++ b/src/Connections/FeedConnection.ts @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection { // We want to retry these sends, because sometimes the network / HS // craps out. + const content = { + msgtype: 'm.notice', + format: "org.matrix.custom.html", + formatted_body: md.renderInline(message), + body: message, + external_url: entry.link ?? undefined, + "uk.half-shot.matrix-hookshot.feeds.item": entry, + }; await retry( - () => this.intent.sendEvent(this.roomId, { - msgtype: 'm.notice', - format: "org.matrix.custom.html", - formatted_body: md.renderInline(message), - body: message, - external_url: entry.link ?? undefined, - "uk.half-shot.matrix-hookshot.feeds.item": entry, - }), + () => this.intent.sendEvent(this.roomId, content), SEND_EVENT_MAX_ATTEMPTS, SEND_EVENT_INTERVAL_MS, // Filter for showstopper errors like 4XX errors, but otherwise diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 04dcda996..81ed74755 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -150,8 +150,6 @@ export class FeedReader { } observedFeedUrls.forEach(url => this.feedQueue.push(url)); this.feedQueue.shuffle(); - - Metrics.feedsCount.set(observedFeedUrls.size); Metrics.feedsCountDeprecated.set(observedFeedUrls.size); return observedFeedUrls; From 605139f6fdb2610db7d7f1b5cd3d1d115d684bb5 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:12:54 +0000 Subject: [PATCH 25/31] Slightly more efficient iteration --- src/Stores/MemoryStorageProvider.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Stores/MemoryStorageProvider.ts b/src/Stores/MemoryStorageProvider.ts index d44add7f0..52b5bf545 100644 --- a/src/Stores/MemoryStorageProvider.ts +++ b/src/Stores/MemoryStorageProvider.ts @@ -36,7 +36,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider } async hasSeenFeedGuids(url: string, ...guids: string[]): Promise { - return this.feedGuids.get(url)?.filter((existingGuid) => guids.includes(existingGuid)) ?? []; + const existing = this.feedGuids.get(url); + return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : []; } public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { From ffd3881fbf6002146621f476a350d402c6275059 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Wed, 7 Feb 2024 15:50:49 +0000 Subject: [PATCH 26/31] Improve performance of backoff insertion --- src/util/mod.rs | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 5e0d9621f..4b3adeef0 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -4,9 +4,9 @@ use std::collections::HashMap; use std::collections::VecDeque; use std::time::{SystemTime, UNIX_EPOCH}; -const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; -const BACKOFF_POW: f32 = 1.05f32; -const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; +const DEFAULT_BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const DEFAULT_BACKOFF_POW: f64 = 1.05f64; +const DEFAULT_BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; #[napi] @@ -15,34 +15,41 @@ pub struct QueueWithBackoff { /** * A map of absolute backoff timestamps mapped to the value. */ - backoff: BTreeMap, + backoff: BTreeMap, /** * The last duration applied when a value was backed off. */ last_backoff_duration: HashMap, + + backoff_time: f64, + backoff_exponent: f64, + backoff_max: f64, } impl Default for QueueWithBackoff { fn default() -> Self { - Self::new() + Self::new(DEFAULT_BACKOFF_TIME_MS, DEFAULT_BACKOFF_POW, DEFAULT_BACKOFF_TIME_MAX_MS) } } #[napi] impl QueueWithBackoff { #[napi(constructor)] - pub fn new() -> Self { + pub fn new(backoff_time: f64, backoff_exponent: f64, backoff_max: f64) -> Self { QueueWithBackoff { queue: VecDeque::new(), backoff: BTreeMap::new(), last_backoff_duration: HashMap::new(), + backoff_time, + backoff_exponent, + backoff_max, } } #[napi] pub fn pop(&mut self) -> Option { let start = SystemTime::now(); - let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis(); + let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; // We only need to check this once, as we won't be adding to the backoff queue // as often as we pull from it. @@ -64,25 +71,27 @@ impl QueueWithBackoff { #[napi] pub fn backoff(&mut self, item: String) -> u32 { - let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f32; + let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f64; let mut rng = rand::thread_rng(); - let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0.5 and 1.1 + let y: f64 = rng.gen::() + 0.5f64; // generates a float between 0.5 and 1.1 - let backoff_duration = ((y * BACKOFF_TIME_MS) + last_backoff.powf(BACKOFF_POW)) - .min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_duration = ((y * self.backoff_time) + last_backoff.powf(self.backoff_exponent)) + .min(self.backoff_max) as u32; let backoff_item = item.clone(); self.last_backoff_duration.insert(item, backoff_duration); let start = SystemTime::now(); let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap(); - let mut time = since_the_epoch.as_millis() + backoff_duration as u128; + let mut time = since_the_epoch.as_millis() as u64 + backoff_duration as u64; - // If the backoff queue contains this time (unlikely, but we don't) - // want to overwrite, then add some variance. + // If the backoff queue contains this time (likely) + // then we want to increase the backoff time slightly + // to allow for it. + let incr: f64 = (rng.gen::() * 2f64) + 2f64; while self.backoff.contains_key(&time) { - time += (y * BACKOFF_TIME_MS) as u128; + time += (incr * self.backoff_time) as u64; } self.backoff.insert(time, backoff_item); From 6649143ee5c70f429c3fdaf2764c7654bfa0cfae Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 10:13:44 +0000 Subject: [PATCH 27/31] Configure feed reader --- src/feeds/FeedReader.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 81ed74755..405b2eb99 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -12,6 +12,11 @@ import UserAgent from "../UserAgent"; import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); + +const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +const BACKOFF_POW = 1.05; +const BACKOFF_TIME_MS = 5 * 1000; + export class FeedError extends Error { constructor( public url: string, @@ -78,7 +83,7 @@ export class FeedReader { private connections: FeedConnection[]; - private feedQueue = new QueueWithBackoff(); + private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS); // A set of last modified times for each url. private cacheTimes: Map = new Map(); From 149ca517c76f09bffecd2185723ed8f8abffd769 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 11:41:52 +0000 Subject: [PATCH 28/31] lint --- src/util/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/util/mod.rs b/src/util/mod.rs index 4b3adeef0..215600ad7 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -28,7 +28,11 @@ pub struct QueueWithBackoff { impl Default for QueueWithBackoff { fn default() -> Self { - Self::new(DEFAULT_BACKOFF_TIME_MS, DEFAULT_BACKOFF_POW, DEFAULT_BACKOFF_TIME_MAX_MS) + Self::new( + DEFAULT_BACKOFF_TIME_MS, + DEFAULT_BACKOFF_POW, + DEFAULT_BACKOFF_TIME_MAX_MS, + ) } } #[napi] From 5b87dc046eddcb342a1072d51489f80252c64a2b Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Thu, 8 Feb 2024 14:24:58 +0000 Subject: [PATCH 29/31] Start porting the reader to Rust. --- Cargo.lock | 10 +++ Cargo.toml | 1 + src/feeds/mod.rs | 1 + src/feeds/reader.rs | 144 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 src/feeds/reader.rs diff --git a/Cargo.lock b/Cargo.lock index e844d1cc9..c694af749 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -690,6 +690,7 @@ dependencies = [ "serde_derive", "serde_json", "url", + "uuid", ] [[package]] @@ -1774,6 +1775,15 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", +] + [[package]] name = "vcpkg" version = "0.2.15" diff --git a/Cargo.toml b/Cargo.toml index ab6e33d66..3185c0d45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" rand = "0.8.5" +uuid = { version = "1.7.0", features = ["v4"] } [build-dependencies] napi-build = "2" diff --git a/src/feeds/mod.rs b/src/feeds/mod.rs index 67c567fa0..ca527719c 100644 --- a/src/feeds/mod.rs +++ b/src/feeds/mod.rs @@ -1 +1,2 @@ pub mod parser; +pub mod reader; diff --git a/src/feeds/reader.rs b/src/feeds/reader.rs new file mode 100644 index 000000000..9f7bbe0cf --- /dev/null +++ b/src/feeds/reader.rs @@ -0,0 +1,144 @@ +use std::collections::{HashMap, HashSet}; +use crate::util::QueueWithBackoff; +use std::time::Instant; +use napi::bindgen_prelude::{Error as JsError, Status}; +use uuid::Uuid; +use crate::feeds::parser::{js_read_feed, ReadFeedOptions}; + +const BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; +const BACKOFF_POW: f64 = 1.05f64; +const BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; + +struct CacheTime { + etag: Option, + last_modified: Option, +} + +impl CacheTime { + fn new() -> Self { + CacheTime { + etag: None, + last_modified: None, + } + } +} + +#[napi] + +pub struct FeedReader { + queue: QueueWithBackoff, + feeds_to_retain: HashSet, + cache_times: HashMap, + poll_interval_seconds: f64, + poll_concurrency: u8, + poll_timeout_seconds: i64, +} + + +#[napi] +pub struct FeedReaderMetrics { + feeds_failing_http: usize, + feeds_failing_parsing: usize, +} + +#[napi] + +impl FeedReader { + #[napi(constructor)] + pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self { + FeedReader { + queue: QueueWithBackoff::new( + BACKOFF_TIME_MS, + BACKOFF_POW, + BACKOFF_TIME_MAX_MS, + ), + feeds_to_retain: HashSet::new(), + cache_times: HashMap::new(), + poll_interval_seconds, + poll_concurrency, + poll_timeout_seconds, + } + } + + #[napi] + pub fn get_metrics(&self) -> FeedReaderMetrics { + FeedReaderMetrics { + feeds_failing_http: 0, + feeds_failing_parsing: 0, + } + } + + + pub fn on_new_url(&mut self, url: String) { + self.queue.push(url); + } + + pub fn on_removed_url(&mut self) { + + } + + async fn poll_feed(&mut self, url: &String) -> Result { + self.feeds_to_retain.insert(url.clone()); + let seen_entries_changed = false; + let fetch_key = Uuid::new_v4().to_string(); + let cache_time = self.cache_times.get(url); + + if let Ok(result) = js_read_feed(url.clone(), ReadFeedOptions { + poll_timeout_seconds: self.poll_timeout_seconds, + etag: cache_time.and_then(|c| c.etag.clone()).or(None), + last_modified: cache_time.and_then(|c| c.last_modified.clone()).or(None), + user_agent: "faked user agent".to_string(), + }).await { + self.cache_times.insert(url.clone(), CacheTime { + etag: result.etag, + last_modified: result.last_modified, + }); + + let initial_sync = false; // TODO: Implement + let seen_items: HashSet = HashSet::new(); // TODO: Implement + let mut new_guids: Vec = Vec::new(); + + if let Some(feed) = result.feed { + for item in feed.items { + if let Some(hash_id) = item.hash_id { + if seen_items.contains(&hash_id) { + continue; + } + // TODO: Drop unwrap + new_guids.push(hash_id); + + if initial_sync { + // Skip. + continue; + } + + } + } + } else { + // TODO: Implement + } + + + } // TODO: Handle error. + Ok(true) + } + + fn sleeping_interval(&self) -> f64 { + return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64; + } + + #[napi] + pub async unsafe fn poll_feeds(&mut self) -> Result { + let now = Instant::now(); + + if let Some(url) = self.queue.pop() { + self.poll_feed(&url).await?; + let elapsed = now.elapsed(); + let sleepFor = (self.sleeping_interval() - (elapsed.as_millis() as f64)).max(0.0); + return Ok(sleepFor); + } else { + + } + return Ok(self.sleeping_interval()); + } +} \ No newline at end of file From 4b626ee420d392a5046e51588cdb0e97987253f3 Mon Sep 17 00:00:00 2001 From: Half-Shot Date: Fri, 9 Feb 2024 00:02:05 +0000 Subject: [PATCH 30/31] Try to implement a trait for storage. --- Cargo.lock | 492 +++++++++++++++++- Cargo.toml | 2 + src/App/ResetCryptoStore.ts | 2 +- src/Bridge.ts | 2 +- src/ConnectionManager.ts | 2 +- src/Connections/FigmaFileConnection.ts | 2 +- src/Connections/GitlabRepo.ts | 2 +- src/Connections/IConnection.ts | 2 +- src/NotificationsProcessor.ts | 2 +- src/Widgets/BridgeWidgetApi.ts | 2 +- src/appservice.ts | 6 +- src/feeds/FeedReader.ts | 2 +- src/feeds/reader.rs | 97 +++- src/lib.rs | 2 + .../MemoryStorageProvider.ts | 0 .../RedisStorageProvider.ts | 0 src/{Stores => stores}/StorageProvider.ts | 0 src/stores/memory.rs | 44 ++ src/stores/mod.rs | 3 + src/stores/redis.rs | 20 + src/stores/traits.rs | 5 + tests/FeedReader.spec.ts | 2 +- tests/connections/GitlabRepoTest.ts | 2 +- 23 files changed, 652 insertions(+), 41 deletions(-) rename src/{Stores => stores}/MemoryStorageProvider.ts (100%) rename src/{Stores => stores}/RedisStorageProvider.ts (100%) rename src/{Stores => stores}/StorageProvider.ts (100%) create mode 100644 src/stores/memory.rs create mode 100644 src/stores/mod.rs create mode 100644 src/stores/redis.rs create mode 100644 src/stores/traits.rs diff --git a/Cargo.lock b/Cargo.lock index c694af749..29141553a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,161 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f093eed78becd229346bf859eec0aa4dd7ddde0757287b2b4107a1f09c80002" +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener 2.5.3", + "futures-core", +] + +[[package]] +name = "async-channel" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +dependencies = [ + "concurrent-queue", + "event-listener 5.0.0", + "event-listener-strategy 0.5.0", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ae5ebefcc48e7452b4987947920dac9450be1110cadf34d1b8c116bdbaf97c" +dependencies = [ + "async-lock 3.3.0", + "async-task", + "concurrent-queue", + "fastrand 2.0.1", + "futures-lite 2.2.0", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.2.0", + "async-executor", + "async-io 2.3.1", + "async-lock 3.3.0", + "blocking", + "futures-lite 2.2.0", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock 2.8.0", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite 1.13.0", + "log", + "parking", + "polling 2.8.0", + "rustix 0.37.27", + "slab", + "socket2 0.4.10", + "waker-fn", +] + +[[package]] +name = "async-io" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f97ab0c5b00a7cdbe5a371b9a782ee7be1316095885c8a4ea1daf490eb0ef65" +dependencies = [ + "async-lock 3.3.0", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.2.0", + "parking", + "polling 3.4.0", + "rustix 0.38.28", + "slab", + "tracing", + "windows-sys 0.52.0", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener 2.5.3", +] + +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener 4.0.3", + "event-listener-strategy 0.4.0", + "pin-project-lite", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io 1.13.0", + "async-lock 2.8.0", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite 1.13.0", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + +[[package]] +name = "async-trait" +version = "0.1.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] + [[package]] name = "atom_syndication" version = "0.12.2" @@ -51,6 +206,12 @@ dependencies = [ "quick-xml", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -99,6 +260,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.2.0", + "async-lock 3.3.0", + "async-task", + "fastrand 2.0.1", + "futures-io", + "futures-lite 2.2.0", + "piper", + "tracing", +] + [[package]] name = "bumpalo" version = "3.14.0" @@ -141,6 +318,29 @@ dependencies = [ "num-traits", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "contrast" version = "0.1.0" @@ -176,6 +376,12 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "crossbeam-utils" +version = "0.8.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" + [[package]] name = "crypto-common" version = "0.1.6" @@ -306,6 +512,63 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" +dependencies = [ + "event-listener 5.0.0", + "pin-project-lite", +] + +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -367,6 +630,40 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-lite" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "445ba825b27408685aaecefd65178908c36c6e96aaf6d8599419d46e624192ba" +dependencies = [ + "fastrand 2.0.1", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-sink" version = "0.3.29" @@ -386,6 +683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-core", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", @@ -418,6 +716,18 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.24" @@ -520,7 +830,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -567,6 +877,26 @@ dependencies = [ "serde", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.48.0", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -606,6 +936,15 @@ dependencies = [ "serde", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -628,6 +967,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "linux-raw-sys" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" + [[package]] name = "linux-raw-sys" version = "0.4.12" @@ -649,6 +994,9 @@ name = "log" version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +dependencies = [ + "value-bag", +] [[package]] name = "mac" @@ -674,6 +1022,7 @@ dependencies = [ name = "matrix-hookshot" version = "5.1.2" dependencies = [ + "async-std", "atom_syndication", "contrast", "hex", @@ -682,6 +1031,7 @@ dependencies = [ "napi-build", "napi-derive", "rand", + "redis", "reqwest", "rgb", "rss", @@ -903,6 +1253,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" @@ -1024,12 +1380,53 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.1", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a" +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polling" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30054e72317ab98eddd8561db0f6524df3367636884b7b21b703e4b280a84a14" +dependencies = [ + "cfg-if", + "concurrent-queue", + "pin-project-lite", + "rustix 0.38.28", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1110,6 +1507,27 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1315,6 +1733,20 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys 0.48.0", +] + [[package]] name = "rustix" version = "0.38.28" @@ -1324,7 +1756,7 @@ dependencies = [ "bitflags 2.4.1", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.12", "windows-sys 0.52.0", ] @@ -1443,6 +1875,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "siphasher" version = "0.3.11" @@ -1464,6 +1902,16 @@ version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -1556,9 +2004,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" dependencies = [ "cfg-if", - "fastrand", + "fastrand 2.0.1", "redox_syscall", - "rustix", + "rustix 0.38.28", "windows-sys 0.48.0", ] @@ -1620,7 +2068,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2", + "socket2 0.5.5", "windows-sys 0.48.0", ] @@ -1784,6 +2232,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "value-bag" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126e423afe2dd9ac52142e7e9d5ce4135d7e13776c529d27fd6bc49f19e3280b" + [[package]] name = "vcpkg" version = "0.2.15" @@ -1796,6 +2250,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "want" version = "0.3.1" @@ -1893,6 +2353,28 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffa44a4268d649eba546544ed45fd9591059d9653a0e584efe030b56d8172b58" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 3185c0d45..8ef64aef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" rand = "0.8.5" uuid = { version = "1.7.0", features = ["v4"] } +async-std = "1.12.0" +redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } [build-dependencies] napi-build = "2" diff --git a/src/App/ResetCryptoStore.ts b/src/App/ResetCryptoStore.ts index 1fddf1458..b9b5c77c5 100644 --- a/src/App/ResetCryptoStore.ts +++ b/src/App/ResetCryptoStore.ts @@ -5,7 +5,7 @@ import { Logger } from "matrix-appservice-bridge"; import { LogService, MatrixClient } from "matrix-bot-sdk"; import { getAppservice } from "../appservice"; import BotUsersManager from "../Managers/BotUsersManager"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; const log = new Logger("ResetCryptoStore"); diff --git a/src/Bridge.ts b/src/Bridge.ts index c92517458..79a1e5a73 100644 --- a/src/Bridge.ts +++ b/src/Bridge.ts @@ -8,7 +8,7 @@ import { CommentProcessor } from "./CommentProcessor"; import { ConnectionManager } from "./ConnectionManager"; import { GetIssueResponse, GetIssueOpts } from "./Gitlab/Types" import { GithubInstance } from "./github/GithubInstance"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { IConnection, GitHubDiscussionSpace, GitHubDiscussionConnection, GitHubUserSpace, JiraProjectConnection, GitLabRepoConnection, GitHubIssueConnection, GitHubProjectConnection, GitHubRepoConnection, GitLabIssueConnection, FigmaFileConnection, FeedConnection, GenericHookConnection, WebhookResponse } from "./Connections"; import { IGitLabWebhookIssueStateEvent, IGitLabWebhookMREvent, IGitLabWebhookNoteEvent, IGitLabWebhookPushEvent, IGitLabWebhookReleaseEvent, IGitLabWebhookTagPushEvent, IGitLabWebhookWikiPageEvent } from "./Gitlab/WebhookTypes"; diff --git a/src/ConnectionManager.ts b/src/ConnectionManager.ts index c475aa778..aac180fc7 100644 --- a/src/ConnectionManager.ts +++ b/src/ConnectionManager.ts @@ -13,7 +13,7 @@ import { FigmaFileConnection, FeedConnection } from "./Connections"; import { GetConnectionTypeResponseItem } from "./provisioning/api"; import { GitLabClient } from "./Gitlab/Client"; import { GithubInstance } from "./github/GithubInstance"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { JiraProject, JiraVersion } from "./jira/Types"; import { Logger } from "matrix-appservice-bridge"; import { MessageSenderClient } from "./MatrixSender"; diff --git a/src/Connections/FigmaFileConnection.ts b/src/Connections/FigmaFileConnection.ts index 1a662dede..526404456 100644 --- a/src/Connections/FigmaFileConnection.ts +++ b/src/Connections/FigmaFileConnection.ts @@ -4,7 +4,7 @@ import { FigmaPayload } from "../figma/types"; import { BaseConnection } from "./BaseConnection"; import { IConnection, IConnectionState } from "."; import { Logger } from "matrix-appservice-bridge"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { BridgeConfig } from "../config/Config"; import { Connection, InstantiateConnectionOpts, ProvisionConnectionOpts } from "./IConnection"; import { ConfigGrantChecker, GrantChecker } from "../grants/GrantCheck"; diff --git a/src/Connections/GitlabRepo.ts b/src/Connections/GitlabRepo.ts index c3936a4c5..5115c0a83 100644 --- a/src/Connections/GitlabRepo.ts +++ b/src/Connections/GitlabRepo.ts @@ -16,7 +16,7 @@ import { CommandError } from "../errors"; import QuickLRU from "@alloc/quick-lru"; import { HookFilter } from "../HookFilter"; import { GitLabClient } from "../Gitlab/Client"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import axios from "axios"; import { GitLabGrantChecker } from "../Gitlab/GrantChecker"; diff --git a/src/Connections/IConnection.ts b/src/Connections/IConnection.ts index 9fe63a9a3..c69247cb8 100644 --- a/src/Connections/IConnection.ts +++ b/src/Connections/IConnection.ts @@ -6,7 +6,7 @@ import { BridgeConfig, BridgePermissionLevel } from "../config/Config"; import { UserTokenStore } from "../UserTokenStore"; import { CommentProcessor } from "../CommentProcessor"; import { MessageSenderClient } from "../MatrixSender"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { GithubInstance } from "../github/GithubInstance"; import "reflect-metadata"; diff --git a/src/NotificationsProcessor.ts b/src/NotificationsProcessor.ts index 8b4ea8013..eced03553 100644 --- a/src/NotificationsProcessor.ts +++ b/src/NotificationsProcessor.ts @@ -1,5 +1,5 @@ import { MessageSenderClient } from "./MatrixSender"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; import { UserNotificationsEvent } from "./Notifications/UserNotificationWatcher"; import { Logger } from "matrix-appservice-bridge"; import { AdminRoom } from "./AdminRoom"; diff --git a/src/Widgets/BridgeWidgetApi.ts b/src/Widgets/BridgeWidgetApi.ts index 385557890..6cec32246 100644 --- a/src/Widgets/BridgeWidgetApi.ts +++ b/src/Widgets/BridgeWidgetApi.ts @@ -5,7 +5,7 @@ import { ApiError, ErrCode } from "../api"; import { BridgeConfig } from "../config/Config"; import { GetAuthPollResponse, GetAuthResponse, GetConnectionsForServiceResponse } from "./BridgeWidgetInterface"; import { ProvisioningApi, ProvisioningRequest } from "matrix-appservice-bridge"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import { ConnectionManager } from "../ConnectionManager"; import BotUsersManager, {BotUser} from "../Managers/BotUsersManager"; import { assertUserPermissionsInRoom, GetConnectionsResponseItem } from "../provisioning/api"; diff --git a/src/appservice.ts b/src/appservice.ts index 2bd97742d..5134989cc 100644 --- a/src/appservice.ts +++ b/src/appservice.ts @@ -2,9 +2,9 @@ import { Logger } from "matrix-appservice-bridge"; import { Appservice, IAppserviceCryptoStorageProvider, IAppserviceRegistration, RustSdkAppserviceCryptoStorageProvider, RustSdkCryptoStoreType } from "matrix-bot-sdk"; import { BridgeConfig } from "./config/Config"; import Metrics from "./Metrics"; -import { MemoryStorageProvider } from "./Stores/MemoryStorageProvider"; -import { RedisStorageProvider } from "./Stores/RedisStorageProvider"; -import { IBridgeStorageProvider } from "./Stores/StorageProvider"; +import { MemoryStorageProvider } from "./stores/MemoryStorageProvider"; +import { RedisStorageProvider } from "./stores/RedisStorageProvider"; +import { IBridgeStorageProvider } from "./stores/StorageProvider"; const log = new Logger("Appservice"); export function getAppservice(config: BridgeConfig, registration: IAppserviceRegistration) { diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index 405b2eb99..69e1df5c9 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -7,7 +7,7 @@ import axios from "axios"; import Metrics from "../Metrics"; import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; -import { IBridgeStorageProvider } from "../Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../stores/StorageProvider"; import UserAgent from "../UserAgent"; import { QueueWithBackoff } from "../libRs"; diff --git a/src/feeds/reader.rs b/src/feeds/reader.rs index 9f7bbe0cf..bce483e2f 100644 --- a/src/feeds/reader.rs +++ b/src/feeds/reader.rs @@ -1,14 +1,37 @@ use std::collections::{HashMap, HashSet}; use crate::util::QueueWithBackoff; -use std::time::Instant; +use std::time::{Duration, Instant}; use napi::bindgen_prelude::{Error as JsError, Status}; +use napi::tokio::sync::RwLock; +use std::sync::Arc; use uuid::Uuid; use crate::feeds::parser::{js_read_feed, ReadFeedOptions}; +use crate::stores::memory::MemoryStorageProvider; +use crate::stores::traits::StorageProvider; const BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64; const BACKOFF_POW: f64 = 1.05f64; const BACKOFF_TIME_MS: f64 = 5f64 * 1000f64; +#[derive(Serialize, Debug, Deserialize)] +#[napi(object)] +struct HookshotFeedInfo { + pub title: String, + pub url: String, + pub entries: Vec, + pub fetch_key: String, +} + +#[derive(Serialize, Debug, Deserialize)] +#[napi(object)] +struct HookshotFeedEntry { + pub title: Option, + pub pubdate: Option, + pub summary: Option, + pub author: Option, + pub link: Option, +} + struct CacheTime { etag: Option, last_modified: Option, @@ -24,17 +47,16 @@ impl CacheTime { } #[napi] - pub struct FeedReader { queue: QueueWithBackoff, feeds_to_retain: HashSet, - cache_times: HashMap, + cache_times: Arc>>, + storage_provider: Box, poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64, } - #[napi] pub struct FeedReaderMetrics { feeds_failing_http: usize, @@ -46,17 +68,21 @@ pub struct FeedReaderMetrics { impl FeedReader { #[napi(constructor)] pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self { + let mut cache_times: HashMap = HashMap::new(); + let mut lock = Arc::new(RwLock::new(cache_times)); + let mut storage_provider = MemoryStorageProvider::new(); FeedReader { queue: QueueWithBackoff::new( BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS, ), + storage_provider, feeds_to_retain: HashSet::new(), - cache_times: HashMap::new(), poll_interval_seconds, poll_concurrency, poll_timeout_seconds, + cache_times: lock, } } @@ -68,38 +94,56 @@ impl FeedReader { } } - + + #[napi] pub fn on_new_url(&mut self, url: String) { self.queue.push(url); } + #[napi] pub fn on_removed_url(&mut self) { } - async fn poll_feed(&mut self, url: &String) -> Result { - self.feeds_to_retain.insert(url.clone()); + async fn poll_feed(&self, url: &String, cache_times: Arc>>) -> Result, JsError> { let seen_entries_changed = false; let fetch_key = Uuid::new_v4().to_string(); - let cache_time = self.cache_times.get(url); + + let c_t = cache_times.read().await; + let cache_time = c_t.get(url); + let etag = cache_time.and_then(|c| c.etag.clone()).or(None); + let last_modified = cache_time.and_then(|c| c.last_modified.clone()).or(None); + drop(c_t); if let Ok(result) = js_read_feed(url.clone(), ReadFeedOptions { poll_timeout_seconds: self.poll_timeout_seconds, - etag: cache_time.and_then(|c| c.etag.clone()).or(None), - last_modified: cache_time.and_then(|c| c.last_modified.clone()).or(None), + etag, + last_modified, user_agent: "faked user agent".to_string(), }).await { - self.cache_times.insert(url.clone(), CacheTime { + let mut c_t_w = cache_times.write().await; + c_t_w.insert(url.clone(), CacheTime { etag: result.etag, last_modified: result.last_modified, }); + drop(c_t_w); let initial_sync = false; // TODO: Implement let seen_items: HashSet = HashSet::new(); // TODO: Implement let mut new_guids: Vec = Vec::new(); + let new_entries: Vec = Vec::new(); + if let Some(feed) = result.feed { + println!("Got feed result!"); + let mut feed_info = HookshotFeedInfo { + title: feed.title, + url: url.clone(), + entries: vec![], + fetch_key, + }; for item in feed.items { + println!("Got feed result! {:?}", item); if let Some(hash_id) = item.hash_id { if seen_items.contains(&hash_id) { continue; @@ -111,16 +155,23 @@ impl FeedReader { // Skip. continue; } - + feed_info.entries.push(HookshotFeedEntry { + title: item.title, + pubdate: item.pubdate, + summary: item.summary, + author: item.author, + link: item.link, + }); } } + return Ok(Some(feed_info)); } else { // TODO: Implement } } // TODO: Handle error. - Ok(true) + Ok(None) } fn sleeping_interval(&self) -> f64 { @@ -128,17 +179,19 @@ impl FeedReader { } #[napi] - pub async unsafe fn poll_feeds(&mut self) -> Result { - let now = Instant::now(); - + pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> { + let mut sleep_for = self.sleeping_interval(); if let Some(url) = self.queue.pop() { - self.poll_feed(&url).await?; + self.feeds_to_retain.insert(url.clone()); + let now = Instant::now(); + let result = self.poll_feed(&url, self.cache_times.clone()).await?; + self.feeds_to_retain.remove(&url); let elapsed = now.elapsed(); - let sleepFor = (self.sleeping_interval() - (elapsed.as_millis() as f64)).max(0.0); - return Ok(sleepFor); + sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); } else { - + println!("No feeds available"); } - return Ok(self.sleeping_interval()); + async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; + Ok(()) } } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 3a15bd657..81e1cf4b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,8 @@ pub mod github; pub mod jira; pub mod util; +mod stores; + #[macro_use] extern crate napi_derive; diff --git a/src/Stores/MemoryStorageProvider.ts b/src/stores/MemoryStorageProvider.ts similarity index 100% rename from src/Stores/MemoryStorageProvider.ts rename to src/stores/MemoryStorageProvider.ts diff --git a/src/Stores/RedisStorageProvider.ts b/src/stores/RedisStorageProvider.ts similarity index 100% rename from src/Stores/RedisStorageProvider.ts rename to src/stores/RedisStorageProvider.ts diff --git a/src/Stores/StorageProvider.ts b/src/stores/StorageProvider.ts similarity index 100% rename from src/Stores/StorageProvider.ts rename to src/stores/StorageProvider.ts diff --git a/src/stores/memory.rs b/src/stores/memory.rs new file mode 100644 index 000000000..afdce24ca --- /dev/null +++ b/src/stores/memory.rs @@ -0,0 +1,44 @@ +use std::collections::{HashMap, HashSet}; +use crate::stores::traits::StorageProvider; + +pub struct MemoryStorageProvider { + guids: HashMap>, +} + +impl MemoryStorageProvider { + pub fn new() -> Self { + MemoryStorageProvider { + guids: HashMap::new(), + } + } +} + +impl StorageProvider for MemoryStorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result<(), Err> { + let mut guid_set = self.guids.get(url).or_else(|| { + let new = HashSet::new(); + self.guids.insert(url.clone(), new); + Some(&new) + }).unwrap(); + for guid in guids { + guid_set.insert(guid.clone()); + } + Ok(()) + } + + async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result> { + Ok(self.guids.contains_key(url)) + } + + async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Result, Err> { + let mut seen_guids = Vec::default(); + if let Some(existing_guids) = self.guids.get(url) { + for guid in guids { + if existing_guids.contains(guid) { + seen_guids.push(guid.clone()); + } + } + } + Ok(seen_guids) + } +} \ No newline at end of file diff --git a/src/stores/mod.rs b/src/stores/mod.rs new file mode 100644 index 000000000..2f649380a --- /dev/null +++ b/src/stores/mod.rs @@ -0,0 +1,3 @@ +pub mod traits; +// pub mod redis; +pub mod memory; \ No newline at end of file diff --git a/src/stores/redis.rs b/src/stores/redis.rs new file mode 100644 index 000000000..df28d3f6b --- /dev/null +++ b/src/stores/redis.rs @@ -0,0 +1,20 @@ +use redis::{Commands, ConnectionInfo}; +use crate::stores::traits::StorageProvider; + +pub struct RedisStorageProvider { + client: redis::Client, +} + +impl RedisStorageProvider { + pub fn new(self, host: String, port: u16) -> Self { + let client = redis::Client::open((host, port))?; + + RedisStorageProvider { + client, + } + } +} + +impl StorageProvider for RedisStorageProvider { + +} \ No newline at end of file diff --git a/src/stores/traits.rs b/src/stores/traits.rs new file mode 100644 index 000000000..aa8418222 --- /dev/null +++ b/src/stores/traits.rs @@ -0,0 +1,5 @@ +trait StorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result>; + async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result>; + async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Result, Err>; +} \ No newline at end of file diff --git a/tests/FeedReader.spec.ts b/tests/FeedReader.spec.ts index 9b2de2fb4..e30613b51 100644 --- a/tests/FeedReader.spec.ts +++ b/tests/FeedReader.spec.ts @@ -5,7 +5,7 @@ import { ConnectionManager } from "../src/ConnectionManager"; import { IConnection } from "../src/Connections"; import { FeedEntry, FeedReader } from "../src/feeds/FeedReader"; import { MessageQueue, MessageQueueMessage } from "../src/MessageQueue"; -import { MemoryStorageProvider } from "../src/Stores/MemoryStorageProvider"; +import { MemoryStorageProvider } from "../src/stores/MemoryStorageProvider"; import { Server, createServer } from 'http'; import { AddressInfo } from "net"; diff --git a/tests/connections/GitlabRepoTest.ts b/tests/connections/GitlabRepoTest.ts index ee87d7658..e56e1265a 100644 --- a/tests/connections/GitlabRepoTest.ts +++ b/tests/connections/GitlabRepoTest.ts @@ -5,7 +5,7 @@ import { ApiError, ErrCode, ValidatorApiError } from "../../src/api"; import { GitLabRepoConnection, GitLabRepoConnectionState } from "../../src/Connections"; import { expect } from "chai"; import { BridgeConfigGitLab } from "../../src/config/Config"; -import { IBridgeStorageProvider } from "../../src/Stores/StorageProvider"; +import { IBridgeStorageProvider } from "../../src/stores/StorageProvider"; import { IntentMock } from "../utils/IntentMock"; const ROOM_ID = "!foo:bar"; From bf88aab67d9663353402968f119be9e2d9699132 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 12 Feb 2024 10:54:37 +0000 Subject: [PATCH 31/31] More cleanup --- Cargo.lock | 64 ++++++++++++++++++++++++++----- Cargo.toml | 1 + src/feeds/parser.rs | 7 ++-- src/feeds/reader.rs | 89 ++++++++++++++++++++++++++------------------ src/stores/memory.rs | 18 ++++----- src/stores/traits.rs | 8 ++-- 6 files changed, 122 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29141553a..3fd4e7910 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -615,20 +615,47 @@ dependencies = [ "new_debug_unreachable", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" @@ -664,29 +691,45 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] + [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1025,6 +1068,7 @@ dependencies = [ "async-std", "atom_syndication", "contrast", + "futures", "hex", "md-5", "napi", diff --git a/Cargo.toml b/Cargo.toml index 8ef64aef0..76273c94a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ rand = "0.8.5" uuid = { version = "1.7.0", features = ["v4"] } async-std = "1.12.0" redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } +futures = "0.3.30" [build-dependencies] napi-build = "2" diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index f6e845035..b0c4d1e09 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -20,7 +20,7 @@ pub struct FeedItem { pub pubdate: Option, pub summary: Option, pub author: Option, - pub hash_id: Option, + pub hash_id: String, } #[derive(Serialize, Debug, Deserialize)] @@ -71,7 +71,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .or(item.link.clone()) .or(item.title.clone()) .and_then(|f| hash_id(f).ok()) - .and_then(|f| Some(format!("md5:{}", f))), + .and_then(|f| Some(format!("md5:{}", f))) + .unwrap(), // TODO: Handle error }) .collect(), } @@ -118,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel { .map(|date| date.to_rfc2822()), summary: item.summary().map(|v| v.value.clone()), author: authors_to_string(item.authors()), - hash_id: hash_id(item.id.clone()).ok(), + hash_id: hash_id(item.id.clone()).unwrap(), }) .collect(), } diff --git a/src/feeds/reader.rs b/src/feeds/reader.rs index bce483e2f..148f38e49 100644 --- a/src/feeds/reader.rs +++ b/src/feeds/reader.rs @@ -1,10 +1,12 @@ use std::collections::{HashMap, HashSet}; +use std::future; use crate::util::QueueWithBackoff; use std::time::{Duration, Instant}; -use napi::bindgen_prelude::{Error as JsError, Status}; +use napi::bindgen_prelude::Error as JsError; use napi::tokio::sync::RwLock; use std::sync::Arc; use uuid::Uuid; +use futures::future::{Future, select_all}; use crate::feeds::parser::{js_read_feed, ReadFeedOptions}; use crate::stores::memory::MemoryStorageProvider; use crate::stores::traits::StorageProvider; @@ -51,7 +53,6 @@ pub struct FeedReader { queue: QueueWithBackoff, feeds_to_retain: HashSet, cache_times: Arc>>, - storage_provider: Box, poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64, @@ -70,14 +71,12 @@ impl FeedReader { pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self { let mut cache_times: HashMap = HashMap::new(); let mut lock = Arc::new(RwLock::new(cache_times)); - let mut storage_provider = MemoryStorageProvider::new(); FeedReader { queue: QueueWithBackoff::new( BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS, ), - storage_provider, feeds_to_retain: HashSet::new(), poll_interval_seconds, poll_concurrency, @@ -105,7 +104,7 @@ impl FeedReader { } - async fn poll_feed(&self, url: &String, cache_times: Arc>>) -> Result, JsError> { + async fn poll_feed(&self, url: &String, cache_times: Arc>>, mut storage: &impl StorageProvider) -> Result, JsError> { let seen_entries_changed = false; let fetch_key = Uuid::new_v4().to_string(); @@ -128,13 +127,17 @@ impl FeedReader { }); drop(c_t_w); - let initial_sync = false; // TODO: Implement - let seen_items: HashSet = HashSet::new(); // TODO: Implement + let initial_sync = storage.has_seen_feed(url).await; let mut new_guids: Vec = Vec::new(); let new_entries: Vec = Vec::new(); if let Some(feed) = result.feed { + let items = feed.items.iter().map(|x| x.hash_id.clone()).collect::>(); + let seen_items = storage.has_seen_feed_guids( + url, + &items, + ).await; println!("Got feed result!"); let mut feed_info = HookshotFeedInfo { title: feed.title, @@ -144,26 +147,25 @@ impl FeedReader { }; for item in feed.items { println!("Got feed result! {:?}", item); - if let Some(hash_id) = item.hash_id { - if seen_items.contains(&hash_id) { - continue; - } - // TODO: Drop unwrap - new_guids.push(hash_id); - - if initial_sync { - // Skip. - continue; - } - feed_info.entries.push(HookshotFeedEntry { - title: item.title, - pubdate: item.pubdate, - summary: item.summary, - author: item.author, - link: item.link, - }); + if seen_items.contains(&item.hash_id) { + continue; + } + + new_guids.push(item.hash_id.clone()); + + if initial_sync { + // Skip. + continue; } + feed_info.entries.push(HookshotFeedEntry { + title: item.title, + pubdate: item.pubdate, + summary: item.summary, + author: item.author, + link: item.link, + }); } + storage.store_feed_guids(&url, &new_guids).await; return Ok(Some(feed_info)); } else { // TODO: Implement @@ -178,20 +180,33 @@ impl FeedReader { return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64; } + async unsafe fn poll_feed_int(&mut self, url: &String, mut storage_provider: &impl StorageProvider) { + let mut sleep_for = self.sleeping_interval(); + self.feeds_to_retain.insert(url.clone()); + let now = Instant::now(); + let result = self.poll_feed(url, self.cache_times.clone(), &storage_provider).await; + self.feeds_to_retain.remove(url); + let elapsed = now.elapsed(); + sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); + async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; + } + #[napi] pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> { - let mut sleep_for = self.sleeping_interval(); - if let Some(url) = self.queue.pop() { - self.feeds_to_retain.insert(url.clone()); - let now = Instant::now(); - let result = self.poll_feed(&url, self.cache_times.clone()).await?; - self.feeds_to_retain.remove(&url); - let elapsed = now.elapsed(); - sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); - } else { - println!("No feeds available"); + let concurrency = self.poll_concurrency as usize; + let mut storage_provider = MemoryStorageProvider::new(); + let mut future_set: Vec<_> = Vec::new(); + loop { + if let Some(url) = self.queue.pop() { + let result = Box::pin(self.poll_feed_int(&url, &storage_provider)); + future_set.push(result); + } else { + async_std::task::sleep(Duration::from_millis(self.sleeping_interval() as u64)).await; + } + if future_set.len() >= concurrency { + let (item_resolved, ready_future_index, _remaining_futures) = + select_all(future_set).await; + } } - async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; - Ok(()) } } \ No newline at end of file diff --git a/src/stores/memory.rs b/src/stores/memory.rs index afdce24ca..1293d00f6 100644 --- a/src/stores/memory.rs +++ b/src/stores/memory.rs @@ -14,23 +14,19 @@ impl MemoryStorageProvider { } impl StorageProvider for MemoryStorageProvider { - async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result<(), Err> { - let mut guid_set = self.guids.get(url).or_else(|| { - let new = HashSet::new(); - self.guids.insert(url.clone(), new); - Some(&new) - }).unwrap(); + async fn store_feed_guids(&mut self, url: &String, guids: &Vec) { + let guid_set = self.guids.entry(url.clone()).or_insert(HashSet::default()); + for guid in guids { guid_set.insert(guid.clone()); } - Ok(()) } - async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result> { - Ok(self.guids.contains_key(url)) + async fn has_seen_feed(&self, url: &String) -> bool { + self.guids.contains_key(url) } - async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Result, Err> { + async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Vec { let mut seen_guids = Vec::default(); if let Some(existing_guids) = self.guids.get(url) { for guid in guids { @@ -39,6 +35,6 @@ impl StorageProvider for MemoryStorageProvider { } } } - Ok(seen_guids) + seen_guids } } \ No newline at end of file diff --git a/src/stores/traits.rs b/src/stores/traits.rs index aa8418222..f44fcc818 100644 --- a/src/stores/traits.rs +++ b/src/stores/traits.rs @@ -1,5 +1,5 @@ -trait StorageProvider { - async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result>; - async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result>; - async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Result, Err>; +pub trait StorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec); + async fn has_seen_feed(&self, url: &String) -> bool; + async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Vec; } \ No newline at end of file