Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ep refcnt #1976

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 40 additions & 91 deletions src/core/dialer.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
static void dialer_connect_start(nni_dialer *);
static void dialer_connect_cb(void *);
static void dialer_timer_cb(void *);
static void dialer_destroy(void *);

static nni_id_map dialers = NNI_ID_MAP_INITIALIZER(1, 0x7fffffff, 0);
static nni_mtx dialers_lk = NNI_MTX_INITIALIZER;
Expand All @@ -30,11 +31,10 @@ nni_dialer_id(nni_dialer *d)
return (d->d_id);
}

void
nni_dialer_destroy(nni_dialer *d)
static void
dialer_destroy(void *arg)
{
nni_aio_stop(&d->d_con_aio);
nni_aio_stop(&d->d_tmo_aio);
nni_dialer *d = arg;

nni_aio_fini(&d->d_con_aio);
nni_aio_fini(&d->d_tmo_aio);
Expand Down Expand Up @@ -209,11 +209,9 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
{
int rv;

d->d_closed = false;
d->d_data = NULL;
d->d_ref = 1;
d->d_sock = s;
d->d_tran = tran;
d->d_data = NULL;
d->d_sock = s;
d->d_tran = tran;
nni_atomic_flag_reset(&d->d_started);

// Make a copy of the endpoint operations. This allows us to
Expand All @@ -229,23 +227,23 @@ nni_dialer_init(nni_dialer *d, nni_sock *s, nni_sp_tran *tran)
nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);

nni_refcnt_init(&d->d_refcnt, 2, d, dialer_destroy);

nni_mtx_lock(&dialers_lk);
rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);

nni_sock_hold(s);

#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
#endif

if ((rv != 0) ||
((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
nni_mtx_lock(&dialers_lk);
nni_id_remove(&dialers, d->d_id);
nni_mtx_unlock(&dialers_lk);
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&d->st_root);
#endif
nni_dialer_close(d);
nni_dialer_rele(d);
return (rv);
}

Expand All @@ -271,7 +269,6 @@ nni_dialer_create_url(nni_dialer **dp, nni_sock *s, const nng_url *url)
return (rv);
}
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
nni_dialer_destroy(d);
return (rv);
}
*dp = d;
Expand Down Expand Up @@ -301,44 +298,7 @@ nni_dialer_create(nni_dialer **dp, nni_sock *s, const char *url_str)
NNI_FREE_STRUCT(d);
return (rv);
}
d->d_closed = false;
d->d_data = NULL;
d->d_ref = 1;
d->d_sock = s;
d->d_tran = tran;
nni_atomic_flag_reset(&d->d_started);

// Make a copy of the endpoint operations. This allows us to
// modify them (to override NULLs for example), and avoids an extra
// dereference on hot paths.
d->d_ops = *tran->tran_dialer;

NNI_LIST_NODE_INIT(&d->d_node);
NNI_LIST_INIT(&d->d_pipes, nni_pipe, p_ep_node);

nni_mtx_init(&d->d_mtx);

nni_aio_init(&d->d_con_aio, dialer_connect_cb, d);
nni_aio_init(&d->d_tmo_aio, dialer_timer_cb, d);

nni_mtx_lock(&dialers_lk);
rv = nni_id_alloc32(&dialers, &d->d_id, d);
nni_mtx_unlock(&dialers_lk);

#ifdef NNG_ENABLE_STATS
dialer_stats_init(d);
#endif

if ((rv != 0) ||
((rv = d->d_ops.d_init(&d->d_data, &d->d_url, d)) != 0) ||
((rv = nni_sock_add_dialer(s, d)) != 0)) {
nni_mtx_lock(&dialers_lk);
nni_id_remove(&dialers, d->d_id);
nni_mtx_unlock(&dialers_lk);
#ifdef NNG_ENABLE_STATS
nni_stat_unregister(&d->st_root);
#endif
nni_dialer_destroy(d);
if ((rv = nni_dialer_init(d, s, tran)) != 0) {
return (rv);
}

Expand All @@ -353,61 +313,58 @@ nni_dialer_find(nni_dialer **dp, uint32_t id)

nni_mtx_lock(&dialers_lk);
if ((d = nni_id_get(&dialers, id)) != NULL) {
d->d_ref++;
nni_dialer_hold(d);
*dp = d;
}
nni_mtx_unlock(&dialers_lk);
return (d == NULL ? NNG_ENOENT : 0);
}

int
void
nni_dialer_hold(nni_dialer *d)
{
int rv;
nni_mtx_lock(&dialers_lk);
if (d->d_closed) {
rv = NNG_ECLOSED;
} else {
d->d_ref++;
rv = 0;
}
nni_mtx_unlock(&dialers_lk);
return (rv);
nni_refcnt_hold(&d->d_refcnt);
}

void
nni_dialer_rele(nni_dialer *d)
{
bool reap;
nni_refcnt_rele(&d->d_refcnt);
}

nni_mtx_lock(&dialers_lk);
NNI_ASSERT(d->d_ref > 0);
d->d_ref--;
reap = ((d->d_ref == 0) && (d->d_closed));
nni_mtx_unlock(&dialers_lk);
static void
dialer_reap(void *arg)
{
nni_dialer *d = arg;

if (reap) {
nni_dialer_reap(d);
nni_aio_stop(&d->d_tmo_aio);
nni_aio_stop(&d->d_con_aio);
if (d->d_data != NULL) {
d->d_ops.d_close(d->d_data);
}

nni_dialer_shutdown(d);
nni_dialer_rele(d);
}

static nni_reap_list dialer_reap_list = {
.rl_offset = offsetof(nni_dialer, d_reap),
.rl_func = dialer_reap,
};

void
nni_dialer_close(nni_dialer *d)
{
nni_mtx_lock(&dialers_lk);
if (d->d_closed) {
nni_mtx_unlock(&dialers_lk);
nni_dialer_rele(d);
if (nni_atomic_flag_test_and_set(&d->d_closing)) {
return;
}
d->d_closed = true;
nni_mtx_lock(&dialers_lk);
nni_id_remove(&dialers, d->d_id);
nni_mtx_unlock(&dialers_lk);
nni_aio_close(&d->d_tmo_aio);
nni_aio_close(&d->d_con_aio);

nni_dialer_shutdown(d);

nni_sock_remove_dialer(d);
nni_dialer_rele(d);
nni_reap(&dialer_reap_list, d);
}

static void
Expand Down Expand Up @@ -507,14 +464,6 @@ nni_dialer_start(nni_dialer *d, unsigned flags)
return (rv);
}

void
nni_dialer_stop(nni_dialer *d)
{
nni_aio_stop(&d->d_tmo_aio);
nni_aio_stop(&d->d_con_aio);
d->d_ops.d_close(d->d_data);
}

nni_sock *
nni_dialer_sock(nni_dialer *d)
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/dialer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#define CORE_DIALER_H

extern int nni_dialer_find(nni_dialer **, uint32_t);
extern int nni_dialer_hold(nni_dialer *);
extern void nni_dialer_hold(nni_dialer *);
extern void nni_dialer_rele(nni_dialer *);
extern uint32_t nni_dialer_id(nni_dialer *);
extern int nni_dialer_create(nni_dialer **, nni_sock *, const char *);
Expand Down
Loading
Loading