Skip to content

Commit

Permalink
Utilizing unix socket selector implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ktakashi committed Dec 5, 2023
1 parent 5807687 commit 9e808d2
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 172 deletions.
2 changes: 2 additions & 0 deletions ext/socket/sagittarius-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,7 @@ SgObject Sg_MakeConditionSocketPort(SgObject socket, SgObject port)
}

extern void Sg__Init_socket_stub(SgLibrary *lib);
extern void Sg__Init_selector(SgLibrary *lib);

#if defined(_WIN32)
static void finish_winsock(void *data)
Expand All @@ -1646,6 +1647,7 @@ SG_EXTENSION_ENTRY void CDECL Sg_Init_sagittarius__socket()
lib = SG_LIBRARY(Sg_FindLibrary(SG_INTERN("(sagittarius socket)"),
FALSE));
Sg__Init_socket_stub(lib);
Sg__Init_selector(lib);

/* for multithreading issue, we do not add this cond-feature.
we always support this anyway */
Expand Down
1 change: 1 addition & 0 deletions ext/socket/sagittarius/socket.scm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
;;;

#!nounbound
(library (sagittarius socket)
(export make-client-socket
make-server-socket
Expand Down
105 changes: 24 additions & 81 deletions ext/socket/selector-epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,109 +33,52 @@
#include <string.h>
#include <sys/epoll.h>

#include "unix-socket.incl"
#include "unix-socket-selector.incl"

typedef struct epoll_context_rec
static int make_selector()
{
int epfd;
int stop_fd;
struct sockaddr_un *addr;
SgObject events;
} epoll_context_t;

static void selector_finalizer(SgObject self, void *data)
{
Sg_CloseSocketSelector(SG_SOCKET_SELECTOR(self));
}

SgObject Sg_MakeSocketSelector()
{
SgSocketSelector *selector = SG_NEW(SgSocketSelector);
epoll_context_t *ctx = SG_NEW(epoll_context_t);
struct kevent *ke = SG_NEW_ATOMIC(struct kevent);

SG_SET_CLASS(selector, SG_CLASS_SOCKET_SELECTOR);
if ((ctx->epfd = epoll_create1(0)) < 0) goto err;
ctx->events = SG_NIL;

ctx->stop_fd = unix_socket();
if (ctx->stop_fd == -1) goto err;

ctx->addr = bind_unix_socket(ctx->stop_fd);
if (ctx->addr == NULL) {
close(ctx->stop_fd);
goto err;
}
selector->context = ctx;
ctx->events = Sg_Cons(selector, ctx->events);

Sg_RegisterFinalizer(selector, selector_finalizer, NULL);
return SG_OBJ(selector);

err: {
int e = errno;
char *msg = strerror(e);
close(ctx->epfd);
Sg_SystemError(e, UC("Setting up epoll failed: %A"),
Sg_Utf8sToUtf32s(msg, strlen(msg)));
}
return epoll_create1(0);
}

void Sg_CloseSocketSelector(SgSocketSelector *selector)
{
epoll_context_t *ctx = (epoll_context_t *)selector->context;
close(ctx->epfd);
close(ctx->stop_fd);
Sg_UnregisterFinalizer(selector);
}

SgObject Sg_SocketSelectorAdd(SgSocketSelector *selector, SgSocket *socket)
static void add_socket(unix_context_t *ctx, SgSocket *socket)
{
epoll_context_t *ctx = (epoll_context_t *)selector->context;
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = socket;
epoll_ctl(ctx->epfd, EPOLL_CTL_ADD, socket.socket, &event);
ctx->events = Sg_Cons(socket, ctx->events);
return SG_OBJ(selector);
epoll_ctl(ctx->fd, EPOLL_CTL_ADD, socket->socket, &ev);
}

SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
static SgObject wait_selector(unix_context_t *ctx, int nsock,
SgObject sockets, SgObject timeout)
{
epoll_context_t *ctx = (epoll_context_t *)selector->context;
int n = Sg_Length(ctx->events), i, c;
int n = nsock + 1, i, c;
long millis = -1;
SgObject cp, r = SG_NIL;
SgObject r = SG_NIL;
struct timespec spec, *sp;
struct epoll_event *evm;
struct epoll_event *evm, ev;

sp = Sg_GetTimeSpec(timeout, &spec);
if (sp) {
millis = sp->tv_sec * 1000;
millis += sp->tv_usec / 1000;
millis += sp->tv_nsec / 1000000;
}

evm = SG_NEW_ATOMIC2(struct kevent *, n * sizeof(struct kevent));
c = epoll_wait(ctx->epfd, evm, n, millis);
if (c < 0) {
int e = errno;
char *msg = strerror(e);
Sg_SystemError(e, UC("kevent failed: %A"),
Sg_Utf8sToUtf32s(msg, strlen(msg)));
return SG_UNDEF; /* dummy */
}
ev.events = EPOLLIN;
ev.data.ptr = SG_FALSE;
epoll_ctl(ctx->fd, EPOLL_CTL_ADD, ctx->stop_fd, &ev);

evm = SG_NEW_ATOMIC2(struct epoll_event *, n * sizeof(struct epoll_event));
c = epoll_wait(ctx->fd, evm, n, millis);

if (c < 0) return system_error(errno, -1);

for (i = 0; i < c; i++) {
if (SG_SOCKETP(evm[i].data.ptr) && evm[i].events == EPOLLIN) {
if (SG_FALSEP(evm[i].data.ptr)) {
interrupted_unix_stop(ctx);
} else if (SG_SOCKETP(evm[i].data.ptr) && evm[i].events == EPOLLIN) {
r = Sg_Cons(evm[i].data.ptr, r);
}
}
return r;
}

SgObject Sg_SocketSelectorInterrupt(SgSocketSelector *selector)
{
epoll_context_t *ctx = (epoll_context_t *)selector->context;
const char *stop = "stop it";
sendto_unix_socket(ctx->addr, (const uint8_t *)stop, 7);
return selector;
}
20 changes: 12 additions & 8 deletions ext/socket/selector-iocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ typedef struct iocp_context_rec
{
HANDLE iocp;
HANDLE thread; /* waiting thread */

} iocp_context_t;


Expand All @@ -47,7 +46,8 @@ static void selector_finalizer(SgObject self, void *data)

static void system_error(int code)
{
Sg_SystemError(e, UC("Setting up IOCP failed: %A"),
Sg_SystemError(GetLastError(),
UC("Setting up IOCP failed: %A"),
Sg_GetLastErrorMessageWithErrorCode(code));
}

Expand All @@ -56,10 +56,11 @@ SgObject Sg_MakeSocketSelector()
SgSocketSelector *selector = SG_NEW(SgSocketSelector);
iocp_context_t *ctx = SG_NEW(iocp_context_t);

SG_SET_CLASS(selector, SG_CLASS_SOCKET_SELECTOR);
ctx->iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (ctx->iocp == NULL) goto err;
ctx->thread = NULL;
SG_SET_CLASS(selector, SG_CLASS_SOCKET_SELECTOR);
selector->sockets = SG_NIL;

Sg_RegisterFinalizer(selector, selector_finalizer, NULL);
return SG_OBJ(selector);
Expand All @@ -79,32 +80,35 @@ SgObject Sg_SocketSelectorAdd(SgSocketSelector *selector, SgSocket *socket)
{
iocp_context_t *ctx = (iocp_context_t *)selector->context;
HANDLE r;
r = CreateIoCompletionPort(ctx->iocp, (HANDLE)socket->socket, socket, 0);
r = CreateIoCompletionPort(ctx->iocp, (HANDLE)socket->socket,
(ULONG_PTR)socket, 0);
if (r == NULL) {
system_error(Sg_GetLastError());
}
selector->sockets = Sg_Cons(socket, ctx->sockets);
return SG_OBJ(selector);
}

SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
{
iocp_context_t *ctx = (iocp_context_t *)selector->context;
int n = Sg_Length(ctx->events), i, removed, millis = INFINITE;
LPOVERLAPPED_ENTRY *entries;
int n = Sg_Length(selector->sockets), i, millis = INFINITE;
ULONG removed;
LPOVERLAPPED_ENTRY entries;
BOOL r;
HANDLE thread;
struct timespec spec, *sp;
SgObject ret = SG_NIL;

if (ctx->thread != NULL) {
Sg_Error(UC("There's a thread already waiting for %A", selector));
Sg_Error(UC("There's a thread already waiting for %A"), selector);
}
ctx->thread = GetCurrentThread();

sp = Sg_GetTimeSpec(timeout, &spec);
if (sp) {
millis = sp->tv_sec * 1000;
millis += sp->tv_usec / 1000;
millis += sp->tv_nsec / 1000000;
}

entries = SG_NEW_ATOMIC2(OVERLAPPED_ENTRY *, n * sizeof(OVERLAPPED_ENTRY));
Expand Down
101 changes: 20 additions & 81 deletions ext/socket/selector-kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,107 +34,46 @@
#include <sys/types.h>
#include <sys/event.h>

#include "unix-socket.incl"
#include "unix-socket-selector.incl"

typedef struct kqueue_context_rec
static int make_selector()
{
int kq;
int stop_fd;
struct sockaddr_un *addr;
SgObject events;
} kqueue_context_t;

static void selector_finalizer(SgObject self, void *data)
{
Sg_CloseSocketSelector(SG_SOCKET_SELECTOR(self));
}

#define make_data(fd, data) Sg_Cons(SG_MAKE_INT(fd), (data))

SgObject Sg_MakeSocketSelector()
{
SgSocketSelector *selector = SG_NEW(SgSocketSelector);
kqueue_context_t *ctx = SG_NEW(kqueue_context_t);
struct kevent *ke = SG_NEW_ATOMIC(struct kevent);

SG_SET_CLASS(selector, SG_CLASS_SOCKET_SELECTOR);
if ((ctx->kq = kqueue()) < 0) goto err;
ctx->events = SG_NIL;

ctx->stop_fd = unix_socket();
if (ctx->stop_fd == -1) goto err;

ctx->addr = bind_unix_socket(ctx->stop_fd);
if (ctx->addr == NULL) {
close(ctx->stop_fd);
goto err;
}
selector->context = ctx;
ctx->events = Sg_Cons(make_data(ctx->stop_fd, selector), ctx->events);

Sg_RegisterFinalizer(selector, selector_finalizer, NULL);
return SG_OBJ(selector);

err: {
int e = errno;
char *msg = strerror(e);
close(ctx->kq);
Sg_SystemError(e, UC("Setting up kqueue failed: %A"),
Sg_Utf8sToUtf32s(msg, strlen(msg)));
}
}

void Sg_CloseSocketSelector(SgSocketSelector *selector)
{
kqueue_context_t *ctx = (kqueue_context_t *)selector->context;
close(ctx->kq);
close(ctx->stop_fd);
Sg_UnregisterFinalizer(selector);
return kqueue();
}

SgObject Sg_SocketSelectorAdd(SgSocketSelector *selector, SgSocket *socket)
static void add_socket(unix_context_t *ctx, SgSocket *socket)
{
kqueue_context_t *ctx = (kqueue_context_t *)selector->context;
ctx->events = Sg_Cons(make_data(socket->socket, socket), ctx->events);
return SG_OBJ(selector);
/* do nothing :) */
}

SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
static SgObject wait_selector(unix_context_t *ctx, int nsock,
SgObject sockets, SgObject timeout)
{
kqueue_context_t *ctx = (kqueue_context_t *)selector->context;
int n = Sg_Length(ctx->events), i, c;
SgObject cp, r = SG_NIL;
int i, c, n = nsock + 1;
struct timespec spec, *sp;
struct kevent *evm;

sp = Sg_GetTimeSpec(timeout, &spec);

evm = SG_NEW_ATOMIC2(struct kevent *, n * sizeof(struct kevent));
i = 0;
SG_FOR_EACH(cp, ctx->events) {
EV_SET(&evm[i++], SG_INT_VALUE(SG_CAAR(cp)),
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, SG_CDAR(cp));
}
c = kevent(ctx->kq, evm, n, evm, n, sp);
if (c < 0) {
int e = errno;
char *msg = strerror(e);
Sg_SystemError(e, UC("kevent failed: %A"),
Sg_Utf8sToUtf32s(msg, strlen(msg)));
return SG_UNDEF; /* dummy */
SG_FOR_EACH(cp, sockets) {
SgSocket *s = SG_SOCKET(SG_CAR(cp));
EV_SET(&evm[i++], s->socket, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, s);
}
EV_SET(&evm[i++], ctx->stop_fd,
EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, NULL);
c = kevent(ctx->fd, evm, n, evm, n, sp);
if (c < 0) return system_error(errno, -1);;

for (i = 0; i < c; i++) {
if (evm[i].ident != ctx->stop_fd && evm[i].filter == EVFILT_READ) {
if (evm[i].ident == ctx->stop_fd) {
interrupted_unix_stop(ctx);
} else if (evm[i].filter == EVFILT_READ) {
r = Sg_Cons(evm[i].udata, r);
}
}
return r;
}

SgObject Sg_SocketSelectorInterrupt(SgSocketSelector *selector)
{
kqueue_context_t *ctx = (kqueue_context_t *)selector->context;
const char *stop = "stop it";
sendto_unix_socket(ctx->addr, (const uint8_t *)stop, 7);
return selector;
}
2 changes: 1 addition & 1 deletion ext/socket/selector.stub
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
(define-c-proc socket-selector-add! (s::<socket-selector> sock::<socket>)
Sg_SocketSelectorAdd)

(define-c-proc socket-selector-wait! (s::<socket-selector> timeout)
(define-c-proc socket-selector-wait! (s::<socket-selector> :optional (timeout #f))
Sg_SocketSelectorWait)

(define-c-proc socket-selector-interrupt! (s::<socket-selector>)
Expand Down
3 changes: 2 additions & 1 deletion ext/socket/socket-selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
typedef struct SgSocketSelectorRec
{
SG_HEADER;
void *context; /* underlying implementation context */
SgObject sockets;
void *context; /* underlying implementation context */
} SgSocketSelector;

SG_CLASS_DECL(Sg_SocketSelectorClass);
Expand Down
Loading

0 comments on commit 9e808d2

Please sign in to comment.