Skip to content

Commit

Permalink
IPv6 support in LDMS stack and Stream
Browse files Browse the repository at this point in the history
- Add IPv6 support to LDMS library.
- Update LDMS Python library.
- Update ldms_ls, ldmsd, ldmsd_prdcr to not assume IPv4.
- Fix 'zap_sock' to not assume IPv4.
  • Loading branch information
narategithub committed Aug 30, 2023
1 parent 064fe78 commit 750f3e9
Show file tree
Hide file tree
Showing 14 changed files with 474 additions and 180 deletions.
31 changes: 18 additions & 13 deletions ldms/python/ldms.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,25 @@ cdef extern from * nogil:
uint64_t be64toh(uint64_t x)
uint64_t htobe64(uint64_t x)

enum:
AF_INET,
AF_INET6,

struct sockaddr:
uint16_t sa_family
pass
struct sockaddr_storage:
pass
struct in_addr:
uint32_t s_addr
struct sockaddr_in:
uint16_t sin_port
in_addr sin_addr
struct in6_addr:
pass
struct sockaddr_in6:
uint16_t sin6_port
in6_addr sin6_addr
ctypedef uint32_t socklen_t

cdef extern from "errno.h" nogil:
Expand Down Expand Up @@ -698,15 +710,13 @@ cdef extern from "ldms.h" nogil:
LDMS_STREAM_EVENT_RECV
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS
struct ldms_stream_src_u:
uint64_t u64;
uint32_t addr4;
struct ldms_addr:
uint8_t addr[4];
uint16_t port;
uint16_t reserved;
uint16_t sin_port;
uint16_t sa_family;
struct ldms_stream_recv_data_s:
ldms_stream_client_t client
ldms_stream_src_u src
ldms_addr src
uint64_t msg_gn
ldms_stream_type_e type
uint32_t name_len
Expand Down Expand Up @@ -744,18 +754,13 @@ cdef extern from "ldms.h" nogil:
ldms_stream_event_cb_t cb_fn, void *cb_arg)

# -- ldms_stream_stats -- #
union ldms_stream_addr_u:
uint64_t u64
uint8_t addr[4]
uint32_t addr4
uint16_t port
struct ldms_stream_counters_s:
timespec first_ts
timespec last_ts
uint64_t count
size_t bytes
struct ldms_stream_src_stats_s:
uint64_t src
ldms_addr src
ldms_stream_counters_s rx
struct ldms_stream_client_pair_stats_s:
const char *stream_name
Expand All @@ -777,7 +782,7 @@ cdef extern from "ldms.h" nogil:
ldms_stream_counters_s tx
ldms_stream_counters_s drops
ldms_stream_client_pair_stats_tq_s pair_tq
ldms_stream_addr_u dest
ldms_addr dest
int is_regex
const char *match
const char *desc
Expand Down
123 changes: 89 additions & 34 deletions ldms/python/ldms.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import os
import sys
import copy
import json
import socket
import threading
from queue import Queue, Empty
cimport cython
Expand Down Expand Up @@ -3660,23 +3661,16 @@ cdef class Xprt(object):
if rc:
raise StreamSubscribeError(f"ldms_stream_remote_unsubscribe() error, rc: {rc}")

def get_sockaddr(self):
def get_addr(self):
"""Get the local socket Internet address in ((LOCAL_ADDR, LOCAL_PORT), (REMOTE_ADDR, REMOTE_PORT))"""
cdef sockaddr_in lcl, rmt
cdef sockaddr_storage lcl, rmt
cdef socklen_t slen = sizeof(lcl)
cdef int rc
rc = ldms_xprt_sockaddr(self.xprt, <sockaddr*>&lcl, <sockaddr*>&rmt, &slen)
if rc:
raise RuntimeError(f"ldms_xprt_sockaddr() error, rc: {rc}")
return ( (lcl.sin_addr.s_addr, lcl.sin_port) ,
(rmt.sin_addr.s_addr, rmt.sin_port) )

def get_stream_addr(self):
"""Get (LOCAL, REMOTE) addresses in StreamAddr format"""
lcl, rmt = self.get_sockaddr()
lcl = StreamAddr.from_addr_port(*lcl)
rmt = StreamAddr.from_addr_port(*rmt)
return ( lcl, rmt )
return ( LdmsAddr.from_sockaddr(PTR(&lcl)),
LdmsAddr.from_sockaddr(PTR(&rmt)) )


cdef class _StreamSubCtxt(object):
Expand Down Expand Up @@ -3724,11 +3718,90 @@ class StreamSubscribeError(Exception):

StreamDataAttrs = [ "raw_data", "data", "src", "name", "is_json", "uid", "gid", "perm", "tid" ]

cdef class LdmsAddr(object):
cdef public int family
cdef public int port
cdef public bytes addr

def __init__(self, family = 0, port = 0, addr = b'\x00'*16):
self.family = family
self.addr = addr
self.port = port

def __repr__(self):
return f"LdmsAddr( {self.family}, {self.port}, {self.addr} )"

def __str__(self):
cdef char buff[128]
if self.family in [ AF_INET, AF_INET6 ]:
addr = socket.inet_ntop(self.family, self.addr)
else:
addr = "UNSUPPORTED"
return f"[{addr}]:{self.port}"

@classmethod
def from_ldms_addr(cls, Ptr addr_ptr):
cdef ldms_addr *addr = <ldms_addr*>addr_ptr.c_ptr
if addr.sa_family == AF_INET:
addr_bytes = addr.addr[:4]
elif addr.sa_family == AF_INET6:
addr_bytes = addr.addr[:16]
elif addr.sa_family == 0:
addr_bytes = b'\x00'*16
else:
raise RuntimeError(f"Unsupported address family: {addr.sa_family}")
return LdmsAddr(addr.sa_family, be16toh(addr.sin_port), addr_bytes)

@classmethod
def from_sockaddr(cls, Ptr addr_ptr):
cdef sockaddr *sa = <sockaddr*>addr_ptr.c_ptr
cdef sockaddr_in *sin = <sockaddr_in*>addr_ptr.c_ptr
cdef sockaddr_in6 *sin6 = <sockaddr_in6*>addr_ptr.c_ptr
cdef char *addr
if sa.sa_family == AF_INET:
addr = <char*>&sin.sin_addr
return LdmsAddr(AF_INET, be16toh(sin.sin_port), addr[:4])
elif sa.sa_family == AF_INET6:
addr = <char*>&sin6.sin6_addr
return LdmsAddr(AF_INET6, be16toh(sin6.sin6_port), addr[:16])
elif sa.sa_family == 0:
return LdmsAddr(0, 0, b'\x00'*16)
else:
raise RuntimeError(f"Unsupported address family: {sa.sa_family}")

def as_tuple(self):
return tuple( self.family, self.port, self.addr )

def __iter__(self):
yield self.family
yield self.port
yield self.addr

def __eq__(self, other):
for a, b in zip(self, other):
if a != b:
return False
return True

def __lt__(self, other):
for a, b in zip(self, other):
if a is None:
if b is None:
continue
return True
if b is None:
return False
if a < b:
return True
if a > b:
return False
return False

cdef class StreamData(object):
"""Stream Data"""
cdef public bytes raw_data # bytes raw data
cdef public object data # `str` (for STRING) or `dict` (for JSON)
cdef public uint64_t src # stream originator
cdef public LdmsAddr src # stream originator
cdef public str name # stream name
cdef public int is_json # data is JSON
cdef public int uid # uid of the original publisher
Expand All @@ -3751,7 +3824,7 @@ cdef class StreamData(object):
return str(self.data)

def __repr__(self):
return f"StreamData('{self.name}', {hex(self.src)}, " \
return f"StreamData('{self.name}', {repr(self.src)}, " \
f"{self.tid}, {self.uid}, {self.gid}, {oct(self.perm)}, " \
f"{self.is_json}, {repr(self.data)})"

Expand All @@ -3777,7 +3850,7 @@ cdef class StreamData(object):
is_json = False
data = raw_data.decode()
name = ev.recv.name.decode()
src = ev.recv.src.u64
src = LdmsAddr.from_ldms_addr(PTR(&ev.recv.src))
uid = ev.recv.cred.uid
gid = ev.recv.cred.gid
perm = ev.recv.perm
Expand All @@ -3794,24 +3867,6 @@ cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil:
c.data_q.put(sdata)
return 0

StreamAddr = namedtuple('StreamAddr', ['addr', 'port'])
def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_addr_u *addr = <ldms_stream_addr_u*>ptr.c_ptr
return cls( tuple([addr.addr[0], addr.addr[1], addr.addr[2], addr.addr[3]]),
be16toh(addr.port) )
StreamAddr.from_ptr = classmethod(_from_ptr)
del _from_ptr
def _from_addr_port(cls, addr, port):
"""From addr, port in Network byte format"""
_b = struct.pack('=LH', addr, port)
_u = struct.unpack('>BBBBH', _b)
_addr = _u[0:4]
_port = _u[4]
return cls(_addr, _port)
StreamAddr.from_addr_port = classmethod(_from_addr_port)
del _from_addr_port
StreamAddr.__str__ = lambda s: ".".join(s.addr)+f":{s.port}"

TimeSpec = namedtuple('TimeSpec', [ 'tv_sec', 'tv_nsec' ])
def _from_ptr(cls, Ptr ptr):
cdef timespec *ts = <timespec*>ptr.c_ptr
Expand All @@ -3833,7 +3888,7 @@ del _from_ptr
StreamSrcStats = namedtuple('StreamSrcStats', ['src', 'rx'])
def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_src_stats_s *ss = <ldms_stream_src_stats_s *>ptr.c_ptr
src = StreamAddr.from_ptr(PTR(&ss.src))
src = LdmsAddr.from_ldms_addr(PTR(&ss.src))
rx = StreamCounters.from_ptr(PTR(&ss.rx))
return cls(src, rx)
StreamSrcStats.from_ptr = classmethod(_from_ptr)
Expand Down Expand Up @@ -3884,7 +3939,7 @@ def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_client_pair_stats_s *ps
tx = StreamCounters.from_ptr(PTR(&cs.tx))
drops = StreamCounters.from_ptr(PTR(&cs.drops))
dest = StreamAddr.from_ptr(PTR(&cs.dest))
dest = LdmsAddr.from_ldms_addr(PTR(&cs.dest))
ps = __STREAM_CLIENT_PAIR_STATS_TQ_FIRST(&cs.pair_tq)
streams = list()
while ps:
Expand Down
71 changes: 71 additions & 0 deletions ldms/src/core/ldms.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@
#include <sys/user.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <limits.h>
#include <assert.h>
#include <mmalloc/mmalloc.h>
#include <pthread.h>
#include <asm/byteorder.h>
#include <ctype.h>
#include <netdb.h>
#include "ovis_util/os_util.h"
#include "ldms.h"
#include "ldms_xprt.h"
Expand Down Expand Up @@ -5045,3 +5047,72 @@ ldms_schema_t ldms_schema_from_template(const char *name,
ldms_schema_delete(sch);
return NULL;
}

static int __getsockaddr(const char *host, const char *port,
const struct addrinfo *hints,
struct sockaddr *sa, socklen_t *sa_len)
{
struct addrinfo *ai;
int rc;
rc = getaddrinfo(host, port, hints, &ai);
switch (rc) {
case 0:
break;
case EAI_ADDRFAMILY:
return -EADDRNOTAVAIL;
case EAI_AGAIN:
return -EAGAIN;
case EAI_BADFLAGS:
return -EINVAL;
case EAI_FAIL:
return -EFAULT;
case EAI_FAMILY:
return -EAFNOSUPPORT;
case EAI_MEMORY:
return -ENOMEM;
case EAI_NODATA:
return -ENODATA;
case EAI_NONAME:
return -ENOENT;
case EAI_SERVICE:
return -EPROTONOSUPPORT;
case EAI_SOCKTYPE:
return -EPROTOTYPE;
case EAI_SYSTEM:
return -errno;
default:
return -EINVAL;
}
if (!ai)
return -ENOENT;
if (ai->ai_addrlen > *sa_len) {
rc = -ENOBUFS;
} else {
*sa_len = ai->ai_addrlen;
memcpy(sa, ai->ai_addr, ai->ai_addrlen);
rc = 0;
}
freeaddrinfo(ai);
return rc;
}

int ldms_getsockaddr4(const char *host, const char *port,
struct sockaddr *sa, socklen_t *sa_len)
{
struct addrinfo hints = { .ai_family = AF_INET, .ai_socktype = SOCK_STREAM };
return __getsockaddr(host, port, &hints, sa, sa_len);
}

int ldms_getsockaddr6(const char *host, const char *port,
struct sockaddr *sa, socklen_t *sa_len)
{
struct addrinfo hints = { .ai_family = AF_INET6, .ai_socktype = SOCK_STREAM };
return __getsockaddr(host, port, &hints, sa, sa_len);
}

int ldms_getsockaddr(const char *host, const char *port,
struct sockaddr *sa, socklen_t *sa_len)
{
struct addrinfo hints = { .ai_socktype = SOCK_STREAM };
return __getsockaddr(host, port, &hints, sa, sa_len);
}
Loading

0 comments on commit 750f3e9

Please sign in to comment.