Skip to content

Commit

Permalink
Check timed-out sockets when returning
Browse files Browse the repository at this point in the history
Fixing Windows socket selector to raise C exception
  • Loading branch information
ktakashi committed Jan 23, 2025
1 parent 2c1bea8 commit 9787288
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 48 deletions.
85 changes: 51 additions & 34 deletions ext/socket/selector-win.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
typedef struct win_context_rec
{
WSAEVENT event;
SgObject results; /* result for multiple waiting */
HANDLE *threads; /* threads for multiple waiting */
HANDLE lock; /* lock */
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS]; /* for sliding batch */
HANDLE lock; /* lock for closing... */
} win_context_t;


Expand All @@ -77,11 +76,19 @@ static void * make_selector_context()

ctx->event = WSACreateEvent();
if (ctx->event == NULL) goto err;
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
ctx->events[i] = WSACreateEvent();
if (ctx->events[i] == NULL) goto err;
}
ctx->lock = CreateMutex(NULL, FALSE, NULL);

return ctx;

err:
if (ctx->event) WSACloseEvent(ctx->event);
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
if (ctx->events[i]) WSACloseEvent(ctx->events[i]);
}
system_error(Sg_GetLastError());
return NULL; /* dummy */
}
Expand All @@ -91,8 +98,15 @@ void Sg_CloseSocketSelector(SgSocketSelector *selector)
if (!Sg_SocketSelectorClosedP(selector)) {
win_context_t *ctx = (win_context_t *)selector->context;
selector->context = NULL;

WaitForSingleObject(ctx->lock, INFINITE);
WSACloseEvent(ctx->event);
CloseHandle(ctx->lock);
ctx->event = INVALID_HANDLE_VALUE;
for (int i = 0; i < WSA_MAXIMUM_WAIT_EVENTS; i++) {
WSACloseEvent(ctx->events[i]);
ctx->events[i] = INVALID_HANDLE_VALUE;
}
ReleaseMutex(ctx->lock);
Sg_UnregisterFinalizer(selector);
}
}
Expand All @@ -107,7 +121,7 @@ static SgObject win_selector_wait(win_context_t *ctx, int n,
DWORD millis = INFINITE;
SgObject ret = SG_NIL;

#define SET_EVENT(sockets, event, flags) \
#define SET_EVENT(sockets, event, flags) \
do { \
SgObject cp; \
int i = 0; \
Expand Down Expand Up @@ -135,45 +149,48 @@ static SgObject win_selector_wait(win_context_t *ctx, int n,
err = TRUE;
goto cleanup;
}
/* unassociate event */
/* unassociate ctx->event */
SET_EVENT(sockets, ctx->event, 0);
WSAResetEvent(ctx->event); /* reset event */


if (r == WSA_WAIT_EVENT_0) {
int batch = (n/WSA_MAXIMUM_WAIT_EVENTS) + (n%WSA_MAXIMUM_WAIT_EVENTS)>0 ? 1 : 0;
int size = max(n, WSA_MAXIMUM_WAIT_EVENTS);
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS] = { NULL, };

for (int i = 0; i < size; i++) events[i] = WSACreateEvent();

int size = min(n, WSA_MAXIMUM_WAIT_EVENTS);
SgObject cp = sockets;
WSANETWORKEVENTS networkEvents;

while (SG_PAIRP(cp)) {
SgObject h = SG_NIL, t = SG_NIL;
for (int i = 0; i < size && SG_PAIRP(cp); i++, cp = SG_CDR(cp)) {
SgObject slot = SG_CAR(cp);
SgSocket *so = SG_SOCKET(SG_CAR(slot));
WSAEventSelect(so->socket, events[i], waiting_flags);
SG_APPEND1(h, t, slot);
}
r = WSAWaitForMultipleEvents(size, events, FALSE, 0, FALSE);
if (r != WSA_WAIT_TIMEOUT && r != WSA_WAIT_FAILED) {
int i = 0;
SgObject cp2;
SG_FOR_EACH(cp2, h) {
SgObject slot = SG_CAR(cp2);
SgSocket *s = SG_SOCKET(SG_CAR(slot));
if (WSAEnumNetworkEvents(s->socket, events[i], &networkEvents) == 0) {
if ((networkEvents.lNetworkEvents & waiting_flags) != 0) {
ret = Sg_Cons(slot, ret);
r = WaitForSingleObject(ctx->lock, INFINITE);
if (r == WAIT_OBJECT_0 && ctx->event != INVALID_HANDLE_VALUE) {
WSAResetEvent(ctx->event); /* reset event */
while (SG_PAIRP(cp)) {
SgObject h = SG_NIL, t = SG_NIL;
int count = 0;
for (int i = 0; i < size && SG_PAIRP(cp); i++, cp = SG_CDR(cp)) {
SgObject slot = SG_CAR(cp);
SgSocket *so = SG_SOCKET(SG_CAR(slot));
WSAEventSelect(so->socket, ctx->events[i], waiting_flags);
count++;
SG_APPEND1(h, t, slot);
}
r = WSAWaitForMultipleEvents(count, ctx->events, FALSE, 0, FALSE);
if (r != WSA_WAIT_TIMEOUT && r != WSA_WAIT_FAILED) {
int i = 0;
SgObject cp2;
SG_FOR_EACH(cp2, h) {
SgObject slot = SG_CAR(cp2);
SgSocket *s = SG_SOCKET(SG_CAR(slot));
if (WSAEnumNetworkEvents(s->socket, ctx->events[i], &networkEvents) == 0) {
if ((networkEvents.lNetworkEvents & waiting_flags) != 0) {
ret = Sg_Cons(slot, ret);
}
}
}
}
SG_FOR_EACH(cp, h) {
SgSocket *s = SG_SOCKET(SG_CAAR(cp));
WSAEventSelect(s->socket, NULL, 0);
}
}
ReleaseMutex(ctx->lock);
}
for (int i = 0; i < size; i++) WSACloseEvent(events[i]);
}

#undef SET_EVENT
Expand Down Expand Up @@ -203,7 +220,7 @@ static SgObject selector_wait(SgSocketSelector *selector, int n,
{
win_context_t *ctx = (win_context_t *)selector->context;
SgObject sockets = Sg_Reverse(selector->sockets);
return win_selector_wait(ctx, n, selector, sockets, sp);
return win_selector_wait(ctx, n, selector, sockets, sp);
}


Expand Down
17 changes: 3 additions & 14 deletions ext/socket/socket-selector.incl
Original file line number Diff line number Diff line change
Expand Up @@ -305,22 +305,11 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)

goto retry;
} else {
SgObject strip;
/* reset it, in case of r is not null :) */
selector->retry = FALSE;
if (!SG_NULLP(timedout)) {
SgObject h = SG_NIL, t = SG_NIL, cp;
SG_FOR_EACH(cp, r) {
SG_APPEND1(h, t, SG_CAR(cp));
}
SG_FOR_EACH(cp, timedout) {
SG_APPEND1(h, t, SG_CAR(cp));
}
strip = h;
} else {
strip = r;
}
strip_sockets(selector, strip);
if (!SG_NULLP(r)) strip_sockets(selector, r);
earliest_timeout(selector->sockets, &start, &timedout);
if (!SG_NULLP(timedout)) strip_sockets(selector, timedout);
}

Sg_UnlockMutex(&selector->lock);
Expand Down

0 comments on commit 9787288

Please sign in to comment.