Skip to content

Commit

Permalink
feat: support ping
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanchaoa committed Feb 17, 2025
1 parent cf2eaff commit 847a4e6
Show file tree
Hide file tree
Showing 19 changed files with 384 additions and 44 deletions.
3 changes: 3 additions & 0 deletions agent/crates/public/src/l7_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub enum L7Protocol {
// INFRA
DNS = 120,
TLS = 121,
Ping = 122,

Custom = 127,

Expand All @@ -91,6 +92,7 @@ impl L7Protocol {
| Self::Dubbo
| Self::SofaRPC
| Self::SomeIp
| Self::Ping
| Self::Custom => true,
_ => false,
}
Expand Down Expand Up @@ -127,6 +129,7 @@ impl From<String> for L7Protocol {
"dns" => Self::DNS,
"oracle" => Self::Oracle,
"tls" => Self::TLS,
"ping" => Self::Ping,
"some/ip" | "someip" => Self::SomeIp,
_ => Self::Unknown,
}
Expand Down
6 changes: 4 additions & 2 deletions agent/src/common/l7_protocol_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
protocol_logs::{
fastcgi::FastCGIInfo, pb_adapter::L7ProtocolSendLog, AmqpInfo, BrpcInfo, DnsInfo,
DubboInfo, HttpInfo, KafkaInfo, MemcachedInfo, MongoDBInfo, MqttInfo, MysqlInfo,
NatsInfo, OpenWireInfo, PostgreInfo, PulsarInfo, RedisInfo, RocketmqInfo, SofaRpcInfo,
TarsInfo, ZmtpInfo,
NatsInfo, OpenWireInfo, PingInfo, PostgreInfo, PulsarInfo, RedisInfo, RocketmqInfo,
SofaRpcInfo, TarsInfo, ZmtpInfo,
},
AppProtoHead, LogMessageType, Result,
},
Expand Down Expand Up @@ -84,6 +84,7 @@ cfg_if::cfg_if! {
PostgreInfo(PostgreInfo),
OpenWireInfo(OpenWireInfo),
SofaRpcInfo(SofaRpcInfo),
PingInfo(PingInfo),
CustomInfo(CustomInfo),
// add new protocol info below
);
Expand Down Expand Up @@ -112,6 +113,7 @@ cfg_if::cfg_if! {
SofaRpcInfo(SofaRpcInfo),
TlsInfo(crate::flow_generator::protocol_logs::tls::TlsInfo),
SomeIpInfo(crate::flow_generator::protocol_logs::rpc::SomeIpInfo),
PingInfo(PingInfo),
CustomInfo(CustomInfo),
// add new protocol info below
);
Expand Down
20 changes: 18 additions & 2 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::flow::{L7PerfStats, PacketDirection};
use super::l7_protocol_info::L7ProtocolInfo;
use super::MetaPacket;

use crate::common::meta_packet::{IcmpData, ProtocolData};
use crate::config::handler::LogParserConfig;
use crate::config::OracleConfig;
use crate::flow_generator::flow_map::FlowMapCounter;
Expand All @@ -39,8 +40,8 @@ use crate::flow_generator::protocol_logs::plugin::get_custom_log_parser;
use crate::flow_generator::protocol_logs::sql::ObfuscateCache;
use crate::flow_generator::protocol_logs::{
AmqpLog, BrpcLog, DnsLog, DubboLog, HttpLog, KafkaLog, MemcachedLog, MongoDBLog, MqttLog,
MysqlLog, NatsLog, OpenWireLog, PostgresqlLog, PulsarLog, RedisLog, RocketmqLog, SofaRpcLog,
TarsLog, ZmtpLog,
MysqlLog, NatsLog, OpenWireLog, PingLog, PostgresqlLog, PulsarLog, RedisLog, RocketmqLog,
SofaRpcLog, TarsLog, ZmtpLog,
};

use crate::flow_generator::{LogMessageType, Result};
Expand Down Expand Up @@ -185,6 +186,7 @@ cfg_if::cfg_if! {
ZMTP(ZmtpLog),
RocketMQ(RocketmqLog),
OpenWire(OpenWireLog),
Ping(PingLog),
// add protocol below
}
}
Expand Down Expand Up @@ -215,6 +217,7 @@ cfg_if::cfg_if! {
OpenWire(OpenWireLog),
TLS(crate::flow_generator::protocol_logs::tls::TlsLog),
SomeIp(crate::flow_generator::protocol_logs::rpc::SomeIpLog),
Ping(PingLog),
// add protocol below
}
}
Expand Down Expand Up @@ -290,6 +293,13 @@ pub trait L7ProtocolParserInterface {
true
}

// l4即不是udp也不是tcp,用于快速过滤协议
// ==============================
// L4 is neither UDP nor TCP and is used to quickly filter protocols
fn parsable_on_other(&self) -> bool {
false
}

// is parse default? use for config init.
fn parse_default(&self) -> bool {
true
Expand Down Expand Up @@ -373,6 +383,7 @@ pub struct ParseParam<'a> {
pub port_src: u16,
pub port_dst: u16,
pub flow_id: u64,
pub icmp_data: Option<&'a IcmpData>,

// parse info
pub direction: PacketDirection,
Expand Down Expand Up @@ -430,6 +441,11 @@ impl<'a> ParseParam<'a> {
ip_dst: packet.lookup_key.dst_ip,
port_src: packet.lookup_key.src_port,
port_dst: packet.lookup_key.dst_port,
icmp_data: if let ProtocolData::IcmpData(icmp_data) = &packet.protocol_data {
Some(icmp_data)
} else {
None
},
flow_id: packet.flow_id,

direction: packet.lookup_key.direction,
Expand Down
46 changes: 40 additions & 6 deletions agent/src/common/meta_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,22 @@ impl<'a> MetaPacket<'a> {
}
}

// 目前仅支持获取UDP或TCP的Payload
pub fn get_l4_payload(&self) -> Option<&[u8]> {
if self.lookup_key.proto != IpProtocol::TCP && self.lookup_key.proto != IpProtocol::UDP {
fn get_l3_payload(&self) -> Option<&[u8]> {
if self.tap_port.is_from(TapPort::FROM_EBPF) {
return None;
}

let packet_header_size = self.header_type.min_packet_size() + self.l2_l3_opt_size as usize;
if let Some(raw) = self.raw.as_ref() {
if raw.len() > packet_header_size {
return Some(&raw[packet_header_size..]);
}
}
None
}

// 目前仅支持获取UDP或TCP的Payload
pub fn get_l4_payload(&self) -> Option<&[u8]> {
if self.tap_port.is_from(TapPort::FROM_EBPF) {
return Some(&self.raw_from_ebpf[self.raw_from_ebpf_offset..]);
}
Expand All @@ -538,6 +549,18 @@ impl<'a> MetaPacket<'a> {
None
}

pub fn get_l7(&self) -> Option<&[u8]> {
if self.lookup_key.proto == IpProtocol::TCP || self.lookup_key.proto == IpProtocol::UDP {
return self.get_l4_payload();
}
if self.lookup_key.eth_type == EthernetType::IPV4
|| self.lookup_key.eth_type == EthernetType::IPV6
{
return self.get_l3_payload();
}
None
}

pub fn update<P: AsRef<[u8]> + Into<RawPacket<'a>>>(
&mut self,
raw_packet: P,
Expand Down Expand Up @@ -973,9 +996,20 @@ impl<'a> MetaPacket<'a> {
return self.packet_len as usize - 54;
}

let packet_header_size = self.header_type.min_packet_size()
+ self.l2_l3_opt_size as usize
+ self.l4_opt_size as usize;
let packet_header_size = if self.lookup_key.proto == IpProtocol::UDP
&& self.lookup_key.proto == IpProtocol::TCP
{
self.header_type.min_packet_size()
+ self.l2_l3_opt_size as usize
+ self.l4_opt_size as usize
} else if self.lookup_key.eth_type == EthernetType::IPV4 {
HeaderType::Ipv4.min_packet_size() + self.l2_l3_opt_size as usize
} else if self.lookup_key.eth_type == EthernetType::IPV6 {
HeaderType::Ipv6.min_packet_size() + self.l2_l3_opt_size as usize
} else {
return 0;
};

if let Some(raw) = self.raw.as_ref() {
if raw.len() > packet_header_size {
return raw.len() - packet_header_size;
Expand Down
2 changes: 2 additions & 0 deletions agent/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,7 @@ impl Default for Filters {
("RocketMQ".to_string(), "1-65535".to_string()),
("DNS".to_string(), "53,5353".to_string()),
("TLS".to_string(), "443,6443".to_string()),
("PING".to_string(), "1-65535".to_string()),
("Custom".to_string(), "1-65535".to_string()),
]),
tag_filters: HashMap::from([
Expand Down Expand Up @@ -1596,6 +1597,7 @@ impl Default for Filters {
("RocketMQ".to_string(), vec![]),
("DNS".to_string(), vec![]),
("TLS".to_string(), vec![]),
("PING".to_string(), vec![]),
]),
unconcerned_dns_nxdomain_response_suffixes: Default::default(),
}
Expand Down
2 changes: 2 additions & 0 deletions agent/src/flow_generator/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ pub enum Error {
InsufficientPayloadLength,
#[error("unsupported SOME/IP message type")]
SomeIpUnsupportedMessageType,
#[error("ping header parse failed")]
PingHeaderParseFailed,
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down
57 changes: 40 additions & 17 deletions agent/src/flow_generator/flow_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,8 +971,8 @@ impl FlowMap {
{
node.timeout = config.flow.flow_timeout.established_rst;
}
if let Some(meta_flow_log) = node.meta_flow_log.as_mut() {
let _ = meta_flow_log.parse_l3(meta_packet);
if config.flow.collector_enabled {
self.collect_metric(config, node, meta_packet, true, true);
}
false
}
Expand Down Expand Up @@ -1429,7 +1429,7 @@ impl FlowMap {
match meta_packet.lookup_key.proto {
IpProtocol::TCP => flow_config.rrt_tcp_timeout,
IpProtocol::UDP => flow_config.rrt_udp_timeout,
_ => 0,
_ => flow_config.rrt_udp_timeout,
},
flow_config.l7_protocol_inference_ttl as u64,
last,
Expand Down Expand Up @@ -1673,20 +1673,37 @@ impl FlowMap {
} else {
(0, local_epc_id)
};
let ip_protocol = meta_packet.lookup_key.proto;

for packet in meta_packet {
match log.parse(
flow_config,
log_parser_config,
packet,
is_first_packet_direction,
Self::l7_metrics_enabled(flow_config),
Self::l7_log_parse_enabled(flow_config, &packet.lookup_key),
&mut self.app_table,
local_epc,
remote_epc,
&self.l7_protocol_checker,
) {
let ret = if ip_protocol == IpProtocol::UDP || ip_protocol == IpProtocol::TCP {
log.parse(
flow_config,
log_parser_config,
packet,
is_first_packet_direction,
Self::l7_metrics_enabled(flow_config),
Self::l7_log_parse_enabled(flow_config, &packet.lookup_key),
&mut self.app_table,
local_epc,
remote_epc,
&self.l7_protocol_checker,
)
} else {
log.parse_l3(
flow_config,
log_parser_config,
packet,
Self::l7_metrics_enabled(flow_config),
Self::l7_log_parse_enabled(flow_config, &packet.lookup_key),
&mut self.app_table,
local_epc,
remote_epc,
&self.l7_protocol_checker,
)
};

match ret {
Ok(info) => {
if let Some(perf_stats) = node.tagged_flow.flow.flow_perf_stats.as_mut() {
perf_stats.l7_protocol = log.l7_protocol_enum.get_l7_protocol();
Expand Down Expand Up @@ -1839,8 +1856,14 @@ impl FlowMap {
node.flow_state = FlowState::Established;
// opening timeout
node.timeout = config.flow.flow_timeout.opening;
if let Some(meta_flow_log) = node.meta_flow_log.as_mut() {
let _ = meta_flow_log.parse_l3(meta_packet);
if config.flow.collector_enabled {
self.collect_metric(
config,
&mut node,
meta_packet,
meta_packet.lookup_key.direction == PacketDirection::ClientToServer,
true,
);
}
node
}
Expand Down
Loading

0 comments on commit 847a4e6

Please sign in to comment.