Skip to content

Commit 43431bd

Browse files
authored
Merge pull request #19221 from ahrtr/race-20250117
Fix race condition (also a regression of the PR 19139)
2 parents c9045d6 + 201568a commit 43431bd

File tree

2 files changed

+68
-14
lines changed

2 files changed

+68
-14
lines changed

server/embed/etcd.go

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,23 @@ type Etcd struct {
7979

8080
Server *etcdserver.EtcdServer
8181

82-
cfg Config
83-
stopc chan struct{}
84-
errc chan error
82+
cfg Config
8583

84+
// closeOnce is to ensure `stopc` is closed only once, no matter
85+
// how many times the Close() method is called.
8686
closeOnce sync.Once
87-
wg sync.WaitGroup
87+
// stopc is used to notify the sub goroutines not to send
88+
// any errors to `errc`.
89+
stopc chan struct{}
90+
// errc is used to receive error from sub goroutines (including
91+
// client handler, peer handler and metrics handler). It's closed
92+
// after all these sub goroutines exit (checked via `wg`). Writers
93+
// should avoid writing after `stopc` is closed by selecting on
94+
// reading from `stopc`.
95+
errc chan error
96+
97+
// wg is used to track the lifecycle of all sub goroutines created by `StartEtcd`.
98+
wg sync.WaitGroup
8899
}
89100

90101
type peerListener struct {
@@ -388,6 +399,24 @@ func (e *Etcd) Config() Config {
388399
// Close gracefully shuts down all servers/listeners.
389400
// Client requests will be terminated with request timeout.
390401
// After timeout, enforce remaning requests be closed immediately.
402+
//
403+
// The rough workflow to shut down etcd:
404+
// 1. close the `stopc` channel, so that all error handlers (child
405+
// goroutines) won't send back any errors anymore;
406+
// 2. stop the http and grpc servers gracefully, within request timeout;
407+
// 3. close all client and metrics listeners, so that etcd server
408+
// stops receiving any new connection;
409+
// 4. call the cancel function to close the gateway context, so that
410+
// all gateway connections are closed.
411+
// 5. stop etcd server gracefully, and ensure the main raft loop
412+
// goroutine is stopped;
413+
// 6. stop all peer listeners, so that it stops receiving peer connections
414+
// and messages (wait up to 1-second);
415+
// 7. wait for all child goroutines (i.e. client handlers, peer handlers
416+
// and metrics handlers) to exit;
417+
// 8. close the `errc` channel to release the resource. Note that it's only
418+
// safe to close the `errc` after step 7 above is done, otherwise the
419+
// child goroutines may send errors back to already closed `errc` channel.
391420
func (e *Etcd) Close() {
392421
fields := []zap.Field{
393422
zap.String("name", e.cfg.Name),
@@ -607,7 +636,9 @@ func (e *Etcd) servePeers() {
607636

608637
// start peer servers in a goroutine
609638
for _, pl := range e.Peers {
639+
e.wg.Add(1)
610640
go func(l *peerListener) {
641+
defer e.wg.Done()
611642
u := l.Addr().String()
612643
e.cfg.logger.Info(
613644
"serving peer traffic",
@@ -774,7 +805,9 @@ func (e *Etcd) serveClients() {
774805

775806
// start client servers in each goroutine
776807
for _, sctx := range e.sctxs {
808+
e.wg.Add(1)
777809
go func(s *serveCtx) {
810+
defer e.wg.Done()
778811
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHTTP), splitHTTP, gopts...))
779812
}(sctx)
780813
}
@@ -859,7 +892,9 @@ func (e *Etcd) serveMetrics() (err error) {
859892
return err
860893
}
861894
e.metricsListeners = append(e.metricsListeners, ml)
895+
e.wg.Add(1)
862896
go func(u url.URL, ln net.Listener) {
897+
defer e.wg.Done()
863898
e.cfg.logger.Info(
864899
"serving metrics",
865900
zap.String("address", u.String()),
@@ -872,9 +907,6 @@ func (e *Etcd) serveMetrics() (err error) {
872907
}
873908

874909
func (e *Etcd) errHandler(err error) {
875-
e.wg.Add(1)
876-
defer e.wg.Done()
877-
878910
if err != nil {
879911
e.GetLogger().Error("setting up serving from embedded etcd failed.", zap.Error(err))
880912
}

server/embed/serve.go

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,23 @@ type serveCtx struct {
6161
insecure bool
6262
httpOnly bool
6363

64+
// ctx is used to control the grpc gateway. Terminate the grpc gateway
65+
// by calling `cancel` when shutting down the etcd.
6466
ctx context.Context
6567
cancel context.CancelFunc
6668

6769
userHandlers map[string]http.Handler
6870
serviceRegister func(*grpc.Server)
69-
serversC chan *servers
70-
closeOnce sync.Once
71+
72+
// serversC is used to receive the http and grpc server objects (created
73+
// in `serve`), both of which will be closed when shutting down the etcd.
74+
// Close it when `serve` returns or when etcd fails to bootstrap.
75+
serversC chan *servers
76+
// closeOnce is to ensure `serversC` is closed only once.
77+
closeOnce sync.Once
78+
79+
// wg is used to track the lifecycle of all sub goroutines created by `serve`.
80+
wg sync.WaitGroup
7181
}
7282

7383
type servers struct {
@@ -182,13 +192,17 @@ func (sctx *serveCtx) serve(
182192
server = m.Serve
183193

184194
httpl := m.Match(cmux.HTTP1())
195+
sctx.wg.Add(1)
185196
go func(srvhttp *http.Server, tlsLis net.Listener) {
197+
defer sctx.wg.Done()
186198
errHandler(srvhttp.Serve(tlsLis))
187199
}(srv, httpl)
188200

189201
if grpcEnabled {
190202
grpcl := m.Match(cmux.HTTP2())
203+
sctx.wg.Add(1)
191204
go func(gs *grpc.Server, l net.Listener) {
205+
defer sctx.wg.Done()
192206
errHandler(gs.Serve(l))
193207
}(gs, grpcl)
194208
}
@@ -237,7 +251,7 @@ func (sctx *serveCtx) serve(
237251
TLSConfig: tlscfg,
238252
ErrorLog: logger, // do not log user error
239253
}
240-
if err := configureHTTPServer(srv, s.Cfg); err != nil {
254+
if err = configureHTTPServer(srv, s.Cfg); err != nil {
241255
sctx.lg.Error("Configure https server failed", zap.Error(err))
242256
return err
243257
}
@@ -248,11 +262,13 @@ func (sctx *serveCtx) serve(
248262
} else {
249263
server = m.Serve
250264

251-
tlsl, err := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
252-
if err != nil {
253-
return err
265+
tlsl, tlsErr := transport.NewTLSListener(m.Match(cmux.Any()), tlsinfo)
266+
if tlsErr != nil {
267+
return tlsErr
254268
}
269+
sctx.wg.Add(1)
255270
go func(srvhttp *http.Server, tlsl net.Listener) {
271+
defer sctx.wg.Done()
256272
errHandler(srvhttp.Serve(tlsl))
257273
}(srv, tlsl)
258274
}
@@ -265,7 +281,11 @@ func (sctx *serveCtx) serve(
265281
)
266282
}
267283

268-
return server()
284+
err = server()
285+
sctx.close()
286+
// ensure all goroutines, which are created by this method, to complete before this method returns.
287+
sctx.wg.Wait()
288+
return err
269289
}
270290

271291
func configureHTTPServer(srv *http.Server, cfg config.ServerConfig) error {
@@ -334,7 +354,9 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
334354
return nil, err
335355
}
336356
}
357+
sctx.wg.Add(1)
337358
go func() {
359+
defer sctx.wg.Done()
338360
<-ctx.Done()
339361
if cerr := conn.Close(); cerr != nil {
340362
sctx.lg.Warn(

0 commit comments

Comments
 (0)