Skip to content

Commit

Permalink
remove thread.channel
Browse files Browse the repository at this point in the history
  • Loading branch information
actboy168 committed May 7, 2024
1 parent 196db37 commit efdc469
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 343 deletions.
181 changes: 17 additions & 164 deletions binding/lua_thread.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
#include <bee/error.h>
#include <bee/lua/binding.h>
#include <bee/lua/module.h>
#include <bee/nonstd/format.h>
#include <bee/nonstd/print.h>
#include <bee/thread/atomic_semaphore.h>
#include <bee/thread/setname.h>
#include <bee/thread/simplethread.h>
#include <bee/thread/spinlock.h>

#include <atomic>
#include <functional>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
Expand All @@ -23,159 +18,35 @@ extern "C" {
namespace bee::lua_thread {
class channel {
public:
using value_type = void*;

void push(value_type data) {
do {
std::unique_lock<spinlock> lk(mutex);
queue.push(data);
} while (0);
sem.release();
}
bool pop(value_type& data) {
void push(lua_State* L, int from) noexcept {
void* data = seri_pack(L, from, NULL);
std::unique_lock<spinlock> lk(mutex);
if (queue.empty()) {
return false;
}
data = queue.front();
queue.pop();
return true;
queue.push(data);
}
void blocked_pop(value_type& data) {
for (;;) {
if (pop(data)) {
return;
}
sem.acquire();
}
}
template <class Rep, class Period>
bool timed_pop(value_type& data, const std::chrono::duration<Rep, Period>& timeout) {
auto now = std::chrono::steady_clock::now();
if (pop(data)) {
return true;
}
if (!sem.try_acquire_for(timeout)) {
return false;
}
auto time = now + std::chrono::duration_cast<std::chrono::steady_clock::duration>(timeout);
while (!pop(data)) {
if (!sem.try_acquire_until(time)) {
return false;
int pop(lua_State* L) noexcept {
void* data;
{
std::unique_lock<spinlock> lk(mutex);
if (queue.empty()) {
lua_pushboolean(L, 0);
return 1;
}
data = queue.front();
queue.pop();
}
return true;
}

private:
std::queue<value_type> queue;
spinlock mutex;
atomic_semaphore sem = atomic_semaphore(0);
};

using boxchannel = std::shared_ptr<channel>;

class channelmgr {
public:
bool create(zstring_view name) {
std::unique_lock<spinlock> lk(mutex);
std::string namestr { name.data(), name.size() };
auto it = channels.find(namestr);
if (it != channels.end()) {
return false;
}
channels.emplace(std::make_pair(namestr, new channel));
return true;
}
void clear() {
std::unique_lock<spinlock> lk(mutex);
channels.clear();
}
boxchannel query(zstring_view name) {
std::unique_lock<spinlock> lk(mutex);
std::string namestr { name.data(), name.size() };
auto it = channels.find(namestr);
if (it != channels.end()) {
return it->second;
}
return nullptr;
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
}

private:
std::map<std::string, boxchannel> channels;
std::queue<void*> queue;
spinlock mutex;
};

static channelmgr g_channel;
static channel g_errlog;
static std::atomic<int> g_thread_id = -1;
static int THREADID;

static int lchannel_push(lua_State* L) {
auto& bc = lua::checkudata<boxchannel>(L, 1);
void* buffer = seri_pack(L, 1, NULL);
bc->push(buffer);
return 0;
}

static int lchannel_bpop(lua_State* L) {
auto& bc = lua::checkudata<boxchannel>(L, 1);
void* data;
bc->blocked_pop(data);
return seri_unpackptr(L, data);
}

static int lchannel_pop(lua_State* L) {
auto& bc = lua::checkudata<boxchannel>(L, 1);
if (lua_isnoneornil(L, 2)) {
void* data;
if (!bc->pop(data)) {
lua_pushboolean(L, 0);
return 1;
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
}
void* data;
int msec = lua::checkinteger<int>(L, 2);
if (!bc->timed_pop(data, std::chrono::milliseconds(msec))) {
lua_pushboolean(L, 0);
return 1;
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
}

static int lnewchannel(lua_State* L) {
auto name = lua::checkstrview(L, 1);
if (!g_channel.create(name)) {
return luaL_error(L, "Duplicate channel '%s'", name.data());
}
return 0;
}

static void channel_metatable(lua_State* L) {
luaL_Reg lib[] = {
{ "push", lchannel_push },
{ "pop", lchannel_pop },
{ "bpop", lchannel_bpop },
{ NULL, NULL },
};
luaL_newlibtable(L, lib);
luaL_setfuncs(L, lib, 0);
lua_setfield(L, -2, "__index");
}

static int lchannel(lua_State* L) {
auto name = lua::checkstrview(L, 1);
boxchannel c = g_channel.query(name);
if (!c) {
return luaL_error(L, "Can't query channel '%s'", name.data());
}
lua::newudata<boxchannel>(L, c);
return 1;
}

static int lsleep(lua_State* L) {
int msec = lua::checkinteger<int>(L, 1);
thread_sleep(msec);
Expand Down Expand Up @@ -237,8 +108,7 @@ namespace bee::lua_thread {
lua_pushcfunction(L, thread_luamain);
lua_pushlightuserdata(L, ud);
if (lua_pcall(L, 1, 0, 1) != LUA_OK) {
void* errmsg = seri_pack(L, lua_gettop(L) - 1, NULL);
g_errlog.push(errmsg);
g_errlog.push(L, lua_gettop(L) - 1);
}
lua_close(L);
}
Expand All @@ -260,13 +130,7 @@ namespace bee::lua_thread {
}

static int lerrlog(lua_State* L) {
void* data;
if (!g_errlog.pop(data)) {
lua_pushboolean(L, 0);
return 1;
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
return g_errlog.pop(L);
}

static int lreset(lua_State* L) {
Expand All @@ -276,7 +140,6 @@ namespace bee::lua_thread {
if (threadid != 0) {
return luaL_error(L, "reset must call from main thread");
}
g_channel.clear();
g_thread_id = 0;
return 0;
}
Expand Down Expand Up @@ -308,8 +171,6 @@ namespace bee::lua_thread {
{ "sleep", lsleep },
{ "thread", lthread },
{ "errlog", lerrlog },
{ "newchannel", lnewchannel },
{ "channel", lchannel },
{ "reset", lreset },
{ "wait", lwait },
{ "setname", lsetname },
Expand All @@ -326,11 +187,3 @@ namespace bee::lua_thread {
}

DEFINE_LUAOPEN(thread)

namespace bee::lua {
template <>
struct udata<lua_thread::boxchannel> {
static inline auto name = "bee::legacy_channel";
static inline auto metatable = bee::lua_thread::channel_metatable;
};
}
Loading

0 comments on commit efdc469

Please sign in to comment.