Skip to content

Commit e1c3717

Browse files
committed
refactor(x/net/http): Rewrite loop <-> hyper.executor logic
Signed-off-by: hackerchai <i@hackerchai.com>
1 parent 22ab2d5 commit e1c3717

File tree

3 files changed

+42
-38
lines changed

3 files changed

+42
-38
lines changed

x/net/http/request.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type Request struct {
3535
timeout time.Duration
3636
}
3737

38-
func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
38+
func (conn *conn) readRequest(srv *Server, hyperReq *hyper.Request) (*Request, error) {
3939
println("[debug] readRequest called")
4040
req := Request{
4141
Header: make(Header),
@@ -137,21 +137,23 @@ func (conn *conn) readRequest(hyperReq *hyper.Request) (*Request, error) {
137137
if body != nil {
138138
task := body.Data()
139139
taskFlag := getBodyTask
140+
141+
requestBody := newRequestBody(conn.asyncHandle)
142+
req.Body = requestBody
143+
140144
taskData := taskData{
141145
hyperBody: body,
142146
responseBody: nil,
143-
conn: conn,
147+
requestBody: requestBody,
144148
taskFlag: taskFlag,
149+
server: srv,
145150
}
146151
task.SetUserdata(c.Pointer(&taskData), nil)
147-
requestBody := newRequestBody(conn.asyncHandle)
148-
conn.requestBody = requestBody
149-
req.Body = requestBody
150152

151153
conn.asyncHandle.SetData(c.Pointer(&taskData))
152154
fmt.Println("[debug] async task set")
153155
if task != nil {
154-
r := conn.executor.Push(task)
156+
r := srv.executor.Push(task)
155157
if r != hyper.OK {
156158
fmt.Printf("failed to push body foreach task: %d\n", r)
157159
task.Free()

x/net/http/response.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ type response struct {
1414
statusCode int
1515
written bool
1616
body []byte
17+
server *Server
1718
hyperChannel *hyper.ResponseChannel
1819
hyperResp *hyper.Response
1920
}
@@ -27,7 +28,8 @@ type responseBodyRaw struct {
2728
type taskData struct {
2829
hyperBody *hyper.Body
2930
responseBody *responseBodyRaw
30-
conn *conn
31+
requestBody *requestBody
32+
server *Server
3133
taskFlag taskFlag
3234
}
3335

@@ -40,12 +42,13 @@ const (
4042

4143
var DefaultChunkSize uintptr = 8192
4244

43-
func newResponse(hyperChannel *hyper.ResponseChannel) *response {
45+
func newResponse(server *Server, hyperChannel *hyper.ResponseChannel) *response {
4446
fmt.Printf("[debug] newResponse called\n")
4547

4648
return &response{
4749
header: make(Header),
4850
hyperChannel: hyperChannel,
51+
server: server,
4952
statusCode: 200,
5053
written: false,
5154
body: nil,
@@ -133,7 +136,7 @@ func (r *response) finalize() error {
133136
taskData := &taskData{
134137
hyperBody: nil,
135138
responseBody: &bodyData,
136-
conn: nil,
139+
server: r.server,
137140
taskFlag: setBodyTask,
138141
}
139142
body.SetDataFunc(setBodyDataFunc)

x/net/http/server.go

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type Server struct {
3737
inShutdown atomic.Bool
3838
idleHandle libuv.Idle
3939

40+
executor *hyper.Executor
41+
4042
mu sync.Mutex
4143
activeConnections map[*conn]struct{}
4244
}
@@ -51,9 +53,7 @@ type conn struct {
5153
http2Opts *hyper.Http2ServerconnOptions
5254
isClosing atomic.Bool
5355
closedHandles int32
54-
executor *hyper.Executor
5556
remoteAddr string
56-
requestBody *requestBody
5757
asyncHandle *libuv.Async
5858
}
5959

@@ -225,7 +225,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {
225225
(*libuv.Handle)(unsafe.Pointer(&conn.stream)).Close(nil)
226226
return
227227
}
228-
conn.executor = executor
228+
srv.executor = executor
229229

230230
fmt.Println("[debug] Conn created")
231231
srv.trackConn(conn, true)
@@ -236,7 +236,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {
236236
io := createIo(conn)
237237
service := hyper.ServiceNew(serverCallback)
238238
service.SetUserdata(unsafe.Pointer(userdata), nil)
239-
http1Opts := hyper.Http1ServerconnOptionsNew(conn.executor)
239+
http1Opts := hyper.Http1ServerconnOptionsNew(srv.executor)
240240
if http1Opts == nil {
241241
fmt.Fprintf(os.Stderr, "Failed to create http1_opts\n")
242242
os.Exit(1)
@@ -248,7 +248,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {
248248
}
249249
conn.http1Opts = http1Opts
250250

251-
http2Opts := hyper.Http2ServerconnOptionsNew(conn.executor)
251+
http2Opts := hyper.Http2ServerconnOptionsNew(srv.executor)
252252
if http2Opts == nil {
253253
fmt.Fprintf(os.Stderr, "Failed to create http2_opts\n")
254254
os.Exit(1)
@@ -266,7 +266,7 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) {
266266
conn.http2Opts = http2Opts
267267

268268
serverconn := hyper.ServeHttpXConnection(http1Opts, http2Opts, io, service)
269-
conn.executor.Push(serverconn)
269+
srv.executor.Push(serverconn)
270270
} else {
271271
fmt.Println("[debug] Client not accepted")
272272
(*libuv.Handle)(unsafe.Pointer(&conn.pollHandle)).Close(nil)
@@ -280,7 +280,7 @@ func onAsync(asyncHandle *libuv.Async) {
280280
dataTask := taskData.hyperBody.Data()
281281
dataTask.SetUserdata(c.Pointer(taskData), nil)
282282
if dataTask != nil {
283-
r := taskData.conn.executor.Push(dataTask)
283+
r := taskData.server.executor.Push(dataTask)
284284
fmt.Printf("[debug] onAsync push data task: %d\n", r)
285285
if r != hyper.OK {
286286
fmt.Printf("failed to push data task: %d\n", r)
@@ -291,13 +291,11 @@ func onAsync(asyncHandle *libuv.Async) {
291291

292292
func onIdle(handle *libuv.Idle) {
293293
srv := (*Server)((*libuv.Handle)(unsafe.Pointer(handle)).GetData())
294-
for conn := range srv.activeConnections {
295-
if conn.executor != nil {
296-
task := conn.executor.Poll()
297-
for task != nil {
298-
srv.handleTask(task)
299-
task = conn.executor.Poll()
300-
}
294+
if srv.executor != nil {
295+
task := srv.executor.Poll()
296+
for task != nil {
297+
srv.handleTask(task)
298+
task = srv.executor.Poll()
301299
}
302300
}
303301

@@ -309,19 +307,24 @@ func onIdle(handle *libuv.Idle) {
309307

310308
func serverCallback(userdata unsafe.Pointer, hyperReq *hyper.Request, channel *hyper.ResponseChannel) {
311309
userData := (*serviceUserdata)(userdata)
310+
srv := userData.server
311+
if srv == nil {
312+
fmt.Fprintf(os.Stderr, "Error: Received null server\n")
313+
return
314+
}
312315

313316
if hyperReq == nil {
314317
fmt.Fprintf(os.Stderr, "Error: Received null request\n")
315318
return
316319
}
317320

318-
req, err := userData.conn.readRequest(hyperReq)
321+
req, err := userData.conn.readRequest(srv, hyperReq)
319322
if err != nil {
320323
fmt.Printf("Error creating request: %v\n", err)
321324
return
322325
}
323326

324-
res := newResponse(channel)
327+
res := newResponse(srv, channel)
325328
fmt.Println("[debug] Response created")
326329

327330
//TODO(hackerchai): replace with no goroutine
@@ -348,7 +351,7 @@ func (srv *Server) handleTask(task *hyper.Task) {
348351
if payload != nil {
349352
switch payload.taskFlag {
350353
case getBodyTask:
351-
handleGetBodyTask(hyperTaskType, task, payload)
354+
handleGetBodyTask(srv, hyperTaskType, task, payload)
352355
return
353356
case setBodyTask:
354357
handleSetBodyTask(hyperTaskType, task)
@@ -374,15 +377,15 @@ func (srv *Server) handleTask(task *hyper.Task) {
374377
}
375378
}
376379

377-
func handleGetBodyTask(hyperTaskType hyper.TaskReturnType, task *hyper.Task, payload *taskData) {
380+
func handleGetBodyTask(srv *Server, hyperTaskType hyper.TaskReturnType, task *hyper.Task, payload *taskData) {
378381
switch hyperTaskType {
379382
case hyper.TaskError:
380383
handleTaskError(task)
381384
case hyper.TaskBuf:
382385
handleTaskBuffer(task, payload)
383386
case hyper.TaskEmpty:
384387
fmt.Println("[debug] Get body task closing request body")
385-
payload.conn.requestBody.Close()
388+
payload.requestBody.Close()
386389
task.Free()
387390
}
388391
}
@@ -411,7 +414,7 @@ func handleTaskError(task *hyper.Task) {
411414
func handleTaskBuffer(task *hyper.Task, payload *taskData) {
412415
buf := (*hyper.Buf)(task.Value())
413416
bytes := unsafe.Slice(buf.Bytes(), buf.Len())
414-
payload.conn.requestBody.readCh <- bytes
417+
payload.requestBody.readCh <- bytes
415418
fmt.Printf("[debug] Task get body writing to bodyWriter: %s\n", string(bytes))
416419
buf.Free()
417420
task.Free()
@@ -591,11 +594,6 @@ func freeConnData(userdata c.Pointer) {
591594
conn.writeWaker = nil
592595
}
593596

594-
if conn.executor != nil {
595-
conn.executor.Free()
596-
conn.executor = nil
597-
}
598-
599597
if conn.http1Opts != nil {
600598
conn.http1Opts.Free()
601599
conn.http1Opts = nil
@@ -632,6 +630,11 @@ func (srv *Server) Close() error {
632630
delete(srv.activeConnections, c)
633631
}
634632

633+
if srv.executor != nil {
634+
srv.executor.Free()
635+
srv.executor = nil
636+
}
637+
635638
srv.uvLoop.Walk(closeWalkCb, nil)
636639
srv.uvLoop.Run(libuv.RUN_ONCE)
637640
(*libuv.Handle)(unsafe.Pointer(&srv.uvServer)).Close(nil)
@@ -663,10 +666,6 @@ func (c *conn) Close() {
663666
c.writeWaker = nil
664667
}
665668

666-
if c.executor != nil {
667-
c.executor.Free()
668-
c.executor = nil
669-
}
670669
if c.http1Opts != nil {
671670
c.http1Opts.Free()
672671
c.http1Opts = nil

0 commit comments

Comments
 (0)