From 750f3e948acb3d687364a2e9c79f7adfa84cabb3 Mon Sep 17 00:00:00 2001 From: Narate Taerat Date: Wed, 23 Aug 2023 15:17:14 -0500 Subject: [PATCH] IPv6 support in LDMS stack and Stream - Add IPv6 support to LDMS library. - Update LDMS Python library. - Update ldms_ls, ldmsd, ldmsd_prdcr to not assume IPv4. - Fix 'zap_sock' to not assume IPv4. --- ldms/python/ldms.pxd | 31 ++++---- ldms/python/ldms.pyx | 123 +++++++++++++++++++++--------- ldms/src/core/ldms.c | 71 ++++++++++++++++++ ldms/src/core/ldms.h | 68 +++++++++++++---- ldms/src/core/ldms_rail.c | 87 ++++++++++++++++++++-- ldms/src/core/ldms_rail.h | 22 ++++++ ldms/src/core/ldms_stream.c | 136 +++++++++++++++++++++++----------- ldms/src/core/ldms_stream.h | 4 +- ldms/src/core/ldms_xprt.c | 10 +-- ldms/src/core/ldms_xprt.h | 8 +- ldms/src/ldmsd/ldms_ls.c | 51 ++++++++----- ldms/src/ldmsd/ldmsd_config.c | 23 +----- ldms/src/ldmsd/ldmsd_prdcr.c | 18 +---- lib/src/zap/sock/zap_sock.c | 2 +- 14 files changed, 474 insertions(+), 180 deletions(-) diff --git a/ldms/python/ldms.pxd b/ldms/python/ldms.pxd index 5379419bf..99d404f89 100644 --- a/ldms/python/ldms.pxd +++ b/ldms/python/ldms.pxd @@ -57,13 +57,25 @@ cdef extern from * nogil: uint64_t be64toh(uint64_t x) uint64_t htobe64(uint64_t x) + enum: + AF_INET, + AF_INET6, + struct sockaddr: + uint16_t sa_family + pass + struct sockaddr_storage: pass struct in_addr: uint32_t s_addr struct sockaddr_in: uint16_t sin_port in_addr sin_addr + struct in6_addr: + pass + struct sockaddr_in6: + uint16_t sin6_port + in6_addr sin6_addr ctypedef uint32_t socklen_t cdef extern from "errno.h" nogil: @@ -698,15 +710,13 @@ cdef extern from "ldms.h" nogil: LDMS_STREAM_EVENT_RECV LDMS_STREAM_EVENT_SUBSCRIBE_STATUS LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS - struct ldms_stream_src_u: - uint64_t u64; - uint32_t addr4; + struct ldms_addr: uint8_t addr[4]; - uint16_t port; - uint16_t reserved; + uint16_t sin_port; + uint16_t sa_family; struct ldms_stream_recv_data_s: ldms_stream_client_t client - ldms_stream_src_u src + ldms_addr src uint64_t msg_gn ldms_stream_type_e type uint32_t name_len @@ -744,18 +754,13 @@ cdef extern from "ldms.h" nogil: ldms_stream_event_cb_t cb_fn, void *cb_arg) # -- ldms_stream_stats -- # - union ldms_stream_addr_u: - uint64_t u64 - uint8_t addr[4] - uint32_t addr4 - uint16_t port struct ldms_stream_counters_s: timespec first_ts timespec last_ts uint64_t count size_t bytes struct ldms_stream_src_stats_s: - uint64_t src + ldms_addr src ldms_stream_counters_s rx struct ldms_stream_client_pair_stats_s: const char *stream_name @@ -777,7 +782,7 @@ cdef extern from "ldms.h" nogil: ldms_stream_counters_s tx ldms_stream_counters_s drops ldms_stream_client_pair_stats_tq_s pair_tq - ldms_stream_addr_u dest + ldms_addr dest int is_regex const char *match const char *desc diff --git a/ldms/python/ldms.pyx b/ldms/python/ldms.pyx index 792a9d2bb..4fc7aeede 100644 --- a/ldms/python/ldms.pyx +++ b/ldms/python/ldms.pyx @@ -60,6 +60,7 @@ import os import sys import copy import json +import socket import threading from queue import Queue, Empty cimport cython @@ -3660,23 +3661,16 @@ cdef class Xprt(object): if rc: raise StreamSubscribeError(f"ldms_stream_remote_unsubscribe() error, rc: {rc}") - def get_sockaddr(self): + def get_addr(self): """Get the local socket Internet address in ((LOCAL_ADDR, LOCAL_PORT), (REMOTE_ADDR, REMOTE_PORT))""" - cdef sockaddr_in lcl, rmt + cdef sockaddr_storage lcl, rmt cdef socklen_t slen = sizeof(lcl) cdef int rc rc = ldms_xprt_sockaddr(self.xprt, &lcl, &rmt, &slen) if rc: raise RuntimeError(f"ldms_xprt_sockaddr() error, rc: {rc}") - return ( (lcl.sin_addr.s_addr, lcl.sin_port) , - (rmt.sin_addr.s_addr, rmt.sin_port) ) - - def get_stream_addr(self): - """Get (LOCAL, REMOTE) addresses in StreamAddr format""" - lcl, rmt = self.get_sockaddr() - lcl = StreamAddr.from_addr_port(*lcl) - rmt = StreamAddr.from_addr_port(*rmt) - return ( lcl, rmt ) + return ( LdmsAddr.from_sockaddr(PTR(&lcl)), + LdmsAddr.from_sockaddr(PTR(&rmt)) ) cdef class _StreamSubCtxt(object): @@ -3724,11 +3718,90 @@ class StreamSubscribeError(Exception): StreamDataAttrs = [ "raw_data", "data", "src", "name", "is_json", "uid", "gid", "perm", "tid" ] +cdef class LdmsAddr(object): + cdef public int family + cdef public int port + cdef public bytes addr + + def __init__(self, family = 0, port = 0, addr = b'\x00'*16): + self.family = family + self.addr = addr + self.port = port + + def __repr__(self): + return f"LdmsAddr( {self.family}, {self.port}, {self.addr} )" + + def __str__(self): + cdef char buff[128] + if self.family in [ AF_INET, AF_INET6 ]: + addr = socket.inet_ntop(self.family, self.addr) + else: + addr = "UNSUPPORTED" + return f"[{addr}]:{self.port}" + + @classmethod + def from_ldms_addr(cls, Ptr addr_ptr): + cdef ldms_addr *addr = addr_ptr.c_ptr + if addr.sa_family == AF_INET: + addr_bytes = addr.addr[:4] + elif addr.sa_family == AF_INET6: + addr_bytes = addr.addr[:16] + elif addr.sa_family == 0: + addr_bytes = b'\x00'*16 + else: + raise RuntimeError(f"Unsupported address family: {addr.sa_family}") + return LdmsAddr(addr.sa_family, be16toh(addr.sin_port), addr_bytes) + + @classmethod + def from_sockaddr(cls, Ptr addr_ptr): + cdef sockaddr *sa = addr_ptr.c_ptr + cdef sockaddr_in *sin = addr_ptr.c_ptr + cdef sockaddr_in6 *sin6 = addr_ptr.c_ptr + cdef char *addr + if sa.sa_family == AF_INET: + addr = &sin.sin_addr + return LdmsAddr(AF_INET, be16toh(sin.sin_port), addr[:4]) + elif sa.sa_family == AF_INET6: + addr = &sin6.sin6_addr + return LdmsAddr(AF_INET6, be16toh(sin6.sin6_port), addr[:16]) + elif sa.sa_family == 0: + return LdmsAddr(0, 0, b'\x00'*16) + else: + raise RuntimeError(f"Unsupported address family: {sa.sa_family}") + + def as_tuple(self): + return tuple( self.family, self.port, self.addr ) + + def __iter__(self): + yield self.family + yield self.port + yield self.addr + + def __eq__(self, other): + for a, b in zip(self, other): + if a != b: + return False + return True + + def __lt__(self, other): + for a, b in zip(self, other): + if a is None: + if b is None: + continue + return True + if b is None: + return False + if a < b: + return True + if a > b: + return False + return False + cdef class StreamData(object): """Stream Data""" cdef public bytes raw_data # bytes raw data cdef public object data # `str` (for STRING) or `dict` (for JSON) - cdef public uint64_t src # stream originator + cdef public LdmsAddr src # stream originator cdef public str name # stream name cdef public int is_json # data is JSON cdef public int uid # uid of the original publisher @@ -3751,7 +3824,7 @@ cdef class StreamData(object): return str(self.data) def __repr__(self): - return f"StreamData('{self.name}', {hex(self.src)}, " \ + return f"StreamData('{self.name}', {repr(self.src)}, " \ f"{self.tid}, {self.uid}, {self.gid}, {oct(self.perm)}, " \ f"{self.is_json}, {repr(self.data)})" @@ -3777,7 +3850,7 @@ cdef class StreamData(object): is_json = False data = raw_data.decode() name = ev.recv.name.decode() - src = ev.recv.src.u64 + src = LdmsAddr.from_ldms_addr(PTR(&ev.recv.src)) uid = ev.recv.cred.uid gid = ev.recv.cred.gid perm = ev.recv.perm @@ -3794,24 +3867,6 @@ cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil: c.data_q.put(sdata) return 0 -StreamAddr = namedtuple('StreamAddr', ['addr', 'port']) -def _from_ptr(cls, Ptr ptr): - cdef ldms_stream_addr_u *addr = ptr.c_ptr - return cls( tuple([addr.addr[0], addr.addr[1], addr.addr[2], addr.addr[3]]), - be16toh(addr.port) ) -StreamAddr.from_ptr = classmethod(_from_ptr) -del _from_ptr -def _from_addr_port(cls, addr, port): - """From addr, port in Network byte format""" - _b = struct.pack('=LH', addr, port) - _u = struct.unpack('>BBBBH', _b) - _addr = _u[0:4] - _port = _u[4] - return cls(_addr, _port) -StreamAddr.from_addr_port = classmethod(_from_addr_port) -del _from_addr_port -StreamAddr.__str__ = lambda s: ".".join(s.addr)+f":{s.port}" - TimeSpec = namedtuple('TimeSpec', [ 'tv_sec', 'tv_nsec' ]) def _from_ptr(cls, Ptr ptr): cdef timespec *ts = ptr.c_ptr @@ -3833,7 +3888,7 @@ del _from_ptr StreamSrcStats = namedtuple('StreamSrcStats', ['src', 'rx']) def _from_ptr(cls, Ptr ptr): cdef ldms_stream_src_stats_s *ss = ptr.c_ptr - src = StreamAddr.from_ptr(PTR(&ss.src)) + src = LdmsAddr.from_ldms_addr(PTR(&ss.src)) rx = StreamCounters.from_ptr(PTR(&ss.rx)) return cls(src, rx) StreamSrcStats.from_ptr = classmethod(_from_ptr) @@ -3884,7 +3939,7 @@ def _from_ptr(cls, Ptr ptr): cdef ldms_stream_client_pair_stats_s *ps tx = StreamCounters.from_ptr(PTR(&cs.tx)) drops = StreamCounters.from_ptr(PTR(&cs.drops)) - dest = StreamAddr.from_ptr(PTR(&cs.dest)) + dest = LdmsAddr.from_ldms_addr(PTR(&cs.dest)) ps = __STREAM_CLIENT_PAIR_STATS_TQ_FIRST(&cs.pair_tq) streams = list() while ps: diff --git a/ldms/src/core/ldms.c b/ldms/src/core/ldms.c index 2a1948ecc..67e8b2088 100644 --- a/ldms/src/core/ldms.c +++ b/ldms/src/core/ldms.c @@ -60,6 +60,7 @@ #include #include #include +#include #include #include #include @@ -67,6 +68,7 @@ #include #include #include +#include #include "ovis_util/os_util.h" #include "ldms.h" #include "ldms_xprt.h" @@ -5045,3 +5047,72 @@ ldms_schema_t ldms_schema_from_template(const char *name, ldms_schema_delete(sch); return NULL; } + +static int __getsockaddr(const char *host, const char *port, + const struct addrinfo *hints, + struct sockaddr *sa, socklen_t *sa_len) +{ + struct addrinfo *ai; + int rc; + rc = getaddrinfo(host, port, hints, &ai); + switch (rc) { + case 0: + break; + case EAI_ADDRFAMILY: + return -EADDRNOTAVAIL; + case EAI_AGAIN: + return -EAGAIN; + case EAI_BADFLAGS: + return -EINVAL; + case EAI_FAIL: + return -EFAULT; + case EAI_FAMILY: + return -EAFNOSUPPORT; + case EAI_MEMORY: + return -ENOMEM; + case EAI_NODATA: + return -ENODATA; + case EAI_NONAME: + return -ENOENT; + case EAI_SERVICE: + return -EPROTONOSUPPORT; + case EAI_SOCKTYPE: + return -EPROTOTYPE; + case EAI_SYSTEM: + return -errno; + default: + return -EINVAL; + } + if (!ai) + return -ENOENT; + if (ai->ai_addrlen > *sa_len) { + rc = -ENOBUFS; + } else { + *sa_len = ai->ai_addrlen; + memcpy(sa, ai->ai_addr, ai->ai_addrlen); + rc = 0; + } + freeaddrinfo(ai); + return rc; +} + +int ldms_getsockaddr4(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len) +{ + struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM }; + return __getsockaddr(host, port, &hints, sa, sa_len); +} + +int ldms_getsockaddr6(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len) +{ + struct addrinfo hints = { .ai_family = AF_INET6, .ai_socktype = SOCK_STREAM }; + return __getsockaddr(host, port, &hints, sa, sa_len); +} + +int ldms_getsockaddr(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len) +{ + struct addrinfo hints = { .ai_socktype = SOCK_STREAM }; + return __getsockaddr(host, port, &hints, sa, sa_len); +} diff --git a/ldms/src/core/ldms.h b/ldms/src/core/ldms.h index 431e448d4..41e600055 100644 --- a/ldms/src/core/ldms.h +++ b/ldms/src/core/ldms.h @@ -975,6 +975,48 @@ int ldms_xprt_rail_eps(ldms_t x); */ int ldms_xprt_rail_send_credit_get(ldms_t x, uint64_t *credits, int n); +/* A convenient sockaddr union for IPv4 and IPv6 (for now) */ +union ldms_sockaddr { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + struct sockaddr_storage storage; +}; + +/** + * A utility to convert \c host, \c port/service to \c sockaddr. + * + * \c host could be: + * - "IP4_ADDR", e.g. "192.168.0.5" + * - "IP6_ADDR", e.g. "::1" + * - "HOSTNAME", e.g. "node05" + * + * \note It is recommended to supply \c sockaddr_storage structure for \c sa. + * + * \param host The host string. + * \param port The port or service string (see \c getaddrinfo(3)). + * \param[out] sa The sockaddr output buffer. It is recommended to + * supply \c sockaddr_storage structure for \c sa. + * \param[in,out] sa_len The length of the \c sa buffer. On return, \c sa_len + * is set to the length of the returned \c sa. + * + * \retval 0 If there is no error. + */ +int ldms_getsockaddr(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len); + +/** + * Same as \c ldms_getsockaddr(), but only returns AF_INET4 family. + */ +int ldms_getsockaddr4(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len); + +/** + * Same as \c ldms_getsockaddr(), but only returns AF_INET6 family. + */ +int ldms_getsockaddr6(const char *host, const char *port, + struct sockaddr *sa, socklen_t *sa_len); + /** \} */ /** @@ -1038,19 +1080,13 @@ int ldms_stream_publish_file(ldms_t x, const char *stream_name, typedef struct ldms_stream_client_s *ldms_stream_client_t; typedef struct json_entity_s *json_entity_t; -#pragma pack(push, 1) -typedef union ldms_stream_addr_u { - uint64_t u64; /* access as u64 */ - struct { - union { - uint32_t addr4; /* big endian */ - uint8_t addr[4]; /* convenient addr byte access */ - }; - uint16_t port; /* big endian */ - uint16_t reserved; - }; -} *ldms_stream_addr_t; -#pragma pack(pop) +/* currently only support IPv4 and IPv6 */ +struct ldms_addr { + sa_family_t sa_family; /* host-endian */ + in_port_t sin_port; /* network-endian */ + uint8_t addr[16]; /* addr[0-3] for IPv4, + addr[0-15] for IPv6 */ +}; enum ldms_stream_event_type { LDMS_STREAM_EVENT_RECV, /* stream data received */ @@ -1061,7 +1097,7 @@ enum ldms_stream_event_type { /* For stream data delivery to the application */ struct ldms_stream_recv_data_s { ldms_stream_client_t client; - union ldms_stream_addr_u src; + struct ldms_addr src; uint64_t msg_gn; ldms_stream_type_t type; uint32_t name_len; @@ -1176,7 +1212,7 @@ struct ldms_stream_counters_s { /* stream statistics by src */ struct ldms_stream_src_stats_s { struct rbn rbn; /* key ==> src */ - uint64_t src; + struct ldms_addr src; struct ldms_stream_counters_s rx; /* total rx from src */ }; @@ -1213,7 +1249,7 @@ struct ldms_stream_client_stats_s { struct ldms_stream_counters_s tx; struct ldms_stream_counters_s drops; struct ldms_stream_client_pair_stats_tq_s pair_tq; /* stats by stream */ - union ldms_stream_addr_u dest; + struct ldms_addr dest; int is_regex; const char *match; /* the matching string; allocated with the structure */ const char *desc; /* the short description; allocated with the structure */ diff --git a/ldms/src/core/ldms_rail.c b/ldms/src/core/ldms_rail.c index f7a223454..12c6fbc0f 100644 --- a/ldms/src/core/ldms_rail.c +++ b/ldms/src/core/ldms_rail.c @@ -62,6 +62,7 @@ #include #include +#include #include "ldms.h" #include "ldms_xprt.h" @@ -536,13 +537,12 @@ void __rail_zap_handle_conn_req(zap_ep_t zep, zap_event_t ev) static char rej_msg[64] = "Insufficient resources"; struct ldms_rail_conn_msg_s msg; int rc; - char name[128]; + char name[128]; /* for debug / error */ zap_err_t zerr; struct ldms_xprt *lx = zap_get_ucontext(zep); struct ldms_auth *auth; - struct sockaddr_in self_addr, peer_addr; - socklen_t addr_len; - unsigned char *ip4; /* array of 4 bytes */ + union ldms_sockaddr self_addr, peer_addr; + socklen_t addr_len = sizeof(self_addr); struct ldms_rail_id_s rail_id; struct ldms_rail_s *r, *lr = NULL; struct ldms_rail_ep_s *rep; @@ -557,12 +557,22 @@ void __rail_zap_handle_conn_req(zap_ep_t zep, zap_event_t ev) snprintf(rej_msg, sizeof(rej_msg), "zap_get_name() error: %d", zerr); goto err_0; } - ip4 = (void*)&peer_addr.sin_addr.s_addr; - snprintf(name, sizeof(name), "%hhd.%hhd.%hhd.%hhd", ip4[0], ip4[1], ip4[2], ip4[3]); + switch (peer_addr.sa.sa_family) { /* family is in host-endian */ + case AF_INET: + inet_ntop(peer_addr.sa.sa_family, &peer_addr.sin.sin_addr, + name, sizeof(name)); + break; + case AF_INET6: + inet_ntop(peer_addr.sa.sa_family, &peer_addr.sin6.sin6_addr, + name, sizeof(name)); + break; + default: + break; + } __rail_conn_msg_ntoh(m); - rail_id.ip4_addr = peer_addr.sin_addr.s_addr; + rail_id.ip4_addr = peer_addr.sin.sin_addr.s_addr; rail_id.pid = m->pid; rail_id.rail_gn = m->rail_gn; pthread_mutex_lock(&__rail_mutex); @@ -1316,3 +1326,66 @@ zap_ep_t __rail_get_zap_ep(ldms_t x) return NULL; return r->eps[0].ep->zap_ep; } + +int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la) +{ + union ldms_sockaddr *lsa = (void*)sa; + switch (sa->sa_family) { + case AF_INET: + la->sa_family = sa->sa_family; + la->sin_port = lsa->sin.sin_port; + memcpy(&la->addr, &lsa->sin.sin_addr, sizeof(lsa->sin.sin_addr)); + break; + case AF_INET6: + la->sa_family = sa->sa_family; + la->sin_port = lsa->sin6.sin6_port; + memcpy(&la->addr, &lsa->sin6.sin6_addr, sizeof(lsa->sin6.sin6_addr)); + break; + default: + return ENOTSUP; + } + return 0; +} + +const char *sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz) +{ + union ldms_sockaddr *lsa = (void*)sa; + char tmp[128]; + switch (sa->sa_family) { + case 0: + snprintf(buff, sz, "0.0.0.0:0"); + break; + case AF_INET: + inet_ntop(AF_INET, &lsa->sin.sin_addr, tmp, sizeof(tmp)); + snprintf(buff, sz, "%s:%d", tmp, ntohs(lsa->sin.sin_port)); + break; + case AF_INET6: + inet_ntop(AF_INET6, &lsa->sin6.sin6_addr, tmp, sizeof(tmp)); + snprintf(buff, sz, "[%s]:%d", tmp, ntohs(lsa->sin6.sin6_port)); + break; + default: + snprintf(buff, sz, "__UNSUPPORTED__"); + } + return buff; +} + +const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz) +{ + char tmp[128]; + switch (addr->sa_family) { + case 0: + snprintf(buff, sz, "0.0.0.0:0"); + break; + case AF_INET: + inet_ntop(AF_INET, &addr->addr, tmp, sizeof(tmp)); + snprintf(buff, sz, "%s:%d", tmp, ntohs(addr->sin_port)); + break; + case AF_INET6: + inet_ntop(AF_INET6, &addr->addr, tmp, sizeof(tmp)); + snprintf(buff, sz, "[%s]:%d", tmp, ntohs(addr->sin_port)); + break; + default: + snprintf(buff, sz, "__UNSUPPORTED__"); + } + return buff; +} diff --git a/ldms/src/core/ldms_rail.h b/ldms/src/core/ldms_rail.h index b194f49b5..29ed88e1c 100644 --- a/ldms/src/core/ldms_rail.h +++ b/ldms/src/core/ldms_rail.h @@ -58,6 +58,8 @@ #ifndef __LDMS_RAIL_H__ #define __LDMS_RAIL_H__ #include +#include + #include "ovis_ref/ref.h" #include "coll/rbt.h" #include "ldms.h" @@ -198,4 +200,24 @@ struct rail_proc_id_s { int get_rail_proc_id(struct rail_proc_id_s *out); +/** + * Copy sockaddr \c sa address into \c la. + * + * The sa.sa_family is in host-endian. + * The address and port is in little endian. + * + * \note Only support sockaddr_in and sockaddr_in6. + */ +int sockaddr2ldms_addr(struct sockaddr *sa, struct ldms_addr *la); + +/** + * Wrapper of 'inet_ntop()' for sockaddr. + */ +const char *sockaddr_ntop(struct sockaddr *sa, char *buff, size_t sz); + +/** + * Wrapper of 'inet_ntop()' for ldms_addr. + */ +const char *ldms_addr_ntop(struct ldms_addr *addr, char *buff, size_t sz); + #endif /* __LDMS_RAIL_H__ */ diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 1a3d7995b..0e373821b 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -66,6 +66,7 @@ #include #include #include +#include #include "ovis_json/ovis_json.h" #include "coll/rbt.h" @@ -95,6 +96,7 @@ void __rate_credit_release(struct ldms_rail_rate_credit_s *c, uint64_t n); int __str_rbn_cmp(void *tree_key, const void *key); int __u64_rbn_cmp(void *tree_key, const void *key); +int __ldms_addr_rbn_cmp(void *tree_key, const void *key); pthread_rwlock_t __stream_rwlock = PTHREAD_RWLOCK_INITIALIZER; #define __STREAM_RDLOCK() pthread_rwlock_rdlock(&__stream_rwlock) @@ -115,7 +117,7 @@ int __rail_rep_send_raw(struct ldms_rail_ep_s *rep, void *data, int len); * lenX are `int`. */ static int __part_send(struct ldms_rail_ep_s *rep, - uint64_t src, uint64_t msg_gn, + struct ldms_addr *src, uint64_t msg_gn, ...) { size_t zmax = rep->rail->max_msg; @@ -133,7 +135,7 @@ static int __part_send(struct ldms_rail_ep_s *rep, req->hdr.cmd = htobe32(LDMS_CMD_STREAM_MSG); req->hdr.xid = 0; - req->stream_part.src = src; /* src is IP4 addr; already big endian */ + req->stream_part.src = *src; /* src is IP4/6 port+addr; already big endian */ req->stream_part.msg_gn = htobe64(msg_gn); req->stream_part.more = 1; req->stream_part.first = 1; @@ -191,7 +193,7 @@ static int __part_send(struct ldms_rail_ep_s *rep, static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, ldms_stream_type_t stream_type, - uint64_t src, uint64_t msg_gn, + struct ldms_addr *src, uint64_t msg_gn, ldms_cred_t cred, int perm, const char *data, size_t data_len) { @@ -210,14 +212,20 @@ static int __rep_publish(struct ldms_rail_ep_s *rep, const char *stream_name, } /* credit acquired */ - msg.src = src; + if (src) { + msg.src = *src; + msg.src.sa_family = htons(msg.src.sa_family); + /* the rest of msg.src are in network endian */ + } else { + bzero(&msg.src, sizeof(msg.src)); + } msg.msg_gn = htobe64(msg_gn); msg.msg_len = htobe32(name_len + data_len); msg.stream_type = htobe32(stream_type); msg.cred.uid = htobe32(cred->uid); msg.cred.gid = htobe32(cred->gid); msg.perm = htobe32(perm); - rc = __part_send(rep, src, msg_gn, + rc = __part_send(rep, &msg.src, msg_gn, &msg, sizeof(msg), /* msg hdr */ stream_name, name_len, /* name */ data, data_len, /* data */ @@ -238,7 +246,20 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) if (!XTYPE_IS_RAIL(ev->recv.client->x->xtype)) return ENOTSUP; r = (ldms_rail_t)ev->recv.client->x; - ep_idx = ( be32toh(ev->recv.src.addr4) % primer ) % r->n_eps; + switch (ev->recv.src.sa_family) { + case 0: + ep_idx = 0; + break; + case AF_INET: + ep_idx = ( be32toh(*(int*)&ev->recv.src.addr[0]) % primer ) % r->n_eps; + break; + case AF_INET6: + ep_idx = ( be32toh(*(int*)&ev->recv.src.addr[12]) % primer ) % r->n_eps; + break; + default: + assert(0 == "Unexpected network family"); + ep_idx = 0; + } rc = ldms_access_check(r->eps[ep_idx].ep, LDMS_ACCESS_READ, ev->recv.cred.uid, ev->recv.cred.gid, ev->recv.perm); @@ -250,7 +271,7 @@ __remote_client_cb(ldms_stream_event_t ev, void *cb_arg) goto out; rc = __rep_publish(&r->eps[ep_idx], ev->recv.name, ev->recv.type, - ev->recv.src.u64, ev->recv.msg_gn, + &ev->recv.src, ev->recv.msg_gn, &ev->recv.cred, ev->recv.perm, ev->recv.data, ev->recv.data_len); @@ -293,7 +314,7 @@ __stream_get(const char *stream_name, int *is_new) if (is_new) *is_new = 1; - rbt_init(&s->src_stats_rbt, __u64_rbn_cmp); + rbt_init(&s->src_stats_rbt, __ldms_addr_rbn_cmp); s->rx.first_ts = __TIMESPEC_MAX; s->rx.last_ts = __TIMESPEC_MIN; @@ -389,7 +410,7 @@ void __counters_update(struct ldms_stream_counters_s *ctr, /* deliver stream data to all clients */ /* must NOT hold __stream_mutex */ static int -__stream_deliver(uint64_t src, uint64_t msg_gn, +__stream_deliver(struct ldms_addr *src, uint64_t msg_gn, const char *stream_name, int name_len, ldms_stream_type_t stream_type, ldms_cred_t cred, uint32_t perm, @@ -410,7 +431,7 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, struct ldms_stream_event_s _ev = { .type = LDMS_STREAM_EVENT_RECV, .recv = { - .src = {src}, + .src = {0}, .msg_gn = msg_gn, .type = stream_type, .name_len = name_len, @@ -424,6 +445,9 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, }; json_entity_t json = NULL; + if (src) + _ev.recv.src = *src; + /* update stats */ if (__stream_stats_level <= 0) goto skip_stats; @@ -432,7 +456,7 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, __counters_update(&s->rx, &now, data_len); if (__stream_stats_level > 1) { /* stats by src */ - struct rbn *rbn = rbt_find(&s->src_stats_rbt, &src); + struct rbn *rbn = rbt_find(&s->src_stats_rbt, &_ev.recv.src); struct ldms_stream_src_stats_s *ss; if (rbn) { ss = container_of(rbn, struct ldms_stream_src_stats_s, rbn); @@ -444,7 +468,7 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, pthread_rwlock_unlock(&s->rwlock); goto skip_stats; } - ss->src = src; + ss->src = _ev.recv.src; rbn_init(&ss->rbn, &ss->src); ss->rx = LDMS_STREAM_COUNTERS_INITIALIZER; rbt_ins(&s->src_stats_rbt, &ss->rbn); @@ -965,7 +989,7 @@ int ldms_stream_remote_unsubscribe(ldms_t x, const char *match, int is_regex, } struct __sbuf_key_s { - uint64_t src; + struct ldms_addr src; uint64_t msg_gn; }; @@ -990,6 +1014,11 @@ int __u64_rbn_cmp(void *tree_key, const void *key) return 0; } +int __ldms_addr_rbn_cmp(void *tree_key, const void *key) +{ + return memcmp(tree_key, key, sizeof(struct ldms_addr)); +} + struct __stream_buf_s { struct rbn rbn; struct __sbuf_key_s key; @@ -1013,8 +1042,8 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) struct __stream_buf_s *sbuf; struct ldms_stream_full_msg_s *fmsg; int plen, flen; - struct sockaddr_in lsa, rsa; - socklen_t slen; + union ldms_sockaddr lsa, rsa; + socklen_t slen = sizeof(lsa); int rc; const char *name; int name_len; @@ -1024,18 +1053,38 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) /* src is always big endian */ req->stream_part.msg_gn = be64toh(req->stream_part.msg_gn); plen = be32toh(req->hdr.len) - sizeof(req->hdr) - sizeof(req->stream_part); - if (req->stream_part.src == 0) { + if (req->stream_part.src.sa_family == 0) { /* resolve source */ - rc = ldms_xprt_sockaddr(x, (void*)&lsa, (void*)&rsa, &slen); + rc = ldms_xprt_sockaddr(x, &lsa.sa, &rsa.sa, &slen); if (rc) return; - /* Exclude 127.0.0.0/8 loopbacks. - * In the case of the loopback, the 'src' stays 0 and the - * next level will resolve the src address. - */ - if (*((char*)&rsa.sin_addr.s_addr) != 127) { - req->stream_part.src_addr = rsa.sin_addr.s_addr; - req->stream_part.src_port = rsa.sin_port; + switch (rsa.sa.sa_family) { + case AF_INET: + /* Exclude 127.0.0.0/8 loopbacks. + * In the case of the loopback, the 'src' stays 0 and the + * next level will resolve the src address. + */ + if (*((char*)&rsa.sin.sin_addr.s_addr) != 127) { + req->stream_part.src.sa_family = htons(AF_INET); + memcpy(req->stream_part.src.addr, + &rsa.sin.sin_addr, + sizeof(rsa.sin.sin_addr)); + req->stream_part.src.sin_port = rsa.sin.sin_port; + } + break; + case AF_INET6: + /* Exclude loopbacks */ + if (0 != memcmp(&rsa.sin6.sin6_addr, &in6addr_loopback, + sizeof(struct in6_addr))) { + req->stream_part.src.sa_family = htons(AF_INET6); + memcpy(req->stream_part.src.addr, + &rsa.sin6.sin6_addr, + sizeof(struct in6_addr)); + req->stream_part.src.sin_port = rsa.sin6.sin6_port; + } + break; + default: + break; } } @@ -1080,6 +1129,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) goto cleanup; } sbuf->msg->src = req->stream_part.src; + sbuf->msg->src.sa_family = ntohs(sbuf->msg->src.sa_family); sbuf->msg->msg_gn = be64toh(sbuf->msg->msg_gn); sbuf->msg->msg_len = be32toh(sbuf->msg->msg_len); sbuf->msg->stream_type = be32toh(sbuf->msg->stream_type); @@ -1092,7 +1142,7 @@ __process_stream_msg(ldms_t x, struct ldms_request *req) data = sbuf->msg->msg + name_len; data_len = sbuf->msg->msg_len - name_len; - __stream_deliver(sbuf->msg->src, sbuf->msg->msg_gn, + __stream_deliver(&sbuf->msg->src, sbuf->msg->msg_gn, name, name_len, sbuf->msg->stream_type, &sbuf->msg->cred, sbuf->msg->perm, data, data_len); @@ -1113,7 +1163,7 @@ __process_stream_sub(ldms_t x, struct ldms_request *req) int rc; const char *err_msg; int msg_len, reply_len; - struct sockaddr_in lsin, rsin; + union ldms_sockaddr lsin, rsin; socklen_t sin_len = sizeof(lsin); struct { struct ldms_reply r; @@ -1142,13 +1192,14 @@ __process_stream_sub(ldms_t x, struct ldms_request *req) c->rate_credit.rate = be64toh(req->stream_sub.rate); c->rate_credit.credit = be64toh(req->stream_sub.rate); - rc = ldms_xprt_sockaddr(x, (void*)&lsin, (void*)&rsin, &sin_len); + rc = ldms_xprt_sockaddr(x, &lsin.sa, &rsin.sa, &sin_len); if (!rc) { - c->dest.addr4 = rsin.sin_addr.s_addr; - c->dest.port = rsin.sin_port; - c->dest.reserved = 0; + rc = sockaddr2ldms_addr(&rsin.sa, &c->dest); + if (rc) { + bzero(&c->dest, sizeof(c->dest)); + } } else { - c->dest.u64 = -1; + c->dest.sa_family = 0; } c->x = (ldms_t)rep->rail; @@ -1379,7 +1430,7 @@ struct ldms_stream_stats_s * __stream_get_stats(struct ldms_stream_s *s) ss->name = (char*)&ss[1]; memcpy((char*)ss->name, s->name, s->name_len); TAILQ_INIT(&ss->pair_tq); - rbt_init(&ss->src_stats_rbt, __u64_rbn_cmp); + rbt_init(&ss->src_stats_rbt, __ldms_addr_rbn_cmp); LDMS_STREAM_COUNTERS_INIT(&ss->rx); ss->rx = s->rx; @@ -1529,21 +1580,22 @@ int __stream_stats_sources_buff_append(struct ldms_stream_stats_s *stats, int rc; struct rbn *rbn; struct ldms_stream_src_stats_s *src; + struct ldms_addr addr; + char addr_buff[128] = ""; const char *sep = ""; - union ldms_stream_addr_u addr; rc = ovis_buff_appendf(buff, "{"); if (rc) goto out; for (rbn = rbt_min(&stats->src_stats_rbt); rbn; rbn = rbn_succ(rbn)) { src = container_of(rbn, struct ldms_stream_src_stats_s, rbn); - addr.u64 = src->src; - rc = ovis_buff_appendf(buff, "%s\"%hhu.%hhu.%hhu.%hhu:%hu\":", - sep, - addr.addr[0], addr.addr[1], addr.addr[2], addr.addr[3], - be16toh(addr.port)); + addr = src->src; + ldms_addr_ntop(&addr, addr_buff, sizeof(addr_buff)); + rc = ovis_buff_appendf(buff, "%s\"%s\":",sep, addr_buff); if (rc) goto out; rc = __src_stats_buff_append(src, buff); + if (rc) + goto out; sep = ","; } rc = ovis_buff_appendf(buff, "}"); @@ -1791,15 +1843,15 @@ int __client_stats_buff_append(struct ldms_stream_client_stats_s *cs, ovis_buff_t buff) { int rc; + char addr_buff[128]; + ldms_addr_ntop(&cs->dest, addr_buff, sizeof(addr_buff)); rc = ovis_buff_appendf(buff, "{" "\"match\":\"%s\"" ",\"desc\":\"%s\"" - ",\"dest\":\"%hhu.%hhu.%hhu.%hhu:%hu\"", + ",\"dest\":\"%s\"", cs->match, cs->desc, - cs->dest.addr[0], cs->dest.addr[1], - cs->dest.addr[2], cs->dest.addr[3], - be16toh(cs->dest.port)) || + addr_buff) || ovis_buff_appendf(buff, ",\"tx\":") || __counters_buff_append(&cs->tx, buff) || ovis_buff_appendf(buff, ",\"drops\":") || diff --git a/ldms/src/core/ldms_stream.h b/ldms/src/core/ldms_stream.h index bd323103e..d0683bb65 100644 --- a/ldms/src/core/ldms_stream.h +++ b/ldms/src/core/ldms_stream.h @@ -109,7 +109,7 @@ struct ldms_stream_client_s { /* streams that this client subscribed for */ TAILQ_HEAD(, ldms_stream_client_entry_s) stream_tq; - union ldms_stream_addr_u dest; + struct ldms_addr dest; ldms_t x; ldms_stream_event_cb_t cb_fn; @@ -128,7 +128,7 @@ struct ldms_stream_client_s { /* the full stream message */ struct ldms_stream_full_msg_s { - uint64_t src; + struct ldms_addr src; uint64_t msg_gn; uint32_t msg_len; uint32_t stream_type; diff --git a/ldms/src/core/ldms_xprt.c b/ldms/src/core/ldms_xprt.c index a7c8c0e98..fa5b112ba 100644 --- a/ldms/src/core/ldms_xprt.c +++ b/ldms/src/core/ldms_xprt.c @@ -2190,7 +2190,7 @@ int ldms_xprt_names(ldms_t x, char *lcl_name, size_t lcl_name_sz, char *rem_port, size_t rem_port_sz, int flags) { - struct sockaddr lcl, rmt; + struct sockaddr_storage lcl, rmt; socklen_t xlen = sizeof(lcl); int rc; @@ -2205,17 +2205,17 @@ int ldms_xprt_names(ldms_t x, char *lcl_name, size_t lcl_name_sz, memset(&rmt, 0, sizeof(rmt)); memset(&lcl, 0, sizeof(rmt)); - rc = ldms_xprt_sockaddr(x, &lcl, &rmt, &xlen); + rc = ldms_xprt_sockaddr(x, (void*)&lcl, (void*)&rmt, &xlen); if (rc) return rc; if (lcl_name || lcl_port) { - (void) getnameinfo(&lcl, xlen, lcl_name, lcl_name_sz, + (void) getnameinfo((void*)&lcl, xlen, lcl_name, lcl_name_sz, lcl_port, lcl_port_sz, flags); } if (rem_name || rem_port) { - (void)getnameinfo(&lcl, xlen, rem_name, rem_name_sz, + (void)getnameinfo((void*)&lcl, xlen, rem_name, rem_name_sz, rem_port, rem_port_sz, flags); } return 0; @@ -4050,7 +4050,6 @@ int ldms_xprt_connect_by_name(ldms_t x, const char *host, const char *port, { struct addrinfo *ai; struct addrinfo hints = { - .ai_family = AF_INET, .ai_socktype = SOCK_STREAM }; int rc = getaddrinfo(host, port, &hints, &ai); @@ -4093,7 +4092,6 @@ int ldms_xprt_listen_by_name(ldms_t x, const char *host, const char *port_no, struct sockaddr_in sin; struct addrinfo *ai; struct addrinfo hints = { - .ai_family = AF_INET, .ai_socktype = SOCK_STREAM, .ai_flags = AI_PASSIVE, }; diff --git a/ldms/src/core/ldms_xprt.h b/ldms/src/core/ldms_xprt.h index 95db0b07e..253a6389e 100644 --- a/ldms/src/core/ldms_xprt.h +++ b/ldms/src/core/ldms_xprt.h @@ -203,13 +203,7 @@ struct ldms_cancel_push_cmd_param { /* partial stream message; see ldms_stream.h for a full stream message */ struct ldms_stream_part_msg_param { - union { - uint64_t src; - struct { - uint32_t src_addr; - uint32_t src_port; - }; - }; + struct ldms_addr src; uint64_t msg_gn; uint32_t more:1; /* more partial message */ uint32_t first:1; /* first partial message */ diff --git a/ldms/src/ldmsd/ldms_ls.c b/ldms/src/ldmsd/ldms_ls.c index 6404c9682..cb8ab36f1 100644 --- a/ldms/src/ldmsd/ldms_ls.c +++ b/ldms/src/ldmsd/ldms_ls.c @@ -1093,10 +1093,10 @@ int is_in_set_list(const char *name) int main(int argc, char *argv[]) { struct ldms_version version; - struct sockaddr_in sin; + union ldms_sockaddr lsa; + socklen_t sa_len = sizeof(lsa); ldms_t ldms; int ret; - struct hostent *h; char *hostname = strdup("localhost"); unsigned short port_no = LDMS_DEFAULT_PORT; int op; @@ -1222,18 +1222,22 @@ int main(int argc, char *argv[]) } } - h = gethostbyname(hostname); - if (!h) { - herror(argv[0]); + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", port_no); + + rc = ldms_getsockaddr(hostname, port_str, &lsa.sa, &sa_len); + switch (rc) { + case 0: + break;/* OK */ + case -ENOENT: printf("%s: %s does not resolve.\n", argv[0], hostname); exit(1); - } - - if (h->h_addrtype != AF_INET) { - printf("%s: -h %s does not provide an AF_INET address.\n", - argv[0], hostname); - printf("%s: please give a proper hostname.\n", argv[0]); + break; + default: + printf("%s: Error '%d' in resolving host '%s', port '%d'.\n", + argv[0], rc, hostname, port_no); exit(1); + break; } /* Initialize LDMS */ @@ -1261,13 +1265,25 @@ int main(int argc, char *argv[]) exit(1); } - memset(&sin, 0, sizeof sin); - sin.sin_addr.s_addr = *(unsigned int *)(h->h_addr_list[0]); - sin.sin_family = h->h_addrtype; - sin.sin_port = htons(port_no); if (verbose > 1) { printf("Hostname : %s\n", hostname); - printf("IP Address : %s\n", inet_ntoa(sin.sin_addr)); + char addr_buff[128]; + switch (lsa.sa.sa_family) { + case AF_INET: + inet_ntop(AF_INET, &lsa.sin.sin_addr, + addr_buff, sizeof(addr_buff)); + printf("IP Address : %s\n", addr_buff); + break; + case AF_INET6: + inet_ntop(AF_INET6, &lsa.sin6.sin6_addr, + addr_buff, sizeof(addr_buff)); + printf("IPv6 Address : %s\n", addr_buff); + break; + default: + printf("Address : __UNSUPPORTED__ (address family: %d)\n", + lsa.sa.sa_family); + break; + } printf("Port : %hu\n", port_no); printf("Transport : %s\n", xprt); } @@ -1275,8 +1291,7 @@ int main(int argc, char *argv[]) free(hostname); xprt = NULL; hostname = NULL; - ret = ldms_xprt_connect(ldms, (struct sockaddr *)&sin, sizeof(sin), - ldms_connect_cb, NULL); + ret = ldms_xprt_connect(ldms, &lsa.sa, sa_len, ldms_connect_cb, NULL); if (ret) { perror("ldms_xprt_connect"); exit(2); diff --git a/ldms/src/ldmsd/ldmsd_config.c b/ldms/src/ldmsd/ldmsd_config.c index ad9c30599..3ec25e201 100644 --- a/ldms/src/ldmsd/ldmsd_config.c +++ b/ldms/src/ldmsd/ldmsd_config.c @@ -1066,30 +1066,13 @@ static void __listen_connect_cb(ldms_t x, ldms_xprt_event_t e, void *cb_arg) int listen_on_ldms_xprt(ldmsd_listen_t listen) { int rc = 0; - struct sockaddr_in sin; - struct addrinfo *ai = NULL; - struct addrinfo ai_hint = { .ai_family = AF_INET, - .ai_flags = AI_PASSIVE }; char port_buff[8]; assert(listen->x); - sin.sin_family = AF_INET; - if (listen->host) { - snprintf(port_buff, sizeof(port_buff), "%hu", listen->port_no); - rc = getaddrinfo(listen->host, port_buff, &ai_hint, &ai); - if (rc) { - ovis_log(NULL, OVIS_LERROR, "xprt listen error, getaddrinfo(%s, %s) error: %d\n", listen->host, port_buff, rc); - return rc; - } - memcpy(&sin, ai->ai_addr, ai->ai_addrlen); - freeaddrinfo(ai); - } else { - sin.sin_addr.s_addr = 0; - sin.sin_port = htons(listen->port_no); - } - rc = ldms_xprt_listen(listen->x, (struct sockaddr *)&sin, sizeof(sin), - __listen_connect_cb, NULL); + snprintf(port_buff, sizeof(port_buff), "%hu", listen->port_no); + rc = ldms_xprt_listen_by_name(listen->x, listen->host, port_buff, + __listen_connect_cb, NULL); if (rc) { ovis_log(NULL, OVIS_LERROR, "Error %d Listening on %s:%d using `%s` transport and " "`%s` authentication\n", rc, listen->xprt, diff --git a/ldms/src/ldmsd/ldmsd_prdcr.c b/ldms/src/ldmsd/ldmsd_prdcr.c index a308932f5..ecad20b90 100644 --- a/ldms/src/ldmsd/ldmsd_prdcr.c +++ b/ldms/src/ldmsd/ldmsd_prdcr.c @@ -74,22 +74,11 @@ static void prdcr_task_cb(ldmsd_task_t task, void *arg); int prdcr_resolve(const char *hostname, unsigned short port_no, struct sockaddr_storage *ss, socklen_t *ss_len) { - struct hostent *h; + char port_str[16]; - h = gethostbyname(hostname); - if (!h) - return -1; + snprintf(port_str, sizeof(port_str), "%d", port_no); - if (h->h_addrtype != AF_INET) - return -1; - - memset(ss, 0, sizeof *ss); - struct sockaddr_in *sin = (struct sockaddr_in *)ss; - sin->sin_addr.s_addr = *(unsigned int *)(h->h_addr_list[0]); - sin->sin_family = h->h_addrtype; - sin->sin_port = htons(port_no); - *ss_len = sizeof(*sin); - return 0; + return ldms_getsockaddr(hostname, port_str, (struct sockaddr*)ss, ss_len); } void ldmsd_prdcr___del(ldmsd_cfgobj_t obj) @@ -823,6 +812,7 @@ ldmsd_prdcr_new_with_auth(const char *name, const char *xprt_name, if (!prdcr->port_no) goto out; + prdcr->ss_len = sizeof(prdcr->ss); if (prdcr_resolve(host_name, port_no, &prdcr->ss, &prdcr->ss_len)) { errno = EAFNOSUPPORT; ovis_log(prdcr_log, OVIS_LERROR, "ldmsd_prdcr_new: %s:%u not resolved.\n", diff --git a/lib/src/zap/sock/zap_sock.c b/lib/src/zap/sock/zap_sock.c index d966185c3..cc66b0a35 100644 --- a/lib/src/zap/sock/zap_sock.c +++ b/lib/src/zap/sock/zap_sock.c @@ -1711,7 +1711,7 @@ static zap_err_t z_sock_listen(zap_ep_t ep, struct sockaddr *sa, goto err_0; /* create a socket */ - sep->sock = socket(AF_INET, SOCK_STREAM, 0); + sep->sock = socket(sa->sa_family, SOCK_STREAM, 0); if (sep->sock == -1) { zerr = ZAP_ERR_RESOURCE; goto err_0;