From efd859562c0cb34b0ae0100d705a35372228702d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 23 Oct 2024 14:34:00 -0300 Subject: [PATCH] feat(kafka-like producers): add queuing bytes metric telemetry support https://github.com/kafka4beam/wolff/pull/82 --- apps/emqx_bridge_azure_event_hub/rebar.config | 2 +- apps/emqx_bridge_confluent/rebar.config | 2 +- apps/emqx_bridge_kafka/rebar.config | 2 +- .../src/emqx_bridge_kafka_impl_producer.erl | 8 ++++++++ .../test/emqx_bridge_v2_kafka_producer_SUITE.erl | 2 ++ changes/ce/feat-14065.en.md | 2 ++ mix.exs | 2 +- 7 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 changes/ce/feat-14065.en.md diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 618d3a8c35..55b13fbc68 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, "4.0.2"}, + {wolff, "4.0.3"}, {kafka_protocol, "4.1.8"}, {brod_gssapi, "0.1.3"}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 44f9875eac..4954c0fcda 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, "4.0.2"}, + {wolff, "4.0.3"}, {kafka_protocol, "4.1.8"}, {brod_gssapi, "0.1.3"}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index d5a5c25f8d..f15788f369 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, "4.0.2"}, + {wolff, "4.0.3"}, {kafka_protocol, "4.1.8"}, {brod_gssapi, "0.1.3"}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index dce32d7872..021541becc 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -928,6 +928,13 @@ handle_telemetry_event( #{bridge_id := ID} ) when is_integer(Val) -> emqx_resource_metrics:queuing_set(ID, PartitionID, Val); +handle_telemetry_event( + [wolff, queuing_bytes], + #{gauge_set := Val}, + #{bridge_id := ID, partition_id := PartitionID}, + #{bridge_id := ID} +) when is_integer(Val) -> + emqx_resource_metrics:queuing_bytes_set(ID, PartitionID, Val); handle_telemetry_event( [wolff, retried], #{counter_inc := Val}, @@ -965,6 +972,7 @@ maybe_install_wolff_telemetry_handlers(TelemetryId) -> [ [wolff, dropped_queue_full], [wolff, queuing], + [wolff, queuing_bytes], [wolff, retried], [wolff, inflight] ], diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 350dd66929..3317d9c752 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -714,6 +714,8 @@ t_create_connector_while_connection_is_down(Config) -> ), ?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)), ?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)), + QueuingBytes = emqx_resource_metrics:queuing_bytes_get(ActionId), + ?assert(QueuingBytes > 0, #{bytes => QueuingBytes}), ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)), ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)), ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)), diff --git a/changes/ce/feat-14065.en.md b/changes/ce/feat-14065.en.md new file mode 100644 index 0000000000..df1233745a --- /dev/null +++ b/changes/ce/feat-14065.en.md @@ -0,0 +1,2 @@ +Added the new `queuing_bytes` to available data integration metrics. This metric indicates how much RAM and/or disk resources the buffering of a given action is consuming. +Currently, only Pulsar Producer action lacks support for this metric. diff --git a/mix.exs b/mix.exs index 14c91c8afb..218553922d 100644 --- a/mix.exs +++ b/mix.exs @@ -274,7 +274,7 @@ defmodule EMQXUmbrella.MixProject do def common_dep(:influxdb), do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true} - def common_dep(:wolff), do: {:wolff, "4.0.2"} + def common_dep(:wolff), do: {:wolff, "4.0.3"} def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"} def common_dep(:kafka_protocol),