Skip to content

Commit e473b7e

Browse files
Fixing race condition in LeaserCheckpointer where it can fail with a ContainerAlreadyExists error #253
There's a slight race condition between checking the store exists and the container being created. We can handle it easily if we just allow ContainerAlreadyExists to be considered successful. Also, since the storage tests were failing (unrelated to my change) in race detection I also fixed that as well. Fixes #252 Fixes #225
1 parent 73b7c0f commit e473b7e

File tree

7 files changed

+78
-25
lines changed

7 files changed

+78
-25
lines changed

changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Change Log
22

3+
## `v3.3.17`
4+
5+
- Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253)
6+
37
## `v3.3.16`
48

59
- Exporting a subset of AMQP message properties for the Dapr project.

eng/azure-pipelines.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ jobs:
4141
go get github.com/AlekSi/gocov-xml
4242
go get -u github.com/matm/gocov-html
4343
go get -u golang.org/x/lint/golint
44-
go get github.com/fzipp/gocyclo/cmd/gocyclo
44+
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
4545
workingDirectory: '$(sdkPath)'
4646
displayName: 'Install Dependencies'
4747
- script: |

eng/integration-tests.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ steps:
2929
go get github.com/axw/gocov/gocov
3030
go get github.com/AlekSi/gocov-xml
3131
go get -u github.com/matm/gocov-html
32-
go get github.com/fzipp/gocyclo/cmd/gocyclo
32+
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
3333
go get golang.org/x/lint/golint
3434
displayName: 'Install Dependencies'
3535
- script: |
@@ -47,10 +47,10 @@ steps:
4747
gocov-html < coverage.json > coverage.html
4848
displayName: 'Run Integration Tests'
4949
env:
50-
ARM_SUBSCRIPTION_ID: $(go-live-azure-subscription-id)
51-
ARM_CLIENT_ID: $(go-live-eh-azure-client-id)
52-
ARM_CLIENT_SECRET: $(go-live-eh-azure-client-secret)
53-
ARM_TENANT_ID: $(go-live-tenant-id)
50+
ARM_SUBSCRIPTION_ID: $(azure-subscription-id)
51+
ARM_CLIENT_ID: $(aad-azure-sdk-test-client-id)
52+
ARM_CLIENT_SECRET: $(aad-azure-sdk-test-client-secret)
53+
ARM_TENANT_ID: $(aad-azure-sdk-test-tenant-id)
5454
5555
- task: PublishTestResults@2
5656
inputs:

eph/leasedReceiver.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131

3232
"github.com/devigned/tab"
3333

34-
"github.com/Azure/azure-event-hubs-go/v3"
34+
eventhub "github.com/Azure/azure-event-hubs-go/v3"
3535
)
3636

3737
type (
@@ -58,11 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error {
5858
epoch := lr.lease.GetEpoch()
5959
lr.dlog(ctx, "running...")
6060

61-
go func() {
62-
ctx, done := context.WithCancel(context.Background())
63-
lr.done = done
64-
lr.periodicallyRenewLease(ctx)
65-
}()
61+
renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background())
62+
lr.done = cancelRenewLease
63+
64+
go lr.periodicallyRenewLease(renewLeaseCtx)
6665

6766
opts := []eventhub.ReceiveOption{eventhub.ReceiveWithEpoch(epoch)}
6867
if lr.processor.consumerGroup != "" {

storage/storage.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import (
3030
"encoding/json"
3131
"errors"
3232
"io/ioutil"
33+
"net/http"
3334
"net/url"
3435
"sync"
3536
"time"
3637

3738
"github.com/Azure/azure-amqp-common-go/v3/uuid"
3839
"github.com/devigned/tab"
3940

40-
"github.com/Azure/azure-event-hubs-go/v3"
41+
eventhub "github.com/Azure/azure-event-hubs-go/v3"
4142
"github.com/Azure/azure-event-hubs-go/v3/eph"
4243
"github.com/Azure/azure-event-hubs-go/v3/persist"
4344

@@ -144,20 +145,22 @@ func (sl *LeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) {
144145
span, ctx := startConsumerSpanFromContext(ctx, "storage.LeaserCheckpointer.StoreExists")
145146
defer span.End()
146147

147-
opts := azblob.ListContainersSegmentOptions{
148-
Prefix: sl.containerName,
149-
}
150-
res, err := sl.serviceURL.ListContainersSegment(ctx, azblob.Marker{}, opts)
151-
if err != nil {
152-
return false, err
148+
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
149+
_, err := containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})
150+
151+
if err == nil {
152+
return true, nil
153153
}
154154

155-
for _, container := range res.ContainerItems {
156-
if container.Name == sl.containerName {
157-
return true, nil
155+
var respErr azblob.ResponseError
156+
157+
if errors.As(err, &respErr) {
158+
if respErr.Response().StatusCode == http.StatusNotFound {
159+
return false, nil
158160
}
159161
}
160-
return false, nil
162+
163+
return false, err
161164
}
162165

163166
// EnsureStore creates the container if it does not exist
@@ -175,9 +178,21 @@ func (sl *LeaserCheckpointer) EnsureStore(ctx context.Context) error {
175178
if !ok {
176179
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
177180
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
181+
178182
if err != nil {
179-
return err
183+
var storageErr azblob.StorageError
184+
185+
if errors.As(err, &storageErr) {
186+
// we're okay if the container has been created - we're basically racing against
187+
// other LeaserCheckpointers.
188+
if storageErr.ServiceCode() != azblob.ServiceCodeContainerAlreadyExists {
189+
return err
190+
}
191+
} else {
192+
return err
193+
}
180194
}
195+
181196
sl.containerURL = &containerURL
182197
}
183198
return nil

storage/storage_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ package storage
2525
import (
2626
"context"
2727
"strings"
28+
"sync"
2829
"time"
2930

3031
"github.com/Azure/azure-amqp-common-go/v3/aad"
3132
"github.com/Azure/azure-storage-blob-go/azblob"
3233
"github.com/stretchr/testify/assert"
3334

3435
"github.com/Azure/azure-event-hubs-go/v3/eph"
36+
"github.com/Azure/azure-event-hubs-go/v3/internal/test"
3537
)
3638

3739
const (
@@ -64,6 +66,35 @@ func (ts *testSuite) TestLeaserStoreCreation() {
6466
ts.True(exists)
6567
}
6668

69+
func (ts *testSuite) TestLeaserStoreCreationConcurrent() {
70+
wg := sync.WaitGroup{}
71+
72+
containerName := test.RandomString("concurrent-container", 4)
73+
74+
// do a simple test that ensures we don't die just because we raced with
75+
// other leasers to create the storage container.
76+
for i := 0; i < 100; i++ {
77+
wg.Add(1)
78+
79+
go func(i int) {
80+
defer wg.Done()
81+
82+
leaser, _ := ts.newLeaserWithContainerName(containerName)
83+
84+
err := leaser.EnsureStore(context.Background())
85+
ts.Require().NoError(err)
86+
}(i)
87+
}
88+
89+
wg.Wait()
90+
91+
leaser, del := ts.newLeaserWithContainerName(containerName)
92+
defer del()
93+
exists, err := leaser.StoreExists(context.Background())
94+
ts.NoError(err)
95+
ts.True(exists)
96+
}
97+
6798
func (ts *testSuite) TestLeaserLeaseEnsure() {
6899
leaser, del := ts.leaserWithEPH()
69100
defer del()
@@ -189,6 +220,10 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {
189220

190221
func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) {
191222
containerName := strings.ToLower(ts.RandomName("stortest", 4))
223+
return ts.newLeaserWithContainerName(containerName)
224+
}
225+
226+
func (ts *testSuite) newLeaserWithContainerName(containerName string) (*LeaserCheckpointer, func()) {
192227
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
193228
ts.Require().NoError(err)
194229
leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)

version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package eventhub
22

33
const (
44
// Version is the semantic version number
5-
Version = "3.3.13"
5+
Version = "3.3.17"
66
)

0 commit comments

Comments
 (0)