Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink: amqp): add priority property #22243

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

aramperes
Copy link

@aramperes aramperes commented Jan 20, 2025

Summary

Provides the option to set the priority of messages sent by the amqp sink. RabbitMQ documentation.

Priority is a useful way to order messages when they are sent to the same exchange/queue through different sinks.

The property is a numeric value (u8) that also allows templating. The template must resolve to an integer between 0-255, otherwise the priority will not be set on the message (default behavior is priority 0). An invalid template will log and drop the event.

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

How did you test this PR?

  1. Declare a RabbitMQ queue with the x-max-priority argument set (values up to 5 recommended).
  2. Declare an exchange, and bind the queue.
  3. Configure an amqp sink in Vector, publishing to the exchange and with properties.priority configured. It can be a template: ("{{ .priority }}"), or a constant: 1.
sources:
  in:
    type: stdin

transforms:
  prioritize:
    type: remap
    inputs:
      - in
    source: |
      .priority = random_int(1, 10)

sinks:
  sink_amqp:
    type: amqp
    inputs:
      - prioritize
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    exchange: example
    encoding:
      codec: json
    properties:
      priority: "{{ .priority }}"

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the "no-changelog" label to this PR.

Checklist

  • Please read our Vector contributor resources.
    • make check-all is a good command to run locally. This check is
      defined here. Some of these
      checks might not be relevant to your PR. For Rust changes, at the very least you should run:
      • cargo fmt --all
      • cargo clippy --workspace --all-targets -- -D warnings
      • cargo nextest run --workspace (alternatively, you can run cargo test --all)
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run dd-rust-license-tool write to regenerate the license inventory and commit the changes (if any). More details here.

References

@aramperes aramperes requested review from a team as code owners January 20, 2025 00:48
@bits-bot
Copy link

bits-bot commented Jan 20, 2025

CLA assistant check
All committers have signed the CLA.

@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: external docs Anything related to Vector's external, public documentation labels Jan 20, 2025
src/sinks/amqp/config.rs Outdated Show resolved Hide resolved
src/sinks/amqp/config.rs Outdated Show resolved Hide resolved
@pront pront enabled auto-merge January 21, 2025 19:57
@jszwedko jszwedko changed the title feat(amqp sink): add priority property feat(sink: amqp): add priority property Jan 21, 2025
@pront pront added this pull request to the merge queue Jan 21, 2025
@aramperes
Copy link
Author

Let me see if I can make Templating work here... It would prevent a future breaking change here

@pront pront removed this pull request from the merge queue due to a manual request Jan 21, 2025
@pront
Copy link
Member

pront commented Jan 21, 2025

Let me see if I can make Templating work here... It would prevent a future breaking change here

Sounds good. (Although, I don't think the change would be breaking since the config would accept both constants and templated values.)

@jszwedko
Copy link
Member

Let me see if I can make Templating work here... It would prevent a future breaking change here

Sounds good. (Although, I don't think the change would be breaking since the config would accept both constants and templated values.)

Yeah, exactly. Templates can just be raw literals like foo so I think this could be added after and wouldn't be breaking. It would be useful to be able to use templates for non-string literals for sure.

@aramperes
Copy link
Author

Fair enough; my thinking currently is to make it default to 0 (same as no priority) if the templating fails to produce a u8. That would be kinda consistent with RabbitMQ behaviour being permissive:

Messages without a priority property are treated as if their priority were 0. Messages with a priority which is higher than the queue's maximum are treated as if they were published with the maximum priority.

Would that be better than a runtime error, in the event that the template renders an invalid type?

I'm working on a integration test (with templating) right now.

@jszwedko
Copy link
Member

Messages without a priority property are treated as if their priority were 0. Messages with a priority which is higher than the queue's maximum are treated as if they were published with the maximum priority.

Would that be better than a runtime error, in the event that the template renders an invalid type?

Good question 🤔 Typically for "template errors" we consider them fatal and drop the event, but I can see the argument here that there is a safe default. I'd be 👍 to falling back to that default and just emitting a warning if the template fails to render correctly (rather than an error).

@aramperes aramperes marked this pull request as draft January 22, 2025 00:24
@aramperes
Copy link
Author

aramperes commented Jan 22, 2025

I updated the configuration to allow templating of properties.priority. Example:

sources:
  in:
    type: stdin

transforms:
  prioritize:
    type: remap
    inputs:
      - in
    source: |
      .priority = random_int(1, 10)

sinks:
  sink_amqp:
    type: amqp
    inputs:
      - prioritize
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    exchange: example
    encoding:
      codec: json
    properties:
      priority: "{{ .priority }}"

Because it is treated as a string, it must be "quoted" in YAML configurations, which may be counter intuitive. The error thrown in the case priority: 1 is:

Configuration error. error=invalid type: integer `1`, expected a string
in `sinks.sink_amqp`

I also added integration tests for the various cases. I did not cover the case where an invalid template drops the event; if you could point me to an example on how to catch dropped events through instrumentation/counters I could add a case for that.

@aramperes aramperes marked this pull request as ready for review January 22, 2025 02:09
@aramperes aramperes requested review from kayayarai and pront January 22, 2025 02:09
@pront
Copy link
Member

pront commented Jan 22, 2025

First of all, thanks for extending this and including the tests 👍

Because it is treated as a string, it must be "quoted" in YAML configurations, which may be counter intuitive. The error thrown in the case priority: 1 is:

Configuration error. error=invalid type: integer `1`, expected a string
in `sinks.sink_amqp`

Hmm, this is not great UX, especially since the rabbitmq docs clearly ask for a number. Still if we extend templating to accept numbers as well it improve the UX automatically. Maybe worth adding a note to the docs so users aren't surprised cc @jszwedko

@jszwedko
Copy link
Member

First of all, thanks for extending this and including the tests 👍

Because it is treated as a string, it must be "quoted" in YAML configurations, which may be counter intuitive. The error thrown in the case priority: 1 is:

Configuration error. error=invalid type: integer `1`, expected a string
in `sinks.sink_amqp`

Hmm, this is not great UX, especially since the rabbitmq docs clearly ask for a number. Still if we extend templating to accept numbers as well it improve the UX automatically. Maybe worth adding a note to the docs so users aren't surprised cc @jszwedko

Agreed, the UX here isn't great for literals. Could we change the way Template deserializes to allow numeric literals it addition to string literals? That would need to be a change to the Template struct (https://github.com/vectordotdev/vector/blob/21776bbd15113e34841dc6269358fc69b9166546/src/template.rs). You can already see there how it handles strings.

Again, happy to see the original PR go in without template support if you don't want to take that on right now 🙂

@aramperes
Copy link
Author

Agreed, I will give it a shot!

@aramperes
Copy link
Author

aramperes commented Jan 25, 2025

I pushed a solution for the configuration of numeric values as optional templates:

Template now deserializes from a new enum TemplateSource, which supports numeric values (i64, u64, f64) and String. The TemplateSource is then treated as a String and the template is parsed/rendered as before.

There could be a performance optimization done for numeric values to skip the Number -> TemplateSource -> String -> Template -> String -> u8 cycle when evaluating the AMQP template, but that would require some more changes to the Template struct.

For serialization, it is simply converting to a TemplateSource::String regardless of the original type. I'm not clear if that causes any UX issues; otherwise Template could be updated to maintain TemplateSource information. That would be a bit more work to get done.

Tested with both configs:

sinks:
  sink_amqp:
    type: amqp
    inputs:
      - prioritize
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    exchange: example
    encoding:
      codec: json
    properties:
      priority: 1
sinks:
  sink_amqp:
    type: amqp
    inputs:
      - prioritize
    connection_string: amqp://user:password@127.0.0.1:5672/%2f?timeout=10
    exchange: example
    encoding:
      codec: json
    properties:
      priority: "{{ .priority }}"

Also, here is the warning message when the template evaluates to a non-numeric or negative value:

WARN sink{component_kind="sink" component_id=sink_amqp component_type=amqp}: vector::sinks::amqp::config: Failed to convert to numeric value for "properties.priority" error=invalid digit found in string error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

And when it evaluates to an out-of-bounds value above 255:

WARN sink{component_kind="sink" component_id=sink_amqp component_type=amqp}: vector::sinks::amqp::config: Failed to convert to numeric value for "properties.priority" error=number too large to fit in target type error_type="conversion_failed" stage="processing" internal_log_rate_limit=true

On an invalid template, such as a missing field (dropped):

ERROR sink{component_kind="sink" component_id=sink_amqp component_type=amqp}: vector::internal_events::template: Failed to render template for "properties.priority". error=Missing fields on event: [".unknown"] error_type="template_failed" stage="processing" internal_log_rate_limit=true
ERROR sink{component_kind="sink" component_id=sink_amqp component_type=amqp}: vector_common::internal_event::component_events_dropped: Events dropped intentional=false count=1 reason="Failed to render template." internal_log_rate_limit=true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants