Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rx_rate to prdcr_add and prdcr_subscribe + IPv6 support #1267

Merged
merged 7 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions ldms/man/ldmsd_controller.man
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,12 @@ The send credits our ldmsd (the one we are controlling) advertises to the prdcr
(default: value from ldmsd --credits option). This limits how much outstanding
data our ldmsd holds for the prdcr. The prdcr drops messages when it does not
have enough send credits.
.TP
.BI [rx_rate " BYTES_PER_SEC"]
.br
The recv rate (bytes/sec) limit for this connection. The default is -1
(unlimited).

.RE

.SS Delete a producer from the aggregator
Expand Down Expand Up @@ -771,14 +777,23 @@ The stream name
The data to publish
.RE

.SS Subscribe to a stream
.BR subscribe
.SS Subscribe to a stream on matching producers
.BR prdcr_subscribe
attr=<value>
.RS
.TP
.BI name " name"
.BI regex " PRDCR_REGEX"
.br
The stream name
A regular expression matching PRODUCER names
.TP
.BI stream " STREAM_NAME_OR_REGEX"
.br
The stream name or regular expression
.TP
.BI [rx_rate " BYTES_PER_SECOND"]
.br
The recv rate (bytes/sec) limit for the matching streams. The default is -1
(unlimited).
.RE

.SH LDMS DAEMON COMMAND SYNTAX
Expand Down
34 changes: 20 additions & 14 deletions ldms/python/ldms.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,25 @@ cdef extern from * nogil:
uint64_t be64toh(uint64_t x)
uint64_t htobe64(uint64_t x)

enum:
AF_INET,
AF_INET6,

struct sockaddr:
uint16_t sa_family
pass
struct sockaddr_storage:
pass
struct in_addr:
uint32_t s_addr
struct sockaddr_in:
uint16_t sin_port
in_addr sin_addr
struct in6_addr:
pass
struct sockaddr_in6:
uint16_t sin6_port
in6_addr sin6_addr
ctypedef uint32_t socklen_t

cdef extern from "errno.h" nogil:
Expand Down Expand Up @@ -698,15 +710,13 @@ cdef extern from "ldms.h" nogil:
LDMS_STREAM_EVENT_RECV
LDMS_STREAM_EVENT_SUBSCRIBE_STATUS
LDMS_STREAM_EVENT_UNSUBSCRIBE_STATUS
struct ldms_stream_src_u:
uint64_t u64;
uint32_t addr4;
struct ldms_addr:
uint8_t addr[4];
uint16_t port;
uint16_t reserved;
uint16_t sin_port;
uint16_t sa_family;
struct ldms_stream_recv_data_s:
ldms_stream_client_t client
ldms_stream_src_u src
ldms_addr src
uint64_t msg_gn
ldms_stream_type_e type
uint32_t name_len
Expand Down Expand Up @@ -738,23 +748,19 @@ cdef extern from "ldms.h" nogil:
const char *desc)
void ldms_stream_close(ldms_stream_client_t c)
int ldms_stream_remote_subscribe(ldms_t x, const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg)
ldms_stream_event_cb_t cb_fn, void *cb_arg,
int64_t rx_rate)
int ldms_stream_remote_unsubscribe(ldms_t x, const char *stream, int is_regex,
ldms_stream_event_cb_t cb_fn, void *cb_arg)

# -- ldms_stream_stats -- #
union ldms_stream_addr_u:
uint64_t u64
uint8_t addr[4]
uint32_t addr4
uint16_t port
struct ldms_stream_counters_s:
timespec first_ts
timespec last_ts
uint64_t count
size_t bytes
struct ldms_stream_src_stats_s:
uint64_t src
ldms_addr src
ldms_stream_counters_s rx
struct ldms_stream_client_pair_stats_s:
const char *stream_name
Expand All @@ -776,7 +782,7 @@ cdef extern from "ldms.h" nogil:
ldms_stream_counters_s tx
ldms_stream_counters_s drops
ldms_stream_client_pair_stats_tq_s pair_tq
ldms_stream_addr_u dest
ldms_addr dest
int is_regex
const char *match
const char *desc
Expand Down
134 changes: 95 additions & 39 deletions ldms/python/ldms.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import os
import sys
import copy
import json
import socket
import threading
from queue import Queue, Empty
cimport cython
Expand Down Expand Up @@ -3573,7 +3574,7 @@ cdef class Xprt(object):
if rc:
raise RuntimeError(f"ldms_stream_publish() failed, rc: {rc}")

def stream_subscribe(self, match, is_regex, cb=None, cb_arg=None):
def stream_subscribe(self, match, is_regex, cb=None, cb_arg=None, rx_rate=-1):
"""r.stream_subscribe(match, is_regex, cb=None, cb_arg=None)

`cb()` signature: `cb(StreamStatusEvent ev, object cb_arg)`
Expand Down Expand Up @@ -3608,6 +3609,7 @@ cdef class Xprt(object):
cdef _StreamSubCtxt ctxt = _StreamSubCtxt()
cdef const char *c_match
cdef int c_is_regex
cdef int64_t c_rx_rate = rx_rate
cdef bytes tmp
ctxt.cb = cb
ctxt.cb_arg = cb_arg
Expand All @@ -3619,7 +3621,7 @@ cdef class Xprt(object):
sem_init(&ctxt.sem, 0, 0)
with nogil:
rc = ldms_stream_remote_subscribe(self.xprt, c_match, c_is_regex,
__stream_block_cb, <void*>ctxt)
__stream_block_cb, <void*>ctxt, c_rx_rate)
if rc:
with gil:
raise StreamSubscribeError(f"ldms_stream_remote_subscribe() error, rc: {rc}")
Expand All @@ -3630,7 +3632,7 @@ cdef class Xprt(object):
Py_INCREF(ctxt)
with nogil:
rc = ldms_stream_remote_subscribe(self.xprt, c_match, c_is_regex,
__stream_wrap_cb, <void*>ctxt)
__stream_wrap_cb, <void*>ctxt, c_rx_rate)
if rc:
raise StreamSubscribeError(f"ldms_stream_remote_subscribe() error, rc: {rc}")

Expand All @@ -3648,7 +3650,7 @@ cdef class Xprt(object):
rc = ldms_stream_remote_unsubscribe(self.xprt, BYTES(match), is_regex,
__stream_block_cb, <void*>ctxt)
if rc:
raise StreamSubscribeError(f"ldms_stream_remote_subscribe() error, rc: {rc}")
raise StreamSubscribeError(f"ldms_stream_remote_unsubscribe() error, rc: {rc}")
sem_wait(&ctxt.sem)
if ctxt.rc:
raise StreamSubscribeError(f"stream remote subscription error: {ctxt.rc}")
Expand All @@ -3657,25 +3659,18 @@ cdef class Xprt(object):
rc = ldms_stream_remote_unsubscribe(self.xprt, BYTES(match), is_regex,
__stream_wrap_cb, <void*>ctxt)
if rc:
raise StreamSubscribeError(f"ldms_stream_remote_subscribe() error, rc: {rc}")
raise StreamSubscribeError(f"ldms_stream_remote_unsubscribe() error, rc: {rc}")

def get_sockaddr(self):
def get_addr(self):
"""Get the local socket Internet address in ((LOCAL_ADDR, LOCAL_PORT), (REMOTE_ADDR, REMOTE_PORT))"""
cdef sockaddr_in lcl, rmt
cdef sockaddr_storage lcl, rmt
cdef socklen_t slen = sizeof(lcl)
cdef int rc
rc = ldms_xprt_sockaddr(self.xprt, <sockaddr*>&lcl, <sockaddr*>&rmt, &slen)
if rc:
raise RuntimeError(f"ldms_xprt_sockaddr() error, rc: {rc}")
return ( (lcl.sin_addr.s_addr, lcl.sin_port) ,
(rmt.sin_addr.s_addr, rmt.sin_port) )

def get_stream_addr(self):
"""Get (LOCAL, REMOTE) addresses in StreamAddr format"""
lcl, rmt = self.get_sockaddr()
lcl = StreamAddr.from_addr_port(*lcl)
rmt = StreamAddr.from_addr_port(*rmt)
return ( lcl, rmt )
return ( LdmsAddr.from_sockaddr(PTR(&lcl)),
LdmsAddr.from_sockaddr(PTR(&rmt)) )


cdef class _StreamSubCtxt(object):
Expand Down Expand Up @@ -3723,11 +3718,90 @@ class StreamSubscribeError(Exception):

StreamDataAttrs = [ "raw_data", "data", "src", "name", "is_json", "uid", "gid", "perm", "tid" ]

cdef class LdmsAddr(object):
cdef public int family
cdef public int port
cdef public bytes addr

def __init__(self, family = 0, port = 0, addr = b'\x00'*16):
self.family = family
self.addr = addr
self.port = port

def __repr__(self):
return f"LdmsAddr( {self.family}, {self.port}, {self.addr} )"

def __str__(self):
cdef char buff[128]
if self.family in [ AF_INET, AF_INET6 ]:
addr = socket.inet_ntop(self.family, self.addr)
else:
addr = "UNSUPPORTED"
return f"[{addr}]:{self.port}"

@classmethod
def from_ldms_addr(cls, Ptr addr_ptr):
cdef ldms_addr *addr = <ldms_addr*>addr_ptr.c_ptr
if addr.sa_family == AF_INET:
addr_bytes = addr.addr[:4]
elif addr.sa_family == AF_INET6:
addr_bytes = addr.addr[:16]
elif addr.sa_family == 0:
addr_bytes = b'\x00'*16
else:
raise RuntimeError(f"Unsupported address family: {addr.sa_family}")
return LdmsAddr(addr.sa_family, be16toh(addr.sin_port), addr_bytes)

@classmethod
def from_sockaddr(cls, Ptr addr_ptr):
cdef sockaddr *sa = <sockaddr*>addr_ptr.c_ptr
cdef sockaddr_in *sin = <sockaddr_in*>addr_ptr.c_ptr
cdef sockaddr_in6 *sin6 = <sockaddr_in6*>addr_ptr.c_ptr
cdef char *addr
if sa.sa_family == AF_INET:
addr = <char*>&sin.sin_addr
return LdmsAddr(AF_INET, be16toh(sin.sin_port), addr[:4])
elif sa.sa_family == AF_INET6:
addr = <char*>&sin6.sin6_addr
return LdmsAddr(AF_INET6, be16toh(sin6.sin6_port), addr[:16])
elif sa.sa_family == 0:
return LdmsAddr(0, 0, b'\x00'*16)
else:
raise RuntimeError(f"Unsupported address family: {sa.sa_family}")

def as_tuple(self):
return tuple( self.family, self.port, self.addr )

def __iter__(self):
yield self.family
yield self.port
yield self.addr

def __eq__(self, other):
for a, b in zip(self, other):
if a != b:
return False
return True

def __lt__(self, other):
for a, b in zip(self, other):
if a is None:
if b is None:
continue
return True
if b is None:
return False
if a < b:
return True
if a > b:
return False
return False

cdef class StreamData(object):
"""Stream Data"""
cdef public bytes raw_data # bytes raw data
cdef public object data # `str` (for STRING) or `dict` (for JSON)
cdef public uint64_t src # stream originator
cdef public LdmsAddr src # stream originator
cdef public str name # stream name
cdef public int is_json # data is JSON
cdef public int uid # uid of the original publisher
Expand All @@ -3750,7 +3824,7 @@ cdef class StreamData(object):
return str(self.data)

def __repr__(self):
return f"StreamData('{self.name}', {hex(self.src)}, " \
return f"StreamData('{self.name}', {repr(self.src)}, " \
f"{self.tid}, {self.uid}, {self.gid}, {oct(self.perm)}, " \
f"{self.is_json}, {repr(self.data)})"

Expand All @@ -3776,7 +3850,7 @@ cdef class StreamData(object):
is_json = False
data = raw_data.decode()
name = ev.recv.name.decode()
src = ev.recv.src.u64
src = LdmsAddr.from_ldms_addr(PTR(&ev.recv.src))
uid = ev.recv.cred.uid
gid = ev.recv.cred.gid
perm = ev.recv.perm
Expand All @@ -3793,24 +3867,6 @@ cdef int __stream_client_cb(ldms_stream_event_t ev, void *arg) with gil:
c.data_q.put(sdata)
return 0

StreamAddr = namedtuple('StreamAddr', ['addr', 'port'])
def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_addr_u *addr = <ldms_stream_addr_u*>ptr.c_ptr
return cls( tuple([addr.addr[0], addr.addr[1], addr.addr[2], addr.addr[3]]),
be16toh(addr.port) )
StreamAddr.from_ptr = classmethod(_from_ptr)
del _from_ptr
def _from_addr_port(cls, addr, port):
"""From addr, port in Network byte format"""
_b = struct.pack('=LH', addr, port)
_u = struct.unpack('>BBBBH', _b)
_addr = _u[0:4]
_port = _u[4]
return cls(_addr, _port)
StreamAddr.from_addr_port = classmethod(_from_addr_port)
del _from_addr_port
StreamAddr.__str__ = lambda s: ".".join(s.addr)+f":{s.port}"

TimeSpec = namedtuple('TimeSpec', [ 'tv_sec', 'tv_nsec' ])
def _from_ptr(cls, Ptr ptr):
cdef timespec *ts = <timespec*>ptr.c_ptr
Expand All @@ -3832,7 +3888,7 @@ del _from_ptr
StreamSrcStats = namedtuple('StreamSrcStats', ['src', 'rx'])
def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_src_stats_s *ss = <ldms_stream_src_stats_s *>ptr.c_ptr
src = StreamAddr.from_ptr(PTR(&ss.src))
src = LdmsAddr.from_ldms_addr(PTR(&ss.src))
rx = StreamCounters.from_ptr(PTR(&ss.rx))
return cls(src, rx)
StreamSrcStats.from_ptr = classmethod(_from_ptr)
Expand Down Expand Up @@ -3883,7 +3939,7 @@ def _from_ptr(cls, Ptr ptr):
cdef ldms_stream_client_pair_stats_s *ps
tx = StreamCounters.from_ptr(PTR(&cs.tx))
drops = StreamCounters.from_ptr(PTR(&cs.drops))
dest = StreamAddr.from_ptr(PTR(&cs.dest))
dest = LdmsAddr.from_ldms_addr(PTR(&cs.dest))
ps = __STREAM_CLIENT_PAIR_STATS_TQ_FIRST(&cs.pair_tq)
streams = list()
while ps:
Expand Down
Loading