Skip to content

Commit

Permalink
feat(kafka-like producers): add queuing bytes metric telemetry support
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Oct 23, 2024
1 parent 88a8fb4 commit efd8595
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 4 deletions.
2 changes: 1 addition & 1 deletion apps/emqx_bridge_azure_event_hub/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.2"},
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_confluent/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.2"},
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge_kafka/rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{erl_opts, [debug_info]}.
{deps, [
{wolff, "4.0.2"},
{wolff, "4.0.3"},
{kafka_protocol, "4.1.8"},
{brod_gssapi, "0.1.3"},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,13 @@ handle_telemetry_event(
#{bridge_id := ID}
) when is_integer(Val) ->
emqx_resource_metrics:queuing_set(ID, PartitionID, Val);
handle_telemetry_event(
[wolff, queuing_bytes],
#{gauge_set := Val},
#{bridge_id := ID, partition_id := PartitionID},
#{bridge_id := ID}
) when is_integer(Val) ->
emqx_resource_metrics:queuing_bytes_set(ID, PartitionID, Val);
handle_telemetry_event(
[wolff, retried],
#{counter_inc := Val},
Expand Down Expand Up @@ -965,6 +972,7 @@ maybe_install_wolff_telemetry_handlers(TelemetryId) ->
[
[wolff, dropped_queue_full],
[wolff, queuing],
[wolff, queuing_bytes],
[wolff, retried],
[wolff, inflight]
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ t_create_connector_while_connection_is_down(Config) ->
),
?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)),
?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)),
QueuingBytes = emqx_resource_metrics:queuing_bytes_get(ActionId),
?assert(QueuingBytes > 0, #{bytes => QueuingBytes}),
?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)),
?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)),
?assertEqual(0, emqx_resource_metrics:success_get(ActionId)),
Expand Down
2 changes: 2 additions & 0 deletions changes/ce/feat-14065.en.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added the new `queuing_bytes` to available data integration metrics. This metric indicates how much RAM and/or disk resources the buffering of a given action is consuming.
Currently, only Pulsar Producer action lacks support for this metric.
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ defmodule EMQXUmbrella.MixProject do
def common_dep(:influxdb),
do: {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}

def common_dep(:wolff), do: {:wolff, "4.0.2"}
def common_dep(:wolff), do: {:wolff, "4.0.3"}
def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"}

def common_dep(:kafka_protocol),
Expand Down

0 comments on commit efd8595

Please sign in to comment.