-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: add s3 timeout to session batch writer #29415
Conversation
✅ No documentation updates required. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds timeout handling and health checks to the S3 session batch writer to prevent indefinite hangs during uploads and ensure storage accessibility before processing.
- Added a configurable timeout mechanism (default 5s) in
S3SessionBatchFileWriter
with proper stream cleanup and error propagation - Implemented a
checkHealth()
method that verifies S3 bucket accessibility via a HEAD request - Added storage health verification before starting the session recording consumer
- Created comprehensive test coverage for timeout behavior, including custom timeout values and error scenarios
- Introduced a robust error handling system using callbacks to properly propagate errors to all pending operations
7 file(s) reviewed, 2 comment(s)
Edit PR Review Bot Settings | Greptile
try { | ||
return await Promise.race([operation(), errorPromise]) | ||
} finally { | ||
this.rejectCallbacks = this.rejectCallbacks.filter((cb) => !this.rejectCallbacks.includes(cb)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: This filter logic has a potential issue. Using this.rejectCallbacks.includes(cb)
will always be true since you're iterating over the same array. This should likely be this.rejectCallbacks = this.rejectCallbacks.filter((callback) => callback !== cb)
.
this.rejectCallbacks = this.rejectCallbacks.filter((cb) => !this.rejectCallbacks.includes(cb)) | |
this.rejectCallbacks = this.rejectCallbacks.filter((callback) => callback !== cb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is true but i agree it dos look error prone...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed that – the greptile suggestion is good, though the reasoning behind it seems off.
I generally don't like this error barrier code, but couldn't find a better solution for mixing event listeners and promises, plus this weird done
API. At least it's encapsulated in this class.
if (this.timeoutId) { | ||
clearTimeout(this.timeoutId) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The timeoutId is cleared here but the timeoutId variable isn't set to null, unlike in the handleError method. For consistency, consider setting it to null here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small nit comments but otherwise looks good
plugin-server/src/config/config.ts
Outdated
@@ -233,6 +233,7 @@ export function getDefaultConfig(): PluginsServerConfig { | |||
SESSION_RECORDING_V2_S3_REGION: 'us-east-1', | |||
SESSION_RECORDING_V2_S3_ACCESS_KEY_ID: 'object_storage_root_user', | |||
SESSION_RECORDING_V2_S3_SECRET_ACCESS_KEY: 'object_storage_root_password', | |||
SESSION_RECORDING_V2_S3_TIMEOUT_MS: 5000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea but could this be too short? Like could a valid upload take that long?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point – had a look at the metrics, I'll change it – 30 seconds should be fine.
@@ -112,19 +114,20 @@ export class SessionRecordingIngester { | |||
|
|||
const offsetManager = new KafkaOffsetManager(this.commitOffsets.bind(this), this.topic) | |||
const metadataStore = new SessionMetadataStore(producer) | |||
const fileStorage = s3Client | |||
this.fileStorage = s3Client | |||
? new S3SessionBatchFileStorage( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq - why not just pass in the config here? Feels like a lot of your classes take hyper specific options which just makes for a lot of verbose code rather than just referencing the config? Could still use Pick<> to only get the values you want but makes for less churn
private readonly s3: S3Client, | ||
private readonly bucket: string, | ||
private readonly prefix: string, | ||
private readonly timeout: number = 5000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather leave the default to the config (otherwise it makes it look like there are two places you can set the default which is misleading
try { | ||
return await Promise.race([operation(), errorPromise]) | ||
} finally { | ||
this.rejectCallbacks = this.rejectCallbacks.filter((cb) => !this.rejectCallbacks.includes(cb)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this is true but i agree it dos look error prone...
Problem
Due to lack of timeouts, S3 uploads can hang indefinitely. We also don't check if the S3 bucket is accessible when starting the server.
Changes
Does this work well for both Cloud and self-hosted?
Yes
How did you test this code?
Added unit tests, tested locally.