Skip to content

Commit

Permalink
Removing socket after selected (on kqueue and epoll)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktakashi committed Dec 5, 2023
1 parent 892b0de commit 3170cef
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ext/socket/selector-epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static SgObject wait_selector(unix_context_t *ctx, int nsock,
struct timespec spec, *sp;
struct epoll_event *evm, ev;

sp = Sg_GetTimeSpec(timeout, &spec);
sp = selector_timespec(timeout, &spec);
if (sp) {
millis = sp->tv_sec * 1000;
millis += sp->tv_nsec / 1000000;
Expand Down
2 changes: 1 addition & 1 deletion ext/socket/selector-iocp.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
}
ctx->thread = GetCurrentThread();

sp = Sg_GetTimeSpec(timeout, &spec);
sp = selector_timespec(timeout, &spec);
if (sp) {
millis = sp->tv_sec * 1000;
millis += sp->tv_nsec / 1000000;
Expand Down
2 changes: 1 addition & 1 deletion ext/socket/selector-kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static SgObject wait_selector(unix_context_t *ctx, int nsock,
struct timespec spec, *sp;
struct kevent *evm;

sp = Sg_GetTimeSpec(timeout, &spec);
sp = selector_timespec(timeout, &spec);

evm = SG_NEW_ATOMIC2(struct kevent *, n * sizeof(struct kevent));
i = 0;
Expand Down
48 changes: 48 additions & 0 deletions ext/socket/socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,51 @@ static void selector_finalizer(SgObject self, void *data)
Sg_CloseSocketSelector(SG_SOCKET_SELECTOR(self));
}

static struct timespec *selector_timespec(SgObject timeout, struct timespec *tm)
{
if (SG_FALSEP(timeout)) return NULL;
/* number = usec (the same as `select`) */
if (SG_INTP(timeout)) {
long val = SG_INT_VALUE(timeout);
if (val < 0) goto badtv;
tm->tv_sec = val / 1000000;
tm->tv_nsec = (val % 1000000) * 1000;
return tm;
} else if (SG_BIGNUMP(timeout)) {
long usec;
SgObject sec;
if (Sg_Sign(timeout) < 0) goto badtv;
sec = Sg_BignumDivSI(SG_BIGNUM(timeout), 1000000, &usec);
tm->tv_sec = Sg_GetInteger(sec);
tm->tv_nsec = usec * 1000;
return tm;
} else if (SG_FLONUMP(timeout)) {
long val = Sg_GetInteger(timeout);
if (val < 0) goto badtv;
tm->tv_sec = val / 1000000;
tm->tv_nsec = (val % 1000000) * 1000;
return tm;
} else if (SG_PAIRP(timeout) && SG_PAIRP(SG_CDR(timeout))) {
SgObject sec = SG_CAR(timeout);
SgObject usec = SG_CADR(timeout);
long isec, iusec;
if (!Sg_IntegerP(sec) || !Sg_IntegerP(usec)) goto badtv;
isec = Sg_GetInteger(sec);
iusec = Sg_GetInteger(usec);
if (isec < 0 || iusec < 0) goto badtv;
tm->tv_sec = isec;
tm->tv_nsec = iusec * 1000;
return tm;
} else if (SG_TIMEP(timeout)) {
tm->tv_sec = SG_TIME(timeout)->sec;
tm->tv_nsec = SG_TIME(timeout)->nsec;
return tm;
}
badtv:
Sg_Error(UC("timespec needs to be a real number (in microseconds), a list "
"of two integers (seconds and microseconds), or a time object "
"but got %S"),
timeout);
return NULL; /* dummy */
}

38 changes: 38 additions & 0 deletions ext/socket/test.scm
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,42 @@
(else (test-assert "unexpected condition" #f)))
(make-client-socket "localhost" "123456789"))

(let ()
(define server (make-server-socket "0"))
(define t (thread-start!
(make-thread
(lambda ()
(let ((s (socket-accept server)))
(thread-sleep! 1) ;; 1s
(socket-send s #*"ok")
(thread-sleep! 1) ;; 1s
(socket-send s #*"ok2")
(socket-shutdown s SHUT_RDWR)
(socket-close s))))))
(define selector (make-socket-selector))
(define (make-selector-thread)
(make-thread (lambda () (socket-selector-wait! selector))))
(define t2 (make-selector-thread))
(define s (make-client-socket "localhost" (server-service server)))

(test-assert (socket-selector? (socket-selector-add! selector s)))
(thread-start! t2)

(test-error (socket-selector-wait! selector))
(let ((s* (thread-join! t2)))
(test-equal 1 (length s*))
(test-equal '(#*"ok") (map (lambda (s) (socket-recv s 2)) s*)))
;; nothing to wait, so it'd return '() immediately
(test-equal '() (socket-selector-wait! selector))

(socket-selector-add! selector s)
;; 1000 => 1ms (timeout = usec)
(test-equal "selector (timeout)" '() (socket-selector-wait! selector 1000))
(let ((s* (socket-selector-wait! selector)))
(test-equal 1 (length s*))
(test-equal '(#*"ok2") (map (lambda (s) (socket-recv s 3)) s*)))
(close-socket-selector! selector)
(socket-close server))


(test-end)
13 changes: 12 additions & 1 deletion ext/socket/unix-socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ static struct sockaddr_un * bind_unix_socket(int socket,
char tmp[] = "/tmp/socket-selector.XXXXXX";
int fd = mkstemp(tmp);
if (fd == -1) return NULL;
close(fd);
close(fd); /* we don't need file descriptor of temp file */
addr->sun_family = AF_UNIX;
snprintf(addr->sun_path, sizeof(tmp), "%s", tmp);
unlink(addr->sun_path); /* mkstemp creates a file but we only need the name */
if (bind(socket, (struct sockaddr *)addr, sizeof(struct sockaddr_un)) != 0) {
return NULL;
}
Expand Down Expand Up @@ -174,6 +175,16 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
}
ctx->waiting = TRUE;
r = wait_selector(ctx, n, selector->sockets, timeout);
if (!SG_NULLP(r)) {
/* remove the returned sockets from the targets */
SgObject h = SG_NIL, t = SG_NIL, cp;
SG_FOR_EACH(cp, selector->sockets) {
if (SG_FALSEP(Sg_Memq(SG_CAR(cp), r))) {
SG_APPEND1(h, t, SG_CAR(cp));
}
}
selector->sockets = h;
}
ctx->waiting = FALSE;
return r;
}

0 comments on commit 3170cef

Please sign in to comment.