From efdc4695bf6943fc1bb92d26a47dd0a37df2d2df Mon Sep 17 00:00:00 2001 From: actboy168 Date: Tue, 7 May 2024 12:41:14 +0800 Subject: [PATCH] remove thread.channel --- binding/lua_thread.cpp | 181 ++++------------------------------------- test/test_thread.lua | 180 +--------------------------------------- 2 files changed, 18 insertions(+), 343 deletions(-) diff --git a/binding/lua_thread.cpp b/binding/lua_thread.cpp index 036c9a61..6bc9e243 100644 --- a/binding/lua_thread.cpp +++ b/binding/lua_thread.cpp @@ -1,16 +1,11 @@ #include #include #include -#include -#include -#include #include #include #include #include -#include -#include #include #include #include @@ -23,159 +18,35 @@ extern "C" { namespace bee::lua_thread { class channel { public: - using value_type = void*; - - void push(value_type data) { - do { - std::unique_lock lk(mutex); - queue.push(data); - } while (0); - sem.release(); - } - bool pop(value_type& data) { + void push(lua_State* L, int from) noexcept { + void* data = seri_pack(L, from, NULL); std::unique_lock lk(mutex); - if (queue.empty()) { - return false; - } - data = queue.front(); - queue.pop(); - return true; + queue.push(data); } - void blocked_pop(value_type& data) { - for (;;) { - if (pop(data)) { - return; - } - sem.acquire(); - } - } - template - bool timed_pop(value_type& data, const std::chrono::duration& timeout) { - auto now = std::chrono::steady_clock::now(); - if (pop(data)) { - return true; - } - if (!sem.try_acquire_for(timeout)) { - return false; - } - auto time = now + std::chrono::duration_cast(timeout); - while (!pop(data)) { - if (!sem.try_acquire_until(time)) { - return false; + int pop(lua_State* L) noexcept { + void* data; + { + std::unique_lock lk(mutex); + if (queue.empty()) { + lua_pushboolean(L, 0); + return 1; } + data = queue.front(); + queue.pop(); } - return true; - } - - private: - std::queue queue; - spinlock mutex; - atomic_semaphore sem = atomic_semaphore(0); - }; - - using boxchannel = std::shared_ptr; - - class channelmgr { - public: - bool create(zstring_view name) { - std::unique_lock lk(mutex); - std::string namestr { name.data(), name.size() }; - auto it = channels.find(namestr); - if (it != channels.end()) { - return false; - } - channels.emplace(std::make_pair(namestr, new channel)); - return true; - } - void clear() { - std::unique_lock lk(mutex); - channels.clear(); - } - boxchannel query(zstring_view name) { - std::unique_lock lk(mutex); - std::string namestr { name.data(), name.size() }; - auto it = channels.find(namestr); - if (it != channels.end()) { - return it->second; - } - return nullptr; + lua_pushboolean(L, 1); + return 1 + seri_unpackptr(L, data); } private: - std::map channels; + std::queue queue; spinlock mutex; }; - static channelmgr g_channel; static channel g_errlog; static std::atomic g_thread_id = -1; static int THREADID; - static int lchannel_push(lua_State* L) { - auto& bc = lua::checkudata(L, 1); - void* buffer = seri_pack(L, 1, NULL); - bc->push(buffer); - return 0; - } - - static int lchannel_bpop(lua_State* L) { - auto& bc = lua::checkudata(L, 1); - void* data; - bc->blocked_pop(data); - return seri_unpackptr(L, data); - } - - static int lchannel_pop(lua_State* L) { - auto& bc = lua::checkudata(L, 1); - if (lua_isnoneornil(L, 2)) { - void* data; - if (!bc->pop(data)) { - lua_pushboolean(L, 0); - return 1; - } - lua_pushboolean(L, 1); - return 1 + seri_unpackptr(L, data); - } - void* data; - int msec = lua::checkinteger(L, 2); - if (!bc->timed_pop(data, std::chrono::milliseconds(msec))) { - lua_pushboolean(L, 0); - return 1; - } - lua_pushboolean(L, 1); - return 1 + seri_unpackptr(L, data); - } - - static int lnewchannel(lua_State* L) { - auto name = lua::checkstrview(L, 1); - if (!g_channel.create(name)) { - return luaL_error(L, "Duplicate channel '%s'", name.data()); - } - return 0; - } - - static void channel_metatable(lua_State* L) { - luaL_Reg lib[] = { - { "push", lchannel_push }, - { "pop", lchannel_pop }, - { "bpop", lchannel_bpop }, - { NULL, NULL }, - }; - luaL_newlibtable(L, lib); - luaL_setfuncs(L, lib, 0); - lua_setfield(L, -2, "__index"); - } - - static int lchannel(lua_State* L) { - auto name = lua::checkstrview(L, 1); - boxchannel c = g_channel.query(name); - if (!c) { - return luaL_error(L, "Can't query channel '%s'", name.data()); - } - lua::newudata(L, c); - return 1; - } - static int lsleep(lua_State* L) { int msec = lua::checkinteger(L, 1); thread_sleep(msec); @@ -237,8 +108,7 @@ namespace bee::lua_thread { lua_pushcfunction(L, thread_luamain); lua_pushlightuserdata(L, ud); if (lua_pcall(L, 1, 0, 1) != LUA_OK) { - void* errmsg = seri_pack(L, lua_gettop(L) - 1, NULL); - g_errlog.push(errmsg); + g_errlog.push(L, lua_gettop(L) - 1); } lua_close(L); } @@ -260,13 +130,7 @@ namespace bee::lua_thread { } static int lerrlog(lua_State* L) { - void* data; - if (!g_errlog.pop(data)) { - lua_pushboolean(L, 0); - return 1; - } - lua_pushboolean(L, 1); - return 1 + seri_unpackptr(L, data); + return g_errlog.pop(L); } static int lreset(lua_State* L) { @@ -276,7 +140,6 @@ namespace bee::lua_thread { if (threadid != 0) { return luaL_error(L, "reset must call from main thread"); } - g_channel.clear(); g_thread_id = 0; return 0; } @@ -308,8 +171,6 @@ namespace bee::lua_thread { { "sleep", lsleep }, { "thread", lthread }, { "errlog", lerrlog }, - { "newchannel", lnewchannel }, - { "channel", lchannel }, { "reset", lreset }, { "wait", lwait }, { "setname", lsetname }, @@ -326,11 +187,3 @@ namespace bee::lua_thread { } DEFINE_LUAOPEN(thread) - -namespace bee::lua { - template <> - struct udata { - static inline auto name = "bee::legacy_channel"; - static inline auto metatable = bee::lua_thread::channel_metatable; - }; -} diff --git a/test/test_thread.lua b/test/test_thread.lua index ba197737..310f68a3 100644 --- a/test/test_thread.lua +++ b/test/test_thread.lua @@ -79,22 +79,6 @@ function test_thread:test_thread_initargs() assertNotThreadError() end -function test_thread:test_channel_1() - thread.reset() - lt.assertErrorMsgEquals("Can't query channel 'test'", thread.channel, "test") - thread.newchannel "test" - lt.assertIsUserdata(thread.channel "test") - lt.assertIsUserdata(thread.channel "test") - thread.reset() -end - -function test_thread:test_channel_2() - thread.reset() - thread.newchannel "test" - lt.assertErrorMsgEquals("Duplicate channel 'test'", thread.newchannel, "test") - thread.reset() -end - function test_thread:test_id_1() assertNotThreadError() lt.assertEquals(thread.id, 0) @@ -119,19 +103,7 @@ function test_thread:test_id_2() assertNotThreadError() end -function test_thread:test_reset_1() - thread.reset() - lt.assertErrorMsgEquals("Can't query channel 'test'", thread.channel, "test") - thread.newchannel "test" - lt.assertIsUserdata(thread.channel "test") - thread.reset() - lt.assertErrorMsgEquals("Can't query channel 'test'", thread.channel, "test") - thread.newchannel "test" - lt.assertIsUserdata(thread.channel "test") - thread.reset() -end - -function test_thread:test_reset_2() +function test_thread:test_reset() assertNotThreadError() local thd = createThread [[ local thread = require "bee.thread" @@ -142,156 +114,6 @@ function test_thread:test_reset_2() assertNotThreadError() end -local function TestSuit(f) - f(1) - f(0.0001) - f("TEST") - f(true) - f(false) - f({}) - f({ 1, 2 }) - f(1, { 1, 2 }) - f(1, 2, { A = { B = { C = "D" } } }) - f(1, nil, 2) -end - -function test_thread:test_pop_1() - thread.reset() - thread.newchannel "test" - local channel = thread.channel "test" - local function pack_pop(ok, ...) - lt.assertEquals(ok, true) - return table.pack(...) - end - local function test_ok(...) - channel:push(...) - lt.assertEquals(pack_pop(channel:pop()), table.pack(...)) - channel:push(...) - lt.assertEquals(table.pack(channel:bpop()), table.pack(...)) - end - TestSuit(test_ok) - -- 基本和serialization的测试重复,所以failed就不测了 -end - -function test_thread:test_pop_2() - thread.reset() - thread.newchannel "test" - local channel = thread.channel "test" - - local function assertIs(expected) - local ok, v = channel:pop() - lt.assertEquals(ok, true) - lt.assertEquals(v, expected) - end - local function assertEmpty() - local ok, v = channel:pop() - lt.assertEquals(ok, false) - lt.assertEquals(v, nil) - end - - assertEmpty() - - channel:push(1024) - assertIs(1024) - assertEmpty() - - channel:push(1024) - channel:push(1025) - channel:push(1026) - assertIs(1024) - assertIs(1025) - assertIs(1026) - assertEmpty() - - channel:push(1024) - channel:push(1025) - assertIs(1024) - channel:push(1026) - assertIs(1025) - assertIs(1026) - assertEmpty() - - thread.reset() -end - -function test_thread:test_thread_bpop() - assertNotThreadError() - thread.reset() - thread.newchannel "testReq" - thread.newchannel "testRes" - local thd = createThread [[ - local thread = require "bee.thread" - local req = thread.channel "testReq" - local res = thread.channel "testRes" - local function dispatch(what, ...) - if what == "exit" then - return true - end - res:push(what, ...) - end - while not dispatch(req:bpop()) do - end - ]] - local req = thread.channel "testReq" - local res = thread.channel "testRes" - local function test_ok(...) - req:push(...) - lt.assertEquals(table.pack(res:bpop()), table.pack(...)) - end - TestSuit(test_ok) - req:push "exit" - thread.wait(thd) - assertNotThreadError() -end - -function test_thread:test_thread_pop() - assertNotThreadError() - thread.reset() - thread.newchannel "testReq" - thread.newchannel "testRes" - local thd = createThread [[ - local thread = require "bee.thread" - local req = thread.channel "testReq" - local res = thread.channel "testRes" - local function dispatch(ok, what, ...) - if not ok then - thread.sleep(0) - return - end - if what == "exit" then - return true - end - res:push(what, ...) - end - while not dispatch(req:pop()) do - end - ]] - local req = thread.channel "testReq" - local res = thread.channel "testRes" - local function pack_pop(ok, ...) - if not ok then - return - end - return table.pack(...) - end - local function test_ok(...) - req:push(...) - local t - while true do - t = pack_pop(res:pop()) - if t then - break - end - thread.sleep(0) - end - lt.assertEquals(t, table.pack(...)) - end - TestSuit(test_ok) - req:push "exit" - thread.wait(thd) - assertNotThreadError() -end - function test_thread:test_sleep() local t1 = time.monotonic() thread.sleep(1)