diff --git a/go.mod b/go.mod index e893515..95f17e6 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/goplus/llgoexamples go 1.20 require ( - github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b + github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196 golang.org/x/net v0.28.0 ) diff --git a/go.sum b/go.sum index 5d7faad..08150d6 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b h1:iC0vVA8F2DNJ9wVyHI9fP9U0nM+si3LSQJ1TtGftXyo= -github.com/goplus/llgo v0.9.7-0.20240830010153-2434fd778f0b/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI= +github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196 h1:LckJktvgChf3x0eex+GT//JkYRj1uiT4uMLzyrg3ChU= +github.com/goplus/llgo v0.9.8-0.20240919105235-c6436ea6d196/go.mod h1:5Fs+08NslqofJ7xtOiIXugkurYOoQvY02ZkFNWA1uEI= golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= diff --git a/x/net/http/_demo/parallelRequest/parallelRequest.go b/x/net/http/_demo/parallelRequest/parallelRequest.go new file mode 100644 index 0000000..0bcb336 --- /dev/null +++ b/x/net/http/_demo/parallelRequest/parallelRequest.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "sync" + + "github.com/goplus/llgoexamples/x/net/http" +) + +func worker(id int, wg *sync.WaitGroup) { + defer wg.Done() + resp, err := http.Get("http://www.baidu.com") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(id, ":", resp.Status) + //body, err := io.ReadAll(resp.Body) + //if err != nil { + // fmt.Println(err) + // return + //} + //fmt.Println(string(body)) + resp.Body.Close() +} + +func main() { + var wait sync.WaitGroup + for i := 0; i < 500; i++ { + wait.Add(1) + go worker(i, &wait) + } + wait.Wait() + fmt.Println("All done") + + resp, err := http.Get("http://www.baidu.com") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(resp.Status) + resp.Body.Close() +} diff --git a/x/net/http/bodyChunk.go b/x/net/http/bodyChunk.go index c1d1072..01d9e74 100644 --- a/x/net/http/bodyChunk.go +++ b/x/net/http/bodyChunk.go @@ -2,73 +2,49 @@ package http import ( "errors" - "io" - "sync" "github.com/goplus/llgo/c/libuv" ) -type onceError struct { - sync.Mutex - err error -} - -func (a *onceError) Store(err error) { - a.Lock() - defer a.Unlock() - if a.err != nil { - return - } - a.err = err -} - -func (a *onceError) Load() error { - a.Lock() - defer a.Unlock() - return a.err -} - -func newBodyChunk(asyncHandle *libuv.Async) *bodyChunk { - return &bodyChunk{ - readCh: make(chan []byte, 1), - done: make(chan struct{}), - asyncHandle: asyncHandle, - } -} - type bodyChunk struct { chunk []byte readCh chan []byte asyncHandle *libuv.Async - once sync.Once done chan struct{} - rerr onceError + rerr error } var ( errClosedBodyChunk = errors.New("bodyChunk: read/write on closed body") ) +func newBodyChunk(asyncHandle *libuv.Async) *bodyChunk { + return &bodyChunk{ + readCh: make(chan []byte, 1), + done: make(chan struct{}), + asyncHandle: asyncHandle, + } +} + func (bc *bodyChunk) Read(p []byte) (n int, err error) { + select { + case <-bc.done: + err = bc.readCloseError() + return + default: + } + for n < len(p) { if len(bc.chunk) == 0 { + bc.asyncHandle.Send() select { - case chunk, ok := <-bc.readCh: - if !ok { - if n > 0 { - return n, nil - } - return 0, bc.readCloseError() - } + case chunk := <-bc.readCh: bc.chunk = chunk - bc.asyncHandle.Send() case <-bc.done: - if n > 0 { - return n, nil - } - return 0, io.EOF + err = bc.readCloseError() + return } } @@ -77,28 +53,28 @@ func (bc *bodyChunk) Read(p []byte) (n int, err error) { bc.chunk = bc.chunk[copied:] } - return n, nil + return } func (bc *bodyChunk) Close() error { - return bc.closeRead(nil) + return bc.closeWithError(nil) } func (bc *bodyChunk) readCloseError() error { - if rerr := bc.rerr.Load(); rerr != nil { + if rerr := bc.rerr; rerr != nil { return rerr } return errClosedBodyChunk } -func (bc *bodyChunk) closeRead(err error) error { +func (bc *bodyChunk) closeWithError(err error) error { + if bc.rerr != nil { + return nil + } if err == nil { - err = io.EOF + err = errClosedBodyChunk } - bc.rerr.Store(err) - bc.once.Do(func() { - close(bc.done) - }) - //close(bc.done) + bc.rerr = err + close(bc.done) return nil } diff --git a/x/net/http/client.go b/x/net/http/client.go index 7e26395..fa62732 100644 --- a/x/net/http/client.go +++ b/x/net/http/client.go @@ -11,8 +11,6 @@ import ( "reflect" "sort" "strings" - "sync" - "sync/atomic" "time" ) @@ -157,8 +155,7 @@ func (c *Client) do(req *Request) (retres *Response, reterr error) { URL: u, Header: make(Header), Host: host, - Cancel: ireq.Cancel, - ctx: ireq.ctx, + //Cancel: ireq.Cancel, timer: ireq.timer, timeoutch: ireq.timeoutch, @@ -307,16 +304,15 @@ func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, d forkReq() } - // TODO(spongehah) tmp timeout(send) + // TODO(hah) tmp timeout(send): LLGo has not yet implemented startTimer. //stopTimer, didTimeout := setRequestCancel(req, rt, deadline) req.timeoutch = make(chan struct{}, 1) req.deadline = deadline - req.ctx.Done() if deadline.IsZero() { didTimeout = alwaysFalse defer close(req.timeoutch) } else { - didTimeout = func() bool { return req.timer.GetDueIn() == 0 } + didTimeout = func() bool { return time.Now().After(deadline) } } resp, err = rt.RoundTrip(req) @@ -478,110 +474,83 @@ func (b *cancelTimerBody) Close() error { return err } -// knownRoundTripperImpl reports whether rt is a RoundTripper that's -// maintained by the Go team and known to implement the latest -// optional semantics (notably contexts). The Request is used -// to check whether this particular request is using an alternate protocol, -// in which case we need to check the RoundTripper for that protocol. -func knownRoundTripperImpl(rt RoundTripper, req *Request) bool { - switch t := rt.(type) { - case *Transport: - if altRT := t.alternateRoundTripper(req); altRT != nil { - return knownRoundTripperImpl(altRT, req) - } - return true - //case *http2Transport, http2noDialH2RoundTripper: - // return true - } - // There's a very minor chance of a false positive with this. - // Instead of detecting our golang.org/x/net/http2.Transport, - // it might detect a Transport type in a different http2 - // package. But I know of none, and the only problem would be - // some temporarily leaked goroutines if the transport didn't - // support contexts. So this is a good enough heuristic: - if reflect.TypeOf(rt).String() == "*http2.Transport" { - return true - } - return false -} - -// setRequestCancel sets req.Cancel and adds a deadline context to req -// if deadline is non-zero. The RoundTripper's type is used to -// determine whether the legacy CancelRequest behavior should be used. +//// setRequestCancel sets req.Cancel and adds a deadline context to req +//// if deadline is non-zero. The RoundTripper's type is used to +//// determine whether the legacy CancelRequest behavior should be used. +//// +//// As background, there are three ways to cancel a request: +//// First was Transport.CancelRequest. (deprecated) +//// Second was Request.Cancel. +//// Third was Request.Context. +//// This function populates the second and third, and uses the first if it really needs to. +//func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) { +// if deadline.IsZero() { +// return nop, alwaysFalse +// } +// knownTransport := knownRoundTripperImpl(rt, req) +// oldCtx := req.Context() // -// As background, there are three ways to cancel a request: -// First was Transport.CancelRequest. (deprecated) -// Second was Request.Cancel. -// Third was Request.Context. -// This function populates the second and third, and uses the first if it really needs to. -func setRequestCancel(req *Request, rt RoundTripper, deadline time.Time) (stopTimer func(), didTimeout func() bool) { - if deadline.IsZero() { - return nop, alwaysFalse - } - knownTransport := knownRoundTripperImpl(rt, req) - oldCtx := req.Context() - - if req.Cancel == nil && knownTransport { - // If they already had a Request.Context that's - // expiring sooner, do nothing: - if !timeBeforeContextDeadline(deadline, oldCtx) { - return nop, alwaysFalse - } - - var cancelCtx func() - req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline) - return cancelCtx, func() bool { return time.Now().After(deadline) } - } - initialReqCancel := req.Cancel // the user's original Request.Cancel, if any - - var cancelCtx func() - if timeBeforeContextDeadline(deadline, oldCtx) { - req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline) - } - - cancel := make(chan struct{}) - req.Cancel = cancel - - doCancel := func() { - // The second way in the func comment above: - close(cancel) - // The first way, used only for RoundTripper - // implementations written before Go 1.5 or Go 1.6. - type canceler interface{ CancelRequest(*Request) } - if v, ok := rt.(canceler); ok { - v.CancelRequest(req) - } - } - - stopTimerCh := make(chan struct{}) - var once sync.Once - stopTimer = func() { - once.Do(func() { - close(stopTimerCh) - if cancelCtx != nil { - cancelCtx() - } - }) - } - - timer := time.NewTimer(time.Until(deadline)) - var timedOut atomic.Bool - - go func() { - select { - case <-initialReqCancel: - doCancel() - timer.Stop() - case <-timer.C: - timedOut.Store(true) - doCancel() - case <-stopTimerCh: - timer.Stop() - } - }() - - return stopTimer, timedOut.Load -} +// if req.Cancel == nil && knownTransport { +// // If they already had a Request.Context that's +// // expiring sooner, do nothing: +// if !timeBeforeContextDeadline(deadline, oldCtx) { +// return nop, alwaysFalse +// } +// +// var cancelCtx func() +// req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline) +// return cancelCtx, func() bool { return time.Now().After(deadline) } +// } +// initialReqCancel := req.Cancel // the user's original Request.Cancel, if any +// +// var cancelCtx func() +// if timeBeforeContextDeadline(deadline, oldCtx) { +// req.ctx, cancelCtx = context.WithDeadline(oldCtx, deadline) +// } +// +// cancel := make(chan struct{}) +// req.Cancel = cancel +// +// doCancel := func() { +// // The second way in the func comment above: +// close(cancel) +// // The first way, used only for RoundTripper +// // implementations written before Go 1.5 or Go 1.6. +// type canceler interface{ CancelRequest(*Request) } +// if v, ok := rt.(canceler); ok { +// v.CancelRequest(req) +// } +// } +// +// stopTimerCh := make(chan struct{}) +// var once sync.Once +// stopTimer = func() { +// once.Do(func() { +// close(stopTimerCh) +// if cancelCtx != nil { +// cancelCtx() +// } +// }) +// } +// +// timer := time.NewTimer(time.Until(deadline)) +// var timedOut atomic.Bool +// +// go func() { +// select { +// case <-initialReqCancel: +// doCancel() +// timer.Stop() +// case <-timer.C: +// timedOut.Store(true) +// doCancel() +// case <-stopTimerCh: +// timer.Stop() +// } +// }() +// +// return stopTimer, timedOut.Load +//} // timeBeforeContextDeadline reports whether the non-zero Time t is // before ctx's deadline, if any. If ctx does not have a deadline, it @@ -594,7 +563,7 @@ func timeBeforeContextDeadline(t time.Time, ctx context.Context) bool { return t.Before(d) } -/*// knownRoundTripperImpl reports whether rt is a RoundTripper that's +// knownRoundTripperImpl reports whether rt is a RoundTripper that's // maintained by the Go team and known to implement the latest // optional semantics (notably contexts). The Request is used // to check whether this particular request is using an alternate protocol, @@ -619,7 +588,7 @@ func knownRoundTripperImpl(rt RoundTripper, req *Request) bool { return true } return false -}*/ +} // makeHeadersCopier makes a function that copies headers from the // initial Request, ireq. For every redirect, this function must be called diff --git a/x/net/http/request.go b/x/net/http/request.go index e9279fc..37d6408 100644 --- a/x/net/http/request.go +++ b/x/net/http/request.go @@ -2,7 +2,6 @@ package http import ( "bytes" - "context" "errors" "fmt" "io" @@ -31,20 +30,16 @@ type Request struct { TransferEncoding []string Close bool Host string - //Form url.Values - //PostForm url.Values - //MultipartForm *multipart.Form - Trailer Header + // Form url.Values + // PostForm url.Values + // MultipartForm *multipart.Form RemoteAddr string RequestURI string - //TLS *tls.ConnectionState - Cancel <-chan struct{} Response *Response - ctx context.Context deadline time.Time - timeoutch chan struct{} //tmp timeout + timeoutch chan struct{} timer *libuv.Timer } @@ -75,34 +70,8 @@ var reqWriteExcludeHeader = map[string]bool{ type requestBodyReadError struct{ error } // NewRequest wraps NewRequestWithContext using context.Background. -func NewRequest(method, url string, body io.Reader) (*Request, error) { - return NewRequestWithContext(context.Background(), method, url, body) -} - -// NewRequestWithContext returns a new Request given a method, URL, and -// optional body. -// -// If the provided body is also an io.Closer, the returned -// Request.Body is set to body and will be closed by the Client -// methods Do, Post, and PostForm, and Transport.RoundTrip. -// -// NewRequestWithContext returns a Request suitable for use with -// Client.Do or Transport.RoundTrip. To create a request for use with -// testing a Server Handler, either use the NewRequest function in the -// net/http/httptest package, use ReadRequest, or manually update the -// Request fields. For an outgoing client request, the context -// controls the entire lifetime of a request and its response: -// obtaining a connection, sending the request, and reading the -// response headers and body. See the Request type's documentation for -// the difference between inbound and outbound request fields. -// -// If body is of type *bytes.Buffer, *bytes.Reader, or -// *strings.Reader, the returned request's ContentLength is set to its -// exact value (instead of -1), GetBody is populated (so 307 and 308 -// redirects can replay the body), and Body is set to NoBody if the -// ContentLength is 0. -func NewRequestWithContext(ctx context.Context, method, urlStr string, body io.Reader) (*Request, error) { - // TODO(spongehah) Hyper only supports http +func NewRequest(method, urlStr string, body io.Reader) (*Request, error) { + // TODO(hah) Hyper only supports http isHttpPrefix := strings.HasPrefix(urlStr, "http://") isHttpsPrefix := strings.HasPrefix(urlStr, "https://") if !isHttpPrefix && !isHttpsPrefix { @@ -121,9 +90,6 @@ func NewRequestWithContext(ctx context.Context, method, urlStr string, body io.R if !validMethod(method) { return nil, fmt.Errorf("net/http: invalid method %q", method) } - if ctx == nil { - return nil, errors.New("net/http: nil Context") - } u, err := url.Parse(urlStr) if err != nil { return nil, err @@ -135,7 +101,6 @@ func NewRequestWithContext(ctx context.Context, method, urlStr string, body io.R // The host's colon:port should be normalized. See Issue 14836. u.Host = removeEmptyPort(u.Host) req := &Request{ - ctx: ctx, Method: method, URL: u, Proto: "HTTP/1.1", @@ -228,24 +193,6 @@ func (r *Request) isReplayable() bool { return false } -// Context returns the request's context. To change the context, use -// Clone or WithContext. -// -// The returned context is always non-nil; it defaults to the -// background context. -// -// For outgoing client requests, the context controls cancellation. -// -// For incoming server requests, the context is canceled when the -// client's connection closes, the request is canceled (with HTTP/2), -// or when the ServeHTTP method returns. -func (r *Request) Context() context.Context { - if r.ctx != nil { - return r.ctx - } - return context.Background() -} - // AddCookie adds a cookie to the request. Per RFC 6265 section 5.4, // AddCookie does not attach more than one Cookie header field. That // means all cookies, if any, are written into the same line, @@ -300,7 +247,11 @@ func (r *Request) write(client *hyper.ClientConn, taskData *taskData, exec *hype } // Send it! sendTask := client.Send(hyperReq) - sendTask.SetUserdata(c.Pointer(taskData)) + if sendTask == nil { + println("############### write: sendTask is nil") + return errors.New("failed to send the request") + } + sendTask.SetUserdata(c.Pointer(taskData), nil) sendRes := exec.Push(sendTask) if sendRes != hyper.OK { err = errors.New("failed to send the request") @@ -424,7 +375,7 @@ func (r *Request) newHyperRequest(usingProxy bool, extraHeader Header, treq *tra // Wait for 100-continue if expected. if r.ProtoAtLeast(1, 1) && r.Body != nil && r.expectsContinue() { - hyperReq.OnInformational(printInformational, nil) + hyperReq.OnInformational(printInformational, nil, nil) } // Write body and trailer diff --git a/x/net/http/response.go b/x/net/http/response.go index a3a96fc..da7c3e4 100644 --- a/x/net/http/response.go +++ b/x/net/http/response.go @@ -81,13 +81,19 @@ func (r *Response) checkRespBody(taskData *taskData) (needContinue bool) { select { case taskData.resc <- responseAndError{res: r}: case <-taskData.callerGone: - readLoopDefer(pc, true) + if debugSwitch { + println("############### checkRespBody callerGone") + } + closeAndRemoveIdleConn(pc, true) return true } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) - readLoopDefer(pc, false) + if debugSwitch { + println("############### checkRespBody return") + } + closeAndRemoveIdleConn(pc, false) return true } return false @@ -97,6 +103,17 @@ func (r *Response) wrapRespBody(taskData *taskData) { body := &bodyEOFSignal{ body: r.Body, earlyCloseFn: func() error { + // If the response body is closed prematurely, + // the hyperBody needs to be recycled and the persistConn needs to be handled. + taskData.closeHyperBody() + select { + case <-taskData.pc.closech: + taskData.pc.t.removeIdleConn(taskData.pc) + default: + } + replaced := taskData.pc.t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool + taskData.pc.alive = taskData.pc.alive && + replaced && taskData.pc.tryPutIdleConn() return nil }, fn: func(err error) error { @@ -110,7 +127,7 @@ func (r *Response) wrapRespBody(taskData *taskData) { }, } r.Body = body - // TODO(spongehah) gzip(wrapRespBody) + // TODO(hah) gzip(wrapRespBody): The compress/gzip library still has a bug. An exception occurs when calling gzip.NewReader(). //if taskData.addedGzip && EqualFold(r.Header.Get("Content-Encoding"), "gzip") { // println("gzip reader") // r.Body = &gzipReader{body: body} diff --git a/x/net/http/server.go b/x/net/http/server.go index 5c4c58d..f38cbd0 100644 --- a/x/net/http/server.go +++ b/x/net/http/server.go @@ -10,10 +10,3 @@ package http // size is anyway. (if we have the bytes on the machine, we might as // well read them) const maxPostHandlerReadBytes = 256 << 10 - -type readResult struct { - _ incomparable - n int - err error - b byte // byte read, if n == 1 -} diff --git a/x/net/http/transfer.go b/x/net/http/transfer.go index 818fb3c..12f3d70 100644 --- a/x/net/http/transfer.go +++ b/x/net/http/transfer.go @@ -28,7 +28,6 @@ type transferReader struct { ContentLength int64 Chunked bool Close bool - Trailer Header } // parseTransferEncoding sets t.Chunked based on the Transfer-Encoding header. @@ -151,10 +150,6 @@ func readTransfer(msg any, r io.ReadCloser) (err error) { t.ContentLength = realLength } - // TODO(spongehah) Trailer(readTransfer) - // Trailer - //t.Trailer, err = fixTrailer(t.Header, t.Chunked) - // If there is no Content-Length or chunked Transfer-Encoding on a *Response // and the status is not 1xx, 204 or 304, then the body is unbounded. // See RFC 7230, section 3.3. @@ -301,48 +296,6 @@ func parseContentLength(cl string) (int64, error) { } -// Parse the trailer header. -func fixTrailer(header Header, chunked bool) (Header, error) { - vv, ok := header["Trailer"] - if !ok { - return nil, nil - } - if !chunked { - // Trailer and no chunking: - // this is an invalid use case for trailer header. - // Nevertheless, no error will be returned and we - // let users decide if this is a valid HTTP message. - // The Trailer header will be kept in Response.Header - // but not populate Response.Trailer. - // See issue #27197. - return nil, nil - } - header.Del("Trailer") - - trailer := make(Header) - var err error - for _, v := range vv { - foreachHeaderElement(v, func(key string) { - key = CanonicalHeaderKey(key) - switch key { - case "Transfer-Encoding", "Trailer", "Content-Length": - if err == nil { - err = badStringError("bad trailer key", key) - return - } - } - trailer[key] = nil - }) - } - if err != nil { - return nil, err - } - if len(trailer) == 0 { - return nil, nil - } - return trailer, nil -} - // body turns a Reader into a ReadCloser. // Close ensures that the body has been fully read // and then reads the trailer if necessary. @@ -387,16 +340,6 @@ func (b *body) readLocked(p []byte) (n int, err error) { b.sawEOF = true // Chunked case. Read the trailer. if b.hdr != nil { - // TODO(spongehah) Trailer(b.readLocked) - //if e := b.readTrailer(); e != nil { - // err = e - // // Something went wrong in the trailer, we must not allow any - // // further reads of any kind to succeed from body, nor any - // // subsequent requests on the server connection. See - // // golang.org/issue/12027 - // b.sawEOF = false - // b.closed = true - //} b.hdr = nil } else { // If the server declared the Content-Length, our body is a LimitedReader @@ -634,7 +577,6 @@ func (r *Request) writeHeader(reqHeaders *hyper.Headers) error { // 'Content-Length' and 'Transfer-Encoding:chunked' are already handled by hyper // Write Trailer header - // TODO(spongehah) Trailer(writeHeader) return nil } @@ -682,7 +624,7 @@ func (r *Request) writeBody(hyperReq *hyper.Request, treq *transportRequest) err buf: buf, treq: treq, } - hyperReqBody.SetUserdata(c.Pointer(reqData)) + hyperReqBody.SetUserdata(c.Pointer(reqData), nil) hyperReqBody.SetDataFunc(setPostData) hyperReq.SetBody(hyperReqBody) } diff --git a/x/net/http/transport.go b/x/net/http/transport.go index 8075133..e47bd2a 100644 --- a/x/net/http/transport.go +++ b/x/net/http/transport.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "hash/fnv" "io" "log" "net/url" @@ -27,7 +28,6 @@ import ( // as directed by the environment variables HTTP_PROXY, HTTPS_PROXY // and NO_PROXY (or the lowercase versions thereof). var DefaultTransport RoundTripper = &Transport{ - //Proxy: ProxyFromEnvironment, Proxy: nil, MaxIdleConns: 100, IdleConnTimeout: 90 * time.Second, @@ -36,6 +36,7 @@ var DefaultTransport RoundTripper = &Transport{ // DefaultMaxIdleConnsPerHost is the default value of Transport's // MaxIdleConnsPerHost. const DefaultMaxIdleConnsPerHost = 2 +const _SC_NPROCESSORS_ONLN c.Int = 58 // Debug switch provided for developers const ( @@ -69,11 +70,10 @@ type Transport struct { MaxConnsPerHost int IdleConnTimeout time.Duration - // libuv and hyper related - loopInitOnce sync.Once - loop *libuv.Loop - async *libuv.Async - exec *hyper.Executor + loopsMu sync.Mutex + loops []*clientEventLoop + isClosing atomic.Bool + //curLoop atomic.Uint32 } // A cancelKey is the key of the reqCanceler map. @@ -183,6 +183,9 @@ func (tr *transportRequest) setError(err error) { func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { if err := t.tryPutIdleConn(pconn); err != nil { + if debugSwitch { + println("############### putOrCloseIdleConn: close") + } pconn.close(err) } } @@ -274,6 +277,9 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { t.idleLRU.add(pconn) if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() + if debugSwitch { + println("############### tryPutIdleConn: removeOldest") + } oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } @@ -287,7 +293,7 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { pconn.idleTimer.Start(onIdleConnTimeout, idleConnTimeout, 0) } else { pconn.idleTimer = &libuv.Timer{} - libuv.InitTimer(t.loop, pconn.idleTimer) + libuv.InitTimer(pconn.eventLoop.loop, pconn.idleTimer) (*libuv.Handle)(c.Pointer(pconn.idleTimer)).SetData(c.Pointer(pconn)) pconn.idleTimer.Start(onIdleConnTimeout, idleConnTimeout, 0) } @@ -343,7 +349,9 @@ func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { // See whether this connection has been idle too long, considering // only the wall time (the Round(0)), in case this is a laptop or VM // coming out of suspend with previously cached idle connections. - tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) + // FIXME: Round() is not supported in llgo + //tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) + tooOld := !oldTime.IsZero() && pconn.idleAt.Before(oldTime) if tooOld { // Async cleanup. Launch in its own goroutine (as if a // time.AfterFunc called it); it acquires idleMu, which we're @@ -403,9 +411,10 @@ func (t *Transport) removeIdleConn(pconn *persistConn) bool { // t.idleMu must be held. func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool { - if pconn.idleTimer != nil { + if pconn.idleTimer != nil && (*libuv.Handle)(c.Pointer(pconn.idleTimer)).IsClosing() == 0 { pconn.idleTimer.Stop() (*libuv.Handle)(c.Pointer(pconn.idleTimer)).Close(nil) + pconn.idleTimer = nil } t.idleLRU.remove(pconn) key := pconn.cacheKey @@ -467,13 +476,14 @@ func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool { return true } -func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) { +func (t *Transport) connectMethodForRequest(treq *transportRequest, loop *clientEventLoop) (cm connectMethod, err error) { cm.targetScheme = treq.URL.Scheme cm.targetAddr = canonicalAddr(treq.URL) if t.Proxy != nil { cm.proxyURL, err = t.Proxy(treq.Request) } cm.onlyH1 = treq.requiresHTTP1() + cm.eventLoop = loop return cm, err } @@ -524,25 +534,56 @@ func (t *Transport) cancelRequest(key cancelKey, err error) bool { return cancel != nil } -func (t *Transport) close(err error) { - t.reqMu.Lock() - defer t.reqMu.Unlock() - t.closeLocked(err) +func (t *Transport) Close() { + if t != nil && !t.isClosing.Swap(true) { + t.CloseIdleConnections() + for _, el := range t.loops { + el.Close() + } + } } -func (t *Transport) closeLocked(err error) { - if err != nil { - fmt.Println(err) - } - if t.loop != nil { - t.loop.Close() - } - if t.async != nil { - t.async.Close(nil) +type clientEventLoop struct { + // libuv and hyper related + loop *libuv.Loop + async *libuv.Async + exec *hyper.Executor + isRunning atomic.Bool + isClosing atomic.Bool +} + +func (el *clientEventLoop) Close() { + if el != nil && !el.isClosing.Swap(true) { + if el.loop != nil && (*libuv.Handle)(c.Pointer(el.loop)).IsClosing() == 0 { + el.loop.Close() + el.loop = nil + } + if el.async != nil && (*libuv.Handle)(c.Pointer(el.async)).IsClosing() == 0 { + el.async.Close(nil) + el.async = nil + } + if el.exec != nil { + el.exec.Free() + el.exec = nil + } } - if t.exec != nil { - t.exec.Free() +} + +func (el *clientEventLoop) run() { + if el.isRunning.Load() { + return } + + el.loop.Async(el.async, nil) + + checker := &libuv.Idle{} + libuv.InitIdle(el.loop, checker) + (*libuv.Handle)(c.Pointer(checker)).SetData(c.Pointer(el)) + checker.Start(readWriteLoop) + + go el.loop.Run(libuv.RUN_DEFAULT) + + el.isRunning.Store(true) } // ---------------------------------------------------------- @@ -556,26 +597,65 @@ func getMilliseconds(deadline time.Time) uint64 { return uint64(milliseconds) } +var cpuCount int + +func init() { + cpuCount = int(c.Sysconf(_SC_NPROCESSORS_ONLN)) + if cpuCount <= 0 { + cpuCount = 4 + } +} + +func (t *Transport) getOrInitClientEventLoop(i uint32) *clientEventLoop { + if el := t.loops[i]; el != nil { + return el + } + + eventLoop := &clientEventLoop{ + loop: libuv.LoopNew(), + async: &libuv.Async{}, + exec: hyper.NewExecutor(), + } + + eventLoop.run() + + t.loops[i] = eventLoop + return eventLoop +} + +func (t *Transport) getClientEventLoop(req *Request) *clientEventLoop { + t.loopsMu.Lock() + defer t.loopsMu.Unlock() + if t.loops == nil { + t.loops = make([]*clientEventLoop, cpuCount) + } + + key := t.getLoopKey(req) + h := fnv.New32a() + h.Write([]byte(key)) + hashcode := h.Sum32() + + return t.getOrInitClientEventLoop(hashcode % uint32(cpuCount)) + //i := (t.curLoop.Add(1) - 1) % uint32(cpuCount) + //return t.getOrInitClientEventLoop(i) +} + +func (t *Transport) getLoopKey(req *Request) string { + proxyStr := "" + if t.Proxy != nil { + proxyURL, _ := t.Proxy(req) + proxyStr = proxyURL.String() + } + return req.URL.String() + proxyStr +} + func (t *Transport) RoundTrip(req *Request) (*Response, error) { if debugSwitch { println("############### RoundTrip start") defer println("############### RoundTrip end") } - t.loopInitOnce.Do(func() { - println("############### init loop") - t.loop = libuv.LoopNew() - t.async = &libuv.Async{} - t.exec = hyper.NewExecutor() - - t.loop.Async(t.async, nil) - checker := &libuv.Check{} - libuv.InitCheck(t.loop, checker) - (*libuv.Handle)(c.Pointer(checker)).SetData(c.Pointer(t)) - checker.Start(readWriteLoop) - - go t.loop.Run(libuv.RUN_DEFAULT) - }) + eventLoop := t.getClientEventLoop(req) // If timeout is set, start the timer var didTimeout func() bool @@ -583,7 +663,7 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { // Only the first request will initialize the timer if req.timer == nil && !req.deadline.IsZero() { req.timer = &libuv.Timer{} - libuv.InitTimer(t.loop, req.timer) + libuv.InitTimer(eventLoop.loop, req.timer) ch := &timeoutData{ timeoutch: req.timeoutch, taskData: nil, @@ -598,7 +678,9 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { stopTimer = func() { close(req.timeoutch) req.timer.Stop() - (*libuv.Handle)(c.Pointer(req.timer)).Close(nil) + if (*libuv.Handle)(c.Pointer(req.timer)).IsClosing() == 0 { + (*libuv.Handle)(c.Pointer(req.timer)).Close(nil) + } if debugSwitch { println("############### timer close") } @@ -608,7 +690,7 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { stopTimer = nop } - resp, err := t.doRoundTrip(req) + resp, err := t.doRoundTrip(req, eventLoop) if err != nil { stopTimer() return nil, err @@ -624,7 +706,7 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { return resp, nil } -func (t *Transport) doRoundTrip(req *Request) (*Response, error) { +func (t *Transport) doRoundTrip(req *Request, loop *clientEventLoop) (*Response, error) { if debugSwitch { println("############### doRoundTrip start") defer println("############### doRoundTrip end") @@ -687,12 +769,6 @@ func (t *Transport) doRoundTrip(req *Request) (*Response, error) { } for { - //select { - //case <-ctx.Done(): - // req.closeBody() - // return nil, ctx.Err() - //default: - //} select { case <-req.timeoutch: req.closeBody() @@ -703,7 +779,7 @@ func (t *Transport) doRoundTrip(req *Request) (*Response, error) { // treq gets modified by roundTrip, so we need to recreate for each retry. //treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} treq := &transportRequest{Request: req, cancelKey: cancelKey} - cm, err := t.connectMethodForRequest(treq) + cm, err := t.connectMethodForRequest(treq, loop) if err != nil { req.closeBody() return nil, err @@ -716,6 +792,7 @@ func (t *Transport) doRoundTrip(req *Request) (*Response, error) { pconn, err := t.getConn(treq, cm) if err != nil { + println("################# getConn err != nil") t.setReqCanceler(cancelKey, nil) req.closeBody() return nil, err @@ -827,10 +904,6 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi // what caused w.err; if so, prefer to return the // cancellation error (see golang.org/issue/16049). select { - //case <-req.Cancel: - // return nil, errRequestCanceledConn - //case <-req.Context().Done(): - // return nil, req.Context().Err() case <-req.timeoutch: if debugSwitch { println("############### getConn: timeoutch") @@ -977,68 +1050,23 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * writeLoopDone: make(chan struct{}, 1), alive: true, chunkAsync: &libuv.Async{}, + eventLoop: cm.eventLoop, } - t.loop.Async(pconn.chunkAsync, readyToRead) + cm.eventLoop.loop.Async(pconn.chunkAsync, readyToRead) - //trace := httptrace.ContextClientTrace(ctx) - //wrapErr := func(err error) error { - // if cm.proxyURL != nil { - // // Return a typed error, per Issue 16997 - // return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} - // } - // return err - //} - // - //if cm.scheme() == "https" && t.hasCustomTLSDialer() { - // var err error - // pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr()) - // if err != nil { - // return nil, wrapErr(err) - // } - // if tc, ok := pconn.conn.(*tls.Conn); ok { - // // Handshake here, in case DialTLS didn't. TLSNextProto below - // // depends on it for knowing the connection state. - // if trace != nil && trace.TLSHandshakeStart != nil { - // trace.TLSHandshakeStart() - // } - // if err := tc.HandshakeContext(ctx); err != nil { - // go pconn.conn.Close() - // if trace != nil && trace.TLSHandshakeDone != nil { - // trace.TLSHandshakeDone(tls.ConnectionState{}, err) - // } - // return nil, err - // } - // cs := tc.ConnectionState() - // if trace != nil && trace.TLSHandshakeDone != nil { - // trace.TLSHandshakeDone(cs, nil) - // } - // pconn.tlsState = &cs - // } - //} else { - //conn, err := t.dial(ctx, "tcp", cm.addr()) - conn, err := t.dial(cm.addr()) + conn, err := t.dial(cm) if err != nil { return nil, err } pconn.conn = conn - //if cm.scheme() == "https" { - // var firstTLSHost string - // if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { - // return nil, wrapErr(err) - // } - // if err = pconn.addTLS(ctx, firstTLSHost, trace); err != nil { - // return nil, wrapErr(err) - // } - //} - //} select { case <-timeoutch: conn.Close() return default: } - // TODO(spongehah) Proxy(https/sock5)(t.dialConn) + // TODO(hah) Proxy(https/sock5)(t.dialConn) // Proxy setup. switch { case cm.proxyURL == nil: @@ -1054,41 +1082,14 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * // case cm.targetScheme == "https": } - //if cm.proxyURL != nil && cm.targetScheme == "https" { - // if err := pconn.addTLS(ctx, cm.tlsHost(), trace); err != nil { - // return nil, err - // } - //} - // - //if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { - // if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { - // alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) - // if e, ok := alt.(erringRoundTripper); ok { - // // pconn.conn was closed by next (http2configureTransports.upgradeFn). - // return nil, e.RoundTripErr() - // } - // return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil - // } - //} - pconn.closeErr = errReadLoopExiting - pconn.tryPutIdleConn = func() bool { - if err := pconn.t.tryPutIdleConn(pconn); err != nil { - pconn.closeErr = err - //if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { - // trace.PutIdleConn(err) - //} - return false - } - //if trace != nil && trace.PutIdleConn != nil { - // trace.PutIdleConn(nil) - //} - return true - } select { case <-timeoutch: err = errors.New("[t.dialConn] request timeout") + if debugSwitch { + println("############### dialConn: timeoutch") + } pconn.close(err) return nil, err default: @@ -1096,11 +1097,12 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * return pconn, nil } -func (t *Transport) dial(addr string) (*connData, error) { +func (t *Transport) dial(cm connectMethod) (*connData, error) { if debugSwitch { println("############### dial start") defer println("############### dial end") } + addr := cm.addr() host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -1108,8 +1110,8 @@ func (t *Transport) dial(addr string) (*connData, error) { conn := new(connData) - libuv.InitTcp(t.loop, &conn.TcpHandle) - (*libuv.Handle)(c.Pointer(&conn.TcpHandle)).SetData(c.Pointer(conn)) + libuv.InitTcp(cm.eventLoop.loop, &conn.tcpHandle) + (*libuv.Handle)(c.Pointer(&conn.tcpHandle)).SetData(c.Pointer(conn)) var hints cnet.AddrInfo c.Memset(c.Pointer(&hints), 0, unsafe.Sizeof(hints)) @@ -1122,8 +1124,8 @@ func (t *Transport) dial(addr string) (*connData, error) { return nil, fmt.Errorf("getaddrinfo error\n") } - (*libuv.Req)(c.Pointer(&conn.ConnectReq)).SetData(c.Pointer(conn)) - status = libuv.TcpConnect(&conn.ConnectReq, &conn.TcpHandle, res.Addr, onConnect) + (*libuv.Req)(c.Pointer(&conn.connectReq)).SetData(c.Pointer(conn)) + status = libuv.TcpConnect(&conn.connectReq, &conn.tcpHandle, res.Addr, onConnect) if status != 0 { return nil, fmt.Errorf("connect error: %s\n", c.GoString(libuv.Strerror(libuv.Errno(status)))) } @@ -1179,33 +1181,32 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err resc: resc, } - if pc.client == nil && !pc.isReused() { - // Hookup the IO - hyperIo := newHyperIo(pc.conn) - // We need an executor generally to poll futures - // Prepare client options - opts := hyper.NewClientConnOptions() - opts.Exec(pc.t.exec) - // send the handshake - handshakeTask := hyper.Handshake(hyperIo, opts) - taskData.taskId = handshake - handshakeTask.SetUserdata(c.Pointer(taskData)) - // Send the request to readWriteLoop(). - pc.t.exec.Push(handshakeTask) - } else { - taskData.taskId = read - err = req.write(pc.client, taskData, pc.t.exec) - if err != nil { - writeErrCh <- err - } - } + //if pc.client == nil && !pc.isReused() { + // Hookup the IO + hyperIo := newHyperIo(pc.conn) + // We need an executor generally to poll futures + // Prepare client options + opts := hyper.NewClientConnOptions() + opts.Exec(pc.eventLoop.exec) + // send the handshake + handshakeTask := hyper.Handshake(hyperIo, opts) + taskData.taskId = handshake + handshakeTask.SetUserdata(c.Pointer(taskData), nil) + // Send the request to readWriteLoop(). + pc.eventLoop.exec.Push(handshakeTask) + //} else { + // println("############### roundTrip: pc.client != nil") + // taskData.taskId = read + // err = req.write(pc.client, taskData, pc.eventLoop.exec) + // if err != nil { + // writeErrCh <- err + // pc.close(err) + // } + //} // Wake up libuv. Loop - pc.t.async.Send() + pc.eventLoop.async.Send() - //var respHeaderTimer <-chan time.Time - //cancelChan := req.Request.Cancel - //ctxDoneChan := req.Context().Done() timeoutch := req.timeoutch pcClosed := pc.closech canceled := false @@ -1221,6 +1222,9 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err println("############### roundTrip: writeErrch") } if err != nil { + if debugSwitch { + println("############### roundTrip: writeErrch err != nil") + } pc.close(fmt.Errorf("write error: %w", err)) if pc.conn.nwrite == startBytesWritten { err = nothingWrittenError{err} @@ -1247,13 +1251,6 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil - //case <-cancelChan: - // canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled) - // cancelChan = nil - //case <-ctxDoneChan: - // canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err()) - // cancelChan = nil - // ctxDoneChan = nil case <-timeoutch: if debugSwitch { println("############### roundTrip: timeoutch") @@ -1267,21 +1264,21 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err // readWriteLoop handles the main I/O loop for a persistent connection. // It processes incoming requests, sends them to the server, and handles responses. -func readWriteLoop(checker *libuv.Check) { - t := (*Transport)((*libuv.Handle)(c.Pointer(checker)).GetData()) +func readWriteLoop(checker *libuv.Idle) { + eventLoop := (*clientEventLoop)((*libuv.Handle)(c.Pointer(checker)).GetData()) // The polling state machine! Poll all ready tasks and act on them... - task := t.exec.Poll() + task := eventLoop.exec.Poll() for task != nil { if debugSwitch { println("############### polling") } - t.handleTask(task) - task = t.exec.Poll() + eventLoop.handleTask(task) + task = eventLoop.exec.Poll() } } -func (t *Transport) handleTask(task *hyper.Task) { +func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { taskData := (*taskData)(task.Userdata()) if taskData == nil { // A background task for hyper_client completed... @@ -1293,7 +1290,10 @@ func (t *Transport) handleTask(task *hyper.Task) { // If original taskId is set, we need to check it err = checkTaskType(task, taskData) if err != nil { - readLoopDefer(pc, true) + if debugSwitch { + println("############### handleTask: checkTaskType err != nil") + } + closeAndRemoveIdleConn(pc, true) return } switch taskData.taskId { @@ -1313,13 +1313,16 @@ func (t *Transport) handleTask(task *hyper.Task) { pc.client = (*hyper.ClientConn)(task.Value()) task.Free() - // TODO(spongehah) Proxy(writeLoop) + // TODO(hah) Proxy(writeLoop) taskData.taskId = read - err = taskData.req.Request.write(pc.client, taskData, t.exec) + err = taskData.req.Request.write(pc.client, taskData, eventLoop.exec) if err != nil { //pc.writeErrCh <- err // to the body reader, which might recycle us taskData.writeErrCh <- err // to the roundTrip function + if debugSwitch { + println("############### handleTask: write err != nil") + } pc.close(err) return } @@ -1332,6 +1335,20 @@ func (t *Transport) handleTask(task *hyper.Task) { println("############### read") } + pc.tryPutIdleConn = func() bool { + if err := pc.t.tryPutIdleConn(pc); err != nil { + pc.closeErr = err + //if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { + // trace.PutIdleConn(err) + //} + return false + } + //if trace != nil && trace.PutIdleConn != nil { + // trace.PutIdleConn(nil) + //} + return true + } + // Take the results hyperResp := (*hyper.Response)(task.Value()) task.Free() @@ -1340,7 +1357,10 @@ func (t *Transport) handleTask(task *hyper.Task) { if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(hyperResp, err) pc.mu.Unlock() - readLoopDefer(pc, true) + if debugSwitch { + println("############### handleTask: numExpectedResponses == 0") + } + closeAndRemoveIdleConn(pc, true) return } //pc.mu.Unlock() @@ -1361,20 +1381,25 @@ func (t *Transport) handleTask(task *hyper.Task) { hyperResp.Free() if err != nil { + pc.bodyChunk.closeWithError(err) + taskData.closeHyperBody() select { case taskData.resc <- responseAndError{err: err}: case <-taskData.callerGone: - readLoopDefer(pc, true) + if debugSwitch { + println("############### handleTask read: callerGone") + } + closeAndRemoveIdleConn(pc, true) return } - readLoopDefer(pc, true) + if debugSwitch { + println("############### handleTask: read err != nil") + } + closeAndRemoveIdleConn(pc, true) return } - dataTask := taskData.hyperBody.Data() taskData.taskId = readBodyChunk - dataTask.SetUserdata(c.Pointer(taskData)) - t.exec.Push(dataTask) if !taskData.req.deadline.IsZero() { (*timeoutData)((*libuv.Handle)(c.Pointer(taskData.req.timer)).GetData()).taskData = taskData @@ -1391,21 +1416,18 @@ func (t *Transport) handleTask(task *hyper.Task) { resp.wrapRespBody(taskData) - // FIXME: Waiting for the channel bug to be fixed - //select { - //case taskData.resc <- responseAndError{res: resp}: - //case <-taskData.callerGone: - // // defer - // readLoopDefer(pc, true) - // return - //} select { + case taskData.resc <- responseAndError{res: resp}: case <-taskData.callerGone: - readLoopDefer(pc, true) + // defer + if debugSwitch { + println("############### handleTask read: callerGone 2") + } + pc.bodyChunk.Close() + taskData.closeHyperBody() + closeAndRemoveIdleConn(pc, true) return - default: } - taskData.resc <- responseAndError{res: resp} if debugReadWriteLoop { println("############### read end") @@ -1433,14 +1455,16 @@ func (t *Transport) handleTask(task *hyper.Task) { // taskType == taskEmpty (check in checkTaskType) task.Free() - taskData.hyperBody.Free() - taskData.hyperBody = nil - pc.bodyChunk.Close() - replaced := t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool + pc.bodyChunk.closeWithError(io.EOF) + taskData.closeHyperBody() + replaced := pc.t.replaceReqCanceler(taskData.req.cancelKey, nil) // before pc might return to idle pool pc.alive = pc.alive && replaced && pc.tryPutIdleConn() - readLoopDefer(pc, false) + if debugSwitch { + println("############### handleTask readBodyChunk: alive: ", pc.alive) + } + closeAndRemoveIdleConn(pc, false) if debugReadWriteLoop { println("############### readBodyChunk end [empty]") @@ -1449,18 +1473,20 @@ func (t *Transport) handleTask(task *hyper.Task) { } func readyToRead(aysnc *libuv.Async) { - println("############### AsyncCb: readyToRead") taskData := (*taskData)(aysnc.GetData()) dataTask := taskData.hyperBody.Data() - dataTask.SetUserdata(c.Pointer(taskData)) - taskData.pc.t.exec.Push(dataTask) + dataTask.SetUserdata(c.Pointer(taskData), nil) + taskData.pc.eventLoop.exec.Push(dataTask) } -// readLoopDefer Replace the defer function of readLoop in stdlib -func readLoopDefer(pc *persistConn, force bool) { +// closeAndRemoveIdleConn Replace the defer function of readLoop in stdlib +func closeAndRemoveIdleConn(pc *persistConn, force bool) { if pc.alive == true && !force { return } + if debugSwitch { + println("############### closeAndRemoveIdleConn, force:", force) + } pc.close(pc.closeErr) pc.t.removeIdleConn(pc) } @@ -1468,13 +1494,14 @@ func readLoopDefer(pc *persistConn, force bool) { // ---------------------------------------------------------- type connData struct { - TcpHandle libuv.Tcp - ConnectReq libuv.Connect - ReadBuf libuv.Buf - ReadBufFilled uintptr + tcpHandle libuv.Tcp + connectReq libuv.Connect + readBuf libuv.Buf + readBufFilled uintptr nwrite int64 // bytes written(Replaced from persistConn's nwrite) - ReadWaker *hyper.Waker - WriteWaker *hyper.Waker + readWaker *hyper.Waker + writeWaker *hyper.Waker + isClosing atomic.Bool } type taskData struct { @@ -1497,24 +1524,32 @@ const ( readBodyChunk ) -func (conn *connData) Close() error { - if conn == nil { - return nil - } - if conn.ReadWaker != nil { - conn.ReadWaker.Free() - conn.ReadWaker = nil - } - if conn.WriteWaker != nil { - conn.WriteWaker.Free() - conn.WriteWaker = nil +func (conn *connData) Close() { + if conn != nil && !conn.isClosing.Swap(true) { + if conn.readWaker != nil { + conn.readWaker.Free() + conn.readWaker = nil + } + if conn.writeWaker != nil { + conn.writeWaker.Free() + conn.writeWaker = nil + } + //if conn.readBuf.Base != nil { + // c.Free(c.Pointer(conn.readBuf.Base)) + // conn.readBuf.Base = nil + //} + if (*libuv.Handle)(c.Pointer(&conn.tcpHandle)).IsClosing() == 0 { + (*libuv.Handle)(c.Pointer(&conn.tcpHandle)).Close(nil) + } + conn = nil } - if conn.ReadBuf.Base != nil { - c.Free(c.Pointer(conn.ReadBuf.Base)) - conn.ReadBuf.Base = nil +} + +func (d *taskData) closeHyperBody() { + if d.hyperBody != nil { + d.hyperBody.Free() + d.hyperBody = nil } - (*libuv.Handle)(c.Pointer(&conn.TcpHandle)).Close(nil) - return nil } // onConnect is the libuv callback for a successful connection @@ -1524,24 +1559,28 @@ func onConnect(req *libuv.Connect, status c.Int) { defer println("############### connect end") } conn := (*connData)((*libuv.Req)(c.Pointer(req)).GetData()) - if status < 0 { - c.Fprintf(c.Stderr, c.Str("connect error: %d\n"), libuv.Strerror(libuv.Errno(status))) + c.Fprintf(c.Stderr, c.Str("connect error: %s\n"), c.GoString(libuv.Strerror(libuv.Errno(status)))) + conn.Close() return } - (*libuv.Stream)(c.Pointer(&conn.TcpHandle)).StartRead(allocBuffer, onRead) + + // Keep-Alive + conn.tcpHandle.KeepAlive(1, 60) + + (*libuv.Stream)(c.Pointer(&conn.tcpHandle)).StartRead(allocBuffer, onRead) } // allocBuffer allocates a buffer for reading from a socket func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { conn := (*connData)(handle.GetData()) - if conn.ReadBuf.Base == nil { - conn.ReadBuf = libuv.InitBuf((*c.Char)(c.Malloc(suggestedSize)), c.Uint(suggestedSize)) - //base := make([]byte, suggestedSize) - //conn.ReadBuf = libuv.InitBuf((*c.Char)(c.Pointer(&base[0])), c.Uint(suggestedSize)) - conn.ReadBufFilled = 0 + if conn.readBuf.Base == nil { + //conn.readBuf = libuv.InitBuf((*c.Char)(c.Malloc(suggestedSize)), c.Uint(suggestedSize)) + base := make([]byte, suggestedSize) + conn.readBuf = libuv.InitBuf((*c.Char)(c.Pointer(&base[0])), c.Uint(suggestedSize)) + conn.readBufFilled = 0 } - *buf = libuv.InitBuf((*c.Char)(c.Pointer(uintptr(c.Pointer(conn.ReadBuf.Base))+conn.ReadBufFilled)), c.Uint(suggestedSize-conn.ReadBufFilled)) + *buf = libuv.InitBuf((*c.Char)(c.Pointer(uintptr(c.Pointer(conn.readBuf.Base))+conn.readBufFilled)), c.Uint(suggestedSize-conn.readBufFilled)) } // onRead is the libuv callback for reading from a socket @@ -1549,38 +1588,39 @@ func allocBuffer(handle *libuv.Handle, suggestedSize uintptr, buf *libuv.Buf) { func onRead(stream *libuv.Stream, nread c.Long, buf *libuv.Buf) { conn := (*connData)((*libuv.Handle)(c.Pointer(stream)).GetData()) if nread > 0 { - conn.ReadBufFilled += uintptr(nread) + conn.readBufFilled += uintptr(nread) } - if conn.ReadWaker != nil { + if conn.readWaker != nil { // Wake up the pending read operation of Hyper - conn.ReadWaker.Wake() - conn.ReadWaker = nil + conn.readWaker.Wake() + conn.readWaker = nil } } // readCallBack read callback function for Hyper library func readCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uintptr) uintptr { conn := (*connData)(userdata) - if conn.ReadBufFilled > 0 { + if conn.readBufFilled > 0 { var toCopy uintptr - if bufLen < conn.ReadBufFilled { + if bufLen < conn.readBufFilled { toCopy = bufLen } else { - toCopy = conn.ReadBufFilled + toCopy = conn.readBufFilled } // Copy data from read buffer to Hyper's buffer - c.Memcpy(c.Pointer(buf), c.Pointer(conn.ReadBuf.Base), toCopy) + c.Memcpy(c.Pointer(buf), c.Pointer(conn.readBuf.Base), toCopy) // Move remaining data to the beginning of the buffer - c.Memmove(c.Pointer(conn.ReadBuf.Base), c.Pointer(uintptr(c.Pointer(conn.ReadBuf.Base))+toCopy), conn.ReadBufFilled-toCopy) + c.Memmove(c.Pointer(conn.readBuf.Base), c.Pointer(uintptr(c.Pointer(conn.readBuf.Base))+toCopy), conn.readBufFilled-toCopy) // Update the amount of filled buffer - conn.ReadBufFilled -= toCopy + conn.readBufFilled -= toCopy return toCopy } - if conn.ReadWaker != nil { - conn.ReadWaker.Free() + if conn.readWaker != nil { + conn.readWaker.Free() } - conn.ReadWaker = ctx.Waker() + conn.readWaker = ctx.Waker() + println("############### readCallBack: IoPending") return hyper.IoPending } @@ -1588,10 +1628,10 @@ func readCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uin // Callback function called after a write operation completes func onWrite(req *libuv.Write, status c.Int) { conn := (*connData)((*libuv.Req)(c.Pointer(req)).GetData()) - if conn.WriteWaker != nil { + if conn.writeWaker != nil { // Wake up the pending write operation - conn.WriteWaker.Wake() - conn.WriteWaker = nil + conn.writeWaker.Wake() + conn.writeWaker = nil } } @@ -1602,16 +1642,17 @@ func writeCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen ui req := &libuv.Write{} (*libuv.Req)(c.Pointer(req)).SetData(c.Pointer(conn)) - ret := req.Write((*libuv.Stream)(c.Pointer(&conn.TcpHandle)), &initBuf, 1, onWrite) + ret := req.Write((*libuv.Stream)(c.Pointer(&conn.tcpHandle)), &initBuf, 1, onWrite) if ret >= 0 { conn.nwrite += int64(bufLen) return bufLen } - if conn.WriteWaker != nil { - conn.WriteWaker.Free() + if conn.writeWaker != nil { + conn.writeWaker.Free() } - conn.WriteWaker = ctx.Waker() + conn.writeWaker = ctx.Waker() + println("############### writeCallBack: IoPending") return hyper.IoPending } @@ -1630,14 +1671,14 @@ func onTimeout(timer *libuv.Timer) { pc := taskData.pc pc.alive = false pc.t.cancelRequest(taskData.req.cancelKey, errors.New("timeout: req.Context().Err()")) - readLoopDefer(pc, true) + closeAndRemoveIdleConn(pc, true) } } // newHyperIo creates a new IO with read and write callbacks func newHyperIo(connData *connData) *hyper.Io { hyperIo := hyper.NewIo() - hyperIo.SetUserdata(c.Pointer(connData)) + hyperIo.SetUserdata(c.Pointer(connData), nil) hyperIo.SetRead(readCallBack) hyperIo.SetWrite(writeCallBack) return hyperIo @@ -1670,8 +1711,16 @@ func checkTaskType(task *hyper.Task, taskData *taskData) (err error) { task.Free() if curTaskId == handshake || curTaskId == read { taskData.writeErrCh <- err + if debugSwitch { + println("############### checkTaskType: writeErrCh") + } taskData.pc.close(err) } + if taskData.pc.bodyChunk != nil { + taskData.pc.bodyChunk.Close() + taskData.pc.bodyChunk = nil + } + taskData.closeHyperBody() taskData.pc.alive = false } return @@ -1685,6 +1734,7 @@ func fail(err *hyper.Error, taskId taskId) error { errLen := err.Print((*uint8)(c.Pointer(&errBuf[:][0])), uintptr(len(errBuf))) errDetails := unsafe.SliceData(errBuf[:errLen]) details := c.GoString(errDetails) + fmt.Println(details) // clean up the error err.Free() @@ -1837,7 +1887,9 @@ type persistConn struct { // If it's non-nil, the rest of the fields are unused. alt RoundTripper - t *Transport + t *Transport + eventLoop *clientEventLoop + cacheKey connectMethodKey conn *connData //tlsState *tls.ConnectionState @@ -1876,6 +1928,9 @@ type persistConn struct { // a "keep-alive" state. It does not interrupt any connections currently // in use. func (t *Transport) CloseIdleConnections() { + if debugSwitch { + println("############### CloseIdleConnections") + } //t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) t.idleMu.Lock() m := t.idleConn @@ -1888,12 +1943,16 @@ func (t *Transport) CloseIdleConnections() { pconn.close(errCloseIdleConns) } } + //if t2 := t.h2transport; t2 != nil { // t2.CloseIdleConnections() //} } func (pc *persistConn) cancelRequest(err error) { + if debugSwitch { + println("############### cancelRequest") + } pc.mu.Lock() defer pc.mu.Unlock() pc.canceledErr = err @@ -1938,8 +1997,14 @@ func (pc *persistConn) closeLocked(err error) { } close(pc.closech) close(pc.writeLoopDone) - pc.client.Free() - pc.chunkAsync.Close(nil) + if pc.client != nil { + pc.client.Free() + pc.client = nil + } + if pc.chunkAsync != nil && pc.chunkAsync.IsClosing() == 0 { + pc.chunkAsync.Close(nil) + pc.chunkAsync = nil + } } } pc.mutateHeaderFunc = nil @@ -2096,10 +2161,16 @@ func (pc *persistConn) closeConnIfStillIdleLocked() { return } t.removeIdleConnLocked(pc) + if debugSwitch { + println("############### closeConnIfStillIdleLocked") + } pc.close(errIdleConnTimeout) } func (pc *persistConn) readLoopPeekFailLocked(resp *hyper.Response, err error) { + if debugSwitch { + println("############### readLoopPeekFailLocked") + } if pc.closed != nil { return } @@ -2117,7 +2188,7 @@ func (pc *persistConn) setExtraHeaders(req *transportRequest) bool { // uncompress the gzip stream if we were the layer that // requested it. requestedGzip := false - // TODO(spongehah) gzip(pc.roundTrip) + // TODO(hah) gzip(pc.roundTrip): The compress/gzip library still has a bug. An exception occurs when calling gzip.NewReader(). //if !pc.t.DisableCompression && // req.Header.Get("Accept-Encoding") == "" && // req.Header.Get("Range") == "" && @@ -2190,6 +2261,8 @@ type connectMethod struct { // be reused for different targetAddr values. targetAddr string onlyH1 bool // whether to disable HTTP/2 and force HTTP/1 + + eventLoop *clientEventLoop } // connectMethodKey is the map key version of connectMethod, with a