Skip to content

Implement file processor#690

Open
henrikschristensen wants to merge 4 commits intowarpstreamlabs:mainfrom
henrikschristensen:implement-file-processor
Open

Implement file processor#690
henrikschristensen wants to merge 4 commits intowarpstreamlabs:mainfrom
henrikschristensen:implement-file-processor

Conversation

@henrikschristensen
Copy link
Copy Markdown

resolves #649

Copy link
Copy Markdown
Collaborator

@jem-davies jem-davies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you have altered it to draft after I started to take a look...

But as well as the comments - what about thinking about the shutdown procedure - i.e. reading from a big file and Bento is shutdown - will that happen gracefully -

What about the atomicity of the operations too?

@henrikschristensen
Copy link
Copy Markdown
Author

@jem-davies should the read method use a scanner like the file input? I seems that scanner impl handles the prob with large files. Reading in chunks etc.

@jem-davies
Copy link
Copy Markdown
Collaborator

@jem-davies should the read method use a scanner like the file input? I seems that scanner impl handles the prob with large files. Reading in chunks etc.

Yeah that's what I was thinking 👍

@henrikschristensen henrikschristensen force-pushed the implement-file-processor branch 2 times, most recently from 2627bd5 to 53f70d5 Compare February 23, 2026 16:28
@henrikschristensen
Copy link
Copy Markdown
Author

For gracefull shutdown during large file reads i changed the read method to use a scanner.
For atomicity of write and move i implemented steps like using temporary intermediate files to make sure we dont alter data if we fail during the operation.

@henrikschristensen henrikschristensen marked this pull request as ready for review February 28, 2026 11:01
@jem-davies jem-davies self-assigned this Mar 23, 2026
Comment on lines +62 to +63
Description("The file operation to perform.").
Examples("read", "write", "delete", "move", "rename", "stat"),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description("The file operation to perform.").
Examples("read", "write", "delete", "move", "rename", "stat"),
Description("The file operation to perform."),

nit: remove these examples - the options are already displayed on the docs page.

Description("The file operation to perform.").
Examples("read", "write", "delete", "move", "rename", "stat"),
service.NewInterpolatedStringField(fileProcessorFieldPath).
Description("The source file path. Supports interpolation for dynamic paths.").
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description("The source file path. Supports interpolation for dynamic paths.").
Description("The source file path.").

It is already mentioned automatically for NewInterpolatedStringFields.

"/tmp/${! json(\"document.id\") }.txt",
).LintRule(`if this == "" { [ "'path' must be set to a non-empty string" ] }`),
service.NewInterpolatedStringField(fileProcessorFieldDest).
Description("The destination path for 'move' and 'rename' operations. Supports interpolation.").
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Description("The destination path for 'move' and 'rename' operations. Supports interpolation.").
Description("The destination path for 'move' and 'rename' operations.").

fileProcessorFieldDest = "dest"
fileProcessorFieldScanner = "scanner"
)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add const for the operations options - and then reference them later

Comment on lines +19 to +20
fileProcessorFieldPath = "path"
fileProcessorFieldDest = "dest"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fileProcessorFieldPath = "path"
fileProcessorFieldDest = "dest"
fileProcessorFieldPath = "source_path"
fileProcessorFieldDest = "destination_path"

nit

Comment on lines +233 to +235
if err := ackFn(ctx, nil); err != nil {
p.log.Warnf("Failed to acknowledge scanner batch: %v", err)
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := ackFn(ctx, nil); err != nil {
p.log.Warnf("Failed to acknowledge scanner batch: %v", err)
}

Replace with the defer file.Close()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will all scanners move on to NextBatch if we fail to ack the one we are processing?

}

func (p *fileProcessor) processWrite(_ context.Context, msg *service.Message) (service.MessageBatch, error) {
path, err := p.conf.Path.TryString(msg)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting again regarding path / dest or source_path / destination_path - here we are writing to path and not dest - which is fine. But worth documenting this in the operations section of the docs imo.

}

// Use atomic write pattern: write to temp file, then rename
tempFile := path + ".tmp"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would add some sort of random prefix to ".tmp" - so to make sure we don't get a collision.


// atomicCopyAndDelete performs an atomic copy from src to dest and then deletes src
// This ensures that either the operation completes fully or leaves the source intact
func (p *fileProcessor) atomicCopyAndDelete(_ context.Context, srcPath, destPath string, msg *service.Message) (service.MessageBatch, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have context in the func signature but we aren't going to use it, and it's an unexported func so unclear why.

}
defer srcFile.Close()

content, err := io.ReadAll(srcFile)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially going to lead to OOM issues - thinking that this could be re-worked using a streaming approach to writing to the tempFile - and then we could also observe context cancellation.

@jem-davies jem-davies removed their assignment Mar 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add watcher capability for Input / file component

2 participants