diff --git a/ldms/src/core/ldms_stream.c b/ldms/src/core/ldms_stream.c index 7392ae6d5..ded9128a7 100644 --- a/ldms/src/core/ldms_stream.c +++ b/ldms/src/core/ldms_stream.c @@ -357,10 +357,11 @@ __client_stream_unbind(struct ldms_stream_client_entry_s *sce) pthread_rwlock_wrlock(&s->rwlock); sce->client = NULL; - /* NOTE: sce will be removed froms->client_tq later */ + TAILQ_REMOVE(&s->client_tq, sce, stream_client_entry); pthread_rwlock_unlock(&s->rwlock); ref_put(&c->ref, "client_entry"); + ref_put(&sce->ref, "stream_client_entry"); } void __counters_update(struct ldms_stream_counters_s *ctr, @@ -447,12 +448,7 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, while (sce) { next_sce = TAILQ_NEXT(sce, stream_client_entry); c = sce->client; - if (!c) { - /* client deregistered, remove the entry */ - TAILQ_REMOVE(&s->client_tq, sce, stream_client_entry); - ref_put(&sce->ref, "stream_client_entry"); - goto next; - } + assert(c); if (!json && stream_type == LDMS_STREAM_JSON && !c->x) { /* json object is only required to parse once for * the local client */ @@ -485,7 +481,6 @@ __stream_deliver(uint64_t src, uint64_t msg_gn, } ref_put(&c->ref, "callback"); pthread_rwlock_rdlock(&s->rwlock); - next: sce = next_sce; } @@ -1342,6 +1337,7 @@ void __stream_stats_free(struct ldms_stream_stats_s *ss) free(ss); } +/* readlock already taken */ struct ldms_stream_stats_s * __stream_get_stats(struct ldms_stream_s *s) { /* s->name_len already includes '\0' */ @@ -1721,15 +1717,19 @@ struct ldms_stream_client_stats_tq_s *ldms_stream_client_stats_tq_get() /* then go through stream-specific clients */ for (rbn = rbt_min(&__stream_rbt); rbn; rbn = rbn_succ(rbn)) { s = container_of(rbn, struct ldms_stream_s, rbn); + pthread_rwlock_rdlock(&s->rwlock); TAILQ_FOREACH(sce, &s->client_tq, stream_client_entry) { cli = sce->client; if (cli->is_regex) continue; /* already handled above */ cs = ldms_stream_client_get_stats(cli); - if (!cs) + if (!cs) { + pthread_rwlock_unlock(&s->rwlock); goto err_0; + } TAILQ_INSERT_TAIL(tq, cs, entry); } + pthread_rwlock_unlock(&s->rwlock); } if (TAILQ_EMPTY(tq)) {