diff --git a/x/net/http/_demo/parallelRequest/parallelRequest.go b/x/net/http/_demo/parallelRequest/parallelRequest.go index 0bcb336..ae1083d 100644 --- a/x/net/http/_demo/parallelRequest/parallelRequest.go +++ b/x/net/http/_demo/parallelRequest/parallelRequest.go @@ -26,7 +26,7 @@ func worker(id int, wg *sync.WaitGroup) { func main() { var wait sync.WaitGroup - for i := 0; i < 500; i++ { + for i := 0; i < 100; i++ { wait.Add(1) go worker(i, &wait) } diff --git a/x/net/http/body_stream.go b/x/net/http/body_stream.go index 64ebfa4..451ef7f 100644 --- a/x/net/http/body_stream.go +++ b/x/net/http/body_stream.go @@ -2,7 +2,6 @@ package http import ( "errors" - "fmt" "github.com/goplus/llgo/c/libuv" ) @@ -18,7 +17,7 @@ type bodyStream struct { } var ( - ErrClosedRequestBody = errors.New("request body: read/write on closed body") + ErrClosedBodyStream = errors.New("body stream: read/write on closed body") ) func newBodyStream(asyncHandle *libuv.Async) *bodyStream { @@ -30,7 +29,6 @@ func newBodyStream(asyncHandle *libuv.Async) *bodyStream { } func (rb *bodyStream) Read(p []byte) (n int, err error) { - fmt.Println("[debug] RequestBody Read called") select { case <-rb.done: err = rb.readCloseError() @@ -41,11 +39,9 @@ func (rb *bodyStream) Read(p []byte) (n int, err error) { for n < len(p) { if len(rb.chunk) == 0 { rb.asyncHandle.Send() - fmt.Println("[debug] RequestBody Read asyncHandle.Send called") select { case chunk := <-rb.readCh: rb.chunk = chunk - fmt.Println("[debug] RequestBody Read chunk received") case <-rb.done: err = rb.readCloseError() return @@ -64,16 +60,15 @@ func (rb *bodyStream) readCloseError() error { if rerr := rb.rerr; rerr != nil { return rerr } - return ErrClosedRequestBody + return ErrClosedBodyStream } func (rb *bodyStream) closeWithError(err error) error { - fmt.Println("[debug] RequestBody closeRead called") if rb.rerr != nil { return nil } if err == nil { - err = ErrClosedRequestBody + err = ErrClosedBodyStream } rb.rerr = err close(rb.done) diff --git a/x/net/http/response.go b/x/net/http/response.go index d8eca16..a64c4b0 100644 --- a/x/net/http/response.go +++ b/x/net/http/response.go @@ -239,18 +239,12 @@ func (r *Response) checkRespBody(taskData *clientTaskData) (needContinue bool) { select { case taskData.resc <- responseAndError{res: r}: case <-taskData.callerGone: - if debugSwitch { - println("############### checkRespBody callerGone") - } closeAndRemoveIdleConn(pc, true) return true } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) - if debugSwitch { - println("############### checkRespBody return") - } closeAndRemoveIdleConn(pc, false) return true } diff --git a/x/net/http/transport.go b/x/net/http/transport.go index 1fc027f..b7596c1 100644 --- a/x/net/http/transport.go +++ b/x/net/http/transport.go @@ -37,12 +37,6 @@ var DefaultTransport RoundTripper = &Transport{ // MaxIdleConnsPerHost. const DefaultMaxIdleConnsPerHost = 2 -// Debug switch provided for developers -const ( - debugSwitch = true - debugReadWriteLoop = true -) - type Transport struct { idleMu sync.Mutex closeIdle bool // user has requested to close all idle conns @@ -96,8 +90,8 @@ type responseAndError struct { } type timeoutData struct { - timeoutch chan struct{} - clientTaskData *clientTaskData + timeoutch chan struct{} + clientTaskData *clientTaskData } type readTrackingBody struct { @@ -182,9 +176,6 @@ func (tr *transportRequest) setError(err error) { func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { if err := t.tryPutIdleConn(pconn); err != nil { - if debugSwitch { - println("############### putOrCloseIdleConn: close") - } pconn.close(err) } } @@ -276,9 +267,6 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error { t.idleLRU.add(pconn) if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() - if debugSwitch { - println("############### tryPutIdleConn: removeOldest") - } oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } @@ -647,10 +635,6 @@ func (t *Transport) getLoopKey(req *Request) string { } func (t *Transport) RoundTrip(req *Request) (*Response, error) { - if debugSwitch { - println("############### RoundTrip start") - defer println("############### RoundTrip end") - } eventLoop := t.getClientEventLoop(req) @@ -662,15 +646,12 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { req.timer = &libuv.Timer{} libuv.InitTimer(eventLoop.loop, req.timer) ch := &timeoutData{ - timeoutch: req.timeoutch, - clientTaskData: nil, + timeoutch: req.timeoutch, + clientTaskData: nil, } (*libuv.Handle)(c.Pointer(req.timer)).SetData(c.Pointer(ch)) req.timer.Start(onTimeout, getMilliseconds(req.deadline), 0) - if debugSwitch { - println("############### timer start") - } didTimeout = func() bool { return req.timer.GetDueIn() == 0 } stopTimer = func() { close(req.timeoutch) @@ -678,9 +659,6 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { if (*libuv.Handle)(c.Pointer(req.timer)).IsClosing() == 0 { (*libuv.Handle)(c.Pointer(req.timer)).Close(nil) } - if debugSwitch { - println("############### timer close") - } } } else { didTimeout = alwaysFalse @@ -704,10 +682,6 @@ func (t *Transport) RoundTrip(req *Request) (*Response, error) { } func (t *Transport) doRoundTrip(req *Request, loop *clientEventLoop) (*Response, error) { - if debugSwitch { - println("############### doRoundTrip start") - defer println("############### doRoundTrip end") - } //t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) //ctx := req.Context() //trace := httptrace.ContextClientTrace(ctx) @@ -843,10 +817,6 @@ func (t *Transport) doRoundTrip(req *Request, loop *clientEventLoop) (*Response, } func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { - if debugSwitch { - println("############### getConn start") - defer println("############### getConn end") - } req := treq.Request //trace := treq.trace //ctx := req.Context() @@ -902,9 +872,6 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi // cancellation error (see golang.org/issue/16049). select { case <-req.timeoutch: - if debugSwitch { - println("############### getConn: timeoutch") - } return nil, errors.New("timeout: req.Context().Err()") case err := <-cancelc: if err == errRequestCanceled { @@ -920,10 +887,6 @@ func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persi // queueForDial queues w to wait for permission to begin dialing. // Once w receives permission to dial, it will do so in a separate goroutine. func (t *Transport) queueForDial(w *wantConn) { - if debugSwitch { - println("############### queueForDial start") - defer println("############### queueForDial end") - } w.beforeDial() if t.MaxConnsPerHost <= 0 { @@ -956,10 +919,6 @@ func (t *Transport) queueForDial(w *wantConn) { // dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()]. // If the dial is canceled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()]. func (t *Transport) dialConnFor(w *wantConn) { - if debugSwitch { - println("############### dialConnFor start") - defer println("############### dialConnFor end") - } defer w.afterDial() pc, err := t.dialConn(w.timeoutch, w.cm) @@ -1030,10 +989,6 @@ func (t *Transport) decConnsPerHost(key connectMethodKey) { } func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn *persistConn, err error) { - if debugSwitch { - println("############### dialConn start") - defer println("############### dialConn end") - } select { case <-timeoutch: err = errors.New("[t.dialConn] request timeout") @@ -1084,9 +1039,6 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * select { case <-timeoutch: err = errors.New("[t.dialConn] request timeout") - if debugSwitch { - println("############### dialConn: timeoutch") - } pconn.close(err) return nil, err default: @@ -1095,10 +1047,6 @@ func (t *Transport) dialConn(timeoutch chan struct{}, cm connectMethod) (pconn * } func (t *Transport) dial(cm connectMethod) (*connData, error) { - if debugSwitch { - println("############### dial start") - defer println("############### dial end") - } addr := cm.addr() host, port, err := net.SplitHostPort(addr) if err != nil { @@ -1132,10 +1080,6 @@ func (t *Transport) dial(cm connectMethod) (*connData, error) { } func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { - if debugSwitch { - println("############### roundTrip start") - defer println("############### roundTrip end") - } testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) { pc.t.putOrCloseIdleConn(pc) @@ -1210,18 +1154,9 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err for { testHookWaitResLoop() - if debugSwitch { - println("############### roundTrip for") - } select { case err := <-writeErrCh: - if debugSwitch { - println("############### roundTrip: writeErrch") - } if err != nil { - if debugSwitch { - println("############### roundTrip: writeErrch err != nil") - } pc.close(fmt.Errorf("write error: %w", err)) if pc.conn.nwrite == startBytesWritten { err = nothingWrittenError{err} @@ -1229,18 +1164,12 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err return nil, pc.mapRoundTripError(req, startBytesWritten, err) } case <-pcClosed: - if debugSwitch { - println("############### roundTrip: pcClosed") - } pcClosed = nil if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) { return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) } //case <-respHeaderTimer: case re := <-resc: - if debugSwitch { - println("############### roundTrip: resc") - } if (re.res == nil) == (re.err == nil) { return nil, fmt.Errorf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil) } @@ -1249,9 +1178,6 @@ func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err err } return re.res, nil case <-timeoutch: - if debugSwitch { - println("############### roundTrip: timeoutch") - } canceled = pc.t.cancelRequest(req.cancelKey, errors.New("timeout: req.Context().Err()")) timeoutch = nil return nil, errors.New("request timeout") @@ -1267,9 +1193,6 @@ func readWriteLoop(checker *libuv.Idle) { // The polling state machine! Poll all ready tasks and act on them... task := eventLoop.exec.Poll() for task != nil { - if debugSwitch { - println("############### polling") - } eventLoop.handleTask(task) task = eventLoop.exec.Poll() } @@ -1287,17 +1210,11 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { // If original taskId is set, we need to check it err = checkTaskType(task, clientTaskData) if err != nil { - if debugSwitch { - println("############### handleTask: checkTaskType err != nil") - } closeAndRemoveIdleConn(pc, true) return } switch clientTaskData.taskId { case handshake: - if debugReadWriteLoop { - println("############### write") - } // Check if the connection is closed select { @@ -1317,21 +1234,11 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { if err != nil { //pc.writeErrCh <- err // to the body reader, which might recycle us clientTaskData.writeErrCh <- err // to the roundTrip function - if debugSwitch { - println("############### handleTask: write err != nil") - } pc.close(err) return } - if debugReadWriteLoop { - println("############### write end") - } case read: - if debugReadWriteLoop { - println("############### read") - } - pc.tryPutIdleConn = func() bool { if err := pc.t.tryPutIdleConn(pc); err != nil { pc.closeErr = err @@ -1354,9 +1261,6 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(hyperResp, err) pc.mu.Unlock() - if debugSwitch { - println("############### handleTask: numExpectedResponses == 0") - } closeAndRemoveIdleConn(pc, true) return } @@ -1383,15 +1287,9 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { select { case clientTaskData.resc <- responseAndError{err: err}: case <-clientTaskData.callerGone: - if debugSwitch { - println("############### handleTask read: callerGone") - } closeAndRemoveIdleConn(pc, true) return } - if debugSwitch { - println("############### handleTask: read err != nil") - } closeAndRemoveIdleConn(pc, true) return } @@ -1417,23 +1315,13 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { case clientTaskData.resc <- responseAndError{res: resp}: case <-clientTaskData.callerGone: // defer - if debugSwitch { - println("############### handleTask read: callerGone 2") - } pc.bodyStream.Close() clientTaskData.closeHyperBody() closeAndRemoveIdleConn(pc, true) return } - if debugReadWriteLoop { - println("############### read end") - } case readBodyChunk: - if debugReadWriteLoop { - println("############### readBodyChunk") - } - taskType := task.Type() if taskType == hyper.TaskBuf { chunk := (*hyper.Buf)(task.Value()) @@ -1444,9 +1332,6 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { task.Free() // Write to the channel pc.bodyStream.readCh <- bytes - if debugReadWriteLoop { - println("############### readBodyChunk end [buf]") - } return } @@ -1458,14 +1343,7 @@ func (eventLoop *clientEventLoop) handleTask(task *hyper.Task) { pc.alive = pc.alive && replaced && pc.tryPutIdleConn() - if debugSwitch { - println("############### handleTask readBodyChunk: alive: ", pc.alive) - } closeAndRemoveIdleConn(pc, false) - - if debugReadWriteLoop { - println("############### readBodyChunk end [empty]") - } } } @@ -1481,9 +1359,6 @@ func closeAndRemoveIdleConn(pc *persistConn, force bool) { if pc.alive == true && !force { return } - if debugSwitch { - println("############### closeAndRemoveIdleConn, force:", force) - } pc.close(pc.closeErr) pc.t.removeIdleConn(pc) } @@ -1551,10 +1426,6 @@ func (d *clientTaskData) closeHyperBody() { // onConnect is the libuv callback for a successful connection func onConnect(req *libuv.Connect, status c.Int) { - if debugSwitch { - println("############### connect start") - defer println("############### connect end") - } conn := (*connData)((*libuv.Req)(c.Pointer(req)).GetData()) if status < 0 { c.Fprintf(c.Stderr, c.Str("connect error: %s\n"), c.GoString(libuv.Strerror(libuv.Errno(status)))) @@ -1617,7 +1488,6 @@ func readCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen uin conn.readWaker.Free() } conn.readWaker = ctx.Waker() - println("############### readCallBack: IoPending") return hyper.IoPending } @@ -1649,16 +1519,11 @@ func writeCallBack(userdata c.Pointer, ctx *hyper.Context, buf *uint8, bufLen ui conn.writeWaker.Free() } conn.writeWaker = ctx.Waker() - println("############### writeCallBack: IoPending") return hyper.IoPending } // onTimeout is the libuv callback for a timeout func onTimeout(timer *libuv.Timer) { - if debugSwitch { - println("############### onTimeout start") - defer println("############### onTimeout end") - } data := (*timeoutData)((*libuv.Handle)(c.Pointer(timer)).GetData()) close(data.timeoutch) timer.Stop() @@ -1708,9 +1573,6 @@ func checkTaskType(task *hyper.Task, clientTaskData *clientTaskData) (err error) task.Free() if curTaskId == handshake || curTaskId == read { clientTaskData.writeErrCh <- err - if debugSwitch { - println("############### checkTaskType: writeErrCh") - } clientTaskData.pc.close(err) } if clientTaskData.pc.bodyStream != nil { @@ -1916,7 +1778,7 @@ type persistConn struct { closeErr error // Replace the closeErr in readLoop tryPutIdleConn func() bool // Replace the tryPutIdleConn in readLoop client *hyper.ClientConn // http long connection client handle - bodyStream *bodyStream // Implement non-blocking consumption of each responseBody chunk + bodyStream *bodyStream // Implement non-blocking consumption of each responseBody chunk chunkAsync *libuv.Async // Notifying that the received chunk has been read } @@ -1925,9 +1787,6 @@ type persistConn struct { // a "keep-alive" state. It does not interrupt any connections currently // in use. func (t *Transport) CloseIdleConnections() { - if debugSwitch { - println("############### CloseIdleConnections") - } //t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) t.idleMu.Lock() m := t.idleConn @@ -1947,9 +1806,6 @@ func (t *Transport) CloseIdleConnections() { } func (pc *persistConn) cancelRequest(err error) { - if debugSwitch { - println("############### cancelRequest") - } pc.mu.Lock() defer pc.mu.Unlock() pc.canceledErr = err @@ -1976,9 +1832,6 @@ func (pc *persistConn) markReused() { } func (pc *persistConn) closeLocked(err error) { - if debugSwitch { - println("############### pc closed") - } if err == nil { panic("nil error") } @@ -2158,16 +2011,10 @@ func (pc *persistConn) closeConnIfStillIdleLocked() { return } t.removeIdleConnLocked(pc) - if debugSwitch { - println("############### closeConnIfStillIdleLocked") - } pc.close(errIdleConnTimeout) } func (pc *persistConn) readLoopPeekFailLocked(resp *hyper.Response, err error) { - if debugSwitch { - println("############### readLoopPeekFailLocked") - } if pc.closed != nil { return }