diff --git a/x/net/http/server.go b/x/net/http/server.go index e629cfe..ade6c5f 100644 --- a/x/net/http/server.go +++ b/x/net/http/server.go @@ -18,13 +18,14 @@ import ( "github.com/goplus/llgo/x/net" ) -// var requestNotifyHandle *libuv.Async const _SC_NPROCESSORS_ONLN c.Int = 58 - var cpuCount int + var asyncHandleMapMu sync.Mutex -var asyncHandleMap = make(map[int]*libuv.Async) -var connID int32 +var asyncHandleMap = make(map[int64]*libuv.Async) + +// connID is used to generate unique IDs for each connection +var connID int64 type Handler interface { ServeHTTP(ResponseWriter, *Request) @@ -39,21 +40,9 @@ type ResponseWriter interface { type Server struct { Addr string Handler Handler - - // uvLoop *libuv.Loop - // uvServer libuv.Tcp - isShutdown atomic.Bool - // idleHandle libuv.Idle - - // executor *hyper.Executor - // http1Opts *hyper.Http1ServerconnOptions - // http2Opts *hyper.Http2ServerconnOptions eventLoop []*eventLoop - - // mu sync.Mutex - // activeConnections map[*conn]struct{} } type eventLoop struct { @@ -71,7 +60,7 @@ type eventLoop struct { } type conn struct { - asyncID int + asyncID int64 stream libuv.Tcp pollHandle libuv.Poll eventMask c.Uint @@ -83,18 +72,12 @@ type conn struct { } type serviceUserdata struct { - asyncHandleID int + asyncHandleID c.Long host [128]c.Char port [8]c.Char executor *hyper.Executor } -type threadArg struct { - host string - port int - eventLoop *eventLoop -} - func NewServer(addr string) *Server { return &Server{ Addr: addr, @@ -154,19 +137,6 @@ func (el *eventLoop) run(host string, port int) error { return fmt.Errorf("failed to create IP address: %v", libuv.Strerror(libuv.Errno(r))) } - // Set SO_REUSEADDR - // yes := c.Int(1) - // fmt.Println("[debug] el.uvServer.GetIoWatcherFd(): ", el.uvServer.GetIoWatcherFd()) - // result := cnet.SetSockOpt(el.uvServer.GetIoWatcherFd(), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, unsafe.Pointer(&yes), c.Uint(unsafe.Sizeof(yes))) - // if result != 0 { - // return fmt.Errorf("failed to set SO_REUSEADDR: %v", result) - // } - - // result = cnet.SetSockOpt(el.uvServer.GetIoWatcherFd(), syscall.SOL_SOCKET, syscall.SO_REUSEPORT, unsafe.Pointer(&yes), c.Uint(unsafe.Sizeof(yes))) - // if result != 0 { - // return fmt.Errorf("failed to set SO_REUSEADDR: %v", result) - // } - if err := setReuseAddr(&el.uvServer); err != nil { return fmt.Errorf("failed to set SO_REUSEADDR: %v", err) } @@ -175,7 +145,6 @@ func (el *eventLoop) run(host string, port int) error { return fmt.Errorf("failed to bind: %v", libuv.Strerror(libuv.Errno(r))) } - //el.uvServer.Data = unsafe.Pointer(el) if err := (*libuv.Stream)(&el.uvServer).Listen(128, onNewConnection); err != 0 { return fmt.Errorf("failed to listen: %v", err) } @@ -190,8 +159,6 @@ func (el *eventLoop) run(host string, port int) error { return fmt.Errorf("failed to start idle handler: %d", r) } - //os.Setenv("UV_THREADPOOL_SIZE", "1") - if r := el.uvLoop.Run(libuv.RUN_DEFAULT); r != 0 { return fmt.Errorf("error in event loop: %d", r) } @@ -228,6 +195,7 @@ func ListenAndServe(addr string, handler Handler) error { } func (srv *Server) ListenAndServe() error { + connID = 0 cpuCount = int(c.Sysconf(_SC_NPROCESSORS_ONLN)) if cpuCount <= 0 { cpuCount = 4 @@ -243,15 +211,6 @@ func (srv *Server) ListenAndServe() error { srv.eventLoop = append(srv.eventLoop, el) } - // el, err := newEventLoop() - // if err != nil { - // return fmt.Errorf("failed to create event loop: %v", err) - // } - // el2, err := newEventLoop() - // if err != nil { - // return fmt.Errorf("failed to create event loop: %v", err) - // } - host, port, err := net.SplitHostPort(srv.Addr) if err != nil { return fmt.Errorf("invalid address %q: %v", srv.Addr, err) @@ -262,13 +221,6 @@ func (srv *Server) ListenAndServe() error { return fmt.Errorf("invalid port number: %v", err) } - // go func() { - // err = el2.run(host, portNum) - // if err != nil { - // println("[debug] failed to run event loop: %v", err) - // } - // }() - //TODO(hackerchai): new logic for poll // go func() { // for { @@ -280,59 +232,27 @@ func (srv *Server) ListenAndServe() error { // } // }() - // err = el.run(host, portNum) - // if err != nil { - // return fmt.Errorf("failed to run event loop: %v", err) - // } + errChan := make(chan error, len(srv.eventLoop)) + var wg sync.WaitGroup - // Create a libuv thread pool with the same number of threads as event loops - threadPool := make([]*libuv.Thread, len(srv.eventLoop)) - for i := range threadPool { - threadPool[i] = &libuv.Thread{} + for _, el := range srv.eventLoop { + wg.Add(1) + go func(el *eventLoop) { + err := el.run(host, portNum) + if err != nil { + errChan <- fmt.Errorf("failed to run event loop: %v", err) + } + wg.Done() + }(el) } - // Start each event loop in its own thread - for i, el := range srv.eventLoop { - threadArg := &threadArg{ - host: host, - port: portNum, - eventLoop: el, - } - - fmt.Printf("[debug] Creating thread %d\n", i) - - if result := threadPool[i].Create(runEventLoopInThread, unsafe.Pointer(threadArg)); result != 0 { - return fmt.Errorf("failed to create thread: %v", err) - } - } - - // Wait for all threads to complete - for _, thread := range threadPool { - if result := thread.Join(); result != 0 { - fmt.Printf("[debug] Failed to join thread: %v\n", err) - } - } + wg.Wait() fmt.Printf("Listening on %s\n", srv.Addr) - // if r := srv.uvServer.Bind((*cnet.SockAddr)(unsafe.Pointer(&sockaddr)), 0); r != 0 { - // return fmt.Errorf("failed to bind: %v", libuv.Strerror(libuv.Errno(r))) - // } - return nil } -func runEventLoopInThread(arg c.Pointer) { - tArg := (*threadArg)(arg) - host := tArg.host - port := tArg.port - el := tArg.eventLoop - err := el.run(host, port) - if err != nil { - fmt.Printf("[debug] failed to run event loop: %v", err) - } -} - func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) { DefaultServeMux.HandleFunc(pattern, handler) } @@ -344,16 +264,6 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) { return } - // srv := (*Server)((*libuv.Handle)(unsafe.Pointer(serverStream)).GetData()) - // if srv == nil { - // fmt.Fprintf(os.Stderr, "Server is nil\n") - // return - // } - // el := (*eventLoop)((*libuv.Handle)(unsafe.Pointer(serverStream)).GetData()) - // if el == nil { - // fmt.Fprintf(os.Stderr, "Event loop is nil\n") - // return - // } el := (*eventLoop)((*libuv.Handle)(unsafe.Pointer(serverStream)).GetLoop().GetData()) if el == nil { fmt.Fprintf(os.Stderr, "Event loop is nil\n") @@ -394,17 +304,9 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) { (*libuv.Handle)(unsafe.Pointer(&conn.stream)).Close(nil) return } - //userData.setExecutor(srv.executor) - userData.executor = el.executor - userData.asyncHandleID = conn.asyncID - // if srv.Handler == nil { - // fmt.Fprintf(os.Stderr, "Failed to get handler\n") - // (*libuv.Handle)(unsafe.Pointer(&conn.stream)).Close(nil) - // return - // } - //userData.handler = srv.Handler - //userData.requestNotifyHandle = requestNotifyHandle + userData.executor = el.executor + userData.asyncHandleID = c.Long(conn.asyncID) var addr cnet.SockaddrStorage addrlen := c.Int(unsafe.Sizeof(addr)) @@ -420,7 +322,6 @@ func onNewConnection(serverStream *libuv.Stream, status c.Int) { c.Snprintf((*c.Char)(&userData.port[0]), unsafe.Sizeof(userData.port), c.Str("%d"), cnet.Ntohs(s.Port)) } - //TODO(hackerchai): use userData.host and userData.port conn.remoteAddr = c.GoString((*c.Char)(&userData.host[0])) + ":" + c.GoString((*c.Char)(&userData.port[0])) r := libuv.PollInit(el.uvLoop, &conn.pollHandle, libuv.OsFd(conn.stream.GetIoWatcherFd())) @@ -475,7 +376,6 @@ func onAsync(asyncHandle *libuv.Async) { } func onIdle(handle *libuv.Idle) { - // el := (*eventLoop)((*libuv.Handle)(unsafe.Pointer(handle)).GetData()) el := (*eventLoop)((*libuv.Handle)(unsafe.Pointer(handle)).GetLoop().GetData()) if el.executor != nil { task := el.executor.Poll() @@ -495,21 +395,9 @@ func doNothing(handle *libuv.Idle) { return } -// func (s *serviceUserdata) setExecutor(exec *hyper.Executor) { -// s.executor.Store(exec) -// } - -// func (s *serviceUserdata) getExecutor() *hyper.Executor { -// return s.executor.Load() -// } - func serverCallback(userData unsafe.Pointer, hyperReq *hyper.Request, channel *hyper.ResponseChannel) { + payload := (*serviceUserdata)(userData) - // srv := userData.server - // if srv == nil { - // fmt.Fprintf(os.Stderr, "Error: Received null server\n") - // return - // } if payload == nil { fmt.Fprintf(os.Stderr, "Error: Received null userData\n") return @@ -518,15 +406,27 @@ func serverCallback(userData unsafe.Pointer, hyperReq *hyper.Request, channel *h executor := payload.executor if executor == nil { fmt.Fprintf(os.Stderr, "Error: Received null executor\n") + fmt.Printf("[debug] host: %s\n", c.GoString(&payload.host[0])) + fmt.Printf("[debug] port: %s\n", c.GoString(&payload.port[0])) return } + host := payload.host + port := payload.port + + if payload.asyncHandleID == 0 { + fmt.Fprintf(os.Stderr, "Error: Received null asyncHandleID\n") + return + } + connID := int64(payload.asyncHandleID) + + + if hyperReq == nil { fmt.Fprintf(os.Stderr, "Error: Received null request\n") return } - connID := payload.asyncHandleID asyncHandleMapMu.Lock() requestNotifyHandle, ok := asyncHandleMap[connID] asyncHandleMapMu.Unlock() @@ -535,8 +435,6 @@ func serverCallback(userData unsafe.Pointer, hyperReq *hyper.Request, channel *h return } - host := payload.host - port := payload.port remoteAddr := c.GoString(&host[0]) + ":" + c.GoString(&port[0]) fmt.Printf("[debug] Remote address: %s\n", remoteAddr) @@ -811,7 +709,7 @@ func createConnData() (*conn, error) { } conn.isClosing.Store(false) conn.closedHandles = 0 - conn.asyncID = int(connID) + 1 + conn.asyncID = conn.asyncID + 1 return conn, nil } @@ -830,52 +728,51 @@ func closeWalkCb(handle *libuv.Handle, arg c.Pointer) { func (srv *Server) Close() error { srv.isShutdown.Store(true) - // for c := range el.activeConnections { - // c.Close() + for _, el := range srv.eventLoop { + el.Close() + } - // delete(srv.activeConnections, c) - // } + return nil +} - // if srv.executor != nil { - // srv.executor.Free() - // srv.executor = nil - // } +func (s *Server) shuttingDown() bool { + return s.isShutdown.Load() +} - // if exec := srv.executor; exec != nil { - // srv.executor = nil - // exec.Free() - // } +func (el *eventLoop) Close() error { + el.isShutdown.Store(true) - // if srv.http1Opts != nil { - // srv.http1Opts.Free() - // srv.http1Opts = nil - // } + for c := range el.activeConnections { + c.Close() + el.trackConn(c, false) + } - // if srv.http2Opts != nil { - // srv.http2Opts.Free() - // srv.http2Opts = nil - // } + if el.executor != nil { + el.executor.Free() + el.executor = nil + } + if el.http1Opts != nil { + el.http1Opts.Free() + el.http1Opts = nil + } + if el.http2Opts != nil { + el.http2Opts.Free() + el.http2Opts = nil + } - // srv.uvLoop.Walk(closeWalkCb, nil) - // srv.uvLoop.Run(libuv.RUN_ONCE) - // (*libuv.Handle)(unsafe.Pointer(&srv.uvServer)).Close(nil) + el.uvLoop.Walk(closeWalkCb, nil) + el.uvLoop.Run(libuv.RUN_ONCE) + (*libuv.Handle)(unsafe.Pointer(&el.uvServer)).Close(nil) + (*libuv.Handle)(unsafe.Pointer(&el.idleHandle)).Close(nil) + el.uvLoop.Close() - // srv.uvLoop.Close() return nil } -func (s *Server) shuttingDown() bool { - return s.isShutdown.Load() -} - func (el *eventLoop) shuttingDown() bool { return el.isShutdown.Load() } -func (c *conn) shuttingDown() bool { - return c.isClosing.Load() -} - func (c *conn) Close() { if c != nil && !c.isClosing.Swap(true) { fmt.Printf("[debug] Closing connection...\n") @@ -895,9 +792,19 @@ func (c *conn) Close() { if (*libuv.Handle)(unsafe.Pointer(&c.stream)).IsClosing() == 0 { (*libuv.Handle)(unsafe.Pointer(&c.stream)).Close(nil) } + + if asyncHandleMap[c.asyncID] != nil { + asyncHandleMapMu.Lock() + delete(asyncHandleMap, c.asyncID) + asyncHandleMapMu.Unlock() + } } } +func (c *conn) shuttingDown() bool { + return c.isClosing.Load() +} + type HandlerFunc func(ResponseWriter, *Request) func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {