From a9f542dcbc58167ec6e452b7de6fa875fbce7b9a Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 24 Nov 2024 18:42:00 -0800 Subject: [PATCH 1/2] tcp: remove the transport layer reference counting mechanism The upper SP layer takes care not to destroy and endpoint while it has outstanding pipes. It's possible that the dueling reference counts between the upper and lower layers have been responsible for some hangs on shutdown sometimes seen with other transports. --- src/sp/transport/tcp/tcp.c | 87 ++++++++++---------------------------- 1 file changed, 23 insertions(+), 64 deletions(-) diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index a32ee7a83..324ffd42e 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -23,47 +23,42 @@ typedef struct tcptran_ep tcptran_ep; // tcp_pipe is one end of a TCP connection. struct tcptran_pipe { - nng_stream *conn; - nni_pipe *npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - bool closed; - nni_list_node node; - tcptran_ep *ep; - nni_atomic_flag reaped; - nni_reap_node reap; - uint8_t txlen[sizeof(uint64_t)]; - uint8_t rxlen[sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nng_stream *conn; + nni_pipe *npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + bool closed; + nni_list_node node; + tcptran_ep *ep; + uint8_t txlen[sizeof(uint64_t)]; + uint8_t rxlen[sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio *txaio; + nni_aio *rxaio; + nni_aio *negoaio; + nni_msg *rxmsg; + nni_mtx mtx; }; struct tcptran_ep { nni_mtx mtx; uint16_t proto; size_t rcvmax; - bool fini; bool started; bool closed; - const char *host; // for dialers - int refcnt; // active pipes + const char *host; // for dialers nni_aio *useraio; nni_aio *connaio; nni_aio *timeaio; nni_list busypipes; // busy pipes -- ones passed to socket nni_list waitpipes; // pipes waiting to match to socket nni_list negopipes; // pipes busy negotiating - nni_reap_node reap; nng_stream_dialer *dialer; nng_stream_listener *listener; @@ -80,16 +75,6 @@ static void tcptran_pipe_nego_cb(void *); static void tcptran_ep_fini(void *); static void tcptran_pipe_fini(void *); -static nni_reap_list tcptran_ep_reap_list = { - .rl_offset = offsetof(tcptran_ep, reap), - .rl_func = tcptran_ep_fini, -}; - -static nni_reap_list tcptran_pipe_reap_list = { - .rl_offset = offsetof(tcptran_pipe, reap), - .rl_func = tcptran_pipe_fini, -}; - static void tcptran_init(void) { @@ -145,10 +130,6 @@ tcptran_pipe_fini(void *arg) if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); nni_list_node_remove(&p->node); - ep->refcnt--; - if (ep->fini && (ep->refcnt == 0)) { - nni_reap(&tcptran_ep_reap_list, ep); - } nni_mtx_unlock(&ep->mtx); } @@ -161,17 +142,6 @@ tcptran_pipe_fini(void *arg) NNI_FREE_STRUCT(p); } -static void -tcptran_pipe_reap(tcptran_pipe *p) -{ - if (!nni_atomic_flag_test_and_set(&p->reaped)) { - if (p->conn != NULL) { - nng_stream_close(p->conn); - } - nni_reap(&tcptran_pipe_reap_list, p); - } -} - static int tcptran_pipe_alloc(tcptran_pipe **pipep) { @@ -191,7 +161,6 @@ tcptran_pipe_alloc(tcptran_pipe **pipep) } nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); - nni_atomic_flag_reset(&p->reaped); *pipep = p; @@ -294,8 +263,7 @@ tcptran_pipe_nego_cb(void *arg) } nni_list_remove(&ep->negopipes, p); nni_mtx_unlock(&ep->mtx); - - tcptran_pipe_reap(p); + tcptran_pipe_fini(p); } static void @@ -641,8 +609,6 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep) { nni_iov iov; - ep->refcnt++; - p->conn = conn; p->ep = ep; p->proto = ep->proto; @@ -672,13 +638,6 @@ tcptran_ep_fini(void *arg) { tcptran_ep *ep = arg; - nni_mtx_lock(&ep->mtx); - ep->fini = true; - if (ep->refcnt != 0) { - nni_mtx_unlock(&ep->mtx); - return; - } - nni_mtx_unlock(&ep->mtx); nni_aio_stop(ep->timeaio); nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); From f6ddadac1cab5d23251685fd142d5b2d0fa4e936 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sun, 24 Nov 2024 19:34:42 -0800 Subject: [PATCH 2/2] tcp transport: inline aio objects for pipe, and organize members We try to put the least likely stuff far from the hotter stuff in the struct to get better cacheline efficiency. --- src/sp/transport/tcp/tcp.c | 65 ++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index 324ffd42e..65999162b 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -29,21 +29,21 @@ struct tcptran_pipe { uint16_t proto; size_t rcvmax; bool closed; - nni_list_node node; tcptran_ep *ep; + nni_list recvq; + nni_list sendq; + nni_aio txaio; + nni_aio rxaio; + nni_aio negoaio; + nni_mtx mtx; + nni_msg *rxmsg; uint8_t txlen[sizeof(uint64_t)]; uint8_t rxlen[sizeof(uint64_t)]; size_t gottxhead; size_t gotrxhead; size_t wanttxhead; size_t wantrxhead; - nni_list recvq; - nni_list sendq; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_msg *rxmsg; - nni_mtx mtx; + nni_list_node node; }; struct tcptran_ep { @@ -94,9 +94,9 @@ tcptran_pipe_close(void *arg) p->closed = true; nni_mtx_unlock(&p->mtx); - nni_aio_close(p->rxaio); - nni_aio_close(p->txaio); - nni_aio_close(p->negoaio); + nni_aio_close(&p->rxaio); + nni_aio_close(&p->txaio); + nni_aio_close(&p->negoaio); nng_stream_close(p->conn); } @@ -106,9 +106,9 @@ tcptran_pipe_stop(void *arg) { tcptran_pipe *p = arg; - nni_aio_stop(p->rxaio); - nni_aio_stop(p->txaio); - nni_aio_stop(p->negoaio); + nni_aio_stop(&p->rxaio); + nni_aio_stop(&p->txaio); + nni_aio_stop(&p->negoaio); } static int @@ -134,10 +134,10 @@ tcptran_pipe_fini(void *arg) } nng_stream_free(p->conn); - nni_aio_free(p->rxaio); - nni_aio_free(p->txaio); - nni_aio_free(p->negoaio); nni_msg_free(p->rxmsg); + nni_aio_fini(&p->rxaio); + nni_aio_fini(&p->txaio); + nni_aio_fini(&p->negoaio); nni_mtx_fini(&p->mtx); NNI_FREE_STRUCT(p); } @@ -146,19 +146,14 @@ static int tcptran_pipe_alloc(tcptran_pipe **pipep) { tcptran_pipe *p; - int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&p->mtx); - if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) != - 0)) { - tcptran_pipe_fini(p); - return (rv); - } + nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p); + nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p); + nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p); nni_aio_list_init(&p->recvq); nni_aio_list_init(&p->sendq); @@ -190,7 +185,7 @@ tcptran_pipe_nego_cb(void *arg) { tcptran_pipe *p = arg; tcptran_ep *ep = p->ep; - nni_aio *aio = p->negoaio; + nni_aio *aio = &p->negoaio; nni_aio *uaio; int rv; @@ -274,7 +269,7 @@ tcptran_pipe_send_cb(void *arg) nni_aio *aio; size_t n; nni_msg *msg; - nni_aio *txaio = p->txaio; + nni_aio *txaio = &p->txaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->sendq); @@ -321,7 +316,7 @@ tcptran_pipe_recv_cb(void *arg) int rv; size_t n; nni_msg *msg; - nni_aio *rxaio = p->rxaio; + nni_aio *rxaio = &p->rxaio; nni_mtx_lock(&p->mtx); aio = nni_list_first(&p->recvq); @@ -431,7 +426,7 @@ tcptran_pipe_send_cancel(nni_aio *aio, void *arg, int rv) // The callback on the txaio will cause the user aio to // be canceled too. if (nni_list_first(&p->sendq) == aio) { - nni_aio_abort(p->txaio, rv); + nni_aio_abort(&p->txaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -469,7 +464,7 @@ tcptran_pipe_send_start(tcptran_pipe *p) NNI_PUT64(p->txlen, len); - txaio = p->txaio; + txaio = &p->txaio; niov = 0; iov[0].iov_buf = p->txlen; iov[0].iov_len = sizeof(p->txlen); @@ -528,7 +523,7 @@ tcptran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) // The callback on the rxaio will cause the user aio to // be canceled too. if (nni_list_first(&p->recvq) == aio) { - nni_aio_abort(p->rxaio, rv); + nni_aio_abort(&p->rxaio, rv); nni_mtx_unlock(&p->mtx); return; } @@ -557,7 +552,7 @@ tcptran_pipe_recv_start(tcptran_pipe *p) } // Schedule a read of the header. - rxaio = p->rxaio; + rxaio = &p->rxaio; iov.iov_buf = p->rxlen; iov.iov_len = sizeof(p->rxlen); nni_aio_set_iov(rxaio, 1, &iov); @@ -626,11 +621,11 @@ tcptran_pipe_start(tcptran_pipe *p, nng_stream *conn, tcptran_ep *ep) p->wanttxhead = 8; iov.iov_len = 8; iov.iov_buf = &p->txlen[0]; - nni_aio_set_iov(p->negoaio, 1, &iov); + nni_aio_set_iov(&p->negoaio, 1, &iov); nni_list_append(&ep->negopipes, p); - nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate - nng_stream_send(p->conn, p->negoaio); + nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate + nng_stream_send(p->conn, &p->negoaio); } static void