Skip to content

Commit

Permalink
[client]:fix EnrollContext
Browse files Browse the repository at this point in the history
  • Loading branch information
hujiaming.0927 committed Jun 25, 2024
1 parent 4d6fde2 commit d0e1ffc
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 16 deletions.
35 changes: 19 additions & 16 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}

var p *netpoll.Poller
if p, err = netpoll.OpenPoller(); err != nil {
return
}
// var p *netpoll.Poller
// if p, err = netpoll.OpenPoller(); err != nil {
// return
// }
el := eventloop{
listeners: eng.listeners,
engine: &eng,
poller: p,
// listeners: eng.listeners,
engine: &eng,
// poller: p,
}

rbc := options.ReadBufferCap
Expand All @@ -109,7 +109,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
options.WriteBufferCap = math.CeilToPowerOfTwo(wbc)
}

numEventLoop := 10
numEventLoop := 2
if options.Multicore {
numEventLoop = runtime.NumCPU()
}
Expand All @@ -119,6 +119,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
if numEventLoop > gfd.EventLoopIndexMax {
numEventLoop = gfd.EventLoopIndexMax
}
options.NumEventLoop = numEventLoop

for i := 0; i < numEventLoop; i++ {
p, err := netpoll.OpenPoller()
Expand All @@ -145,15 +146,15 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

// Start starts the client event-loop, handing IO events.
func (cli *Client) Start() error {
logging.Infof("Starting gnet client with 1 event-loop")
logging.Infof("Starting gnet client with %d event-loop", cli.opts.NumEventLoop)
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})

cli.el.engine.eventLoops.iterate(func(_ int, e *eventloop) bool {
cli.el.engine.workerPool.Go(e.orbit)
cli.el.engine.workerPool.Go(e.run)
return true
})

cli.el.engine.workerPool.Go(cli.el.run)
// cli.el.engine.workerPool.Go(cli.el.run)
// Start the ticker.
if cli.opts.Ticker {
go cli.el.ticker(cli.el.engine.ticker.ctx)
Expand All @@ -169,7 +170,7 @@ func (cli *Client) Stop() (err error) {
e.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil)
return true
})
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))

// Stop the ticker.
if cli.opts.Ticker {
Expand All @@ -183,7 +184,7 @@ func (cli *Client) Stop() (err error) {
logging.Error(e.poller.Close())
return true
})
logging.Error(cli.el.poller.Close())
// logging.Error(cli.el.poller.Close())

cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
logging.Cleanup()
Expand Down Expand Up @@ -247,6 +248,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
var (
sockAddr unix.Sockaddr
gc *conn
el *eventloop
)
switch c.(type) {
case *net.UnixConn:
Expand All @@ -270,7 +272,8 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
if sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
return nil, err
}
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
el = cli.el.engine.eventLoops.next(socket.SockaddrToTCPOrUnixAddr(sockAddr))
gc = newTCPConn(dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
case *net.UDPConn:
if sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String()); err != nil {
return nil, err
Expand All @@ -287,8 +290,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
}}

// choose an eventLoop by sockAddr and register this conn to it.
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sockAddr)
el := cli.el.engine.eventLoops.next(remoteAddr)
// remoteAddr := socket.SockaddrToTCPOrUnixAddr(sockAddr)
// err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
err = el.poller.Trigger(queue.HighPriority, el.register, ccb)
if err != nil {
Expand All @@ -297,5 +299,6 @@ func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
}

<-connOpened
logging.Infof("[gnet_EnrollContext] %+v enroled to eventloop[%d]", sockAddr, el.idx)
return gc, nil
}
5 changes: 5 additions & 0 deletions reactor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/netpoll"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

func (el *eventloop) rotate() error {
Expand Down Expand Up @@ -51,6 +52,8 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

logging.Infof("[gnet_eventloop_orbit] eventloop[%d] start polling...", el.idx)

err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
Expand Down Expand Up @@ -84,6 +87,8 @@ func (el *eventloop) run() error {
defer runtime.UnlockOSThread()
}

logging.Infof("[gnet_eventloop_run] eventloop[%d] start polling...", el.idx)

err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
Expand Down

0 comments on commit d0e1ffc

Please sign in to comment.