From d9a88501ef1b62f64c5aeb24b3979f12039bd460 Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Tue, 8 Nov 2022 15:47:40 -0800 Subject: [PATCH] Fixing issue with a lease existing on start (#277) Storage failures don't return a Response, and require us to do an errors.As() and check the returned error instead. This checks for the two codes that can come back if the blob already exists (409) or if the blob exists _and_ it has an active storage lease (412). Also, fixed a race condition in the LeaseReceiver that was causing Storage/TestMultiple() to fail. Fixes #276 --- eph/leasedReceiver.go | 44 ++++++++++++++++++++++++++++++----------- eph/scheduler.go | 3 ++- storage/storage.go | 6 ++++++ storage/storage_test.go | 4 ++++ 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/eph/leasedReceiver.go b/eph/leasedReceiver.go index bafa286a..d9d91c9f 100644 --- a/eph/leasedReceiver.go +++ b/eph/leasedReceiver.go @@ -27,6 +27,7 @@ import ( "errors" "fmt" "math/rand" + "sync/atomic" "time" "github.com/devigned/tab" @@ -38,15 +39,18 @@ type ( leasedReceiver struct { handle *eventhub.ListenerHandle processor *EventProcessorHost - lease LeaseMarker + lease *atomic.Value // LeaseMarker done func() } ) func newLeasedReceiver(processor *EventProcessorHost, lease LeaseMarker) *leasedReceiver { + leaseValue := atomic.Value{} + leaseValue.Store(lease) + return &leasedReceiver{ processor: processor, - lease: lease, + lease: &leaseValue, } } @@ -54,8 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error { span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.Run") defer span.End() - partitionID := lr.lease.GetPartitionID() - epoch := lr.lease.GetEpoch() + lease := lr.getLease() + + partitionID := lease.GetPartitionID() + epoch := lease.GetEpoch() lr.dlog(ctx, "running...") renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background()) @@ -99,7 +105,9 @@ func (lr *leasedReceiver) listenForClose() { defer cancel() span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.listenForClose") defer span.End() - err := lr.processor.scheduler.stopReceiver(ctx, lr.lease) + + lease := lr.getLease() + err := lr.processor.scheduler.stopReceiver(ctx, lease) if err != nil { tab.For(ctx).Error(err) } @@ -120,7 +128,8 @@ func (lr *leasedReceiver) periodicallyRenewLease(ctx context.Context) { err := lr.tryRenew(ctx) if err != nil { tab.For(ctx).Error(err) - _ = lr.processor.scheduler.stopReceiver(ctx, lr.lease) + lease := lr.getLease() + _ = lr.processor.scheduler.stopReceiver(ctx, lease) } } } @@ -130,7 +139,8 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error { span, ctx := lr.startConsumerSpanFromContext(ctx, "eph.leasedReceiver.tryRenew") defer span.End() - lease, ok, err := lr.processor.leaser.RenewLease(ctx, lr.lease.GetPartitionID()) + oldLease := lr.getLease() + lease, ok, err := lr.processor.leaser.RenewLease(ctx, oldLease.GetPartitionID()) if err != nil { tab.For(ctx).Error(err) return err @@ -141,23 +151,33 @@ func (lr *leasedReceiver) tryRenew(ctx context.Context) error { return err } lr.dlog(ctx, "lease renewed") - lr.lease = lease + + lr.lease.Store(lease) return nil } func (lr *leasedReceiver) dlog(ctx context.Context, msg string) { name := lr.processor.name - partitionID := lr.lease.GetPartitionID() - epoch := lr.lease.GetEpoch() + lease := lr.getLease() + + partitionID := lease.GetPartitionID() + epoch := lease.GetEpoch() tab.For(ctx).Debug(fmt.Sprintf("eph %q, partition %q, epoch %d: "+msg, name, partitionID, epoch)) } func (lr *leasedReceiver) startConsumerSpanFromContext(ctx context.Context, operationName string) (tab.Spanner, context.Context) { span, ctx := startConsumerSpanFromContext(ctx, operationName) + + lease := lr.getLease() + span.AddAttributes( tab.StringAttribute("eph.id", lr.processor.name), - tab.StringAttribute(partitionIDTag, lr.lease.GetPartitionID()), - tab.Int64Attribute(epochTag, lr.lease.GetEpoch()), + tab.StringAttribute(partitionIDTag, lease.GetPartitionID()), + tab.Int64Attribute(epochTag, lease.GetEpoch()), ) return span, ctx } + +func (lr *leasedReceiver) getLease() LeaseMarker { + return lr.lease.Load().(LeaseMarker) +} diff --git a/eph/scheduler.go b/eph/scheduler.go index 695b9341..1a17fac4 100644 --- a/eph/scheduler.go +++ b/eph/scheduler.go @@ -190,7 +190,8 @@ func (s *scheduler) Stop(ctx context.Context) error { if err := lr.Close(ctx); err != nil { lastErr = err } - _, _ = s.processor.leaser.ReleaseLease(ctx, lr.lease.GetPartitionID()) + + _, _ = s.processor.leaser.ReleaseLease(ctx, lr.getLease().GetPartitionID()) } return lastErr diff --git a/storage/storage.go b/storage/storage.go index 93ccad20..91ca946e 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -630,6 +630,12 @@ func (sl *LeaserCheckpointer) createOrGetLease(ctx context.Context, partitionID }) if err != nil { + if storageErr := azblobvendor.StorageError(nil); errors.As(err, &storageErr) && + (storageErr.Response().StatusCode == http.StatusConflict || // blob exists + storageErr.Response().StatusCode == http.StatusPreconditionFailed) { // blob exists AND an Azure storage lease is active + return sl.getLease(ctx, partitionID) + } + return nil, err } diff --git a/storage/storage_test.go b/storage/storage_test.go index e6023f09..9e70e4e6 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -105,6 +105,10 @@ func (ts *testSuite) TestLeaserLeaseEnsure() { lease, err := leaser.EnsureLease(ctx, partitionID) ts.NoError(err) ts.Equal(partitionID, lease.GetPartitionID()) + + lease, err = leaser.EnsureLease(ctx, partitionID) + ts.NoError(err) + ts.Equal(partitionID, lease.GetPartitionID()) } }