Skip to content

Commit

Permalink
add bee.channel
Browse files Browse the repository at this point in the history
  • Loading branch information
actboy168 committed May 6, 2024
1 parent 9eeb418 commit 923779e
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 11 deletions.
6 changes: 3 additions & 3 deletions bee/net/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace bee::net {
bool event::open() noexcept {
if (pipe[0] != retired_fd)
return false;
return socket::pair(pipe, socket::fd_flags::none);
return socket::pair(pipe, socket::fd_flags::nonblock);
}

void event::set() noexcept {
Expand All @@ -30,7 +30,7 @@ namespace bee::net {
socket::send(pipe[1], rc, tmp, sizeof(tmp));
}

void event::wait() noexcept {
void event::clear() noexcept {
char tmp[128];
int rc = 0;
for (;;) {
Expand All @@ -48,7 +48,7 @@ namespace bee::net {
e.clear(std::memory_order_seq_cst);
}

fd_t event::fd() noexcept {
fd_t event::fd() const noexcept {
return pipe[0];
}
}
4 changes: 2 additions & 2 deletions bee/net/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace bee::net {
~event() noexcept;
bool open() noexcept;
void set() noexcept;
void wait() noexcept;
fd_t fd() noexcept;
void clear() noexcept;
fd_t fd() const noexcept;
};
}
171 changes: 171 additions & 0 deletions binding/lua_channel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#include <bee/error.h>
#include <bee/lua/binding.h>
#include <bee/lua/module.h>
#include <bee/net/event.h>
#include <bee/thread/spinlock.h>

#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <string>

extern "C" {
#include <3rd/lua-seri/lua-seri.h>
}

namespace bee::lua_channel {
class channel {
public:
using box = std::shared_ptr<channel>;
using value_type = void*;

bool init() noexcept {
if (!ev.open()) {
return false;
}
return true;
}
net::fd_t fd() const noexcept {
return ev.fd();
}
void push(value_type data) noexcept {
std::unique_lock<spinlock> lk(mutex);
queue.push(data);
ev.set();
}
bool pop(value_type& data) noexcept {
std::unique_lock<spinlock> lk(mutex);
if (queue.empty()) {
return false;
}
data = queue.front();
queue.pop();
return true;
}

private:
std::queue<value_type> queue;
spinlock mutex;
net::event ev;
};

class channelmgr {
public:
channel::box create(zstring_view name) noexcept {
std::unique_lock<spinlock> lk(mutex);
channel* c = new channel;
if (!c->init()) {
return nullptr;
}
std::string namestr { name.data(), name.size() };
auto [r, ok] = channels.emplace(namestr, channel::box { c });
if (!ok) {
return nullptr;
}
return r->second;
}
void clear() noexcept {
std::unique_lock<spinlock> lk(mutex);
channels.clear();
}
channel::box query(zstring_view name) noexcept {
std::unique_lock<spinlock> lk(mutex);
std::string namestr { name.data(), name.size() };
auto it = channels.find(namestr);
if (it != channels.end()) {
return it->second;
}
return nullptr;
}

private:
std::map<std::string, channel::box> channels;
spinlock mutex;
};

static channelmgr g_channel;

static int lchannel_push(lua_State* L) {
auto& bc = lua::checkudata<channel::box>(L, 1);
void* buffer = seri_pack(L, 1, NULL);
bc->push(buffer);
return 0;
}

static int lchannel_pop(lua_State* L) {
auto& bc = lua::checkudata<channel::box>(L, 1);
void* data;
if (!bc->pop(data)) {
lua_pushboolean(L, 0);
return 1;
}
lua_pushboolean(L, 1);
return 1 + seri_unpackptr(L, data);
}

static int lchannel_fd(lua_State* L) {
auto& bc = lua::checkudata<channel::box>(L, 1);
lua_pushlightuserdata(L, (void*)(intptr_t)bc->fd());
return 1;
}

static void metatable(lua_State* L) {
luaL_Reg lib[] = {
{ "push", lchannel_push },
{ "pop", lchannel_pop },
{ "fd", lchannel_fd },
{ NULL, NULL },
};
luaL_newlibtable(L, lib);
luaL_setfuncs(L, lib, 0);
lua_setfield(L, -2, "__index");
}

static int lcreate(lua_State* L) {
auto name = lua::checkstrview(L, 1);
channel::box c = g_channel.create(name);
if (!c) {
return luaL_error(L, "Duplicate channel '%s'", name.data());
}
lua::newudata<channel::box>(L, c);
return 1;
}

static int lquery(lua_State* L) {
auto name = lua::checkstrview(L, 1);
channel::box c = g_channel.query(name);
if (!c) {
return luaL_error(L, "Can't query channel '%s'", name.data());
}
lua::newudata<channel::box>(L, c);
return 1;
}

static int lreset(lua_State* L) {
g_channel.clear();
return 0;
}

static int luaopen(lua_State* L) {
luaL_Reg lib[] = {
{ "create", lcreate },
{ "query", lquery },
{ "reset", lreset },
{ NULL, NULL },
};
luaL_newlibtable(L, lib);
luaL_setfuncs(L, lib, 0);
return 1;
}
}

DEFINE_LUAOPEN(channel)

namespace bee::lua {
template <>
struct udata<lua_channel::channel::box> {
static inline auto name = "bee::channel";
static inline auto metatable = bee::lua_channel::metatable;
};
}
11 changes: 9 additions & 2 deletions binding/lua_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ namespace bee::lua_epoll {
};

static net::fd_t ep_tofd(lua_State *L, int idx) {
luaL_checktype(L, idx, LUA_TUSERDATA);
return lua::toudata<net::fd_t>(L, idx);
switch (lua_type(L, idx)) {
case LUA_TLIGHTUSERDATA:
return lua::tolightud<net::fd_t>(L, idx);
case LUA_TUSERDATA:
return lua::toudata<net::fd_t>(L, idx);
default:
luaL_checktype(L, idx, LUA_TUSERDATA);
std::unreachable();
}
}

static int ep_events(lua_State *L) {
Expand Down
17 changes: 14 additions & 3 deletions binding/lua_select.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,20 @@ namespace bee::lua_select {
ctx.writeset.clear();
return 0;
}
static net::fd_t tofd(lua_State* L, int idx) {
switch (lua_type(L, 1)) {
case LUA_TLIGHTUSERDATA:
return lua::tolightud<net::fd_t>(L, idx);
case LUA_TUSERDATA:
return lua::toudata<net::fd_t>(L, idx);
default:
luaL_checktype(L, idx, LUA_TUSERDATA);
std::unreachable();
}
}
static int event_add(lua_State* L) {
auto& ctx = lua::checkudata<select_ctx>(L, 1);
auto fd = lua::checkudata<net::fd_t>(L, 2);
auto fd = tofd(L, 2);
auto events = luaL_checkinteger(L, 3);
storeref(L, fd);
if (events & SELECT_READ) {
Expand All @@ -238,7 +249,7 @@ namespace bee::lua_select {
}
static int event_mod(lua_State* L) {
auto& ctx = lua::checkudata<select_ctx>(L, 1);
auto fd = lua::checkudata<net::fd_t>(L, 2);
auto fd = tofd(L, 2);
auto events = luaL_checkinteger(L, 3);
if (events & SELECT_READ) {
ctx.readset.insert(fd);
Expand All @@ -255,7 +266,7 @@ namespace bee::lua_select {
}
static int event_del(lua_State* L) {
auto& ctx = lua::checkudata<select_ctx>(L, 1);
auto fd = lua::checkudata<net::fd_t>(L, 2);
auto fd = tofd(L, 2);
cleanref(L, fd);
ctx.readset.erase(fd);
ctx.writeset.erase(fd);
Expand Down
2 changes: 1 addition & 1 deletion binding/lua_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ DEFINE_LUAOPEN(thread)
namespace bee::lua {
template <>
struct udata<lua_thread::boxchannel> {
static inline auto name = "bee::channel";
static inline auto name = "bee::legacy_channel";
static inline auto metatable = bee::lua_thread::channel_metatable;
};
}
1 change: 1 addition & 0 deletions test/test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require "test_socket"
require "test_epoll"
require "test_filewatch"
require "test_time"
require "test_channel"

do
local fs = require "bee.filesystem"
Expand Down
Loading

0 comments on commit 923779e

Please sign in to comment.