Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port feed reader to Rust #894

Draft
wants to merge 32 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e78d920
Backoff RSS requests if a url repeatedly fails.
Half-Shot Feb 6, 2024
f5568e2
Increase max backoff time to a day
Half-Shot Feb 6, 2024
2ee5012
Add backoff for failing feeds.
Half-Shot Feb 6, 2024
b7cd4fe
Remove unused finally
Half-Shot Feb 6, 2024
18429ce
Add this.feedLastBackoff
Half-Shot Feb 6, 2024
80d4b9b
Rewrite in rust.
Half-Shot Feb 6, 2024
eff30b8
linting
Half-Shot Feb 6, 2024
517d044
pop
Half-Shot Feb 6, 2024
ea7af88
Optimise backoff function further
Half-Shot Feb 6, 2024
965c30b
Drop only!
Half-Shot Feb 6, 2024
e3d085e
fix test
Half-Shot Feb 6, 2024
4274eb4
lint
Half-Shot Feb 6, 2024
64ab808
lint further
Half-Shot Feb 6, 2024
9232213
Better comments
Half-Shot Feb 6, 2024
d0846b1
Fix urls calculation
Half-Shot Feb 7, 2024
92842fd
Merge remote-tracking branch 'origin/main' into hs/add-sensible-rss-b…
Half-Shot Feb 7, 2024
5659d88
Remove testing URL
Half-Shot Feb 7, 2024
e6ddcb6
Add some variance to speed up while loop
Half-Shot Feb 7, 2024
4838ba1
correct comment
Half-Shot Feb 7, 2024
1614a2a
Follow the advice and use a VecDeque as it's slightly faster.
Half-Shot Feb 7, 2024
958bdcb
Vastly better shuffle method
Half-Shot Feb 7, 2024
bec670f
Speed up checking for previous guids.
Half-Shot Feb 7, 2024
be4a468
fix hasher function
Half-Shot Feb 7, 2024
bd0822e
lint
Half-Shot Feb 7, 2024
db23748
Content doesn't need to be calculated twice.
Half-Shot Feb 7, 2024
605139f
Slightly more efficient iteration
Half-Shot Feb 7, 2024
ffd3881
Improve performance of backoff insertion
Half-Shot Feb 7, 2024
6649143
Configure feed reader
Half-Shot Feb 8, 2024
149ca51
lint
Half-Shot Feb 8, 2024
5b87dc0
Start porting the reader to Rust.
Half-Shot Feb 8, 2024
4b626ee
Try to implement a trait for storage.
Half-Shot Feb 9, 2024
bf88aab
More cleanup
Half-Shot Feb 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
567 changes: 552 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ rss = "2.0"
atom_syndication = "0.12"
ruma = { version = "0.9", features = ["events", "html"] }
reqwest = "0.11"
rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4"] }
async-std = "1.12.0"
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] }
futures = "0.3.30"

[build-dependencies]
napi-build = "2"
1 change: 1 addition & 0 deletions changelog.d/890.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown.
2 changes: 1 addition & 1 deletion spec/github.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ describe('GitHub', () => {
return testEnv?.tearDown();
});

it.only('should be able to handle a GitHub event', async () => {
it('should be able to handle a GitHub event', async () => {
const user = testEnv.getUser('user');
const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user);
const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] });
Expand Down
2 changes: 1 addition & 1 deletion src/App/ResetCryptoStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Logger } from "matrix-appservice-bridge";
import { LogService, MatrixClient } from "matrix-bot-sdk";
import { getAppservice } from "../appservice";
import BotUsersManager from "../Managers/BotUsersManager";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";

const log = new Logger("ResetCryptoStore");

Expand Down
2 changes: 1 addition & 1 deletion src/Bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { CommentProcessor } from "./CommentProcessor";
import { ConnectionManager } from "./ConnectionManager";
import { GetIssueResponse, GetIssueOpts } from "./Gitlab/Types"
import { GithubInstance } from "./github/GithubInstance";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { IBridgeStorageProvider } from "./stores/StorageProvider";
import { IConnection, GitHubDiscussionSpace, GitHubDiscussionConnection, GitHubUserSpace, JiraProjectConnection, GitLabRepoConnection,
GitHubIssueConnection, GitHubProjectConnection, GitHubRepoConnection, GitLabIssueConnection, FigmaFileConnection, FeedConnection, GenericHookConnection, WebhookResponse } from "./Connections";
import { IGitLabWebhookIssueStateEvent, IGitLabWebhookMREvent, IGitLabWebhookNoteEvent, IGitLabWebhookPushEvent, IGitLabWebhookReleaseEvent, IGitLabWebhookTagPushEvent, IGitLabWebhookWikiPageEvent } from "./Gitlab/WebhookTypes";
Expand Down
2 changes: 1 addition & 1 deletion src/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { FigmaFileConnection, FeedConnection } from "./Connections";
import { GetConnectionTypeResponseItem } from "./provisioning/api";
import { GitLabClient } from "./Gitlab/Client";
import { GithubInstance } from "./github/GithubInstance";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { IBridgeStorageProvider } from "./stores/StorageProvider";
import { JiraProject, JiraVersion } from "./jira/Types";
import { Logger } from "matrix-appservice-bridge";
import { MessageSenderClient } from "./MatrixSender";
Expand Down
17 changes: 9 additions & 8 deletions src/Connections/FeedConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection {

// We want to retry these sends, because sometimes the network / HS
// craps out.
const content = {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
};
await retry(
() => this.intent.sendEvent(this.roomId, {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
}),
() => this.intent.sendEvent(this.roomId, content),
SEND_EVENT_MAX_ATTEMPTS,
SEND_EVENT_INTERVAL_MS,
// Filter for showstopper errors like 4XX errors, but otherwise
Expand Down
2 changes: 1 addition & 1 deletion src/Connections/FigmaFileConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { FigmaPayload } from "../figma/types";
import { BaseConnection } from "./BaseConnection";
import { IConnection, IConnectionState } from ".";
import { Logger } from "matrix-appservice-bridge";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";
import { BridgeConfig } from "../config/Config";
import { Connection, InstantiateConnectionOpts, ProvisionConnectionOpts } from "./IConnection";
import { ConfigGrantChecker, GrantChecker } from "../grants/GrantCheck";
Expand Down
2 changes: 1 addition & 1 deletion src/Connections/GitlabRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { CommandError } from "../errors";
import QuickLRU from "@alloc/quick-lru";
import { HookFilter } from "../HookFilter";
import { GitLabClient } from "../Gitlab/Client";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";
import axios from "axios";
import { GitLabGrantChecker } from "../Gitlab/GrantChecker";

Expand Down
2 changes: 1 addition & 1 deletion src/Connections/IConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { BridgeConfig, BridgePermissionLevel } from "../config/Config";
import { UserTokenStore } from "../UserTokenStore";
import { CommentProcessor } from "../CommentProcessor";
import { MessageSenderClient } from "../MatrixSender";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";
import { GithubInstance } from "../github/GithubInstance";
import "reflect-metadata";

Expand Down
2 changes: 1 addition & 1 deletion src/NotificationsProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MessageSenderClient } from "./MatrixSender";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { IBridgeStorageProvider } from "./stores/StorageProvider";
import { UserNotificationsEvent } from "./Notifications/UserNotificationWatcher";
import { Logger } from "matrix-appservice-bridge";
import { AdminRoom } from "./AdminRoom";
Expand Down
2 changes: 1 addition & 1 deletion src/Widgets/BridgeWidgetApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { ApiError, ErrCode } from "../api";
import { BridgeConfig } from "../config/Config";
import { GetAuthPollResponse, GetAuthResponse, GetConnectionsForServiceResponse } from "./BridgeWidgetInterface";
import { ProvisioningApi, ProvisioningRequest } from "matrix-appservice-bridge";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";
import { ConnectionManager } from "../ConnectionManager";
import BotUsersManager, {BotUser} from "../Managers/BotUsersManager";
import { assertUserPermissionsInRoom, GetConnectionsResponseItem } from "../provisioning/api";
Expand Down
6 changes: 3 additions & 3 deletions src/appservice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { Logger } from "matrix-appservice-bridge";
import { Appservice, IAppserviceCryptoStorageProvider, IAppserviceRegistration, RustSdkAppserviceCryptoStorageProvider, RustSdkCryptoStoreType } from "matrix-bot-sdk";
import { BridgeConfig } from "./config/Config";
import Metrics from "./Metrics";
import { MemoryStorageProvider } from "./Stores/MemoryStorageProvider";
import { RedisStorageProvider } from "./Stores/RedisStorageProvider";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { MemoryStorageProvider } from "./stores/MemoryStorageProvider";
import { RedisStorageProvider } from "./stores/RedisStorageProvider";
import { IBridgeStorageProvider } from "./stores/StorageProvider";
const log = new Logger("Appservice");

export function getAppservice(config: BridgeConfig, registration: IAppserviceRegistration) {
Expand Down
69 changes: 31 additions & 38 deletions src/feeds/FeedReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ import axios from "axios";
import Metrics from "../Metrics";
import { randomUUID } from "crypto";
import { readFeed } from "../libRs";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { IBridgeStorageProvider } from "../stores/StorageProvider";
import UserAgent from "../UserAgent";
import { QueueWithBackoff } from "../libRs";

const log = new Logger("FeedReader");

const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000;
const BACKOFF_POW = 1.05;
const BACKOFF_TIME_MS = 5 * 1000;

export class FeedError extends Error {
constructor(
public url: string,
Expand Down Expand Up @@ -73,21 +79,11 @@ function normalizeUrl(input: string): string {
return url.toString();
}

function shuffle<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}

export class FeedReader {

private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
private observedFeedUrls: Set<string> = new Set();

private feedQueue: string[] = [];
private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);

// A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
Expand All @@ -104,7 +100,7 @@ export class FeedReader {
get sleepingInterval() {
return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1)
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
}
Expand All @@ -120,7 +116,7 @@ export class FeedReader {
this.timeouts.fill(undefined);
Object.seal(this.timeouts);
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
this.calculateFeedUrls();
const initialFeeds = this.calculateFeedUrls();
connectionManager.on('new-connection', c => {
if (c instanceof FeedConnection) {
log.debug('New connection tracked:', c.connectionId);
Expand All @@ -135,7 +131,7 @@ export class FeedReader {
}
});

log.debug('Loaded feed URLs:', this.observedFeedUrls);
log.debug('Loaded feed URLs:', [...initialFeeds].join(', '));

for (let i = 0; i < config.pollConcurrency; i++) {
void this.pollFeeds(i);
Expand All @@ -147,21 +143,21 @@ export class FeedReader {
this.timeouts.forEach(t => clearTimeout(t));
}

private calculateFeedUrls(): void {
private calculateFeedUrls(): Set<string> {
// just in case we got an invalid URL somehow
const normalizedUrls = [];
const observedFeedUrls = new Set<string>();
for (const conn of this.connections) {
try {
normalizedUrls.push(normalizeUrl(conn.feedUrl));
observedFeedUrls.add(normalizeUrl(conn.feedUrl));
} catch (err: unknown) {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
}
}
this.observedFeedUrls = new Set(normalizedUrls);
this.feedQueue = shuffle([...this.observedFeedUrls.values()]);

Metrics.feedsCount.set(this.observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
observedFeedUrls.forEach(url => this.feedQueue.push(url));
this.feedQueue.shuffle();
Metrics.feedsCount.set(observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
return observedFeedUrls;
}

/**
Expand Down Expand Up @@ -203,22 +199,20 @@ export class FeedReader {
if (feed) {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);

const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
for (const item of feed.items) {
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
if (!item.hashId) {
log.error(`Could not determine guid for entry in ${url}, skipping`);
continue;
}
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
if (seenItems.includes(item.hashId)) {
continue;
}
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
newGuids.push(item.hashId);

if (initialSync) {
log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`);
continue;
}
const entry = {
Expand All @@ -243,25 +237,24 @@ export class FeedReader {
if (seenEntriesChanged && newGuids.length) {
await this.storage.storeFeedGuids(url, ...newGuids);
}

}
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
// Clear any feed failures
this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url);
this.feedQueue.push(url);
} catch (err: unknown) {
// TODO: Proper Rust Type error.
if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) {
this.feedsFailingHttp.add(url);
} else {
this.feedsFailingParsing.add(url);
}
const backoffDuration = this.feedQueue.backoff(url);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
} finally {
this.feedQueue.push(url);
}
return seenEntriesChanged;
}
Expand All @@ -277,11 +270,11 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);

log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`);
log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);

const fetchingStarted = Date.now();

const [ url ] = this.feedQueue.splice(0, 1);
const url = this.feedQueue.pop();
let sleepFor = this.sleepingInterval;

if (url) {
Expand All @@ -298,7 +291,7 @@ export class FeedReader {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
}
} else {
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId}`);
}

Expand Down
1 change: 1 addition & 0 deletions src/feeds/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod parser;
pub mod reader;
8 changes: 5 additions & 3 deletions src/feeds/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct FeedItem {
pub pubdate: Option<String>,
pub summary: Option<String>,
pub author: Option<String>,
pub hash_id: Option<String>,
pub hash_id: String,
}

#[derive(Serialize, Debug, Deserialize)]
Expand Down Expand Up @@ -70,7 +70,9 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.map(|f| f.value)
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok()),
.and_then(|f| hash_id(f).ok())
.and_then(|f| Some(format!("md5:{}", f)))
.unwrap(), // TODO: Handle error
})
.collect(),
}
Expand Down Expand Up @@ -117,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel {
.map(|date| date.to_rfc2822()),
summary: item.summary().map(|v| v.value.clone()),
author: authors_to_string(item.authors()),
hash_id: hash_id(item.id.clone()).ok(),
hash_id: hash_id(item.id.clone()).unwrap(),
})
.collect(),
}
Expand Down
Loading
Loading