Skip to content

Commit

Permalink
fix: v1.7 (#1401)
Browse files Browse the repository at this point in the history
* fix: bucket can be nil

* fix: migrate bucket charged quota and list events

---------

Co-authored-by: Alexgao001 <alex.g@nodereal.io>
  • Loading branch information
BarryTong65 and alexgao001 authored May 17, 2024
1 parent 0c9a3ba commit b38f98f
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 5 deletions.
79 changes: 79 additions & 0 deletions modular/blocksyncer/modules/bucket/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
EventDeleteBucket = proto.MessageName(&storagetypes.EventDeleteBucket{})
EventUpdateBucketInfo = proto.MessageName(&storagetypes.EventUpdateBucketInfo{})
EventDiscontinueBucket = proto.MessageName(&storagetypes.EventDiscontinueBucket{})
EventMigrationBucket = proto.MessageName(&storagetypes.EventMigrationBucket{})
EventCancelMigrationBucket = proto.MessageName(&storagetypes.EventCancelMigrationBucket{})
EventRejectMigrateBucket = proto.MessageName(&storagetypes.EventRejectMigrateBucket{})
EventCompleteMigrationBucket = proto.MessageName(&storagetypes.EventCompleteMigrationBucket{})
EventToggleSPAsDelegatedAgent = proto.MessageName(&storagetypes.EventToggleSPAsDelegatedAgent{})
)
Expand All @@ -31,6 +34,9 @@ var BucketEvents = map[string]bool{
EventDeleteBucket: true,
EventUpdateBucketInfo: true,
EventDiscontinueBucket: true,
EventMigrationBucket: true,
EventCancelMigrationBucket: true,
EventRejectMigrateBucket: true,
EventCompleteMigrationBucket: true,
EventToggleSPAsDelegatedAgent: true,
}
Expand Down Expand Up @@ -100,6 +106,27 @@ func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.Res
return nil, errors.New("discontinue bucket event assert error")
}
return m.handleDiscontinueBucket(ctx, block, txHash, discontinueBucket), nil
case EventMigrationBucket:
migrationBucket, ok := typedEvent.(*storagetypes.EventMigrationBucket)
if !ok {
log.Errorw("type assert error", "type", "EventMigrationBucket", "event", typedEvent)
return nil, errors.New("migration bucket event assert error")
}
return m.handleEventMigrationBucket(ctx, block, txHash, migrationBucket), nil
case EventCancelMigrationBucket:
cancelMigrationBucket, ok := typedEvent.(*storagetypes.EventCancelMigrationBucket)
if !ok {
log.Errorw("type assert error", "type", "EventCancelMigrationBucket", "event", typedEvent)
return nil, errors.New("cancel migration bucket event assert error")
}
return m.handleEventCancelMigrationBucket(ctx, block, txHash, cancelMigrationBucket), nil
case EventRejectMigrateBucket:
rejectMigrateBucket, ok := typedEvent.(*storagetypes.EventRejectMigrateBucket)
if !ok {
log.Errorw("type assert error", "type", "EventRejectMigrateBucket", "event", typedEvent)
return nil, errors.New("reject migration bucket event assert error")
}
return m.handleEventRejectMigrateBucket(ctx, block, txHash, rejectMigrateBucket), nil
case EventCompleteMigrationBucket:
completeMigrationBucket, ok := typedEvent.(*storagetypes.EventCompleteMigrationBucket)
if !ok {
Expand Down Expand Up @@ -208,11 +235,63 @@ func (m *Module) handleUpdateBucketInfo(ctx context.Context, block *tmctypes.Res
}
}

func (m *Module) handleEventMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, migrationBucket *storagetypes.EventMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(migrationBucket.BucketId.BigInt()),
BucketName: migrationBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_MIGRATING.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleEventCancelMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, cancelMigrationBucket *storagetypes.EventCancelMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(cancelMigrationBucket.BucketId.BigInt()),
BucketName: cancelMigrationBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleEventRejectMigrateBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, rejectMigrateBucket *storagetypes.EventRejectMigrateBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(rejectMigrateBucket.BucketId.BigInt()),
BucketName: rejectMigrateBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleCompleteMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, completeMigrationBucket *storagetypes.EventCompleteMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(completeMigrationBucket.BucketId.BigInt()),
BucketName: completeMigrationBucket.BucketName,
GlobalVirtualGroupFamilyId: completeMigrationBucket.GlobalVirtualGroupFamilyId,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
Expand Down
43 changes: 42 additions & 1 deletion modular/manager/bucket_migrate_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (plan *BucketMigrateExecutePlan) sendCompleteMigrateBucketTx(migrateExecute
if err != nil {
return err
}
if bucket == nil {
log.Debugw("send complete migrate bucket has been deleted", "bucket_id", plan.bucketID)
if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return err
}
return nil
}
var gvgMappings []*storagetypes.GVGMapping
for _, migrateGVGUnit := range plan.gvgUnitMap {
aggBlsSig, getBlsError := plan.getBlsAggregateSigForBucketMigration(context.Background(), migrateGVGUnit)
Expand Down Expand Up @@ -239,6 +246,13 @@ func (plan *BucketMigrateExecutePlan) rejectBucketMigration() error {
if err != nil {
return err
}
if bucket == nil {
log.Debugw("reject bucket migration has been deleted", "bucket_id", plan.bucketID)
if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return err
}
return nil
}
rejectMigrateBucket := &storagetypes.MsgRejectMigrateBucket{Operator: plan.manager.baseApp.OperatorAddress(),
BucketName: bucket.BucketInfo.GetBucketName()}
txHash, txErr := plan.manager.baseApp.GfSpClient().RejectMigrateBucket(ctx, rejectMigrateBucket)
Expand Down Expand Up @@ -591,6 +605,23 @@ func (s *BucketMigrateScheduler) doneMigrateBucket(bucketID uint64) error {
return err
}

func (s *BucketMigrateScheduler) deleteMigrateBucket(bucketID uint64) error {
executePlan, err := s.getExecutePlanByBucketID(bucketID)
// 1) Received the CompleteEvents event for the first time.
// 2) Subsequently received the CompleteEvents event.
if err != nil {
log.Errorw("bucket migrate schedule received EventCompleteMigrationBucket, the event may already finished", "bucket_id", bucketID)
return err
}

s.deleteExecutePlanByBucketID(bucketID)
executePlan.stopSPSchedule()
err = s.manager.baseApp.GfSpDB().DeleteMigrateGVGUnitsByBucketID(bucketID)
log.Infow("succeed to delete migrate bucket", "bucket_id", bucketID, "error", err)

return err
}

func (s *BucketMigrateScheduler) cancelMigrateBucket(bucketID uint64, reject bool) error {
var (
executePlan *BucketMigrateExecutePlan
Expand Down Expand Up @@ -696,7 +727,17 @@ func (s *BucketMigrateScheduler) confirmCompleteTxEvents(ctx context.Context, ev
log.Errorw("failed to get bucket by bucket id", "bucket_id", bucketID, "error", err)
return
}

if bucket == nil {
if err = s.deleteMigrateBucket(bucketID); err != nil {
log.Errorw("failed to done migrate bucket", "EventMigrationBucket", event, "error", err)
return
}
if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return
}
log.CtxInfow(ctx, "succeed to remove deleted bucket migrate event", "EventMigrationBucket", event)
return
}
if bucket.BucketInfo.GetBucketStatus() == storagetypes.BUCKET_STATUS_CREATED {
if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_WAIT_COMPLETE_TX_EVENT_DONE); err != nil {
return
Expand Down
12 changes: 11 additions & 1 deletion modular/metadata/metadata_bucket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,25 @@ func (r *MetadataModular) GfSpGetLatestBucketReadQuota(
", bucket_id: " + util.Uint64ToString(req.GetBucketId()) + ", error: " + err.Error())}, nil
}
}

// if the traffic table has been created, return the db info from meta service
var chargedQuotaSize uint64
bucketInfo, err := r.baseApp.Consensus().QueryBucketInfoById(ctx, req.BucketId)
if err != nil {
log.Errorw("failed to get bucketInfo on chain",
"bucket_id", req.GetBucketId(), "error", err)
chargedQuotaSize = bucketTraffic.ChargedQuotaSize
} else {
chargedQuotaSize = bucketInfo.ChargedReadQuota
}
quota := &gfsptask.GfSpBucketQuotaInfo{
BucketName: bucketTraffic.BucketName,
BucketId: bucketTraffic.BucketID,
Month: bucketTraffic.YearMonth,
ReadConsumedSize: bucketTraffic.ReadConsumedSize,
FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize,
FreeQuotaSize: bucketTraffic.FreeQuotaSize,
ChargedQuotaSize: bucketTraffic.ChargedQuotaSize,
ChargedQuotaSize: chargedQuotaSize,
MonthlyFreeQuotaSize: bucketTraffic.MonthlyFreeQuotaSize,
MonthlyFreeQuotaConsumedSize: bucketTraffic.MonthlyFreeQuotaConsumedSize,
}
Expand Down
18 changes: 17 additions & 1 deletion modular/metadata/metadata_bucket_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"testing"

"cosmossdk.io/math"
"github.com/forbole/juno/v4/common"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -656,6 +657,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {
a.baseApp.SetGfBsDB(m)
spDBMocker := spdb.NewMockSPDB(ctrl)

consensusChargedQuotaSize := uint64(100)
bucketTraffic := &spdb.BucketTraffic{
BucketID: 1,
YearMonth: "2024-01",
Expand All @@ -670,6 +672,20 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {

consensusMock := consensus.NewMockConsensus(ctrl)
consensusMock.EXPECT().QuerySPFreeQuota(gomock.Any(), gomock.Any()).Return(uint64(10000), nil).Times(0)
consensusMock.EXPECT().QueryBucketInfoById(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{
Owner: "0x11E0A11A7A01E2E757447B52FBD7152004AC699D",
BucketName: "",
Visibility: 0,
Id: math.NewUint(1),
SourceType: 0,
CreateAt: 0,
PaymentAddress: "",
GlobalVirtualGroupFamilyId: 0,
ChargedReadQuota: consensusChargedQuotaSize,
BucketStatus: 0,
Tags: nil,
SpAsDelegatedAgentDisabled: false,
}, nil).AnyTimes()
a.baseApp.SetConsensus(consensusMock)

resp, err := a.GfSpGetLatestBucketReadQuota(context.Background(), &types.GfSpGetLatestBucketReadQuotaRequest{
Expand All @@ -684,7 +700,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {
ReadConsumedSize: bucketTraffic.ReadConsumedSize,
FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize,
FreeQuotaSize: bucketTraffic.FreeQuotaSize,
ChargedQuotaSize: bucketTraffic.ChargedQuotaSize,
ChargedQuotaSize: consensusChargedQuotaSize,
},
}, resp)
}
Expand Down
4 changes: 2 additions & 2 deletions modular/metadata/metadata_sp_exit_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ func (r *MetadataModular) GfSpListMigrateBucketEvents(ctx context.Context, req *
GlobalVirtualGroupFamilyId: complete.GlobalVirtualGroupFamilyId,
}
}
if cancel != nil && cancel.CreateAt >= event.CreateAt && complete == nil {
if cancel != nil && cancel.CreateAt >= event.CreateAt && (complete == nil || cancel.CreateAt > complete.CreateAt) {
spCancelEvent = &storage_types.EventCancelMigrationBucket{
Operator: cancel.Operator.String(),
BucketName: cancel.BucketName,
BucketId: math.NewUintFromBigInt(cancel.BucketID.Big()),
}
}
if reject != nil && reject.CreateAt >= event.CreateAt && complete == nil {
if reject != nil && reject.CreateAt >= event.CreateAt && (complete == nil || reject.CreateAt > complete.CreateAt) {
spRejectEvent = &storage_types.EventRejectMigrateBucket{
Operator: reject.Operator.String(),
BucketName: reject.BucketName,
Expand Down

0 comments on commit b38f98f

Please sign in to comment.