From b38f98f7cbf5336e5d1a91c6bf307880e7211113 Mon Sep 17 00:00:00 2001 From: Barry <122767193+BarryTong65@users.noreply.github.com> Date: Fri, 17 May 2024 11:02:19 +0800 Subject: [PATCH] fix: v1.7 (#1401) * fix: bucket can be nil * fix: migrate bucket charged quota and list events --------- Co-authored-by: Alexgao001 --- .../modules/bucket/bucket_handle.go | 79 +++++++++++++++++++ modular/manager/bucket_migrate_scheduler.go | 43 +++++++++- modular/metadata/metadata_bucket_service.go | 12 ++- .../metadata/metadata_bucket_service_test.go | 18 ++++- modular/metadata/metadata_sp_exit_service.go | 4 +- 5 files changed, 151 insertions(+), 5 deletions(-) diff --git a/modular/blocksyncer/modules/bucket/bucket_handle.go b/modular/blocksyncer/modules/bucket/bucket_handle.go index 7f9a88b20..1ffc72ddb 100644 --- a/modular/blocksyncer/modules/bucket/bucket_handle.go +++ b/modular/blocksyncer/modules/bucket/bucket_handle.go @@ -22,6 +22,9 @@ var ( EventDeleteBucket = proto.MessageName(&storagetypes.EventDeleteBucket{}) EventUpdateBucketInfo = proto.MessageName(&storagetypes.EventUpdateBucketInfo{}) EventDiscontinueBucket = proto.MessageName(&storagetypes.EventDiscontinueBucket{}) + EventMigrationBucket = proto.MessageName(&storagetypes.EventMigrationBucket{}) + EventCancelMigrationBucket = proto.MessageName(&storagetypes.EventCancelMigrationBucket{}) + EventRejectMigrateBucket = proto.MessageName(&storagetypes.EventRejectMigrateBucket{}) EventCompleteMigrationBucket = proto.MessageName(&storagetypes.EventCompleteMigrationBucket{}) EventToggleSPAsDelegatedAgent = proto.MessageName(&storagetypes.EventToggleSPAsDelegatedAgent{}) ) @@ -31,6 +34,9 @@ var BucketEvents = map[string]bool{ EventDeleteBucket: true, EventUpdateBucketInfo: true, EventDiscontinueBucket: true, + EventMigrationBucket: true, + EventCancelMigrationBucket: true, + EventRejectMigrateBucket: true, EventCompleteMigrationBucket: true, EventToggleSPAsDelegatedAgent: true, } @@ -100,6 +106,27 @@ func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.Res return nil, errors.New("discontinue bucket event assert error") } return m.handleDiscontinueBucket(ctx, block, txHash, discontinueBucket), nil + case EventMigrationBucket: + migrationBucket, ok := typedEvent.(*storagetypes.EventMigrationBucket) + if !ok { + log.Errorw("type assert error", "type", "EventMigrationBucket", "event", typedEvent) + return nil, errors.New("migration bucket event assert error") + } + return m.handleEventMigrationBucket(ctx, block, txHash, migrationBucket), nil + case EventCancelMigrationBucket: + cancelMigrationBucket, ok := typedEvent.(*storagetypes.EventCancelMigrationBucket) + if !ok { + log.Errorw("type assert error", "type", "EventCancelMigrationBucket", "event", typedEvent) + return nil, errors.New("cancel migration bucket event assert error") + } + return m.handleEventCancelMigrationBucket(ctx, block, txHash, cancelMigrationBucket), nil + case EventRejectMigrateBucket: + rejectMigrateBucket, ok := typedEvent.(*storagetypes.EventRejectMigrateBucket) + if !ok { + log.Errorw("type assert error", "type", "EventRejectMigrateBucket", "event", typedEvent) + return nil, errors.New("reject migration bucket event assert error") + } + return m.handleEventRejectMigrateBucket(ctx, block, txHash, rejectMigrateBucket), nil case EventCompleteMigrationBucket: completeMigrationBucket, ok := typedEvent.(*storagetypes.EventCompleteMigrationBucket) if !ok { @@ -208,11 +235,63 @@ func (m *Module) handleUpdateBucketInfo(ctx context.Context, block *tmctypes.Res } } +func (m *Module) handleEventMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, migrationBucket *storagetypes.EventMigrationBucket) map[string][]interface{} { + bucket := &models.Bucket{ + BucketID: common.BigToHash(migrationBucket.BucketId.BigInt()), + BucketName: migrationBucket.BucketName, + Status: storagetypes.BUCKET_STATUS_MIGRATING.String(), + + UpdateAt: block.Block.Height, + UpdateTxHash: txHash, + UpdateTime: block.Block.Time.UTC().Unix(), + } + + k, v := m.db.UpdateBucketToSQL(ctx, bucket) + return map[string][]interface{}{ + k: v, + } +} + +func (m *Module) handleEventCancelMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, cancelMigrationBucket *storagetypes.EventCancelMigrationBucket) map[string][]interface{} { + bucket := &models.Bucket{ + BucketID: common.BigToHash(cancelMigrationBucket.BucketId.BigInt()), + BucketName: cancelMigrationBucket.BucketName, + Status: storagetypes.BUCKET_STATUS_CREATED.String(), + + UpdateAt: block.Block.Height, + UpdateTxHash: txHash, + UpdateTime: block.Block.Time.UTC().Unix(), + } + + k, v := m.db.UpdateBucketToSQL(ctx, bucket) + return map[string][]interface{}{ + k: v, + } +} + +func (m *Module) handleEventRejectMigrateBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, rejectMigrateBucket *storagetypes.EventRejectMigrateBucket) map[string][]interface{} { + bucket := &models.Bucket{ + BucketID: common.BigToHash(rejectMigrateBucket.BucketId.BigInt()), + BucketName: rejectMigrateBucket.BucketName, + Status: storagetypes.BUCKET_STATUS_CREATED.String(), + + UpdateAt: block.Block.Height, + UpdateTxHash: txHash, + UpdateTime: block.Block.Time.UTC().Unix(), + } + + k, v := m.db.UpdateBucketToSQL(ctx, bucket) + return map[string][]interface{}{ + k: v, + } +} + func (m *Module) handleCompleteMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, completeMigrationBucket *storagetypes.EventCompleteMigrationBucket) map[string][]interface{} { bucket := &models.Bucket{ BucketID: common.BigToHash(completeMigrationBucket.BucketId.BigInt()), BucketName: completeMigrationBucket.BucketName, GlobalVirtualGroupFamilyId: completeMigrationBucket.GlobalVirtualGroupFamilyId, + Status: storagetypes.BUCKET_STATUS_CREATED.String(), UpdateAt: block.Block.Height, UpdateTxHash: txHash, diff --git a/modular/manager/bucket_migrate_scheduler.go b/modular/manager/bucket_migrate_scheduler.go index 1158fe54d..b16158822 100644 --- a/modular/manager/bucket_migrate_scheduler.go +++ b/modular/manager/bucket_migrate_scheduler.go @@ -204,6 +204,13 @@ func (plan *BucketMigrateExecutePlan) sendCompleteMigrateBucketTx(migrateExecute if err != nil { return err } + if bucket == nil { + log.Debugw("send complete migrate bucket has been deleted", "bucket_id", plan.bucketID) + if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return err + } + return nil + } var gvgMappings []*storagetypes.GVGMapping for _, migrateGVGUnit := range plan.gvgUnitMap { aggBlsSig, getBlsError := plan.getBlsAggregateSigForBucketMigration(context.Background(), migrateGVGUnit) @@ -239,6 +246,13 @@ func (plan *BucketMigrateExecutePlan) rejectBucketMigration() error { if err != nil { return err } + if bucket == nil { + log.Debugw("reject bucket migration has been deleted", "bucket_id", plan.bucketID) + if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return err + } + return nil + } rejectMigrateBucket := &storagetypes.MsgRejectMigrateBucket{Operator: plan.manager.baseApp.OperatorAddress(), BucketName: bucket.BucketInfo.GetBucketName()} txHash, txErr := plan.manager.baseApp.GfSpClient().RejectMigrateBucket(ctx, rejectMigrateBucket) @@ -591,6 +605,23 @@ func (s *BucketMigrateScheduler) doneMigrateBucket(bucketID uint64) error { return err } +func (s *BucketMigrateScheduler) deleteMigrateBucket(bucketID uint64) error { + executePlan, err := s.getExecutePlanByBucketID(bucketID) + // 1) Received the CompleteEvents event for the first time. + // 2) Subsequently received the CompleteEvents event. + if err != nil { + log.Errorw("bucket migrate schedule received EventCompleteMigrationBucket, the event may already finished", "bucket_id", bucketID) + return err + } + + s.deleteExecutePlanByBucketID(bucketID) + executePlan.stopSPSchedule() + err = s.manager.baseApp.GfSpDB().DeleteMigrateGVGUnitsByBucketID(bucketID) + log.Infow("succeed to delete migrate bucket", "bucket_id", bucketID, "error", err) + + return err +} + func (s *BucketMigrateScheduler) cancelMigrateBucket(bucketID uint64, reject bool) error { var ( executePlan *BucketMigrateExecutePlan @@ -696,7 +727,17 @@ func (s *BucketMigrateScheduler) confirmCompleteTxEvents(ctx context.Context, ev log.Errorw("failed to get bucket by bucket id", "bucket_id", bucketID, "error", err) return } - + if bucket == nil { + if err = s.deleteMigrateBucket(bucketID); err != nil { + log.Errorw("failed to done migrate bucket", "EventMigrationBucket", event, "error", err) + return + } + if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil { + return + } + log.CtxInfow(ctx, "succeed to remove deleted bucket migrate event", "EventMigrationBucket", event) + return + } if bucket.BucketInfo.GetBucketStatus() == storagetypes.BUCKET_STATUS_CREATED { if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_WAIT_COMPLETE_TX_EVENT_DONE); err != nil { return diff --git a/modular/metadata/metadata_bucket_service.go b/modular/metadata/metadata_bucket_service.go index 2759aaf8c..257e848d0 100644 --- a/modular/metadata/metadata_bucket_service.go +++ b/modular/metadata/metadata_bucket_service.go @@ -520,7 +520,17 @@ func (r *MetadataModular) GfSpGetLatestBucketReadQuota( ", bucket_id: " + util.Uint64ToString(req.GetBucketId()) + ", error: " + err.Error())}, nil } } + // if the traffic table has been created, return the db info from meta service + var chargedQuotaSize uint64 + bucketInfo, err := r.baseApp.Consensus().QueryBucketInfoById(ctx, req.BucketId) + if err != nil { + log.Errorw("failed to get bucketInfo on chain", + "bucket_id", req.GetBucketId(), "error", err) + chargedQuotaSize = bucketTraffic.ChargedQuotaSize + } else { + chargedQuotaSize = bucketInfo.ChargedReadQuota + } quota := &gfsptask.GfSpBucketQuotaInfo{ BucketName: bucketTraffic.BucketName, BucketId: bucketTraffic.BucketID, @@ -528,7 +538,7 @@ func (r *MetadataModular) GfSpGetLatestBucketReadQuota( ReadConsumedSize: bucketTraffic.ReadConsumedSize, FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize, FreeQuotaSize: bucketTraffic.FreeQuotaSize, - ChargedQuotaSize: bucketTraffic.ChargedQuotaSize, + ChargedQuotaSize: chargedQuotaSize, MonthlyFreeQuotaSize: bucketTraffic.MonthlyFreeQuotaSize, MonthlyFreeQuotaConsumedSize: bucketTraffic.MonthlyFreeQuotaConsumedSize, } diff --git a/modular/metadata/metadata_bucket_service_test.go b/modular/metadata/metadata_bucket_service_test.go index 9bdc3f23d..257c1463d 100644 --- a/modular/metadata/metadata_bucket_service_test.go +++ b/modular/metadata/metadata_bucket_service_test.go @@ -6,6 +6,7 @@ import ( "net/http" "testing" + "cosmossdk.io/math" "github.com/forbole/juno/v4/common" "github.com/shopspring/decimal" "github.com/stretchr/testify/assert" @@ -656,6 +657,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) { a.baseApp.SetGfBsDB(m) spDBMocker := spdb.NewMockSPDB(ctrl) + consensusChargedQuotaSize := uint64(100) bucketTraffic := &spdb.BucketTraffic{ BucketID: 1, YearMonth: "2024-01", @@ -670,6 +672,20 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) { consensusMock := consensus.NewMockConsensus(ctrl) consensusMock.EXPECT().QuerySPFreeQuota(gomock.Any(), gomock.Any()).Return(uint64(10000), nil).Times(0) + consensusMock.EXPECT().QueryBucketInfoById(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{ + Owner: "0x11E0A11A7A01E2E757447B52FBD7152004AC699D", + BucketName: "", + Visibility: 0, + Id: math.NewUint(1), + SourceType: 0, + CreateAt: 0, + PaymentAddress: "", + GlobalVirtualGroupFamilyId: 0, + ChargedReadQuota: consensusChargedQuotaSize, + BucketStatus: 0, + Tags: nil, + SpAsDelegatedAgentDisabled: false, + }, nil).AnyTimes() a.baseApp.SetConsensus(consensusMock) resp, err := a.GfSpGetLatestBucketReadQuota(context.Background(), &types.GfSpGetLatestBucketReadQuotaRequest{ @@ -684,7 +700,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) { ReadConsumedSize: bucketTraffic.ReadConsumedSize, FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize, FreeQuotaSize: bucketTraffic.FreeQuotaSize, - ChargedQuotaSize: bucketTraffic.ChargedQuotaSize, + ChargedQuotaSize: consensusChargedQuotaSize, }, }, resp) } diff --git a/modular/metadata/metadata_sp_exit_service.go b/modular/metadata/metadata_sp_exit_service.go index b9d422c58..9b7cb2281 100644 --- a/modular/metadata/metadata_sp_exit_service.go +++ b/modular/metadata/metadata_sp_exit_service.go @@ -215,14 +215,14 @@ func (r *MetadataModular) GfSpListMigrateBucketEvents(ctx context.Context, req * GlobalVirtualGroupFamilyId: complete.GlobalVirtualGroupFamilyId, } } - if cancel != nil && cancel.CreateAt >= event.CreateAt && complete == nil { + if cancel != nil && cancel.CreateAt >= event.CreateAt && (complete == nil || cancel.CreateAt > complete.CreateAt) { spCancelEvent = &storage_types.EventCancelMigrationBucket{ Operator: cancel.Operator.String(), BucketName: cancel.BucketName, BucketId: math.NewUintFromBigInt(cancel.BucketID.Big()), } } - if reject != nil && reject.CreateAt >= event.CreateAt && complete == nil { + if reject != nil && reject.CreateAt >= event.CreateAt && (complete == nil || reject.CreateAt > complete.CreateAt) { spRejectEvent = &storage_types.EventRejectMigrateBucket{ Operator: reject.Operator.String(), BucketName: reject.BucketName,