1
- /* selector-iocp.c -*- mode:c; coding:utf-8; -*-
1
+ /* selector-iocp.c -*- mode:c; coding:utf-8; -*-
2
2
*
3
3
* Copyright (c) 2023 Takashi Kato <ktakashi@ymail.com>
4
4
*
34
34
35
35
#include "socket-selector.incl"
36
36
37
- typedef struct iocp_context_rec
37
+ typedef struct win_context_rec
38
38
{
39
- HANDLE iocp ;
39
+ HANDLE event ;
40
40
HANDLE thread ; /* waiting thread */
41
- } iocp_context_t ;
41
+ } win_context_t ;
42
42
43
43
44
44
static void system_error (int code )
45
45
{
46
- Sg_SystemError (GetLastError () ,
46
+ Sg_SystemError (code ,
47
47
UC ("Setting up IOCP failed: %A" ),
48
48
Sg_GetLastErrorMessageWithErrorCode (code ));
49
49
}
50
50
51
51
SgObject Sg_MakeSocketSelector ()
52
52
{
53
53
SgSocketSelector * selector = SG_NEW (SgSocketSelector );
54
- iocp_context_t * ctx = SG_NEW (iocp_context_t );
54
+ win_context_t * ctx = SG_NEW (win_context_t );
55
55
56
56
SG_SET_CLASS (selector , SG_CLASS_SOCKET_SELECTOR );
57
- ctx -> iocp = CreateIoCompletionPort (INVALID_HANDLE_VALUE , NULL , 0 , 0 );
58
- if (ctx -> iocp == NULL ) goto err ;
57
+
58
+ ctx -> event = CreateEvent (NULL , FALSE, FALSE, NULL );
59
+ if (ctx -> event == NULL ) goto err ;
60
+
59
61
ctx -> thread = NULL ;
60
62
selector -> sockets = SG_NIL ;
63
+ selector -> context = ctx ;
61
64
62
65
Sg_RegisterFinalizer (selector , selector_finalizer , NULL );
63
66
return SG_OBJ (selector );
@@ -69,32 +72,34 @@ SgObject Sg_MakeSocketSelector()
69
72
70
73
void Sg_CloseSocketSelector (SgSocketSelector * selector )
71
74
{
72
- iocp_context_t * ctx = (iocp_context_t * )selector -> context ;
73
- CloseHandle (ctx -> iocp );
75
+ win_context_t * ctx = (win_context_t * )selector -> context ;
76
+ CloseHandle (ctx -> event );
74
77
Sg_UnregisterFinalizer (selector );
75
78
}
76
79
77
80
SgObject Sg_SocketSelectorAdd (SgSocketSelector * selector , SgSocket * socket )
78
81
{
79
- iocp_context_t * ctx = (iocp_context_t * )selector -> context ;
80
- HANDLE r ;
81
- r = CreateIoCompletionPort (ctx -> iocp , (HANDLE )socket -> socket ,
82
- (ULONG_PTR )socket , 0 );
83
- if (r == NULL ) {
84
- system_error (Sg_GetLastError ());
85
- }
82
+ win_context_t * ctx = (win_context_t * )selector -> context ;
86
83
selector -> sockets = Sg_Cons (socket , selector -> sockets );
87
84
selector_sockets (selector );
88
85
return SG_OBJ (selector );
89
86
}
90
87
88
+ static SgObject select_socket (SOCKET fd , SgObject sockets )
89
+ {
90
+ SgObject cp ;
91
+ SG_FOR_EACH (cp , sockets ) {
92
+ SgSocket * sock = SG_SOCKET (SG_CAR (cp ));
93
+ if (sock -> socket == fd ) return sock ;
94
+ }
95
+ return SG_FALSE ;
96
+ }
97
+
91
98
SgObject Sg_SocketSelectorWait (SgSocketSelector * selector , SgObject timeout )
92
99
{
93
- iocp_context_t * ctx = (iocp_context_t * )selector -> context ;
94
- int n = selector_sockets (selector ), i , millis = INFINITE ;
95
- ULONG removed ;
96
- LPOVERLAPPED_ENTRY entries ;
97
- BOOL r ;
100
+ win_context_t * ctx = (win_context_t * )selector -> context ;
101
+ int n = selector_sockets (selector ), millis = INFINITE , r ;
102
+ HANDLE hEvents [2 ];
98
103
struct timespec spec , * sp ;
99
104
SgObject ret = SG_NIL ;
100
105
@@ -103,35 +108,71 @@ SgObject Sg_SocketSelectorWait(SgSocketSelector *selector, SgObject timeout)
103
108
}
104
109
ctx -> thread = GetCurrentThread ();
105
110
111
+ hEvents [0 ] = CreateEvent (NULL , FALSE, FALSE, NULL );
112
+ hEvents [1 ] = ctx -> event ;
113
+
114
+ #define SET_EVENT (sockets , event , flags ) \
115
+ do { \
116
+ SgObject cp; \
117
+ SG_FOR_EACH(cp, sockets) { \
118
+ SG_SET_SOCKET_EVENT(SG_CAR(cp), event, flags); \
119
+ } \
120
+ } while(0)
121
+
122
+ SET_EVENT (selector -> sockets , hEvents [0 ], FD_READ | FD_OOB );
123
+
106
124
sp = selector_timespec (timeout , & spec );
107
125
if (sp ) {
108
126
millis = sp -> tv_sec * 1000 ;
109
127
millis += sp -> tv_nsec / 1000000 ;
110
128
}
111
129
112
- entries = SG_NEW_ATOMIC2 (OVERLAPPED_ENTRY * , n * sizeof (OVERLAPPED_ENTRY ));
113
- r = GetQueuedCompletionStatusEx (ctx -> iocp , entries , n , & removed , millis , TRUE);
114
- if (r ) {
115
- for (i = 0 ; i < removed ; i ++ ) {
116
- ret = Sg_Cons ((SgObject ) entries [i ].lpCompletionKey , ret );
130
+ r = WaitForMultipleObjects (2 , hEvents , FALSE, millis );
131
+ if (r == WAIT_OBJECT_0 ) {
132
+ /* Using WSAPoll to detect which sockets are ready to read */
133
+ WSAPOLLFD * fds = SG_NEW_ATOMIC2 (WSAPOLLFD * , n * sizeof (WSAPOLLFD ));
134
+ int i = 0 ;
135
+ SgObject cp ;
136
+ SG_FOR_EACH (cp , selector -> sockets ) {
137
+ SgSocket * sock = SG_SOCKET (SG_CAR (cp ));
138
+ fds [i ].fd = sock -> socket ;
139
+ fds [i ].events = POLLRDNORM ;
140
+ i ++ ;
117
141
}
118
- } else {
119
- system_error (Sg_GetLastError ());
142
+ r = WSAPoll (fds , n , 0 ); /* Some sockets must be ready at this stage */
143
+ if (r == SOCKET_ERROR ) system_error (WSAGetLastError ());
144
+ for (i = 0 ; i < n ; i ++ ) {
145
+ if (fds [i ].revents & POLLRDNORM ) {
146
+ /* collect sockets, should we use hashtable? */
147
+ SgObject o = select_socket (fds [i ].fd , selector -> sockets );
148
+ if (!SG_FALSEP (o )) ret = Sg_Cons (o , ret );
149
+ }
150
+ }
151
+ }
152
+
153
+ SET_EVENT (selector -> sockets , hEvents [0 ], 0 );
154
+ #undef SET_EVENT
155
+
156
+ if (!SG_NULLP (ret )) {
157
+ /* remove the returned sockets from the targets */
158
+ SgObject h = SG_NIL , t = SG_NIL , cp ;
159
+ SG_FOR_EACH (cp , selector -> sockets ) {
160
+ if (SG_FALSEP (Sg_Memq (SG_CAR (cp ), ret ))) {
161
+ SG_APPEND1 (h , t , SG_CAR (cp ));
162
+ }
163
+ }
164
+ selector -> sockets = h ;
120
165
}
166
+ CloseHandle (hEvents [0 ]);
121
167
ctx -> thread = NULL ;
122
168
return ret ;
123
169
}
124
170
125
- static void CALLBACK dummy (ULONG_PTR param )
126
- {
127
- /* Do nothing */
128
- }
129
-
130
171
SgObject Sg_SocketSelectorInterrupt (SgSocketSelector * selector )
131
172
{
132
- iocp_context_t * ctx = (iocp_context_t * )selector -> context ;
173
+ win_context_t * ctx = (win_context_t * )selector -> context ;
133
174
if (ctx -> thread ) {
134
- QueueUserAPC ( dummy , ctx -> thread , 0 );
175
+ SetEvent ( ctx -> event );
135
176
}
136
177
return selector ;
137
178
}
0 commit comments