From ed882d96eef5c91e054700421baf2376a6f261e8 Mon Sep 17 00:00:00 2001 From: BarryTong65 Date: Tue, 14 May 2024 15:46:00 +0800 Subject: [PATCH] fix: migrate nil bucket --- modular/manager/bucket_migrate_scheduler.go | 40 ++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/modular/manager/bucket_migrate_scheduler.go b/modular/manager/bucket_migrate_scheduler.go index f7338f0e3..b16158822 100644 --- a/modular/manager/bucket_migrate_scheduler.go +++ b/modular/manager/bucket_migrate_scheduler.go @@ -204,6 +204,13 @@ func (plan *BucketMigrateExecutePlan) sendCompleteMigrateBucketTx(migrateExecute if err != nil { return err } + if bucket == nil { + log.Debugw("send complete migrate bucket has been deleted", "bucket_id", plan.bucketID) + if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return err + } + return nil + } var gvgMappings []*storagetypes.GVGMapping for _, migrateGVGUnit := range plan.gvgUnitMap { aggBlsSig, getBlsError := plan.getBlsAggregateSigForBucketMigration(context.Background(), migrateGVGUnit) @@ -239,6 +246,13 @@ func (plan *BucketMigrateExecutePlan) rejectBucketMigration() error { if err != nil { return err } + if bucket == nil { + log.Debugw("reject bucket migration has been deleted", "bucket_id", plan.bucketID) + if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return err + } + return nil + } rejectMigrateBucket := &storagetypes.MsgRejectMigrateBucket{Operator: plan.manager.baseApp.OperatorAddress(), BucketName: bucket.BucketInfo.GetBucketName()} txHash, txErr := plan.manager.baseApp.GfSpClient().RejectMigrateBucket(ctx, rejectMigrateBucket) @@ -591,6 +605,23 @@ func (s *BucketMigrateScheduler) doneMigrateBucket(bucketID uint64) error { return err } +func (s *BucketMigrateScheduler) deleteMigrateBucket(bucketID uint64) error { + executePlan, err := s.getExecutePlanByBucketID(bucketID) + // 1) Received the CompleteEvents event for the first time. + // 2) Subsequently received the CompleteEvents event. + if err != nil { + log.Errorw("bucket migrate schedule received EventCompleteMigrationBucket, the event may already finished", "bucket_id", bucketID) + return err + } + + s.deleteExecutePlanByBucketID(bucketID) + executePlan.stopSPSchedule() + err = s.manager.baseApp.GfSpDB().DeleteMigrateGVGUnitsByBucketID(bucketID) + log.Infow("succeed to delete migrate bucket", "bucket_id", bucketID, "error", err) + + return err +} + func (s *BucketMigrateScheduler) cancelMigrateBucket(bucketID uint64, reject bool) error { var ( executePlan *BucketMigrateExecutePlan @@ -697,7 +728,14 @@ func (s *BucketMigrateScheduler) confirmCompleteTxEvents(ctx context.Context, ev return } if bucket == nil { - log.Errorw("can not find bucket info", "bucket_id", bucketID) + if err = s.deleteMigrateBucket(bucketID); err != nil { + log.Errorw("failed to done migrate bucket", "EventMigrationBucket", event, "error", err) + return + } + if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return + } + log.CtxInfow(ctx, "succeed to remove deleted bucket migrate event", "EventMigrationBucket", event) return } if bucket.BucketInfo.GetBucketStatus() == storagetypes.BUCKET_STATUS_CREATED {