From 2768bfba84da83d9a0c4bd26e25fc789f88e52c6 Mon Sep 17 00:00:00 2001 From: Matt Braymer-Hayes Date: Thu, 11 Nov 2021 16:32:25 -0500 Subject: [PATCH] Fix data race-related test failures (PR #171) --- .github/workflows/test.yml | 9 +++++---- .gitignore | 5 ++++- client.go | 7 ++++--- context_test.go | 16 +++++++++++++--- registry/registry.go | 30 +++++++++++++++++++----------- registry/registry_test.go | 9 +++------ 6 files changed, 48 insertions(+), 28 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f62b147..7f2926c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -7,12 +7,13 @@ jobs: go-version: [1.x.x] platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} + name: Go ${{ matrix.go-version }} (${{ matrix.platform }}) steps: + - name: Checkout code + uses: actions/checkout@v2 - name: Install Go - uses: actions/setup-go@v1 + uses: actions/setup-go@v2 with: go-version: ${{ matrix.go-version }} - - name: Checkout code - uses: actions/checkout@v1 - name: Test - run: go test --timeout 360s -v ./... + run: go test -race -timeout 360s -v ./... diff --git a/.gitignore b/.gitignore index 87be583..5a5a3e5 100644 --- a/.gitignore +++ b/.gitignore @@ -24,8 +24,11 @@ _testmain.go *.test *.prof -# ignore vendor - they are only needed for tests. +# ignore vendor - they are only needed for tests. vendor/ # ignore bazel things that are local only bazel-* + +# Visual Studio Code +.vscode/* diff --git a/client.go b/client.go index 3ceef88..3ed5d1e 100644 --- a/client.go +++ b/client.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/lytics/grid/v3/codec" @@ -426,7 +427,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr receivers := g.Members() var broadcastErr error - successes := 0 + var successes int32 mu := new(sync.Mutex) wg := new(sync.WaitGroup) for _, rec := range receivers { @@ -442,7 +443,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr // if this request was successful and the group is configured to Fastest, // then cancel the context so other requests are terminated cancel() - successes++ + atomic.AddInt32(&successes, 1) } mu.Lock() @@ -457,7 +458,7 @@ func (c *Client) broadcast(ctx context.Context, cancel context.CancelFunc, g *Gr // if the group is configured to Fastest, and we had at least one successful // request, then don't return an error - if g.fastest && broadcastErr != nil && successes > 0 { + if g.fastest && broadcastErr != nil && atomic.LoadInt32(&successes) > 0 { broadcastErr = nil } return res, broadcastErr diff --git a/context_test.go b/context_test.go index 7a1c2b2..42d5179 100644 --- a/context_test.go +++ b/context_test.go @@ -3,6 +3,7 @@ package grid import ( "context" "net" + "sync" "testing" "time" @@ -10,15 +11,24 @@ import ( ) type contextActor struct { + mu sync.RWMutex started chan bool ctx context.Context } func (a *contextActor) Act(c context.Context) { + a.mu.Lock() a.ctx = c + a.mu.Unlock() a.started <- true } +func (a *contextActor) Context() context.Context { + a.mu.RLock() + defer a.mu.RUnlock() + return a.ctx +} + func TestContextError(t *testing.T) { // Create a context that is not valid to use // with the grid context methods. The context @@ -88,7 +98,7 @@ func TestValidContext(t *testing.T) { case <-a.started: server.Stop() - id, err := ContextActorID(a.ctx) + id, err := ContextActorID(a.Context()) if err != nil { t.Fatal(err) } @@ -96,7 +106,7 @@ func TestValidContext(t *testing.T) { t.Fatal("expected non-zero value") } - name, err := ContextActorName(a.ctx) + name, err := ContextActorName(a.Context()) if err != nil { t.Fatal(err) } @@ -104,7 +114,7 @@ func TestValidContext(t *testing.T) { t.Fatal("expected non-zero value") } - namespace, err := ContextActorNamespace(a.ctx) + namespace, err := ContextActorNamespace(a.Context()) if err != nil { t.Fatal(err) } diff --git a/registry/registry.go b/registry/registry.go index f9c26a2..e248544 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -92,7 +92,7 @@ func (we *WatchEvent) String() string { // Registry for discovery. type Registry struct { - mu sync.Mutex + mu sync.RWMutex done chan bool exited chan bool kv etcdv3.KV @@ -202,6 +202,7 @@ func (rr *Registry) Start(addr net.Addr) error { // Ensure that we're the owner of the address by taking an etcd lock tctx, cancel := context.WithTimeout(context.TODO(), rr.LeaseDuration*2) // retry until Lease is up... + defer cancel() err = rr.waitForAddress(tctx, address) if err != nil { return err @@ -263,17 +264,24 @@ func (rr *Registry) Start(addr net.Addr) error { // Address of this registry in the format of : func (rr *Registry) Address() string { + rr.mu.RLock() + defer rr.mu.RUnlock() return rr.address } // Registry name, which is a human readable all ASCII // transformation of the network address. func (rr *Registry) Registry() string { + rr.mu.RLock() + defer rr.mu.RUnlock() return rr.name } // Stop Registry. func (rr *Registry) Stop() error { + rr.mu.Lock() + defer rr.mu.Unlock() + if rr.leaseID < 0 { return nil } @@ -296,8 +304,8 @@ func (rr *Registry) Stop() error { // Watch a prefix in the registry. func (rr *Registry) Watch(c context.Context, prefix string) ([]*Registration, <-chan *WatchEvent, error) { - rr.mu.Lock() - defer rr.mu.Unlock() + rr.mu.RLock() + defer rr.mu.RUnlock() getRes, err := rr.kv.Get(c, prefix, etcdv3.WithPrefix()) if err != nil { @@ -390,8 +398,8 @@ func (rr *Registry) Watch(c context.Context, prefix string) ([]*Registration, <- // FindRegistrations associated with the prefix. func (rr *Registry) FindRegistrations(c context.Context, prefix string) ([]*Registration, error) { - rr.mu.Lock() - defer rr.mu.Unlock() + rr.mu.RLock() + defer rr.mu.RUnlock() getRes, err := rr.kv.Get(c, prefix, etcdv3.WithPrefix()) if err != nil { @@ -411,8 +419,8 @@ func (rr *Registry) FindRegistrations(c context.Context, prefix string) ([]*Regi // FindRegistration associated with the given key. func (rr *Registry) FindRegistration(c context.Context, key string) (*Registration, error) { - rr.mu.Lock() - defer rr.mu.Unlock() + rr.mu.RLock() + defer rr.mu.RUnlock() getRes, err := rr.kv.Get(c, key, etcdv3.WithLimit(1)) if err != nil { @@ -434,8 +442,8 @@ func (rr *Registry) FindRegistration(c context.Context, key string) (*Registrati // Hence, registration can be used for mutual-exclusion. func (rr *Registry) Register(c context.Context, key string, annotations ...string) error { sort.Strings(annotations) - rr.mu.Lock() - defer rr.mu.Unlock() + rr.mu.RLock() + defer rr.mu.RUnlock() if rr.leaseID < 0 { return ErrNotStarted @@ -475,8 +483,8 @@ func (rr *Registry) Register(c context.Context, key string, annotations ...strin // Deregister under the given key. func (rr *Registry) Deregister(c context.Context, key string) error { - rr.mu.Lock() - defer rr.mu.Unlock() + rr.mu.RLock() + defer rr.mu.RUnlock() if rr.leaseID < 0 { return ErrNotStarted diff --git a/registry/registry_test.go b/registry/registry_test.go index ed8a110..ff8f726 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -156,21 +156,18 @@ func TestWaitForLeaseThatDoesExpires(t *testing.T) { } r1.LeaseDuration = 10 * time.Second - _, err = kv.Put(context.Background(), registryLockKey(address), "") - if err != nil { + if _, err := kv.Put(context.Background(), registryLockKey(address), ""); err != nil { t.Fatal(err) } time.AfterFunc(5*time.Second, func() { // cleanup lock so that the registry can startup. - _, err = kv.Delete(context.Background(), registryLockKey(address)) - if err != nil { + if _, err := kv.Delete(context.Background(), registryLockKey(address)); err != nil { t.Fatal(err) } }) st := time.Now() - err = r1.Start(addr) - if err != nil { + if err := r1.Start(addr); err != nil { t.Fatalf("unexpected error: err: %v", err) } // ensure that we waited 10 seconds...