Skip to content

Commit

Permalink
tcp: remove the transport layer reference counting mechanism
Browse files Browse the repository at this point in the history
The upper SP layer takes care not to destroy and endpoint
while it has outstanding pipes.

It's possible that the dueling reference counts between the upper
and lower layers have been responsible for some hangs on shutdown
sometimes seen with other transports.
  • Loading branch information
gdamore committed Nov 25, 2024
1 parent c3620e6 commit a9f542d
Showing 1 changed file with 23 additions and 64 deletions.
87 changes: 23 additions & 64 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,42 @@ typedef struct tcptran_ep tcptran_ep;

// tcp_pipe is one end of a TCP connection.
struct tcptran_pipe {
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
nni_atomic_flag reaped;
nni_reap_node reap;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};

struct tcptran_ep {
nni_mtx mtx;
uint16_t proto;
size_t rcvmax;
bool fini;
bool started;
bool closed;
const char *host; // for dialers
int refcnt; // active pipes
const char *host; // for dialers
nni_aio *useraio;
nni_aio *connaio;
nni_aio *timeaio;
nni_list busypipes; // busy pipes -- ones passed to socket
nni_list waitpipes; // pipes waiting to match to socket
nni_list negopipes; // pipes busy negotiating
nni_reap_node reap;
nng_stream_dialer *dialer;
nng_stream_listener *listener;

Expand All @@ -80,16 +75,6 @@ static void tcptran_pipe_nego_cb(void *);
static void tcptran_ep_fini(void *);
static void tcptran_pipe_fini(void *);

static nni_reap_list tcptran_ep_reap_list = {
.rl_offset = offsetof(tcptran_ep, reap),
.rl_func = tcptran_ep_fini,
};

static nni_reap_list tcptran_pipe_reap_list = {
.rl_offset = offsetof(tcptran_pipe, reap),
.rl_func = tcptran_pipe_fini,
};

static void
tcptran_init(void)
{
Expand Down Expand Up @@ -145,10 +130,6 @@ tcptran_pipe_fini(void *arg)
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
nni_list_node_remove(&p->node);
ep->refcnt--;
if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&tcptran_ep_reap_list, ep);
}
nni_mtx_unlock(&ep->mtx);
}

Expand All @@ -161,17 +142,6 @@ tcptran_pipe_fini(void *arg)
NNI_FREE_STRUCT(p);
}

static void
tcptran_pipe_reap(tcptran_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&tcptran_pipe_reap_list, p);
}
}

static int
tcptran_pipe_alloc(tcptran_pipe **pipep)
{
Expand All @@ -191,7 +161,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep)
}
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);

*pipep = p;

Expand Down Expand Up @@ -294,8 +263,7 @@ tcptran_pipe_nego_cb(void *arg)
}
nni_list_remove(&ep->negopipes, p);
nni_mtx_unlock(&ep->mtx);

tcptran_pipe_reap(p);
tcptran_pipe_fini(p);

Check warning on line 266 in src/sp/transport/tcp/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/tcp/tcp.c#L266

Added line #L266 was not covered by tests
}

static void
Expand Down Expand Up @@ -641,8 +609,6 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep)
{
nni_iov iov;

ep->refcnt++;

p->conn = conn;
p->ep = ep;
p->proto = ep->proto;
Expand Down Expand Up @@ -672,13 +638,6 @@ tcptran_ep_fini(void *arg)
{
tcptran_ep *ep = arg;

nni_mtx_lock(&ep->mtx);
ep->fini = true;
if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
nni_mtx_unlock(&ep->mtx);
nni_aio_stop(ep->timeaio);
nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
Expand Down

0 comments on commit a9f542d

Please sign in to comment.