Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tcp: remove the transport layer reference counting mechanism #1948

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 45 additions & 91 deletions src/sp/transport/tcp/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,42 @@

// tcp_pipe is one end of a TCP connection.
struct tcptran_pipe {
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
nni_list_node node;
tcptran_ep *ep;
nni_atomic_flag reaped;
nni_reap_node reap;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list recvq;
nni_list sendq;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
nng_stream *conn;
nni_pipe *npipe;
uint16_t peer;
uint16_t proto;
size_t rcvmax;
bool closed;
tcptran_ep *ep;
nni_list recvq;
nni_list sendq;
nni_aio txaio;
nni_aio rxaio;
nni_aio negoaio;
nni_mtx mtx;
nni_msg *rxmsg;
uint8_t txlen[sizeof(uint64_t)];
uint8_t rxlen[sizeof(uint64_t)];
size_t gottxhead;
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_list_node node;
};

struct tcptran_ep {
nni_mtx mtx;
uint16_t proto;
size_t rcvmax;
bool fini;
bool started;
bool closed;
const char *host; // for dialers
int refcnt; // active pipes
const char *host; // for dialers
nni_aio *useraio;
nni_aio *connaio;
nni_aio *timeaio;
nni_list busypipes; // busy pipes -- ones passed to socket
nni_list waitpipes; // pipes waiting to match to socket
nni_list negopipes; // pipes busy negotiating
nni_reap_node reap;
nng_stream_dialer *dialer;
nng_stream_listener *listener;

Expand All @@ -80,16 +75,6 @@
static void tcptran_ep_fini(void *);
static void tcptran_pipe_fini(void *);

static nni_reap_list tcptran_ep_reap_list = {
.rl_offset = offsetof(tcptran_ep, reap),
.rl_func = tcptran_ep_fini,
};

static nni_reap_list tcptran_pipe_reap_list = {
.rl_offset = offsetof(tcptran_pipe, reap),
.rl_func = tcptran_pipe_fini,
};

static void
tcptran_init(void)
{
Expand All @@ -109,9 +94,9 @@
p->closed = true;
nni_mtx_unlock(&p->mtx);

nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);
nni_aio_close(&p->negoaio);

nng_stream_close(p->conn);
}
Expand All @@ -121,9 +106,9 @@
{
tcptran_pipe *p = arg;

nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
nni_aio_stop(&p->negoaio);
}

static int
Expand All @@ -145,53 +130,32 @@
if ((ep = p->ep) != NULL) {
nni_mtx_lock(&ep->mtx);
nni_list_node_remove(&p->node);
ep->refcnt--;
if (ep->fini && (ep->refcnt == 0)) {
nni_reap(&tcptran_ep_reap_list, ep);
}
nni_mtx_unlock(&ep->mtx);
}

nng_stream_free(p->conn);
nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_free(p->negoaio);
nni_msg_free(p->rxmsg);
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);
nni_aio_fini(&p->negoaio);
nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
}

static void
tcptran_pipe_reap(tcptran_pipe *p)
{
if (!nni_atomic_flag_test_and_set(&p->reaped)) {
if (p->conn != NULL) {
nng_stream_close(p->conn);
}
nni_reap(&tcptran_pipe_reap_list, p);
}
}

static int
tcptran_pipe_alloc(tcptran_pipe **pipep)
{
tcptran_pipe *p;
int rv;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
if (((rv = nni_aio_alloc(&p->txaio, tcptran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, tcptran_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, tcptran_pipe_nego_cb, p)) !=
0)) {
tcptran_pipe_fini(p);
return (rv);
}
nni_aio_init(&p->txaio, tcptran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, tcptran_pipe_recv_cb, p);
nni_aio_init(&p->negoaio, tcptran_pipe_nego_cb, p);
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);

*pipep = p;

Expand Down Expand Up @@ -221,7 +185,7 @@
{
tcptran_pipe *p = arg;
tcptran_ep *ep = p->ep;
nni_aio *aio = p->negoaio;
nni_aio *aio = &p->negoaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -294,8 +258,7 @@
}
nni_list_remove(&ep->negopipes, p);
nni_mtx_unlock(&ep->mtx);

tcptran_pipe_reap(p);
tcptran_pipe_fini(p);

Check warning on line 261 in src/sp/transport/tcp/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/tcp/tcp.c#L261

Added line #L261 was not covered by tests
}

static void
Expand All @@ -306,7 +269,7 @@
nni_aio *aio;
size_t n;
nni_msg *msg;
nni_aio *txaio = p->txaio;
nni_aio *txaio = &p->txaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
Expand Down Expand Up @@ -353,7 +316,7 @@
int rv;
size_t n;
nni_msg *msg;
nni_aio *rxaio = p->rxaio;
nni_aio *rxaio = &p->rxaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);
Expand Down Expand Up @@ -463,7 +426,7 @@
// The callback on the txaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->sendq) == aio) {
nni_aio_abort(p->txaio, rv);
nni_aio_abort(&p->txaio, rv);

Check warning on line 429 in src/sp/transport/tcp/tcp.c

View check run for this annotation

Codecov / codecov/patch

src/sp/transport/tcp/tcp.c#L429

Added line #L429 was not covered by tests
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -501,7 +464,7 @@

NNI_PUT64(p->txlen, len);

txaio = p->txaio;
txaio = &p->txaio;
niov = 0;
iov[0].iov_buf = p->txlen;
iov[0].iov_len = sizeof(p->txlen);
Expand Down Expand Up @@ -560,7 +523,7 @@
// The callback on the rxaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->recvq) == aio) {
nni_aio_abort(p->rxaio, rv);
nni_aio_abort(&p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -589,7 +552,7 @@
}

// Schedule a read of the header.
rxaio = p->rxaio;
rxaio = &p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(rxaio, 1, &iov);
Expand Down Expand Up @@ -641,8 +604,6 @@
{
nni_iov iov;

ep->refcnt++;

p->conn = conn;
p->ep = ep;
p->proto = ep->proto;
Expand All @@ -660,25 +621,18 @@
p->wanttxhead = 8;
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(p->negoaio, 1, &iov);
nni_aio_set_iov(&p->negoaio, 1, &iov);
nni_list_append(&ep->negopipes, p);

nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, p->negoaio);
nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->conn, &p->negoaio);
}

static void
tcptran_ep_fini(void *arg)
{
tcptran_ep *ep = arg;

nni_mtx_lock(&ep->mtx);
ep->fini = true;
if (ep->refcnt != 0) {
nni_mtx_unlock(&ep->mtx);
return;
}
nni_mtx_unlock(&ep->mtx);
nni_aio_stop(ep->timeaio);
nni_aio_stop(ep->connaio);
nng_stream_dialer_free(ep->dialer);
Expand Down
Loading