Skip to content

Commit

Permalink
kqueue implementation fix for socket selector
Browse files Browse the repository at this point in the history
  • Loading branch information
ktakashi committed Jan 24, 2025
1 parent dc3b193 commit e36a7ab
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 34 deletions.
1 change: 1 addition & 0 deletions ext/socket/selector-kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ static SgObject wait_selector(unix_context_t *ctx, int nsock,
SgObject slot = SG_CAR(cp);
SgSocket *s = SG_SOCKET(SG_CAR(slot));
EV_SET(&evm[i++], s->socket, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, slot);
if (i == nsock) break;
}
EV_SET(&evm[i++], ctx->stop_fd, EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, NULL);
c = kevent(ctx->fd, evm, n, evm, n, sp);
Expand Down
5 changes: 2 additions & 3 deletions ext/socket/selector-win.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,10 @@ static SgObject win_selector_wait(win_context_t *ctx, int n,
return ret;
}

static SgObject selector_wait(SgSocketSelector *selector, void *context, int n,
struct timespec *sp)
static SgObject selector_wait(SgSocketSelector *selector, SgObject sockets,
void *context, int n, struct timespec *sp)
{
win_context_t *ctx = (win_context_t *)context;
SgObject sockets = Sg_Reverse(selector->sockets);
return win_selector_wait(ctx, n, selector, sockets, sp);
}

Expand Down
28 changes: 8 additions & 20 deletions ext/socket/socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ SgObject Sg_MakeSocketSelector()
return SG_OBJ(selector);
}

static SgObject selector_wait(SgSocketSelector *selector, void *context,
int n, struct timespec *sp);
static SgObject selector_wait(SgSocketSelector *selector, SgObject sockets,
void *context, int n, struct timespec *sp);

static SgObject earliest_timeout(SgObject sockets, SgTime *start,
SgObject *timedout)
Expand Down Expand Up @@ -224,7 +224,7 @@ static struct timespec * update_timeout(SgObject sockets,

SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
{
SgObject r, timedout = SG_NIL;
SgObject r, timedout = SG_NIL, sockets;
SgTime start;
int n;
unsigned long sec, usec;
Expand All @@ -247,6 +247,8 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
start.nsec = usec * 1000;

Sg_LockMutex(&selector->lock);

retry:
/* compute socket timeout */
sto = update_timeout(selector->sockets, &start, &sock_to, &timedout);
/* replace if exists */
Expand All @@ -261,16 +263,16 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
return Sg_Values2(SG_NIL, timedout); /* return timedout ones */
}

selector->waiting = TRUE;
context = selector->context;
if (context == NULL) {
Sg_UnlockMutex(&selector->lock);
Sg_Error(UC("Socket selector is closed: %A"), selector);
}
sockets = selector->sockets;
selector->waiting = TRUE;

retry:
Sg_UnlockMutex(&selector->lock);
r = selector_wait(selector, context, n, sp);
r = selector_wait(selector, sockets, context, n, sp);

Sg_LockMutex(&selector->lock);
selector->waiting = FALSE;
Expand All @@ -294,20 +296,6 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
start.sec = sec;
start.nsec = nsec;
}
sto = update_timeout(selector->sockets, &start, &sock_to, &timedout);
if (sto) {
sp = sto;
}
strip_sockets(selector, timedout);
n = selector_sockets(selector);

if (n == 0 && !SG_NULLP(timedout)) {
Sg_UnlockMutex(&selector->lock);
return Sg_Values2(SG_NIL, timedout); /* return timedout ones */
}

selector->waiting = TRUE;

goto retry;
} else {
/* reset it, in case of r is not null :) */
Expand Down
7 changes: 3 additions & 4 deletions ext/socket/unix-socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,13 @@ SgObject Sg_SocketSelectorInterrupt(SgSocketSelector *selector)
return selector;
}

static SgObject selector_wait(SgSocketSelector *selector,
void *context,
int n, struct timespec *sp)
static SgObject selector_wait(SgSocketSelector *selector, SgObject sockets,
void *context, int n, struct timespec *sp)
{
unix_context_t *ctx = (unix_context_t *)context;
int err = 0;
/* the socket is reverse order, so correct it */
SgObject r = wait_selector(ctx, n, Sg_Reverse(selector->sockets), sp, &err);
SgObject r = wait_selector(ctx, n, sockets, sp, &err);
if (SG_FALSEP(r)) {
if (err == EBADF) {
Sg_Error(UC("Socket selector is closed during waiting: %A"), selector);
Expand Down
15 changes: 8 additions & 7 deletions test/tests/net/socket.scm
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,15 @@
(make-thread
(lambda ()
(let ((s (make-client-socket "localhost" server-port
(socket-options (read-timeout 500))))
(socket-options (read-timeout 1000))))
(msg (string->utf8
(string-append "Hello world " (number->string i)))))
(guard (e (else #;(print e) s))
(when (even? i) (thread-sleep! 0.05));; 50ms
(socket-send s msg)
(let ((v (socket-recv s 255)))
(cond ((bytevector=? v mark) (shared-queue-put! result #f))
(cond ((zero? (bytevector-u8-ref v 0)) ;; mark
(shared-queue-put! result #f))
(else (shared-queue-put! result (utf8->string v)))))
(socket-send s mark))
(socket-shutdown s SHUT_RDWR)
Expand Down Expand Up @@ -339,13 +340,13 @@
(values (atomic-fixnum-load counter) r))))

(let-values (((c r) (run-socket-selector 1000 #f)))
(test-equal "counter (1)" 100 c)
(test-equal "counter (1)" count c)
(test-equal "hard 1000ms soft #f" count (length (filter string? r))))
(let-values (((c r) (run-socket-selector 100 #f)))
(test-equal "counter (2)" 100 c)
(test-equal "hard 100ms soft #f" 0 (length (filter string? r))))
(let-values (((c r) (run-socket-selector 10 #f)))
(test-equal "counter (2)" count c)
(test-equal "hard 10ms soft #f" 0 (length (filter string? r))))
(let-values (((c r) (run-socket-selector 10 1000)))
(test-equal "counter (3)" 100 c)
(test-equal "counter (3)" count c)
(test-equal "hard 10ms soft 1000ms" count (length (filter string? r))))
(socket-shutdown server-sock SHUT_RDWR)
(socket-close server-sock)
Expand Down

0 comments on commit e36a7ab

Please sign in to comment.