Skip to content

Commit

Permalink
Add ConnectionOpen metric to websocket_server
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Jan 29, 2025
1 parent 0d074c0 commit 8ef766d
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions src/sinks/websocket_server/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use vector_lib::{
use crate::{
codecs::{Encoder, Transformer},
internal_events::{
WsListenerConnectionEstablished, WsListenerConnectionFailedError,
WsListenerConnectionShutdown, WsListenerSendError,
ConnectionOpen, OpenGauge, WsListenerConnectionEstablished,
WsListenerConnectionFailedError, WsListenerConnectionShutdown, WsListenerSendError,
},
sources::util::http::HttpSourceAuth,
};
Expand Down Expand Up @@ -85,9 +85,17 @@ impl WebSocketListenerSink {
peers: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>,
mut listener: MaybeTlsListener,
) {
let open_gauge = OpenGauge::new();

while let Ok(stream) = listener.accept().await {
tokio::spawn(
Self::handle_connection(auth.clone(), Arc::clone(&peers), stream).in_current_span(),
Self::handle_connection(
auth.clone(),
Arc::clone(&peers),
stream,
open_gauge.clone(),
)
.in_current_span(),
);
}
}
Expand All @@ -96,6 +104,7 @@ impl WebSocketListenerSink {
auth: HttpSourceAuth,
peers: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<Message>>>>,
stream: MaybeTlsIncomingStream<TcpStream>,
open_gauge: OpenGauge,
) -> Result<(), ()> {
let addr = stream.peer_addr();
debug!("Incoming TCP connection from: {}", addr);
Expand Down Expand Up @@ -125,6 +134,8 @@ impl WebSocketListenerSink {
})
})?;

let _open_token = open_gauge.open(|count| emit!(ConnectionOpen { count }));

// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded();

Expand Down

0 comments on commit 8ef766d

Please sign in to comment.