Skip to content

Commit ab34cdb

Browse files
authored
Merge pull request #19166 from joshuazh-x/fix-embed-close-deadlock-3.4
[3.4] Avoid deadlock in etcd.Close when stopping during bootstrapping
2 parents e34100c + a3bf49b commit ab34cdb

File tree

4 files changed

+80
-4
lines changed

4 files changed

+80
-4
lines changed

embed/etcd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
109109
if !serving {
110110
// errored before starting gRPC server for serveCtx.serversC
111111
for _, sctx := range e.sctxs {
112-
close(sctx.serversC)
112+
sctx.close()
113113
}
114114
}
115115
e.Close()

embed/serve.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ package embed
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"io/ioutil"
2122
defaultLog "log"
2223
"net"
2324
"net/http"
2425
"strings"
26+
"sync"
2527

2628
"go.etcd.io/etcd/etcdserver"
2729
"go.etcd.io/etcd/etcdserver/api/v3client"
@@ -63,6 +65,7 @@ type serveCtx struct {
6365
userHandlers map[string]http.Handler
6466
serviceRegister func(*grpc.Server)
6567
serversC chan *servers
68+
closeOnce sync.Once
6669
}
6770

6871
type servers struct {
@@ -94,7 +97,15 @@ func (sctx *serveCtx) serve(
9497
splitHttp bool,
9598
gopts ...grpc.ServerOption) (err error) {
9699
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
97-
<-s.ReadyNotify()
100+
101+
// Make sure serversC is closed even if we prematurely exit the function.
102+
defer sctx.close()
103+
104+
select {
105+
case <-s.StoppingNotify():
106+
return errors.New("server is stopping")
107+
case <-s.ReadyNotify():
108+
}
98109

99110
if sctx.lg != nil {
100111
sctx.lg.Info("ready to serve client requests")
@@ -113,8 +124,6 @@ func (sctx *serveCtx) serve(
113124
servElection := v3election.NewElectionServer(v3c)
114125
servLock := v3lock.NewLockServer(v3c)
115126

116-
// Make sure serversC is closed even if we prematurely exit the function.
117-
defer close(sctx.serversC)
118127
var gwmux *gw.ServeMux
119128
if s.Cfg.EnableGRPCGateway {
120129
// GRPC gateway connects to grpc server via connection provided by grpc dial.
@@ -549,3 +558,9 @@ func (sctx *serveCtx) registerTrace() {
549558
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
550559
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
551560
}
561+
562+
func (sctx *serveCtx) close() {
563+
sctx.closeOnce.Do(func() {
564+
close(sctx.serversC)
565+
})
566+
}

etcdserver/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,10 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
16701670
// when the server is stopped.
16711671
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
16721672

1673+
// StoppingNotify returns a channel that receives an empty struct
1674+
// when the server is being stopped.
1675+
func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
1676+
16731677
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
16741678

16751679
func (s *EtcdServer) LeaderStats() []byte {
@@ -2163,6 +2167,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
21632167
Val: string(b),
21642168
}
21652169

2170+
// gofail: var beforePublishing struct{}
21662171
for {
21672172
ctx, cancel := context.WithTimeout(s.ctx, timeout)
21682173
_, err := s.Do(ctx, req)

integration/embed_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ import (
2929
"testing"
3030
"time"
3131

32+
"github.com/stretchr/testify/require"
33+
3234
"go.etcd.io/etcd/clientv3"
3335
"go.etcd.io/etcd/embed"
36+
gofail "go.etcd.io/gofail/runtime"
3437
)
3538

3639
func TestEmbedEtcd(t *testing.T) {
@@ -196,3 +199,56 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) {
196199
}
197200
cfg.InitialCluster = cfg.InitialCluster[1:]
198201
}
202+
203+
func TestEmbedEtcdStopDuringBootstrapping(t *testing.T) {
204+
if len(gofail.List()) == 0 {
205+
t.Skip("please run 'make gofail-enable' before running the test")
206+
}
207+
208+
fpName := "beforePublishing"
209+
require.NoError(t, gofail.Enable(fpName, `sleep("2s")`))
210+
t.Cleanup(func() {
211+
terr := gofail.Disable(fpName)
212+
if terr != nil && terr != gofail.ErrDisabled {
213+
t.Fatalf("failed to disable %s: %v", fpName, terr)
214+
}
215+
})
216+
217+
done := make(chan struct{})
218+
go func() {
219+
defer close(done)
220+
221+
cfg := embed.NewConfig()
222+
urls := newEmbedURLs(false, 2)
223+
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
224+
cfg.Dir = filepath.Join(t.TempDir(), "embed-etcd")
225+
226+
e, err := embed.StartEtcd(cfg)
227+
if err != nil {
228+
t.Errorf("Failed to start etcd, got error %v", err)
229+
}
230+
defer e.Close()
231+
232+
go func() {
233+
time.Sleep(time.Second)
234+
e.Server.Stop()
235+
t.Log("Stopped server during bootstrapping")
236+
}()
237+
238+
select {
239+
case <-e.Server.ReadyNotify():
240+
t.Log("Server is ready!")
241+
case <-e.Server.StopNotify():
242+
t.Log("Server is stopped")
243+
case <-time.After(20 * time.Second):
244+
e.Server.Stop() // trigger a shutdown
245+
t.Error("Server took too long to start!")
246+
}
247+
}()
248+
249+
select {
250+
case <-done:
251+
case <-time.After(10 * time.Second):
252+
t.Error("timeout in bootstrapping etcd")
253+
}
254+
}

0 commit comments

Comments
 (0)