Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(fluent source): Add support for forwarding over unix socket #22212

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Jan 15, 2025

Summary

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

I couldn't work out how to get the integration tests working, but I tested by running fluent-bit with the following config

[SERVICE]
    Flush      5
    Daemon     off
    Log_Level  info

[INPUT]
    Name       cpu
    Tag        cpu_usage

[OUTPUT]
    Name          forward
    Match         *
    unix_path     /tmp/map/fluent.sock

And then running vector with

sources:
  fluent:
    type: fluent
    path: /tmp/map/fluent.sock

sinks:
  out:
    inputs:
      - fluent
    type: console
    encoding:
      codec: json

And seeing the metrics being transferred.

I think it should be a relatively straightforward lift for someone with familiarity with the existing integration testing harness to hook this up, but I got all sorts of errors.

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
    • make check-all is a good command to run locally. This check is
      defined here. Some of these
      checks might not be relevant to your PR. For Rust changes, at the very least you should run:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@tustvold tustvold requested a review from a team as a code owner January 15, 2025 13:28
@bits-bot
Copy link

bits-bot commented Jan 15, 2025

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added the domain: sources Anything related to the Vector's sources label Jan 15, 2025
Copy link
Contributor Author

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately this change is largely mechanical and is just gluing together the existing decoder with the existing unix stream machinery, but there is definitely the possibility I have missed something subtle.

src/sources/fluent/mod.rs Show resolved Hide resolved
/// Listening mode for the `fluent` source.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(untagged, rename_all = "snake_case")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the socket data source I used untagged, to avoid this being a breaking change requiring introducing a "mode" tag. As the fields path vs address are unlikely to ever overlap, I think this is probably fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on this? cc @jszwedko

Copy link
Member

@jszwedko jszwedko Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we maintain compatibility while also moving towards the desired configuration schema. That is, we add mode but default it to tcp so that if users only specify address it continues to work. If users want to use a unix socket, they should specify mode = ... and then also path. I'd suggest we also call this mode unix_stream to match recent changes to the socket sink.

Ideally we'd go further and start to evolve the configuration into what is described in the configuration spec. This would mean configuration would look like:

mode:
  type: tcp
  tcp:
    address: "localhost:1234"

But I can see an argument for not going that far just yet since it would make it inconsistent with the existing configuration for the socket source/sink. So I'd be happy to just see mode added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jesse, I am happy with both recommendations i.e. I am OK with diverging from socket as long as an example is provided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you could advise on how to achieve this, I'm not aware of a way to add a serde tag attribute with a default - https://serde.rs/enum-representations.html#internally-tagged

Copy link
Member

@pront pront Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the easiest way to do this way: https://github.com/vectordotdev/vector/blob/master/src/sources/socket/mod.rs#L20-L48

Which will look like:

mode: tcp
address: localhost:123

Jesse shared this with me: serde-rs/serde#2231. I guess the ideal way will need some custom deserialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be the easiest way to do this way: https://github.com/vectordotdev/vector/blob/master/src/sources/socket/mod.rs#L20-L48

That would force mode to be specified, as linked in the serde issue, and would therefore be a breaking change.

I could manually implement Serialize but this would likely be a fairly chunky piece of code...

Just let me know how you would like to proceed, I'm not sure atm what is being asked for

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ideally we'd make this change in a non-breaking fashion while implementing the config UX we want. That is, allow for:

address: localhost:1234

to continue to work and default to tcp while we add the mode option so that:

mode: tcp
address: localhost:1234
mode: unix_streams
path: /dev/socket/foo

Also works.

To your point, I'm not seeing a way to default mode using an adjacently tagged enum in serde so I think the options are:

  1. Write a custom deserializer for the enum (I'm not 100% sure what this would look like or if it is actually possible)
  2. Don't use an enum to model the config, but instead have separate mode,address, and path options. When initializing the component, check mode, defaulting to tcp, and then look for the appropriate secondary field (address or path), raising an error if it isn't set
  3. Make this a breaking change

I think path (2) would be best since it maintains compatibility and is only slightly hacky to implement. What do you think?

}

fn can_acknowledge(&self) -> bool {
true
matches!(self.mode, FluentMode::Tcp(_))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this does, I just matched the socket source

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please share a link to the code you are referring to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This basically determines if this source supports acknowledgments. This is currently true. And it can stay as true if the new mode also supports acks.

https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the new mode does, it behaves similarly to the socket source which also does not

listen_path: PathBuf,
socket_file_mode: Option<u32>,
decoder: Decoder,
decoder: D,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary to allow using the FluentDecoder. As the fluent protocol doesn't have an explicit framing mechanism that I could ascertain, there wasn't a good way to make this work with the codecs::Decoder setup which separates framing and decoding.

This, and better discoverability, is also the reason I opted not to implement this as a codec on the socket source, as suggested on the ticket

Cargo.lock Outdated Show resolved Hide resolved
@pront pront self-assigned this Jan 23, 2025
@pront

This comment was marked as outdated.

Copy link
Member

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR @tustvold! Left a couple of review comments.

}

fn can_acknowledge(&self) -> bool {
true
matches!(self.mode, FluentMode::Tcp(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please share a link to the code you are referring to?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: sources Anything related to the Vector's sources
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Source: fluent, allow sending events to unix socket
4 participants