From 847a4e617e86f6e72594e8868a92c5052d7d8101 Mon Sep 17 00:00:00 2001 From: yuanchao Date: Fri, 24 Jan 2025 19:52:56 +0800 Subject: [PATCH] feat: support ping --- agent/crates/public/src/l7_protocol.rs | 3 + agent/src/common/l7_protocol_info.rs | 6 +- agent/src/common/l7_protocol_log.rs | 20 +- agent/src/common/meta_packet.rs | 46 +++- agent/src/config/config.rs | 2 + agent/src/flow_generator/error.rs | 2 + agent/src/flow_generator/flow_map.rs | 57 +++-- agent/src/flow_generator/perf/mod.rs | 55 ++++- .../src/flow_generator/protocol_logs/http.rs | 1 + agent/src/flow_generator/protocol_logs/mod.rs | 2 + .../src/flow_generator/protocol_logs/ping.rs | 210 ++++++++++++++++++ agent/src/plugin/shared_obj/test.rs | 2 + agent/src/plugin/wasm/test.rs | 2 + .../ingester/flow_log/log_data/l4_flow_log.go | 2 +- .../ingester/flow_log/log_data/l7_flow_log.go | 4 +- .../ingester/flow_log/log_data/otel_import.go | 6 +- server/libs/datatype/protocol_logs.go | 4 +- .../clickhouse/tag/enum/response_status.ch | 2 +- .../clickhouse/tag/enum/response_status.en | 2 +- 19 files changed, 384 insertions(+), 44 deletions(-) create mode 100644 agent/src/flow_generator/protocol_logs/ping.rs diff --git a/agent/crates/public/src/l7_protocol.rs b/agent/crates/public/src/l7_protocol.rs index 8325d13f516..444840cf9f3 100644 --- a/agent/crates/public/src/l7_protocol.rs +++ b/agent/crates/public/src/l7_protocol.rs @@ -74,6 +74,7 @@ pub enum L7Protocol { // INFRA DNS = 120, TLS = 121, + Ping = 122, Custom = 127, @@ -91,6 +92,7 @@ impl L7Protocol { | Self::Dubbo | Self::SofaRPC | Self::SomeIp + | Self::Ping | Self::Custom => true, _ => false, } @@ -127,6 +129,7 @@ impl From for L7Protocol { "dns" => Self::DNS, "oracle" => Self::Oracle, "tls" => Self::TLS, + "ping" => Self::Ping, "some/ip" | "someip" => Self::SomeIp, _ => Self::Unknown, } diff --git a/agent/src/common/l7_protocol_info.rs b/agent/src/common/l7_protocol_info.rs index 518ee8fd586..24e46366528 100644 --- a/agent/src/common/l7_protocol_info.rs +++ b/agent/src/common/l7_protocol_info.rs @@ -27,8 +27,8 @@ use crate::{ protocol_logs::{ fastcgi::FastCGIInfo, pb_adapter::L7ProtocolSendLog, AmqpInfo, BrpcInfo, DnsInfo, DubboInfo, HttpInfo, KafkaInfo, MemcachedInfo, MongoDBInfo, MqttInfo, MysqlInfo, - NatsInfo, OpenWireInfo, PostgreInfo, PulsarInfo, RedisInfo, RocketmqInfo, SofaRpcInfo, - TarsInfo, ZmtpInfo, + NatsInfo, OpenWireInfo, PingInfo, PostgreInfo, PulsarInfo, RedisInfo, RocketmqInfo, + SofaRpcInfo, TarsInfo, ZmtpInfo, }, AppProtoHead, LogMessageType, Result, }, @@ -84,6 +84,7 @@ cfg_if::cfg_if! { PostgreInfo(PostgreInfo), OpenWireInfo(OpenWireInfo), SofaRpcInfo(SofaRpcInfo), + PingInfo(PingInfo), CustomInfo(CustomInfo), // add new protocol info below ); @@ -112,6 +113,7 @@ cfg_if::cfg_if! { SofaRpcInfo(SofaRpcInfo), TlsInfo(crate::flow_generator::protocol_logs::tls::TlsInfo), SomeIpInfo(crate::flow_generator::protocol_logs::rpc::SomeIpInfo), + PingInfo(PingInfo), CustomInfo(CustomInfo), // add new protocol info below ); diff --git a/agent/src/common/l7_protocol_log.rs b/agent/src/common/l7_protocol_log.rs index 07995d0e894..87c1e63be03 100644 --- a/agent/src/common/l7_protocol_log.rs +++ b/agent/src/common/l7_protocol_log.rs @@ -30,6 +30,7 @@ use super::flow::{L7PerfStats, PacketDirection}; use super::l7_protocol_info::L7ProtocolInfo; use super::MetaPacket; +use crate::common::meta_packet::{IcmpData, ProtocolData}; use crate::config::handler::LogParserConfig; use crate::config::OracleConfig; use crate::flow_generator::flow_map::FlowMapCounter; @@ -39,8 +40,8 @@ use crate::flow_generator::protocol_logs::plugin::get_custom_log_parser; use crate::flow_generator::protocol_logs::sql::ObfuscateCache; use crate::flow_generator::protocol_logs::{ AmqpLog, BrpcLog, DnsLog, DubboLog, HttpLog, KafkaLog, MemcachedLog, MongoDBLog, MqttLog, - MysqlLog, NatsLog, OpenWireLog, PostgresqlLog, PulsarLog, RedisLog, RocketmqLog, SofaRpcLog, - TarsLog, ZmtpLog, + MysqlLog, NatsLog, OpenWireLog, PingLog, PostgresqlLog, PulsarLog, RedisLog, RocketmqLog, + SofaRpcLog, TarsLog, ZmtpLog, }; use crate::flow_generator::{LogMessageType, Result}; @@ -185,6 +186,7 @@ cfg_if::cfg_if! { ZMTP(ZmtpLog), RocketMQ(RocketmqLog), OpenWire(OpenWireLog), + Ping(PingLog), // add protocol below } } @@ -215,6 +217,7 @@ cfg_if::cfg_if! { OpenWire(OpenWireLog), TLS(crate::flow_generator::protocol_logs::tls::TlsLog), SomeIp(crate::flow_generator::protocol_logs::rpc::SomeIpLog), + Ping(PingLog), // add protocol below } } @@ -290,6 +293,13 @@ pub trait L7ProtocolParserInterface { true } + // l4即不是udp也不是tcp,用于快速过滤协议 + // ============================== + // L4 is neither UDP nor TCP and is used to quickly filter protocols + fn parsable_on_other(&self) -> bool { + false + } + // is parse default? use for config init. fn parse_default(&self) -> bool { true @@ -373,6 +383,7 @@ pub struct ParseParam<'a> { pub port_src: u16, pub port_dst: u16, pub flow_id: u64, + pub icmp_data: Option<&'a IcmpData>, // parse info pub direction: PacketDirection, @@ -430,6 +441,11 @@ impl<'a> ParseParam<'a> { ip_dst: packet.lookup_key.dst_ip, port_src: packet.lookup_key.src_port, port_dst: packet.lookup_key.dst_port, + icmp_data: if let ProtocolData::IcmpData(icmp_data) = &packet.protocol_data { + Some(icmp_data) + } else { + None + }, flow_id: packet.flow_id, direction: packet.lookup_key.direction, diff --git a/agent/src/common/meta_packet.rs b/agent/src/common/meta_packet.rs index e1a188b7c8b..9b7a675813a 100644 --- a/agent/src/common/meta_packet.rs +++ b/agent/src/common/meta_packet.rs @@ -518,11 +518,22 @@ impl<'a> MetaPacket<'a> { } } - // 目前仅支持获取UDP或TCP的Payload - pub fn get_l4_payload(&self) -> Option<&[u8]> { - if self.lookup_key.proto != IpProtocol::TCP && self.lookup_key.proto != IpProtocol::UDP { + fn get_l3_payload(&self) -> Option<&[u8]> { + if self.tap_port.is_from(TapPort::FROM_EBPF) { return None; } + + let packet_header_size = self.header_type.min_packet_size() + self.l2_l3_opt_size as usize; + if let Some(raw) = self.raw.as_ref() { + if raw.len() > packet_header_size { + return Some(&raw[packet_header_size..]); + } + } + None + } + + // 目前仅支持获取UDP或TCP的Payload + pub fn get_l4_payload(&self) -> Option<&[u8]> { if self.tap_port.is_from(TapPort::FROM_EBPF) { return Some(&self.raw_from_ebpf[self.raw_from_ebpf_offset..]); } @@ -538,6 +549,18 @@ impl<'a> MetaPacket<'a> { None } + pub fn get_l7(&self) -> Option<&[u8]> { + if self.lookup_key.proto == IpProtocol::TCP || self.lookup_key.proto == IpProtocol::UDP { + return self.get_l4_payload(); + } + if self.lookup_key.eth_type == EthernetType::IPV4 + || self.lookup_key.eth_type == EthernetType::IPV6 + { + return self.get_l3_payload(); + } + None + } + pub fn update + Into>>( &mut self, raw_packet: P, @@ -973,9 +996,20 @@ impl<'a> MetaPacket<'a> { return self.packet_len as usize - 54; } - let packet_header_size = self.header_type.min_packet_size() - + self.l2_l3_opt_size as usize - + self.l4_opt_size as usize; + let packet_header_size = if self.lookup_key.proto == IpProtocol::UDP + && self.lookup_key.proto == IpProtocol::TCP + { + self.header_type.min_packet_size() + + self.l2_l3_opt_size as usize + + self.l4_opt_size as usize + } else if self.lookup_key.eth_type == EthernetType::IPV4 { + HeaderType::Ipv4.min_packet_size() + self.l2_l3_opt_size as usize + } else if self.lookup_key.eth_type == EthernetType::IPV6 { + HeaderType::Ipv6.min_packet_size() + self.l2_l3_opt_size as usize + } else { + return 0; + }; + if let Some(raw) = self.raw.as_ref() { if raw.len() > packet_header_size { return raw.len() - packet_header_size; diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 86ffcbfa768..7dd3b8014fd 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -1568,6 +1568,7 @@ impl Default for Filters { ("RocketMQ".to_string(), "1-65535".to_string()), ("DNS".to_string(), "53,5353".to_string()), ("TLS".to_string(), "443,6443".to_string()), + ("PING".to_string(), "1-65535".to_string()), ("Custom".to_string(), "1-65535".to_string()), ]), tag_filters: HashMap::from([ @@ -1596,6 +1597,7 @@ impl Default for Filters { ("RocketMQ".to_string(), vec![]), ("DNS".to_string(), vec![]), ("TLS".to_string(), vec![]), + ("PING".to_string(), vec![]), ]), unconcerned_dns_nxdomain_response_suffixes: Default::default(), } diff --git a/agent/src/flow_generator/error.rs b/agent/src/flow_generator/error.rs index 7a83319c1d0..5c546caced9 100644 --- a/agent/src/flow_generator/error.rs +++ b/agent/src/flow_generator/error.rs @@ -114,6 +114,8 @@ pub enum Error { InsufficientPayloadLength, #[error("unsupported SOME/IP message type")] SomeIpUnsupportedMessageType, + #[error("ping header parse failed")] + PingHeaderParseFailed, } pub type Result = std::result::Result; diff --git a/agent/src/flow_generator/flow_map.rs b/agent/src/flow_generator/flow_map.rs index 343e72d0f30..9436e2a0d5c 100644 --- a/agent/src/flow_generator/flow_map.rs +++ b/agent/src/flow_generator/flow_map.rs @@ -971,8 +971,8 @@ impl FlowMap { { node.timeout = config.flow.flow_timeout.established_rst; } - if let Some(meta_flow_log) = node.meta_flow_log.as_mut() { - let _ = meta_flow_log.parse_l3(meta_packet); + if config.flow.collector_enabled { + self.collect_metric(config, node, meta_packet, true, true); } false } @@ -1429,7 +1429,7 @@ impl FlowMap { match meta_packet.lookup_key.proto { IpProtocol::TCP => flow_config.rrt_tcp_timeout, IpProtocol::UDP => flow_config.rrt_udp_timeout, - _ => 0, + _ => flow_config.rrt_udp_timeout, }, flow_config.l7_protocol_inference_ttl as u64, last, @@ -1673,20 +1673,37 @@ impl FlowMap { } else { (0, local_epc_id) }; + let ip_protocol = meta_packet.lookup_key.proto; for packet in meta_packet { - match log.parse( - flow_config, - log_parser_config, - packet, - is_first_packet_direction, - Self::l7_metrics_enabled(flow_config), - Self::l7_log_parse_enabled(flow_config, &packet.lookup_key), - &mut self.app_table, - local_epc, - remote_epc, - &self.l7_protocol_checker, - ) { + let ret = if ip_protocol == IpProtocol::UDP || ip_protocol == IpProtocol::TCP { + log.parse( + flow_config, + log_parser_config, + packet, + is_first_packet_direction, + Self::l7_metrics_enabled(flow_config), + Self::l7_log_parse_enabled(flow_config, &packet.lookup_key), + &mut self.app_table, + local_epc, + remote_epc, + &self.l7_protocol_checker, + ) + } else { + log.parse_l3( + flow_config, + log_parser_config, + packet, + Self::l7_metrics_enabled(flow_config), + Self::l7_log_parse_enabled(flow_config, &packet.lookup_key), + &mut self.app_table, + local_epc, + remote_epc, + &self.l7_protocol_checker, + ) + }; + + match ret { Ok(info) => { if let Some(perf_stats) = node.tagged_flow.flow.flow_perf_stats.as_mut() { perf_stats.l7_protocol = log.l7_protocol_enum.get_l7_protocol(); @@ -1839,8 +1856,14 @@ impl FlowMap { node.flow_state = FlowState::Established; // opening timeout node.timeout = config.flow.flow_timeout.opening; - if let Some(meta_flow_log) = node.meta_flow_log.as_mut() { - let _ = meta_flow_log.parse_l3(meta_packet); + if config.flow.collector_enabled { + self.collect_metric( + config, + &mut node, + meta_packet, + meta_packet.lookup_key.direction == PacketDirection::ClientToServer, + true, + ); } node } diff --git a/agent/src/flow_generator/perf/mod.rs b/agent/src/flow_generator/perf/mod.rs index 958d4a6ea60..e9ebc682716 100644 --- a/agent/src/flow_generator/perf/mod.rs +++ b/agent/src/flow_generator/perf/mod.rs @@ -124,6 +124,7 @@ pub type L7ProtocolTuple = (L7Protocol, Option); pub struct L7ProtocolChecker { tcp: Vec, udp: Vec, + other: Vec, } impl L7ProtocolChecker { @@ -133,11 +134,16 @@ impl L7ProtocolChecker { ) -> Self { let mut tcp = vec![]; let mut udp = vec![]; + let mut other = vec![]; for parser in get_all_protocol() { let protocol = parser.protocol(); if !protocol_bitmap.is_enabled(protocol) { continue; } + if parser.parsable_on_other() { + other.push((protocol, port_bitmap.get(&protocol).map(|m| m.clone()))); + continue; + } if parser.parsable_on_tcp() { tcp.push((protocol, port_bitmap.get(&protocol).map(|m| m.clone()))); } @@ -146,7 +152,7 @@ impl L7ProtocolChecker { } } - L7ProtocolChecker { tcp, udp } + L7ProtocolChecker { tcp, udp, other } } pub fn possible_protocols( @@ -158,7 +164,7 @@ impl L7ProtocolChecker { iter: match l4_protocol { L4Protocol::Tcp => self.tcp.iter(), L4Protocol::Udp => self.udp.iter(), - _ => [].iter(), + _ => self.other.iter(), }, port, } @@ -240,7 +246,7 @@ impl FlowLog { local_epc: i32, remote_epc: i32, ) -> Result { - if let Some(payload) = packet.get_l4_payload() { + if let Some(payload) = packet.get_l7() { let mut parse_param = ParseParam::new( &*packet, self.perf_cache.clone(), @@ -326,7 +332,7 @@ impl FlowLog { remote_epc: i32, checker: &L7ProtocolChecker, ) -> Result { - if let Some(payload) = packet.get_l4_payload() { + if let Some(payload) = packet.get_l7() { let pkt_size = flow_config.l7_log_packet_size as usize; let cut_payload = if pkt_size > payload.len() { @@ -466,7 +472,11 @@ impl FlowLog { ); } - if packet.l4_payload_len() < 2 { + let Some(payload) = packet.get_l7() else { + return Err(Error::L7ProtocolUnknown); + }; + + if payload.len() < 2 { return Err(Error::L7ProtocolUnknown); } @@ -584,11 +594,42 @@ impl FlowLog { Ok(L7ParseResult::None) } - pub fn parse_l3(&mut self, packet: &mut MetaPacket) -> Result<()> { + pub fn parse_l3( + &mut self, + flow_config: &FlowConfig, + log_parser_config: &LogParserConfig, + packet: &mut MetaPacket, + l7_performance_enabled: bool, + l7_log_parse_enabled: bool, + app_table: &mut AppTable, + local_epc: i32, + remote_epc: i32, + checker: &L7ProtocolChecker, + ) -> Result { if let Some(l4) = self.l4.as_mut() { l4.parse(packet, false)?; } - Ok(()) + + if packet.signal_source == SignalSource::EBPF { + return Ok(L7ParseResult::None); + } + + if l7_performance_enabled || l7_log_parse_enabled { + // 抛出错误由flowMap.FlowPerfCounter处理 + return self.l7_parse( + flow_config, + log_parser_config, + packet, + app_table, + l7_performance_enabled, + l7_log_parse_enabled, + local_epc, + remote_epc, + checker, + ); + } + + Ok(L7ParseResult::None) } pub fn copy_and_reset_l4_perf_data(&mut self, flow_reversed: bool, flow: &mut Flow) { diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index f7122147874..8cf63530793 100755 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -1921,6 +1921,7 @@ mod tests { buf_size: 0, captured_byte: 1000, oracle_parse_conf: OracleConfig::default(), + icmp_data: None, }; //测试长度不正确 diff --git a/agent/src/flow_generator/protocol_logs/mod.rs b/agent/src/flow_generator/protocol_logs/mod.rs index e97df20a2d5..531500f944e 100644 --- a/agent/src/flow_generator/protocol_logs/mod.rs +++ b/agent/src/flow_generator/protocol_logs/mod.rs @@ -21,6 +21,7 @@ pub(crate) mod http; pub(crate) mod mq; mod parser; pub mod pb_adapter; +pub(crate) mod ping; pub(crate) mod plugin; pub(crate) mod rpc; pub(crate) mod sql; @@ -34,6 +35,7 @@ pub use mq::{ }; use num_enum::TryFromPrimitive; pub use parser::{AppProto, MetaAppProto, PseudoAppProto, SessionAggregator}; +pub use ping::{PingInfo, PingLog}; pub use rpc::{ decode_new_rpc_trace_context_with_type, BrpcInfo, BrpcLog, DubboInfo, DubboLog, SofaRpcInfo, SofaRpcLog, TarsInfo, TarsLog, SOFA_NEW_RPC_TRACE_CTX_KEY, diff --git a/agent/src/flow_generator/protocol_logs/ping.rs b/agent/src/flow_generator/protocol_logs/ping.rs new file mode 100644 index 00000000000..cc38a4f23fc --- /dev/null +++ b/agent/src/flow_generator/protocol_logs/ping.rs @@ -0,0 +1,210 @@ +/* + * Copyright (c) 2024 Yunshan Networks + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use pnet::packet::icmp::{IcmpType, IcmpTypes}; +use serde::Serialize; + +use crate::{ + common::{ + enums::IpProtocol, + flow::{L7PerfStats, L7Protocol}, + l7_protocol_info::{L7ProtocolInfo, L7ProtocolInfoInterface}, + l7_protocol_log::{L7ParseResult, L7ProtocolParserInterface, ParseParam}, + }, + flow_generator::error::{Error, Result}, + flow_generator::protocol_logs::{ + pb_adapter::{ExtendedInfo, L7ProtocolSendLog, L7Request, L7Response}, + set_captured_byte, AppProtoHead, L7ResponseStatus, LogMessageType, + }, +}; + +const PING_HEADER_SIZE: u32 = 8; + +#[derive(Serialize, Debug, Default, Clone)] +pub struct PingInfo { + proto: L7Protocol, + + msg_type: LogMessageType, + + sequence: u16, + id: u16, + status: L7ResponseStatus, + rrt: u64, + + captured_request_byte: u32, + captured_response_byte: u32, +} + +impl L7ProtocolInfoInterface for PingInfo { + fn session_id(&self) -> Option { + Some(((self.id as u32) << 16) | self.sequence as u32) + } + + fn merge_log(&mut self, other: &mut L7ProtocolInfo) -> Result<()> { + if let L7ProtocolInfo::PingInfo(other) = other { + return self.merge(other); + } + Ok(()) + } + + fn app_proto_head(&self) -> Option { + Some(AppProtoHead { + proto: self.proto, + msg_type: self.msg_type, + rrt: self.rrt, + }) + } + + fn is_tls(&self) -> bool { + false + } +} + +impl PingInfo { + pub fn merge(&mut self, other: &mut Self) -> Result<()> { + self.msg_type = LogMessageType::Session; + match other.msg_type { + LogMessageType::Response => { + self.rrt = other.rrt; + self.captured_response_byte = other.captured_response_byte; + self.status = L7ResponseStatus::Ok; + } + LogMessageType::Request => { + self.captured_request_byte = other.captured_request_byte; + self.status = L7ResponseStatus::Ok; + } + _ => {} + } + + Ok(()) + } +} + +impl From for L7ProtocolSendLog { + fn from(f: PingInfo) -> Self { + L7ProtocolSendLog { + req_len: if f.captured_request_byte >= PING_HEADER_SIZE { + Some(f.captured_request_byte - PING_HEADER_SIZE) + } else { + None + }, + resp_len: if f.captured_response_byte >= PING_HEADER_SIZE { + Some(f.captured_response_byte - PING_HEADER_SIZE) + } else { + None + }, + captured_request_byte: f.captured_request_byte, + captured_response_byte: f.captured_response_byte, + req: L7Request { + resource: f.id.to_string(), + ..Default::default() + }, + resp: L7Response { + status: f.status, + ..Default::default() + }, + ext_info: Some(ExtendedInfo { + request_id: Some(f.sequence as u32), + ..Default::default() + }), + ..Default::default() + } + } +} + +#[derive(Default)] +pub struct PingLog { + proto: L7Protocol, + perf_stats: Option, +} + +impl L7ProtocolParserInterface for PingLog { + fn check_payload(&mut self, _: &[u8], param: &ParseParam) -> bool { + if param.l4_protocol != IpProtocol::ICMPV4 && param.l4_protocol != IpProtocol::ICMPV6 { + return false; + } + + let Some(icmp_data) = param.icmp_data else { + return false; + }; + + icmp_data.icmp_type == IcmpTypes::EchoRequest.0 + } + + fn parse_payload(&mut self, _: &[u8], param: &ParseParam) -> Result { + let Some(icmp_data) = param.icmp_data else { + return Err(Error::PingHeaderParseFailed); + }; + + if icmp_data.icmp_type != IcmpTypes::EchoRequest.0 + && icmp_data.icmp_type != IcmpTypes::EchoReply.0 + { + return Err(Error::PingHeaderParseFailed); + } + + match IcmpType::new(icmp_data.icmp_type) { + IcmpTypes::EchoRequest => { + let mut info = PingInfo { + msg_type: LogMessageType::Request, + proto: L7Protocol::Ping, + sequence: icmp_data.echo_id_seq as u16, + id: (icmp_data.echo_id_seq >> 16) as u16, + status: L7ResponseStatus::NotExist, + ..Default::default() + }; + set_captured_byte!(info, param); + self.perf_stats.as_mut().map(|p| p.inc_req()); + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + + Ok(L7ParseResult::Single(L7ProtocolInfo::PingInfo(info))) + } + IcmpTypes::EchoReply => { + let mut info = PingInfo { + msg_type: LogMessageType::Response, + proto: L7Protocol::Ping, + sequence: icmp_data.echo_id_seq as u16, + id: (icmp_data.echo_id_seq >> 16) as u16, + status: L7ResponseStatus::Ok, + ..Default::default() + }; + set_captured_byte!(info, param); + self.perf_stats.as_mut().map(|p| p.inc_resp()); + info.cal_rrt(param).map(|rrt| { + info.rrt = rrt; + self.perf_stats.as_mut().map(|p| p.update_rrt(rrt)); + }); + + Ok(L7ParseResult::Single(L7ProtocolInfo::PingInfo(info))) + } + _ => Err(Error::PingHeaderParseFailed), + } + } + + fn protocol(&self) -> L7Protocol { + L7Protocol::Ping + } + + fn perf_stats(&mut self) -> Option { + self.perf_stats.take() + } + + fn parsable_on_other(&self) -> bool { + true + } +} diff --git a/agent/src/plugin/shared_obj/test.rs b/agent/src/plugin/shared_obj/test.rs index 3a7f532a3f3..2a038247582 100644 --- a/agent/src/plugin/shared_obj/test.rs +++ b/agent/src/plugin/shared_obj/test.rs @@ -85,6 +85,7 @@ fn get_req_param<'a>( buf_size: 0, captured_byte: 0, oracle_parse_conf: OracleConfig::default(), + icmp_data: None, } } @@ -122,6 +123,7 @@ fn get_resp_param<'a>( buf_size: 0, captured_byte: 0, oracle_parse_conf: OracleConfig::default(), + icmp_data: None, } } diff --git a/agent/src/plugin/wasm/test.rs b/agent/src/plugin/wasm/test.rs index e822fd1e51d..12180fc9f0a 100644 --- a/agent/src/plugin/wasm/test.rs +++ b/agent/src/plugin/wasm/test.rs @@ -77,6 +77,7 @@ fn get_req_param<'a>( buf_size: 999, captured_byte: 999, oracle_parse_conf: OracleConfig::default(), + icmp_data: None, } } @@ -115,6 +116,7 @@ fn get_resq_param<'a>( buf_size: 999, captured_byte: 999, oracle_parse_conf: OracleConfig::default(), + icmp_data: None, } } diff --git a/server/ingester/flow_log/log_data/l4_flow_log.go b/server/ingester/flow_log/log_data/l4_flow_log.go index bb4c33d0660..9487751a90c 100644 --- a/server/ingester/flow_log/log_data/l4_flow_log.go +++ b/server/ingester/flow_log/log_data/l4_flow_log.go @@ -727,7 +727,7 @@ func getStatus(t datatype.CloseType, p layers.IPProtocol) datatype.LogMessageSta } else if p == layers.IPProtocolTCP && t.IsServerError() { return datatype.STATUS_SERVER_ERROR } else { - return datatype.STATUS_NOT_EXIST + return datatype.STATUS_TIMEOUT } } diff --git a/server/ingester/flow_log/log_data/l7_flow_log.go b/server/ingester/flow_log/log_data/l7_flow_log.go index 962f7f0be6b..dde8b7167f7 100644 --- a/server/ingester/flow_log/log_data/l7_flow_log.go +++ b/server/ingester/flow_log/log_data/l7_flow_log.go @@ -309,7 +309,7 @@ func (h *L7FlowLog) Fill(l *pb.AppProtoLogsData, platformData *grpc.PlatformInfo h.L7ProtocolStr = datatype.L7Protocol(h.L7Protocol).String(h.IsTLS == 1) } - h.ResponseStatus = uint8(datatype.STATUS_NOT_EXIST) + h.ResponseStatus = uint8(datatype.STATUS_TIMEOUT) h.ResponseDuration = l.Base.Head.Rrt / uint64(time.Microsecond) // 协议结构统一, 不再为每个协议定义单独结构 h.fillL7FlowLog(l, cfg) @@ -393,7 +393,7 @@ func (h *L7FlowLog) fillL7FlowLog(l *pb.AppProtoLogsData, cfg *flowlogCfg.Config case datatype.L7_PROTOCOL_KAFKA: if l.Req != nil { if h.responseCode == 0 && !IsKafkaSupportedCommand(l.Req.ReqType) { - h.ResponseStatus = uint8(datatype.STATUS_NOT_EXIST) + h.ResponseStatus = uint8(datatype.STATUS_TIMEOUT) h.ResponseCode = nil } h.RequestId = &h.requestId diff --git a/server/ingester/flow_log/log_data/otel_import.go b/server/ingester/flow_log/log_data/otel_import.go index 851c1b1d4fc..72af74a170f 100644 --- a/server/ingester/flow_log/log_data/otel_import.go +++ b/server/ingester/flow_log/log_data/otel_import.go @@ -74,7 +74,7 @@ func spanKindToTapSide(spanKind v1.Span_SpanKind) flow_metrics.TAPSideEnum { func spanStatusToResponseStatus(status *v1.Status) datatype.LogMessageStatus { if status == nil { - return datatype.STATUS_NOT_EXIST + return datatype.STATUS_TIMEOUT } switch status.Code { case v1.Status_STATUS_CODE_OK: @@ -82,9 +82,9 @@ func spanStatusToResponseStatus(status *v1.Status) datatype.LogMessageStatus { case v1.Status_STATUS_CODE_ERROR: return datatype.STATUS_SERVER_ERROR case v1.Status_STATUS_CODE_UNSET: - return datatype.STATUS_NOT_EXIST + return datatype.STATUS_TIMEOUT } - return datatype.STATUS_NOT_EXIST + return datatype.STATUS_TIMEOUT } func HttpCodeToResponseStatus(code int32) datatype.LogMessageStatus { diff --git a/server/libs/datatype/protocol_logs.go b/server/libs/datatype/protocol_logs.go index 0fd413edab1..766553e57e3 100644 --- a/server/libs/datatype/protocol_logs.go +++ b/server/libs/datatype/protocol_logs.go @@ -60,7 +60,7 @@ type LogMessageStatus uint8 const ( STATUS_OK LogMessageStatus = iota STATUS_ERROR - STATUS_NOT_EXIST + STATUS_TIMEOUT STATUS_SERVER_ERROR STATUS_CLIENT_ERROR ) @@ -76,7 +76,7 @@ func (t LogMessageStatus) String() string { case STATUS_CLIENT_ERROR: return "Client Error" default: - return "Unknown" + return "Timeout" } } diff --git a/server/querier/db_descriptions/clickhouse/tag/enum/response_status.ch b/server/querier/db_descriptions/clickhouse/tag/enum/response_status.ch index 0a861032cfd..894e8b9c807 100644 --- a/server/querier/db_descriptions/clickhouse/tag/enum/response_status.ch +++ b/server/querier/db_descriptions/clickhouse/tag/enum/response_status.ch @@ -1,5 +1,5 @@ # Value , DisplayName , Description 0 , 正常 , -2 , 未知 , +2 , 超时 , 3 , 服务端异常 , 4 , 客户端异常 , diff --git a/server/querier/db_descriptions/clickhouse/tag/enum/response_status.en b/server/querier/db_descriptions/clickhouse/tag/enum/response_status.en index 4199fd26373..2e9c7439aa9 100644 --- a/server/querier/db_descriptions/clickhouse/tag/enum/response_status.en +++ b/server/querier/db_descriptions/clickhouse/tag/enum/response_status.en @@ -1,5 +1,5 @@ # Value , DisplayName , Description 0 , Success , -2 , Unknown , +2 , Timeout , 3 , Server Error , 4 , Client Error ,