Skip to content

Commit

Permalink
feat: limited bucket (#1371)
Browse files Browse the repository at this point in the history
* feat: check bucket extra is limited
  • Loading branch information
BarryTong65 authored Mar 26, 2024
1 parent ced4cba commit a4c774b
Show file tree
Hide file tree
Showing 23 changed files with 748 additions and 478 deletions.
30 changes: 30 additions & 0 deletions base/gnfd/gnfd_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,36 @@ func (g *Gnfd) QueryBucketInfo(ctx context.Context, bucket string) (bucketInfo *
return resp.GetBucketInfo(), nil
}

// QueryBucketExtraInfo returns the bucket extra info by name.
func (g *Gnfd) QueryBucketExtraInfo(ctx context.Context, bucket string) (bucketInfo *storagetypes.BucketExtraInfo, err error) {
startTime := time.Now()
defer func() {
if err != nil {
metrics.GnfdChainCounter.WithLabelValues(ChainFailureQueryBucketInfo).Inc()
metrics.GnfdChainTime.WithLabelValues(ChainFailureQueryBucketInfo).Observe(
time.Since(startTime).Seconds())
metrics.GnfdChainCounter.WithLabelValues(ChainFailureTotal).Inc()
metrics.GnfdChainTime.WithLabelValues(ChainFailureTotal).Observe(
time.Since(startTime).Seconds())
return
}
metrics.GnfdChainCounter.WithLabelValues(ChainSuccessQueryBucketInfo).Inc()
metrics.GnfdChainTime.WithLabelValues(ChainSuccessQueryBucketInfo).Observe(
time.Since(startTime).Seconds())
metrics.GnfdChainCounter.WithLabelValues(ChainSuccessTotal).Inc()
metrics.GnfdChainTime.WithLabelValues(ChainSuccessTotal).Observe(
time.Since(startTime).Seconds())
}()

client := g.getCurrentClient().GnfdClient()
resp, err := client.HeadBucket(ctx, &storagetypes.QueryHeadBucketRequest{BucketName: bucket})
if err != nil {
log.CtxErrorw(ctx, "failed to query bucket", "bucket_name", bucket, "error", err)
return nil, err
}
return resp.GetExtraInfo(), nil
}

// QueryBucketInfoById returns the bucket info by name.
func (g *Gnfd) QueryBucketInfoById(ctx context.Context, bucketId uint64) (bucketInfo *storagetypes.BucketInfo, err error) {
startTime := time.Now()
Expand Down
5 changes: 5 additions & 0 deletions core/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Consensus interface {
QueryStorageParamsByTimestamp(ctx context.Context, timestamp int64) (params *storagetypes.Params, err error)
// QueryBucketInfo returns the bucket info by bucket name.
QueryBucketInfo(ctx context.Context, bucket string) (*storagetypes.BucketInfo, error)
// QueryBucketExtraInfo returns the bucket extra info by bucket name.
QueryBucketExtraInfo(ctx context.Context, bucket string) (bucketInfo *storagetypes.BucketExtraInfo, err error)
// QueryBucketInfoById returns the bucket info by bucket id.
QueryBucketInfoById(ctx context.Context, bucketId uint64) (bucketInfo *storagetypes.BucketInfo, err error)
// QueryObjectInfo returns the object info by bucket and object name.
Expand Down Expand Up @@ -136,6 +138,9 @@ func (*NullConsensus) QueryStorageParamsByTimestamp(context.Context, int64) (*st
func (*NullConsensus) QueryBucketInfo(context.Context, string) (*storagetypes.BucketInfo, error) {
return nil, nil
}
func (*NullConsensus) QueryBucketExtraInfo(context.Context, string) (*storagetypes.BucketExtraInfo, error) {
return nil, nil
}
func (*NullConsensus) QueryBucketInfoById(context.Context, uint64) (*storagetypes.BucketInfo, error) {
return nil, nil
}
Expand Down
15 changes: 15 additions & 0 deletions core/consensus/consensus_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestNullConsensus(t *testing.T) {
_, _ = nc.QueryStorageParams(context.TODO())
_, _ = nc.QueryStorageParamsByTimestamp(context.TODO(), 0)
_, _ = nc.QueryBucketInfo(context.TODO(), "")
_, _ = nc.QueryBucketExtraInfo(context.TODO(), "")
_, _ = nc.QueryBucketInfoById(context.TODO(), 0)
_, _ = nc.QueryObjectInfo(context.TODO(), "", "")
_, _ = nc.QueryObjectInfoByID(context.TODO(), "")
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/aliyun/credentials-go v1.3.0
github.com/avast/retry-go/v4 v4.3.1
github.com/aws/aws-sdk-go v1.44.159
github.com/bnb-chain/greenfield v1.4.1-0.20240313092637-df143c62cbb5
github.com/bnb-chain/greenfield v1.5.1-0.20240314113148-e4cc584f6da2
github.com/bnb-chain/greenfield-common/go v0.0.0-20240228080631-2683b0ee669a
github.com/bytedance/gopkg v0.0.0-20221122125632-68358b8ecec6
github.com/cometbft/cometbft v0.37.2
Expand Down Expand Up @@ -124,7 +124,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.3.0 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/mock v1.6.0
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
Expand Down Expand Up @@ -306,7 +306,7 @@ replace (
github.com/confio/ics23/go => github.com/cosmos/cosmos-sdk/ics23/go v0.8.0
github.com/cosmos/cosmos-sdk => github.com/bnb-chain/greenfield-cosmos-sdk v1.4.1-0.20240305064839-3cb07f5a365c
github.com/cosmos/iavl => github.com/bnb-chain/greenfield-iavl v0.20.1
github.com/forbole/juno/v4 => github.com/bnb-chain/juno/v4 v4.0.0-20240221084449-8b5ee76301fa
github.com/forbole/juno/v4 => github.com/bnb-chain/juno/v4 v4.0.0-20240326082854-3c67c32eb2be
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ github.com/bgentry/speakeasy v0.1.1-0.20220910012023-760eaf8b6816/go.mod h1:+zsy
github.com/bits-and-blooms/bitset v1.2.0 h1:Kn4yilvwNtMACtf1eYDlG8H77R07mZSPbMjLyS07ChA=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c=
github.com/bnb-chain/greenfield v1.4.1-0.20240313092637-df143c62cbb5 h1:xR5qjFrUesFAw3YTndfhHPauiFLRy7nEoX9rMtCbrrs=
github.com/bnb-chain/greenfield v1.4.1-0.20240313092637-df143c62cbb5/go.mod h1:N45Dfmj0EwL4xiKtpVvX1AkeER1CtKR0vMF+Nr9gLbE=
github.com/bnb-chain/greenfield v1.5.1-0.20240314113148-e4cc584f6da2 h1:y2eM3EGsVjCaHKLDrfZ5n0gtFZuDza2BEwQvaQJlzFM=
github.com/bnb-chain/greenfield v1.5.1-0.20240314113148-e4cc584f6da2/go.mod h1:z970om1k0EPmDFCUvxZpufQz3a1bOP7QriaZbaywaVY=
github.com/bnb-chain/greenfield-cometbft v1.2.0 h1:LTStppZS9WkVj0TfEYKkk5OAQDGfYlUefWByr7Zr018=
github.com/bnb-chain/greenfield-cometbft v1.2.0/go.mod h1:WVOEZ59UYM2XePQH47/IQfcInspDn8wbRXhFSJrbU1c=
github.com/bnb-chain/greenfield-cometbft-db v0.8.1-alpha.1 h1:XcWulGacHVRiSCx90Q8Y//ajOrLNBQWR/KDB89dy3cU=
Expand All @@ -192,8 +192,8 @@ github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20231206043955-0855e0965b
github.com/bnb-chain/greenfield-cosmos-sdk/math v0.0.0-20231206043955-0855e0965bc8/go.mod h1:An0MllWJY6PxibUpnwGk8jOm+a/qIxlKmL5Zyp9NnaM=
github.com/bnb-chain/greenfield-iavl v0.20.1 h1:y3L64GU99otNp27/xLVBTDbv4eroR6CzoYz0rbaVotM=
github.com/bnb-chain/greenfield-iavl v0.20.1/go.mod h1:oLksTs8dfh7DYIKBro7hbRQ+ewls7ghJ27pIXlbEXyI=
github.com/bnb-chain/juno/v4 v4.0.0-20240221084449-8b5ee76301fa h1:bkNYQmoiE0ZpigBawuqX+UOJziBboDqD8Z3REYMpIzI=
github.com/bnb-chain/juno/v4 v4.0.0-20240221084449-8b5ee76301fa/go.mod h1:p+KkPIHURpqUJOdXanhhYgZpTLQxGZWkrAwtUaOuPlM=
github.com/bnb-chain/juno/v4 v4.0.0-20240326082854-3c67c32eb2be h1:yympDqs1gg3oUgSTm31ybKhvWgsEySvqC7LO1npVvRc=
github.com/bnb-chain/juno/v4 v4.0.0-20240326082854-3c67c32eb2be/go.mod h1:p+KkPIHURpqUJOdXanhhYgZpTLQxGZWkrAwtUaOuPlM=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/bradfitz/gomemcache v0.0.0-20170208213004-1952afaa557d/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60=
Expand Down
12 changes: 12 additions & 0 deletions modular/authenticator/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
ErrInvalidAddressOrDomainOrPublicKey = gfsperrors.Register(module.AuthenticationModularName, http.StatusBadRequest, 20017, "userAddress, domain or publicKey can't be null")
ErrInvalidPublicKeyLength = gfsperrors.Register(module.AuthenticationModularName, http.StatusBadRequest, 20018, "The length of publicKeys must be less or equal to 100")
ErrPublicKeyNotExist = gfsperrors.Register(module.AuthenticationModularName, http.StatusBadRequest, 20019, "The publicKey was not registered")
ErrBucketIsRateLimited = gfsperrors.Register(module.AuthenticationModularName, http.StatusBadRequest, 20020, "bucket is rate limited")
)

func ErrUnexpectedObjectStatusWithDetail(objectName string, expectedStatus storagetypes.ObjectStatus, actualStatus storagetypes.ObjectStatus) *gfsperrors.GfSpError {
Expand Down Expand Up @@ -441,6 +442,17 @@ func (a *AuthenticationModular) VerifyAuthentication(
}
return false, ErrConsensusWithDetail("failed to get bucket and object info from consensus, error: " + err.Error())
}
bucketExtraInfo, err := a.baseApp.Consensus().QueryBucketExtraInfo(ctx, bucket)
if err != nil {
log.CtxErrorw(ctx, "failed to get bucket extra info from consensus", "error", err)
if strings.Contains(err.Error(), "No such bucket") {
return false, ErrNoSuchBucket
}
return false, ErrConsensusWithDetail("failed to get bucket extra info from consensus, error: " + err.Error())
}
if bucketExtraInfo.IsRateLimited {
return false, ErrBucketIsRateLimited
}
spID, err := a.getSPID()
if err != nil {
return false, ErrConsensusWithDetail("getSPID error: " + err.Error())
Expand Down
15 changes: 14 additions & 1 deletion modular/authenticator/authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,9 @@ func VerifyObjectAndBucketAndSPID(t *testing.T, authType coremodule.AuthOpType)
// object exists but get SP ID returns error
mockedConsensus = consensus.NewMockConsensus(ctrl)
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{}, nil).Times(1)

if authType == coremodule.AuthOpTypeGetObject {
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
}
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(nil, errors.New("error")).Times(1)
a.baseApp.SetConsensus(mockedConsensus)
_, err = a.VerifyAuthentication(context.Background(), authType, userAddress, "test_bucket", "test_object")
Expand All @@ -856,6 +858,9 @@ func VerifyObjectAndBucketAndSPID(t *testing.T, authType coremodule.AuthOpType)
// QueryVirtualGroupFamily returns error
mockedConsensus = consensus.NewMockConsensus(ctrl)
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{}, nil).Times(1)
if authType == coremodule.AuthOpTypeGetObject {
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
}
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(1)
Expand All @@ -867,6 +872,9 @@ func VerifyObjectAndBucketAndSPID(t *testing.T, authType coremodule.AuthOpType)
// bucketSPID != spID
mockedConsensus = consensus.NewMockConsensus(ctrl)
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{}, nil).Times(1)
if authType == coremodule.AuthOpTypeGetObject {
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
}
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(0) // the SPID query result is cached already in above tests
Expand Down Expand Up @@ -980,6 +988,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) {
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{
ObjectStatus: storagetypes.OBJECT_STATUS_CREATED,
}, nil).Times(1)
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(1)
Expand All @@ -996,6 +1005,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) {
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{
ObjectStatus: storagetypes.OBJECT_STATUS_SEALED,
}, nil).Times(1)
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(0) // the SPID query result is cached already in above tests
Expand All @@ -1012,6 +1022,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) {
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{
ObjectStatus: storagetypes.OBJECT_STATUS_SEALED,
}, nil).Times(1)
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(0) // the SPID query result is cached already in above tests
Expand All @@ -1030,6 +1041,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) {
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{
ObjectStatus: storagetypes.OBJECT_STATUS_SEALED,
}, nil).Times(1)
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(0) // the SPID query result is cached already in above tests
Expand All @@ -1053,6 +1065,7 @@ func Test_VerifyAuth_GetObject(t *testing.T) {
mockedConsensus.EXPECT().QueryBucketInfoAndObjectInfo(gomock.Any(), gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{}, &storagetypes.ObjectInfo{
ObjectStatus: storagetypes.OBJECT_STATUS_SEALED,
}, nil).Times(1)
mockedConsensus.EXPECT().QueryBucketExtraInfo(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketExtraInfo{IsRateLimited: false}, nil).Times(1)
mockedConsensus.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{
Id: 1,
}, nil).Times(0) // the SPID query result is cached already in above tests
Expand Down
14 changes: 14 additions & 0 deletions modular/blocksyncer/database/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,17 @@ func (db *DB) UpdateChargeSizeToSQL(ctx context.Context, objectID common.Hash, b
finalSql := fmt.Sprintf(sql, operation, tableName, tableName)
return finalSql, vars
}

// GetBucketByBucketName get bucket by bucket name
func (db *DB) GetBucketByBucketName(ctx context.Context, bucketName string) (*models.Bucket, error) {
var (
bucket *models.Bucket
err error
)
err = db.Db.WithContext(ctx).Table((&models.Bucket{}).TableName()).
Where("bucket_name = ? AND removed = ?", bucketName, false).Take(&bucket).Error
if err != nil {
return nil, err
}
return bucket, nil
}
21 changes: 21 additions & 0 deletions modular/blocksyncer/modules/bucket/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ var BucketEvents = map[string]bool{
EventCompleteMigrationBucket: true,
}

type OffChainStatus int

const (
OffChainStatusIsLimited OffChainStatus = 1 << iota // 1
)

// AddStatus updates the current status by adding the specified status to it.
func AddStatus(currentStatus int, statusToAdd int) int {
return currentStatus | statusToAdd
}

// RemoveStatus removes the specified status from the current status.
func RemoveStatus(currentStatus int, statusToRemove int) int {
return currentStatus &^ statusToRemove
}

// IsStatusSet checks if the specified status is set in the current status.
func IsStatusSet(currentStatus int, statusToCheck int) bool {
return currentStatus&statusToCheck == statusToCheck
}

func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, event sdk.Event) (map[string][]interface{}, error) {
if !BucketEvents[event.Type] {
return nil, nil
Expand Down
48 changes: 44 additions & 4 deletions modular/blocksyncer/modules/payment/payment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,30 @@ import (
"context"
"errors"

bucketmodule "github.com/bnb-chain/greenfield-storage-provider/modular/blocksyncer/modules/bucket"
paymenttypes "github.com/bnb-chain/greenfield/x/payment/types"
storagetypes "github.com/bnb-chain/greenfield/x/storage/types"
abci "github.com/cometbft/cometbft/abci/types"
tmctypes "github.com/cometbft/cometbft/rpc/core/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
"gorm.io/gorm"

"github.com/forbole/juno/v4/common"
"github.com/forbole/juno/v4/log"
"github.com/forbole/juno/v4/models"
)

var (
EventPaymentAccountUpdate = proto.MessageName(&paymenttypes.EventPaymentAccountUpdate{})
EventStreamRecordUpdate = proto.MessageName(&paymenttypes.EventStreamRecordUpdate{})
EventPaymentAccountUpdate = proto.MessageName(&paymenttypes.EventPaymentAccountUpdate{})
EventStreamRecordUpdate = proto.MessageName(&paymenttypes.EventStreamRecordUpdate{})
EventBucketFlowRateLimitStatus = proto.MessageName(&storagetypes.EventBucketFlowRateLimitStatus{})
)

var PaymentEvents = map[string]bool{
EventPaymentAccountUpdate: true,
EventStreamRecordUpdate: true,
EventPaymentAccountUpdate: true,
EventStreamRecordUpdate: true,
EventBucketFlowRateLimitStatus: true,
}

func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, event sdk.Event) (map[string][]interface{}, error) {
Expand Down Expand Up @@ -51,6 +56,13 @@ func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.Res
return nil, errors.New("update stream record event assert error")
}
return m.handleEventStreamRecordUpdate(ctx, streamRecordUpdate), nil
case EventBucketFlowRateLimitStatus:
bucketFlowRateLimitStatus, ok := typedEvent.(*storagetypes.EventBucketFlowRateLimitStatus)
if !ok {
log.Errorw("type assert error", "type", "EventBucketFlowRateLimitStatus", "event", typedEvent)
return nil, errors.New("bucket flow rate limit status event assert error")
}
return m.handleEventBucketFlowRateLimitStatus(ctx, block, txHash, bucketFlowRateLimitStatus), nil
}
return nil, nil
}
Expand Down Expand Up @@ -92,3 +104,31 @@ func (m *Module) handleEventStreamRecordUpdate(ctx context.Context, streamRecord
k: v,
}
}

func (m *Module) handleEventBucketFlowRateLimitStatus(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, bucketFlowRateLimitStatus *storagetypes.EventBucketFlowRateLimitStatus) map[string][]interface{} {
bucket, err := m.db.GetBucketByBucketName(ctx, bucketFlowRateLimitStatus.BucketName)
if err != nil {
if err == gorm.ErrRecordNotFound {
return nil
}
}
var offChainStatus int
if bucketFlowRateLimitStatus.IsLimited {
offChainStatus = bucketmodule.AddStatus(bucket.OffChainStatus, int(bucketmodule.OffChainStatusIsLimited))
} else {
offChainStatus = bucketmodule.RemoveStatus(bucket.OffChainStatus, int(bucketmodule.OffChainStatusIsLimited))
}
bucketStatus := &models.Bucket{
BucketName: bucketFlowRateLimitStatus.BucketName,
OffChainStatus: offChainStatus,

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketByNameToSQL(ctx, bucketStatus)
return map[string][]interface{}{
k: v,
}
}
Loading

0 comments on commit a4c774b

Please sign in to comment.