Skip to content

Commit 0c1fdd2

Browse files
authored
Populate client side trace's local address via tcp kprobes (#1989)
Summary: Populate client side trace's local address via tcp kprobes This change populates client side trace's `local_addr` and `local_port` columns for the following use cases: 1. To provide more consistency for the protocol data tables. Having columns that are empty make it difficult for end users to understand what is being traced and make them less useful 2. To facilitate addressing a portion of the short lived process problems (#1638) For 2, the root of the issue is that `df.ctx["pod"]` syntax relies on the [px.upid_to_pod_name](https://docs.px.dev/reference/pxl/udf/upid_to_pod_name/) function. If a PEM misses the short lived process during its metadata update, this function fails to resolve the pod name. For client side traces where the pod is making an outbound connection (non localhost), the `local_addr` column provides an alternative pod name lookup for short lived processes when the pod is long lived. This means the following would be equivalent to the `df.ctx["pod"]` lookup: `px.pod_id_to_pod_name(px.ip_to_pod_id(df.local_addr))`. I intend to follow this PR with a compiler change that will make `df.ctx["pod"]` try both methods should `px.upid_to_pod_name` fail to resolve. This will allow the existing pxl scripts to display the previously missed short lived processes. **Alternatives** Another approach I considered was expanding our use of the `sock_alloc` kprobe. I used ftrace on a simple curl command to see what other options could be used (`sudo trace-cmd record -F -p function_graph http://google.com`). The `socket` syscall calls `sock_alloc`, which would be another mechanism for accessing the `struct sock`. I decided against this approach because I don't think its viable to assume that the same thread/process that calls `socket` will be the one that does the later syscalls (how our BPF maps are set up). It's common to have a forking web server model, which means a different process/thread can call `socket` than the ones that later read/write to it. **Probe stability** These probes appear to be stable from our oldest and newest supported kernel. These functions exist in the [tcp_prot](https://elixir.bootlin.com/linux/v4.14.336/source/net/ipv4/tcp_ipv4.c#L2422), [tcpv6_prot](https://elixir.bootlin.com/linux/v4.14.336/source/net/ipv6/tcp_ipv6.c#L1941) structs and I've seen that other projects and bcc tools use these probes. This makes me believe that these functions have a pretty well defined interface. Relevant Issues: #1829, #1638 Type of change: /kind feature Test Plan: New tests verify that ipv4 and ipv6 cases work - [x] Ran `for i in $(seq 0 1000); do curl http://google.com/$i; sleep 2; done` within a pod and verified that `local_addr` is populated with this change and `px.pod_id_to_pod_name(px.ip_to_pod_id(df.local_addr))` works for pod name resolution. - [x] Verified the above curl test results in traces without `local_addr` without this change ![local-addr-testing](https://github.com/user-attachments/assets/344be022-97a0-4096-8af7-8de20d741e40) - Tested on the following k8s offerings and machine images - [x] GKE COS and Ubuntu - [x] EKS Amazon Linux 2 Changelog Message: Populate socket tracer data table `local_addr` and `local_port` column for client side traces. --------- Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent afffb8e commit 0c1fdd2

File tree

5 files changed

+253
-15
lines changed

5 files changed

+253
-15
lines changed

src/stirling/source_connectors/socket_tracer/bcc_bpf/socket_trace.c

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ BPF_HASH(active_accept_args_map, uint64_t, struct accept_args_t);
8181
// Key is {tgid, pid}.
8282
BPF_HASH(active_connect_args_map, uint64_t, struct connect_args_t);
8383

84+
// Map from thread to its sock* struct. This facilitates capturing
85+
// the local address of a tcp socket during connect() syscalls.
86+
// Key is {tgid, pid}.
87+
BPF_HASH(tcp_connect_args_map, uint64_t, struct sock*);
88+
8489
// Map from thread to its ongoing write() syscall's input argument.
8590
// Tracks write() call from entry -> exit.
8691
// Key is {tgid, pid}.
@@ -345,19 +350,17 @@ static __inline void update_traffic_class(struct conn_info_t* conn_info,
345350
* Perf submit functions
346351
***********************************************************/
347352

348-
static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info,
349-
const struct socket* socket) {
350-
// Use BPF_PROBE_READ_KERNEL_VAR since BCC cannot insert them as expected.
351-
struct sock* sk = NULL;
352-
BPF_PROBE_READ_KERNEL_VAR(sk, &socket->sk);
353-
354-
struct sock_common* sk_common = &sk->__sk_common;
353+
static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info, const struct sock* sk) {
354+
const struct sock_common* sk_common = &sk->__sk_common;
355355
uint16_t family = -1;
356356
uint16_t lport = -1;
357357
uint16_t rport = -1;
358358

359359
BPF_PROBE_READ_KERNEL_VAR(family, &sk_common->skc_family);
360360
BPF_PROBE_READ_KERNEL_VAR(lport, &sk_common->skc_num);
361+
// skc_num is stored in host byte order. The rest of our user space processing
362+
// assumes network byte order so convert it here.
363+
lport = htons(lport);
361364
BPF_PROBE_READ_KERNEL_VAR(rport, &sk_common->skc_dport);
362365

363366
conn_info->laddr.sa.sa_family = family;
@@ -377,12 +380,12 @@ static __inline void read_sockaddr_kernel(struct conn_info_t* conn_info,
377380
}
378381

379382
static __inline void submit_new_conn(struct pt_regs* ctx, uint32_t tgid, int32_t fd,
380-
const struct sockaddr* addr, const struct socket* socket,
383+
const struct sockaddr* addr, const struct sock* sock,
381384
enum endpoint_role_t role, enum source_function_t source_fn) {
382385
struct conn_info_t conn_info = {};
383386
init_conn_info(tgid, fd, &conn_info);
384-
if (socket != NULL) {
385-
read_sockaddr_kernel(&conn_info, socket);
387+
if (sock != NULL) {
388+
read_sockaddr_kernel(&conn_info, sock);
386389
} else if (addr != NULL) {
387390
conn_info.raddr = *((union sockaddr_t*)addr);
388391
}
@@ -585,6 +588,52 @@ int conn_cleanup_uprobe(struct pt_regs* ctx) {
585588
return 0;
586589
}
587590

591+
// These probes are used to capture the *sock struct during client side tracing
592+
// of connect() syscalls. This is necessary to capture the socket's local address,
593+
// which is not accessible via the connect() and later syscalls.
594+
//
595+
// This function requires that the function being probed receives a struct sock* as its
596+
// first argument and that the active_connect_args_map is populated when this probe fires.
597+
// This means the function being probed must be part of the connect() syscall path or similar
598+
// syscall path.
599+
//
600+
// Using the struct sock* for capturing a socket's local address only works for TCP sockets.
601+
// The equivalent UDP functions (udp_v4_connect, udp_v6_connect and upd_sendmsg) always receive a
602+
// sock struct with a 0.0.0.0 or ::1 local address. This is deemed acceptable since our local
603+
// address population for server side tracing relies on accept/accept4, which only applies for TCP.
604+
//
605+
// int tcp_v4_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len);
606+
// static int tcp_v6_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len);
607+
// int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size);
608+
int probe_entry_populate_active_connect_sock(struct pt_regs* ctx) {
609+
uint64_t id = bpf_get_current_pid_tgid();
610+
611+
const struct connect_args_t* connect_args = active_connect_args_map.lookup(&id);
612+
if (connect_args == NULL) {
613+
return 0;
614+
}
615+
struct sock* sk = (struct sock*)PT_REGS_PARM1(ctx);
616+
tcp_connect_args_map.update(&id, &sk);
617+
618+
return 0;
619+
}
620+
621+
int probe_ret_populate_active_connect_sock(struct pt_regs* ctx) {
622+
uint64_t id = bpf_get_current_pid_tgid();
623+
624+
struct sock** sk = tcp_connect_args_map.lookup(&id);
625+
if (sk == NULL) {
626+
return 0;
627+
}
628+
struct connect_args_t* connect_args = active_connect_args_map.lookup(&id);
629+
if (connect_args != NULL) {
630+
connect_args->connect_sock = *sk;
631+
}
632+
633+
tcp_connect_args_map.delete(&id);
634+
return 0;
635+
}
636+
588637
/***********************************************************
589638
* BPF syscall processing functions
590639
***********************************************************/
@@ -629,7 +678,8 @@ static __inline void process_syscall_connect(struct pt_regs* ctx, uint64_t id,
629678
return;
630679
}
631680

632-
submit_new_conn(ctx, tgid, args->fd, args->addr, /*socket*/ NULL, kRoleClient, kSyscallConnect);
681+
submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleClient,
682+
kSyscallConnect);
633683
}
634684

635685
static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id,
@@ -645,8 +695,11 @@ static __inline void process_syscall_accept(struct pt_regs* ctx, uint64_t id,
645695
return;
646696
}
647697

648-
submit_new_conn(ctx, tgid, ret_fd, args->addr, args->sock_alloc_socket, kRoleServer,
649-
kSyscallAccept);
698+
const struct sock* sk = NULL;
699+
if (args->sock_alloc_socket != NULL) {
700+
BPF_PROBE_READ_KERNEL_VAR(sk, &args->sock_alloc_socket->sk);
701+
}
702+
submit_new_conn(ctx, tgid, ret_fd, args->addr, sk, kRoleServer, kSyscallAccept);
650703
}
651704

652705
// TODO(oazizi): This is badly broken (but better than before).
@@ -690,7 +743,7 @@ static __inline void process_implicit_conn(struct pt_regs* ctx, uint64_t id,
690743
return;
691744
}
692745

693-
submit_new_conn(ctx, tgid, args->fd, args->addr, /*socket*/ NULL, kRoleUnknown, source_fn);
746+
submit_new_conn(ctx, tgid, args->fd, args->addr, args->connect_sock, kRoleUnknown, source_fn);
694747
}
695748

696749
static __inline bool should_send_data(uint32_t tgid, uint64_t conn_disabled_tsid,

src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ struct socket_control_event_t {
263263

264264
struct connect_args_t {
265265
const struct sockaddr* addr;
266+
const struct sock* connect_sock;
266267
int32_t fd;
267268
};
268269

src/stirling/source_connectors/socket_tracer/socket_trace_bpf_test.cc

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@
3939
#include "src/stirling/source_connectors/socket_tracer/bcc_bpf_intf/socket_trace.hpp"
4040
#include "src/stirling/source_connectors/socket_tracer/socket_trace_connector.h"
4141
#include "src/stirling/source_connectors/socket_tracer/testing/client_server_system.h"
42+
#include "src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h"
4243
#include "src/stirling/source_connectors/socket_tracer/testing/socket_trace_bpf_test_fixture.h"
4344
#include "src/stirling/testing/common.h"
4445

4546
namespace px {
4647
namespace stirling {
4748

4849
using ::px::stirling::testing::FindRecordsMatchingPID;
50+
using ::px::stirling::testing::GetLocalAddrs;
51+
using ::px::stirling::testing::GetLocalPorts;
4952
using ::px::stirling::testing::RecordBatchSizeIs;
5053
using ::px::system::TCPSocket;
5154
using ::px::system::UDPSocket;
@@ -747,6 +750,155 @@ TEST_F(NullRemoteAddrTest, IPv6Accept4WithNullRemoteAddr) {
747750
EXPECT_EQ(records[kHTTPRemotePortIdx]->Get<types::Int64Value>(0), port);
748751
}
749752

753+
using LocalAddrTest = testing::SocketTraceBPFTestFixture</* TClientSideTracing */ true>;
754+
755+
TEST_F(LocalAddrTest, IPv4ConnectPopulatesLocalAddr) {
756+
StartTransferDataThread();
757+
758+
TCPSocket client;
759+
TCPSocket server;
760+
761+
std::atomic<bool> server_ready = true;
762+
763+
std::thread server_thread([&server, &server_ready]() {
764+
server.BindAndListen();
765+
server_ready = true;
766+
auto conn = server.Accept(/* populate_remote_addr */ true);
767+
768+
std::string data;
769+
770+
conn->Read(&data);
771+
conn->Write(kHTTPRespMsg1);
772+
});
773+
774+
// Wait for server thread to start listening.
775+
while (!server_ready) {
776+
}
777+
// After server_ready, server.Accept() needs to enter the accepting state, before the client
778+
// connection can succeed below. We don't have a simple and robust way to signal that from inside
779+
// the server thread, so we just use sleep to avoid the race condition.
780+
std::this_thread::sleep_for(std::chrono::seconds(1));
781+
782+
std::thread client_thread([&client, &server]() {
783+
client.Connect(server);
784+
785+
std::string data;
786+
787+
client.Write(kHTTPReqMsg1);
788+
client.Read(&data);
789+
});
790+
791+
server_thread.join();
792+
client_thread.join();
793+
794+
// Get the remote port seen by server from client's local port.
795+
struct sockaddr_in client_sockaddr = {};
796+
socklen_t client_sockaddr_len = sizeof(client_sockaddr);
797+
struct sockaddr* client_sockaddr_ptr = reinterpret_cast<struct sockaddr*>(&client_sockaddr);
798+
ASSERT_EQ(getsockname(client.sockfd(), client_sockaddr_ptr, &client_sockaddr_len), 0);
799+
800+
// Close after getting the sockaddr from fd, otherwise getsockname() wont work.
801+
client.Close();
802+
server.Close();
803+
804+
StopTransferDataThread();
805+
806+
std::vector<TaggedRecordBatch> tablets = ConsumeRecords(kHTTPTableNum);
807+
ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets);
808+
809+
std::vector<size_t> indices =
810+
testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid());
811+
ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices);
812+
813+
ASSERT_THAT(records, RecordBatchSizeIs(2));
814+
815+
// Make sure that the socket info resolution works.
816+
ASSERT_OK_AND_ASSIGN(std::string remote_addr, IPv4AddrToString(client_sockaddr.sin_addr));
817+
EXPECT_THAT(GetLocalAddrs(records, kHTTPLocalAddrIdx, indices), Contains("127.0.0.1").Times(2));
818+
EXPECT_EQ(remote_addr, "127.0.0.1");
819+
820+
bool found_port = false;
821+
uint16_t port = ntohs(client_sockaddr.sin_port);
822+
for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) {
823+
if (lport == port) {
824+
found_port = true;
825+
break;
826+
}
827+
}
828+
EXPECT_TRUE(found_port);
829+
}
830+
831+
TEST_F(LocalAddrTest, IPv6ConnectPopulatesLocalAddr) {
832+
StartTransferDataThread();
833+
834+
TCPSocket client(AF_INET6);
835+
TCPSocket server(AF_INET6);
836+
837+
std::atomic<bool> server_ready = false;
838+
839+
std::thread server_thread([&server, &server_ready]() {
840+
server.BindAndListen();
841+
server_ready = true;
842+
auto conn = server.Accept(/* populate_remote_addr */ false);
843+
844+
std::string data;
845+
846+
conn->Read(&data);
847+
conn->Write(kHTTPRespMsg1);
848+
});
849+
850+
while (!server_ready) {
851+
}
852+
853+
std::thread client_thread([&client, &server]() {
854+
client.Connect(server);
855+
856+
std::string data;
857+
858+
client.Write(kHTTPReqMsg1);
859+
client.Read(&data);
860+
});
861+
862+
server_thread.join();
863+
client_thread.join();
864+
865+
// Get the remote port seen by server from client's local port.
866+
struct sockaddr_in6 client_sockaddr = {};
867+
socklen_t client_sockaddr_len = sizeof(client_sockaddr);
868+
struct sockaddr* client_sockaddr_ptr = reinterpret_cast<struct sockaddr*>(&client_sockaddr);
869+
ASSERT_EQ(getsockname(client.sockfd(), client_sockaddr_ptr, &client_sockaddr_len), 0);
870+
871+
// Close after getting the sockaddr from fd, otherwise getsockname() wont work.
872+
client.Close();
873+
server.Close();
874+
875+
StopTransferDataThread();
876+
877+
std::vector<TaggedRecordBatch> tablets = ConsumeRecords(kHTTPTableNum);
878+
ASSERT_NOT_EMPTY_AND_GET_RECORDS(const types::ColumnWrapperRecordBatch& record_batch, tablets);
879+
880+
std::vector<size_t> indices =
881+
testing::FindRecordIdxMatchesPID(record_batch, kHTTPUPIDIdx, getpid());
882+
ColumnWrapperRecordBatch records = testing::SelectRecordBatchRows(record_batch, indices);
883+
884+
ASSERT_THAT(records, RecordBatchSizeIs(2));
885+
886+
// Make sure that the socket info resolution works.
887+
ASSERT_OK_AND_ASSIGN(std::string remote_addr, IPv6AddrToString(client_sockaddr.sin6_addr));
888+
EXPECT_THAT(GetLocalAddrs(records, kHTTPLocalAddrIdx, indices), Contains("::1").Times(2));
889+
EXPECT_EQ(remote_addr, "::1");
890+
891+
bool found_port = false;
892+
uint16_t port = ntohs(client_sockaddr.sin6_port);
893+
for (auto lport : GetLocalPorts(records, kHTTPLocalPortIdx, indices)) {
894+
if (lport == port) {
895+
found_port = true;
896+
break;
897+
}
898+
}
899+
EXPECT_TRUE(found_port);
900+
}
901+
750902
// Run a UDP-based client-server system.
751903
class UDPSocketTraceBPFTest : public SocketTraceBPFTest {
752904
protected:

src/stirling/source_connectors/socket_tracer/socket_trace_connector.cc

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ DEFINE_bool(
176176
stirling_debug_tls_sources, gflags::BoolFromEnv("PX_DEBUG_TLS_SOURCES", false),
177177
"If true, stirling will add additional prometheus metrics regarding the traced tls sources");
178178

179-
DEFINE_uint32(stirling_bpf_loop_limit, 42,
179+
DEFINE_uint32(stirling_bpf_loop_limit, 41,
180180
"The maximum number of iovecs to capture for syscalls. "
181181
"Set conservatively for older kernels by default to keep the instruction count below "
182182
"BPF's limit for version 4 kernels (4096 per probe).");
@@ -342,6 +342,18 @@ const auto kProbeSpecs = MakeArray<bpf_tools::KProbeSpec>({
342342
{"close", ProbeType::kReturn, "syscall__probe_ret_close"},
343343
{"mmap", ProbeType::kEntry, "syscall__probe_entry_mmap"},
344344
{"sock_alloc", ProbeType::kReturn, "probe_ret_sock_alloc", /*is_syscall*/ false},
345+
{"tcp_v4_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock",
346+
/*is_syscall*/ false},
347+
{"tcp_v4_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock",
348+
/*is_syscall*/ false},
349+
{"tcp_v6_connect", ProbeType::kEntry, "probe_entry_populate_active_connect_sock",
350+
/*is_syscall*/ false},
351+
{"tcp_v6_connect", ProbeType::kReturn, "probe_ret_populate_active_connect_sock",
352+
/*is_syscall*/ false},
353+
{"tcp_sendmsg", ProbeType::kEntry, "probe_entry_populate_active_connect_sock",
354+
/*is_syscall*/ false},
355+
{"tcp_sendmsg", ProbeType::kReturn, "probe_ret_populate_active_connect_sock",
356+
/*is_syscall*/ false},
345357
{"security_socket_sendmsg", ProbeType::kEntry, "probe_entry_socket_sendmsg",
346358
/*is_syscall*/ false, /* is_optional */ false,
347359
std::make_shared<bpf_tools::KProbeSpec>(bpf_tools::KProbeSpec{

src/stirling/source_connectors/socket_tracer/testing/protocol_checkers.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,26 @@ inline std::vector<bool> GetEncrypted(const types::ColumnWrapperRecordBatch& rb,
135135
return encrypted;
136136
}
137137

138+
inline std::vector<std::string> GetLocalAddrs(const types::ColumnWrapperRecordBatch& rb,
139+
const int local_addr_idx,
140+
const std::vector<size_t>& indices) {
141+
std::vector<std::string> laddrs;
142+
for (size_t idx : indices) {
143+
laddrs.push_back(rb[local_addr_idx]->Get<types::StringValue>(idx));
144+
}
145+
return laddrs;
146+
}
147+
148+
inline std::vector<int64_t> GetLocalPorts(const types::ColumnWrapperRecordBatch& rb,
149+
const int local_port_idx,
150+
const std::vector<size_t>& indices) {
151+
std::vector<int64_t> ports;
152+
for (size_t idx : indices) {
153+
ports.push_back(rb[local_port_idx]->Get<types::Int64Value>(idx).val);
154+
}
155+
return ports;
156+
}
157+
138158
inline std::vector<int64_t> GetRemotePorts(const types::ColumnWrapperRecordBatch& rb,
139159
const std::vector<size_t>& indices) {
140160
std::vector<int64_t> addrs;

0 commit comments

Comments
 (0)