Skip to content

Commit 03b3d7b

Browse files
committed
fix(x/net/http): Implement multi-thread eventLoop
Signed-off-by: hackerchai <i@hackerchai.com>
1 parent 8420957 commit 03b3d7b

File tree

6 files changed

+494
-277
lines changed

6 files changed

+494
-277
lines changed

x/net/http/_demo/http.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package main
22

33
import (
44
"fmt"
5-
"io"
65

76
"github.com/goplus/llgo/x/net/http"
87
)
@@ -18,16 +17,18 @@ func echoHandler(w http.ResponseWriter, r *http.Request) {
1817
fmt.Printf(">> URL: %s\n", r.URL.String())
1918
fmt.Printf(">> RemoteAddr: %s\n", r.RemoteAddr)
2019

21-
body, err := io.ReadAll(r.Body)
22-
if err != nil {
23-
http.Error(w, "Error reading request body", http.StatusInternalServerError)
24-
return
25-
}
26-
defer r.Body.Close()
27-
fmt.Printf(">> Body: %s\n", string(body))
20+
// body, err := io.ReadAll(r.Body)
21+
// if err != nil {
22+
// http.Error(w, "Error reading request body", http.StatusInternalServerError)
23+
// return
24+
// }
25+
// defer r.Body.Close()
26+
// fmt.Printf(">> Body: %s\n", string(body))
2827

28+
// w.Header().Set("Content-Type", "text/plain")
29+
// w.Write(body)
2930
w.Header().Set("Content-Type", "text/plain")
30-
w.Write(body)
31+
w.Write([]byte("Hello, World!"))
3132
}
3233

3334
func main() {

x/net/http/request.go

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/goplus/llgo/c"
1414
"github.com/goplus/llgo/rust/hyper"
15+
"github.com/goplus/llgo/c/libuv"
1516
)
1617

1718
type Request struct {
@@ -35,14 +36,14 @@ type Request struct {
3536
timeout time.Duration
3637
}
3738

38-
func (conn *conn) readRequest(srv *Server, hyperReq *hyper.Request) (*Request, error) {
39+
func readRequest(executor *hyper.Executor, hyperReq *hyper.Request, requestNotifyHandle *libuv.Async, remoteAddr string) (*Request, error) {
3940
println("[debug] readRequest called")
4041
req := Request{
4142
Header: make(Header),
4243
timeout: 0,
4344
Body: nil,
4445
}
45-
req.RemoteAddr = conn.remoteAddr
46+
req.RemoteAddr = remoteAddr
4647

4748
headers := hyperReq.Headers()
4849
if headers != nil {
@@ -135,39 +136,28 @@ func (conn *conn) readRequest(srv *Server, hyperReq *hyper.Request) (*Request, e
135136

136137
body := hyperReq.Body()
137138
if body != nil {
138-
task := body.Data()
139+
//task := body.Data()
139140
taskFlag := getBodyTask
140141

141-
requestBody := newRequestBody(conn.asyncHandle)
142+
requestBody := newRequestBody(requestNotifyHandle)
142143
req.Body = requestBody
143144

144145
taskData := taskData{
145146
hyperBody: body,
146147
responseBody: nil,
147148
requestBody: requestBody,
148149
taskFlag: taskFlag,
149-
server: srv,
150+
executor: executor,
150151
}
151-
task.SetUserdata(c.Pointer(&taskData), nil)
152152

153-
conn.asyncHandle.SetData(c.Pointer(&taskData))
153+
requestNotifyHandle.SetData(c.Pointer(&taskData))
154154
fmt.Println("[debug] async task set")
155-
if task != nil {
156-
r := srv.executor.Push(task)
157-
if r != hyper.OK {
158-
fmt.Printf("failed to push body foreach task: %d\n", r)
159-
task.Free()
160-
return nil, fmt.Errorf("failed to push body foreach task: %v", r)
161-
}
162-
} else {
163-
return nil, fmt.Errorf("failed to create body foreach task")
164-
}
165155

166156
} else {
167157
return nil, fmt.Errorf("failed to get request body")
168158
}
169159

170-
hyperReq.Free()
160+
//hyperReq.Free()
171161

172162
return &req, nil
173163
}

x/net/http/request_body.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package http
33
import (
44
"errors"
55
"fmt"
6-
"io"
76

87
"github.com/goplus/llgo/c/libuv"
98
)
@@ -31,33 +30,34 @@ func newRequestBody(asyncHandle *libuv.Async) *requestBody {
3130
}
3231

3332
func (rb *requestBody) Read(p []byte) (n int, err error) {
34-
fmt.Println("[debug] requestBody.Read called")
35-
// If there are still unread chunks, read them first
36-
if len(rb.chunk) > 0 {
37-
n = copy(p, rb.chunk)
38-
rb.chunk = rb.chunk[n:]
39-
return n, nil
33+
fmt.Println("[debug] RequestBody Read called")
34+
select {
35+
case <-rb.done:
36+
err = rb.readCloseError()
37+
return
38+
default:
4039
}
4140

42-
// Attempt to read a new chunk from a channel
43-
select {
44-
case chunk, ok := <-rb.readCh:
45-
if !ok {
46-
// The channel has been closed, indicating that all data has been read
47-
return 0, rb.readCloseError()
48-
}
49-
n = copy(p, chunk)
50-
if n < len(chunk) {
51-
// If the capacity of p is insufficient to hold the whole chunk, save the rest of the chunk
52-
rb.chunk = chunk[n:]
41+
for n < len(p) {
42+
if len(rb.chunk) == 0 {
43+
rb.asyncHandle.Send()
44+
fmt.Println("[debug] RequestBody Read asyncHandle.Send called")
45+
select {
46+
case chunk := <-rb.readCh:
47+
rb.chunk = chunk
48+
fmt.Println("[debug] RequestBody Read chunk received")
49+
case <-rb.done:
50+
err = rb.readCloseError()
51+
return
52+
}
5353
}
54-
fmt.Println("[debug] requestBody.Read async send")
55-
rb.asyncHandle.Send()
56-
return n, nil
57-
case <-rb.done:
58-
// If the done channel is closed, the read needs to be terminated
59-
return 0, rb.readCloseError()
54+
55+
copied := copy(p[n:], rb.chunk)
56+
n += copied
57+
rb.chunk = rb.chunk[copied:]
6058
}
59+
60+
return
6161
}
6262

6363
func (rb *requestBody) readCloseError() error {
@@ -69,8 +69,11 @@ func (rb *requestBody) readCloseError() error {
6969

7070
func (rb *requestBody) closeRead(err error) error {
7171
fmt.Println("[debug] RequestBody closeRead called")
72+
if rb.rerr != nil {
73+
return nil
74+
}
7275
if err == nil {
73-
err = io.EOF
76+
err = ErrClosedRequestBody
7477
}
7578
rb.rerr = err
7679
close(rb.done)

x/net/http/response.go

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ type response struct {
1414
statusCode int
1515
written bool
1616
body []byte
17-
server *Server
1817
hyperChannel *hyper.ResponseChannel
1918
hyperResp *hyper.Response
2019
}
@@ -29,7 +28,7 @@ type taskData struct {
2928
hyperBody *hyper.Body
3029
responseBody *responseBodyRaw
3130
requestBody *requestBody
32-
server *Server
31+
executor *hyper.Executor
3332
taskFlag taskFlag
3433
}
3534

@@ -42,16 +41,14 @@ const (
4241

4342
var DefaultChunkSize uintptr = 8192
4443

45-
func newResponse(server *Server, hyperChannel *hyper.ResponseChannel) *response {
44+
func newResponse(hyperChannel *hyper.ResponseChannel) *response {
4645
fmt.Printf("[debug] newResponse called\n")
4746

4847
return &response{
4948
header: make(Header),
50-
hyperChannel: hyperChannel,
51-
server: server,
52-
statusCode: 200,
5349
written: false,
54-
body: nil,
50+
statusCode: 200,
51+
hyperChannel: hyperChannel,
5552
hyperResp: hyper.NewResponse(),
5653
}
5754
}
@@ -84,7 +81,7 @@ func (r *response) WriteHeader(statusCode int) {
8481
fmt.Printf("[debug] < HTTP/1.1 %d\n", statusCode)
8582
for key, values := range r.header {
8683
for _, value := range values {
87-
fmt.Printf("< %s: %s\n", key, value)
84+
fmt.Printf("[debug] < %s: %s\n", key, value)
8885
}
8986
}
9087

@@ -116,8 +113,6 @@ func (r *response) finalize() error {
116113
r.WriteHeader(200)
117114
}
118115

119-
r.hyperResp = hyper.NewResponse()
120-
121116
if r.hyperResp == nil {
122117
return fmt.Errorf("failed to create response")
123118
}
@@ -134,9 +129,10 @@ func (r *response) finalize() error {
134129
return fmt.Errorf("failed to create body")
135130
}
136131
taskData := &taskData{
137-
hyperBody: nil,
132+
hyperBody: body,
138133
responseBody: &bodyData,
139-
server: r.server,
134+
requestBody: nil,
135+
executor: nil,
140136
taskFlag: setBodyTask,
141137
}
142138
body.SetDataFunc(setBodyDataFunc)
@@ -163,6 +159,7 @@ func setBodyDataFunc(userdata c.Pointer, ctx *hyper.Context, chunk **hyper.Buf)
163159
fmt.Println("[debug] taskData is nil")
164160
return hyper.PollError
165161
}
162+
fmt.Println("[debug] taskData is not nil")
166163
body := taskData.responseBody
167164

168165
if body.len > 0 {

0 commit comments

Comments
 (0)