Skip to content

Refactoring the error handlers concurrent control implementation #19257

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@
// reading from `stopc`.
errc chan error

// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
// wg is used to track the lifecycle of all sub goroutines which
// need to send error back to the `errc`.
wg sync.WaitGroup
}

Expand Down Expand Up @@ -636,16 +637,15 @@

// start peer servers in a goroutine
for _, pl := range e.Peers {
e.wg.Add(1)
go func(l *peerListener) {
defer e.wg.Done()
l := pl
e.startHandler(func() error {
u := l.Addr().String()
e.cfg.logger.Info(
"serving peer traffic",
zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
return l.serve()
})
}
}

Expand Down Expand Up @@ -805,11 +805,10 @@

// start client servers in each goroutine
for _, sctx := range e.sctxs {
e.wg.Add(1)
go func(s *serveCtx) {
defer e.wg.Done()
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
}(sctx)
s := sctx
e.startHandler(func() error {
return s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...)
})
}
}

Expand Down Expand Up @@ -887,25 +886,34 @@
etcdhttp.HandleHealth(e.cfg.logger, metricsMux, e.Server)

for _, murl := range e.cfg.ListenMetricsUrls {
u := murl

Check warning on line 889 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L889

Added line #L889 was not covered by tests
ml, err := e.createMetricsListener(murl)
if err != nil {
return err
}
e.metricsListeners = append(e.metricsListeners, ml)
e.wg.Add(1)
go func(u url.URL, ln net.Listener) {
defer e.wg.Done()

e.startHandler(func() error {

Check warning on line 896 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L896

Added line #L896 was not covered by tests
e.cfg.logger.Info(
"serving metrics",
zap.String("address", u.String()),
)
e.errHandler(http.Serve(ln, metricsMux))
}(murl, ml)
return http.Serve(ml, metricsMux)
})

Check warning on line 902 in server/embed/etcd.go

View check run for this annotation

Codecov / codecov/patch

server/embed/etcd.go#L901-L902

Added lines #L901 - L902 were not covered by tests
}
}
return nil
}

func (e *Etcd) startHandler(handler func() error) {
// start each handler in a separate goroutine
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.errHandler(handler())
}()
}

func (e *Etcd) errHandler(err error) {
if err != nil {
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
Expand Down
44 changes: 24 additions & 20 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ type serveCtx struct {
wg sync.WaitGroup
}

func (sctx *serveCtx) startHandler(errHandler func(error), handler func() error) {
// start each handler in a separate goroutine
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
err := handler()
if errHandler != nil {
errHandler(err)
}
}()
}

type servers struct {
secure bool
grpc *grpc.Server
Expand Down Expand Up @@ -192,19 +204,15 @@ func (sctx *serveCtx) serve(
server = m.Serve

httpl := m.Match(cmux.HTTP1())
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsLis net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsLis))
}(srv, httpl)
sctx.startHandler(errHandler, func() error {
return srv.Serve(httpl)
})

if grpcEnabled {
grpcl := m.Match(cmux.HTTP2())
sctx.wg.Add(1)
go func(gs *grpc.Server, l net.Listener) {
defer sctx.wg.Done()
errHandler(gs.Serve(l))
}(gs, grpcl)
sctx.startHandler(errHandler, func() error {
return gs.Serve(grpcl)
})
}
}

Expand Down Expand Up @@ -266,11 +274,9 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
sctx.wg.Add(1)
go func(srvhttp *http.Server, tlsl net.Listener) {
defer sctx.wg.Done()
errHandler(srvhttp.Serve(tlsl))
}(srv, tlsl)
sctx.startHandler(errHandler, func() error {
return srv.Serve(tlsl)
})
}

sctx.serversC <- &servers{secure: true, grpc: gs, http: srv}
Expand All @@ -283,7 +289,6 @@ func (sctx *serveCtx) serve(

err = server()
sctx.close()
// ensure all goroutines, which are created by this method, to complete before this method returns.
sctx.wg.Wait()
return err
}
Expand Down Expand Up @@ -354,9 +359,7 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
return nil, err
}
}
sctx.wg.Add(1)
go func() {
defer sctx.wg.Done()
sctx.startHandler(nil, func() error {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
sctx.lg.Warn(
Expand All @@ -365,7 +368,8 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
zap.Error(cerr),
)
}
}()
return nil
})

return gwmux, nil
}
Expand Down
Loading