Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async arguments #738

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Next Next commit
Use a mutex to protect async arguments
  • Loading branch information
javalikescript committed Nov 17, 2024
commit c4cb0e5e6939b84bfe135bb37db7087d6e0be7e8
67 changes: 58 additions & 9 deletions src/async.c
Original file line number Diff line number Diff line change
@@ -16,6 +16,13 @@
*/
#include "private.h"

typedef struct {
luv_thread_arg_t targ;
uv_mutex_t mutex;
} luv_async_arg_t;

#define luv_get_async_arg_from_handle(H) ((luv_async_arg_t *) ((luv_handle_t*) (H)->data)->extra)

static uv_async_t* luv_check_async(lua_State* L, int index) {
uv_async_t* handle = (uv_async_t*)luv_checkudata(L, index, "uv_async");
luaL_argcheck(L, handle->type == UV_ASYNC && handle->data, index, "Expected uv_async_t");
@@ -25,9 +32,19 @@ static uv_async_t* luv_check_async(lua_State* L, int index) {
static void luv_async_cb(uv_async_t* handle) {
luv_handle_t* data = (luv_handle_t*)handle->data;
lua_State* L = data->ctx->L;
int n = luv_thread_arg_push(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN);
luv_call_callback(L, data, LUV_ASYNC, n);
luv_thread_arg_clear(L, (luv_thread_arg_t*)data->extra, LUVF_THREAD_SIDE_MAIN);
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
luv_thread_arg_t targcpy;
int n;
uv_mutex_lock(argmutex);
targcpy = asarg->targ; // work on a copy of the arguments
asarg->targ.argc = 0; // empty the original, nothing to clear
uv_mutex_unlock(argmutex);
n = luv_thread_arg_push(L, &targcpy, LUVF_THREAD_SIDE_MAIN);
if (n >= 0) {
luv_call_callback(L, data, LUV_ASYNC, n);
}
luv_thread_arg_clear(L, &targcpy, LUVF_THREAD_SIDE_MAIN); // clear the copy
}

static int luv_new_async(lua_State* L) {
@@ -43,21 +60,53 @@ static int luv_new_async(lua_State* L) {
return luv_error(L, ret);
}
data = luv_setup_handle(L, ctx);
data->extra = (luv_thread_arg_t*)malloc(sizeof(luv_thread_arg_t));
luv_async_arg_t* asarg = (luv_async_arg_t*)malloc(sizeof(luv_async_arg_t));
memset(asarg, 0, sizeof(luv_async_arg_t));
ret = uv_mutex_init(&asarg->mutex);
if (ret < 0) { // unlikely
abort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't be aborting the program, this should be reported as a fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 9fa42ef

}
data->extra = asarg;
data->extra_gc = free;
memset(data->extra, 0, sizeof(luv_thread_arg_t));
handle->data = data;
luv_check_callback(L, (luv_handle_t*)handle->data, LUV_ASYNC, 1);
return 1;
}


static int luv_handle_gc(lua_State* L);

static int luv_async_gc(lua_State* L) {
uv_async_t* handle = *(uv_async_t**)lua_touserdata(L, 1);
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
uv_mutex_lock(argmutex);
luv_thread_arg_clear(L, &asarg->targ, LUVF_THREAD_SIDE_CHILD); // in case of a pending send, set side to avoid unref
uv_mutex_unlock(argmutex);
uv_mutex_destroy(argmutex);
return luv_handle_gc(L);
}

static int luv_async_send(lua_State* L) {
int ret;
uv_async_t* handle = luv_check_async(L, 1);
luv_thread_arg_t* arg = (luv_thread_arg_t *)((luv_handle_t*) handle->data)->extra;

luv_thread_arg_set(L, arg, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
luv_async_arg_t* asarg = luv_get_async_arg_from_handle(handle);
uv_mutex_t *argmutex = &asarg->mutex;
int n;
uv_mutex_lock(argmutex);
luv_thread_arg_clear(L, &asarg->targ, LUVF_THREAD_SIDE_CHILD); // in case of a pending send
n = luv_thread_arg_set(L, &asarg->targ, 2, lua_gettop(L), LUVF_THREAD_MODE_ASYNC|LUVF_THREAD_SIDE_CHILD);
uv_mutex_unlock(argmutex);
if (n < 0) {
return luv_thread_arg_error(L);
}
ret = uv_async_send(handle);
luv_thread_arg_clear(L, arg, LUVF_THREAD_SIDE_CHILD);
return luv_result(L, ret);
}

static void luv_async_init(lua_State* L) {
luaL_getmetatable(L, "uv_async");
lua_pushcfunction(L, luv_async_gc);
lua_setfield(L, -2, "__gc");
lua_pop(L, 1);
}
1 change: 1 addition & 0 deletions src/luv.c
Original file line number Diff line number Diff line change
@@ -908,6 +908,7 @@ LUALIB_API int luaopen_luv (lua_State* L) {

luv_req_init(L);
luv_handle_init(L);
luv_async_init(L);
#if LUV_UV_VERSION_GEQ(1, 28, 0)
luv_dir_init(L);
#endif
112 changes: 68 additions & 44 deletions src/thread.c
Original file line number Diff line number Diff line change
@@ -66,7 +66,11 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
int i;
int side = LUVF_THREAD_SIDE(flags);
int async = LUVF_THREAD_ASYNC(flags);

/*
* thread works by reference.
* async works by copy, a use case consists in sending just before the thread ends,
* it results that the callback will be called after the thread/async life.
*/
idx = idx > 0 ? idx : 1;
i = idx;
args->flags = flags;
@@ -91,25 +95,40 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
}
break;
case LUA_TSTRING:
if (async)
{
const char* p = lua_tolstring(L, i, &arg->val.str.len);
arg->val.str.base = malloc(arg->val.str.len);
memcpy((void*)arg->val.str.base, p, arg->val.str.len);
if (async) {
size_t l = 0;
const char* p = lua_tolstring(L, i, &l);
void* b = malloc(l + 1);
arg->val.str.base = memcpy((void*)b, p, l + 1);
arg->val.str.len = l;
} else {
arg->val.str.base = lua_tolstring(L, i, &arg->val.str.len);
lua_pushvalue(L, i);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}
break;
case LUA_TUSERDATA:
arg->val.udata.data = lua_topointer(L, i);
arg->val.udata.size = lua_rawlen(L, i);
arg->val.udata.metaname = luv_getmtname(L, i);

if (arg->val.udata.size) {
lua_pushvalue(L, i);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
{
const void* p = lua_topointer(L, i);
size_t l = lua_rawlen(L, i);
const char* mtname = luv_getmtname(L, i);
if (async) {
if (l > 0) {
void* b = malloc(l);
p = (const void*)memcpy(b, p, l);
}
if (mtname != NULL) {
size_t ml = strlen(mtname) + 1;
char* b = malloc(ml);
mtname = (const void*)memcpy(b, mtname, ml);
}
} else {
lua_pushvalue(L, i);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}
arg->val.udata.data = p;
arg->val.udata.size = l;
arg->val.udata.metaname = mtname;
}
break;
default:
@@ -127,51 +146,53 @@ static int luv_thread_arg_set(lua_State* L, luv_thread_arg_t* args, int idx, int
static void luv_thread_arg_clear(lua_State* L, luv_thread_arg_t* args, int flags) {
int i;
int side = LUVF_THREAD_SIDE(flags);
int set = LUVF_THREAD_SIDE(args->flags);
int setside = LUVF_THREAD_SIDE(args->flags);
int async = LUVF_THREAD_ASYNC(args->flags);

if (args->argc == 0)
return;

/*
* clear is safe to be called multiple times, values are set to LUA_NOREF or NULL, argc is preserved.
* thread unrefs per side.
* async frees values on the first calling side.
*/
for (i = 0; i < args->argc; i++) {
luv_val_t* arg = args->argv + i;
switch (arg->type) {
case LUA_TSTRING:
if (arg->ref[side] != LUA_NOREF)
{
if (arg->ref[side] != LUA_NOREF) {
luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]);
arg->ref[side] = LUA_NOREF;
} else {
if(async && set!=side)
{
free((void*)arg->val.str.base);
arg->val.str.base = NULL;
arg->val.str.len = 0;
}
}
if (async) {
free((void*)arg->val.str.base);
arg->val.str.base = NULL;
}
break;
case LUA_TUSERDATA:
if (arg->ref[side]!=LUA_NOREF)
{
if (side != set)
{
if (arg->ref[side] != LUA_NOREF) {
if (side != setside) {
// avoid custom gc
lua_rawgeti(L, LUA_REGISTRYINDEX, arg->ref[side]);
lua_pushnil(L);
lua_setmetatable(L, -2);
lua_pop(L, -1);
lua_pop(L, 1);
}
luaL_unref(L, LUA_REGISTRYINDEX, arg->ref[side]);
arg->ref[side] = LUA_NOREF;
}
if (async) {
if (arg->val.udata.size > 0) {
free((void*)arg->val.udata.data);
arg->val.udata.data = NULL;
}
free((void*)arg->val.udata.metaname);
arg->val.udata.metaname = NULL;
}
break;
default:
break;
}
}
}

// called only in thread
static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags) {
int i = 0;
int side = LUVF_THREAD_SIDE(flags);
@@ -196,18 +217,16 @@ static int luv_thread_arg_push(lua_State* L, luv_thread_arg_t* args, int flags)
lua_pushlstring(L, arg->val.str.base, arg->val.str.len);
break;
case LUA_TUSERDATA:
if (arg->val.udata.size)
{
if (arg->val.udata.size > 0) {
char *p = lua_newuserdata(L, arg->val.udata.size);
memcpy(p, arg->val.udata.data, arg->val.udata.size);
if (arg->val.udata.metaname)
{
if (arg->val.udata.metaname != NULL) {
luaL_getmetatable(L, arg->val.udata.metaname);
lua_setmetatable(L, -2);
}
lua_pushvalue(L, -1);
arg->ref[side] = luaL_ref(L, LUA_REGISTRYINDEX);
}else{
} else {
lua_pushlightuserdata(L, (void*)arg->val.udata.data);
}
break;
@@ -306,9 +325,11 @@ static void luv_thread_cb(void* varg) {

static void luv_thread_notify_close_cb(uv_handle_t *handle) {
luv_thread_t *thread = handle->data;
if (thread->handle != 0)
uv_thread_join(&thread->handle);

uv_thread_t uvt = thread->handle;
if (uvt != 0) {
thread->handle = 0;
uv_thread_join(&uvt);
}
luaL_unref(thread->L, LUA_REGISTRYINDEX, thread->ref);
thread->ref = LUA_NOREF;
thread->L = NULL;
@@ -494,9 +515,12 @@ static int luv_thread_setpriority(lua_State* L) {

static int luv_thread_join(lua_State* L) {
luv_thread_t* tid = luv_check_thread(L, 1);
int ret = uv_thread_join(&tid->handle);
if (ret < 0) return luv_error(L, ret);
tid->handle = 0;
uv_thread_t uvt = tid->handle;
if (uvt != 0) {
tid->handle = 0;
int ret = uv_thread_join(&uvt);
if (ret < 0) return luv_error(L, ret);
}
lua_pushboolean(L, 1);
return 1;
}
Loading