From 84f46db5f1e32cae27286d4588ea2d8ef4c2cc6a Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Thu, 22 Aug 2024 19:31:21 +0400 Subject: [PATCH] fix: remove sectorBlocks package from Boost (#1955) * remove sectorBlocks package from Boost * fix expect * fix mocks --- node/builder.go | 5 +---- node/modules/directdeals.go | 5 ++--- node/modules/storageminer.go | 5 ++--- storagemarket/provider.go | 9 ++++----- storagemarket/smtestutil/mocks.go | 24 ++++++++++++------------ storagemarket/types/mock_types/mocks.go | 20 ++++++++++---------- storagemarket/types/types.go | 3 ++- 7 files changed, 33 insertions(+), 38 deletions(-) diff --git a/node/builder.go b/node/builder.go index a7665ba86..1883e87b6 100644 --- a/node/builder.go +++ b/node/builder.go @@ -63,7 +63,6 @@ import ( "github.com/filecoin-project/lotus/storage/ctladdr" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer" - "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/filecoin-project/lotus/system" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-metrics-interface" @@ -461,9 +460,7 @@ func ConfigBoost(cfg *config.Boost) Option { })), // Sector API - Override(new(sectorblocks.SectorBuilder), From(new(lotus_modules.MinerStorageService))), - - Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), + Override(new(smtypes.PieceAdder), From(new(lotus_modules.MinerStorageService))), // Sealing Pipeline State API Override(new(sealingpipeline.API), From(new(lotus_modules.MinerStorageService))), diff --git a/node/modules/directdeals.go b/node/modules/directdeals.go index 63032f24f..2094a555d 100644 --- a/node/modules/directdeals.go +++ b/node/modules/directdeals.go @@ -18,14 +18,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api/v1api" - "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/libp2p/go-libp2p/core/host" "go.uber.org/fx" ) -func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) { +func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) { return func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB, - fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, + fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 445411230..80a1f606e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -52,7 +52,6 @@ import ( "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/repo" lotus_repo "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/storage/sectorblocks" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -469,9 +468,9 @@ func NewLegacyDealsManager(lc fx.Lifecycle, legacyFSM fsm.Group) legacy.LegacyDe return mgr } -func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) { +func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) { return func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, - fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, + fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) { diff --git a/storagemarket/provider.go b/storagemarket/provider.go index 6898c4cf1..5e09fee1c 100644 --- a/storagemarket/provider.go +++ b/storagemarket/provider.go @@ -30,7 +30,6 @@ import ( "github.com/filecoin-project/dagstore" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v1api" sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/pipeline/piece" @@ -684,8 +683,8 @@ func (p *Provider) AddPieceToSector(ctx context.Context, deal smtypes.ProviderDe }, nil } -func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader, sdInfo lapi.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - sectorNum, offset, err := pieceAdder.AddPiece(ctx, pieceSize, pieceData, sdInfo) +func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader, sdInfo piece.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + info, err := pieceAdder.SectorAddPieceToAny(ctx, pieceSize, pieceData, sdInfo) curTime := build.Clock.Now() for err != nil && build.Clock.Since(curTime) < addPieceRetryTimeout { // Check if the error was because there are too many sectors sealing @@ -697,10 +696,10 @@ func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, piece // There are too many sectors sealing, back off for a while then try again select { case <-build.Clock.After(addPieceRetryWait): - sectorNum, offset, err = pieceAdder.AddPiece(ctx, pieceSize, pieceData, sdInfo) + info, err = pieceAdder.SectorAddPieceToAny(ctx, pieceSize, pieceData, sdInfo) case <-ctx.Done(): return 0, 0, fmt.Errorf("shutdown while adding piece") } } - return sectorNum, offset, err + return info.Sector, info.Offset, err } diff --git a/storagemarket/smtestutil/mocks.go b/storagemarket/smtestutil/mocks.go index f48dc437c..92c84562f 100644 --- a/storagemarket/smtestutil/mocks.go +++ b/storagemarket/smtestutil/mocks.go @@ -9,6 +9,7 @@ import ( "sync" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/storage/pipeline/piece" pdtypes "github.com/filecoin-project/boost/piecedirectory/types" mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks" @@ -18,7 +19,6 @@ import ( "github.com/filecoin-project/boost/testutil" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/builtin/v9/market" - "github.com/filecoin-project/lotus/api" lapi "github.com/filecoin-project/lotus/api" sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/golang/mock/gomock" @@ -138,7 +138,7 @@ func (mb *MinerStubBuilder) SetupNoOp() *MinerStubBuilder { }, nil }).AnyTimes() - mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { return mb.sectorId, mb.offset, nil }).AnyTimes() @@ -281,11 +281,11 @@ func (mb *MinerStubBuilder) SetupAddPiece(blocking bool) *MinerStubBuilder { } mb.stub.lk.Unlock() - sdInfo := lapi.PieceDealInfo{ + sdInfo := piece.PieceDealInfo{ DealID: mb.dealId, DealProposal: &mb.dp.ClientDealProposal.Proposal, PublishCid: &mb.finalPublishCid, - DealSchedule: lapi.DealSchedule{ + DealSchedule: piece.DealSchedule{ StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch, EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch, }, @@ -293,25 +293,25 @@ func (mb *MinerStubBuilder) SetupAddPiece(blocking bool) *MinerStubBuilder { } var readBytes []byte - mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(ctx context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { + mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(ctx context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (lapi.SectorOffset, error) { mb.stub.lk.Lock() ch := mb.stub.unblockAddPiece[mb.dp.DealUUID] mb.stub.lk.Unlock() if ch != nil { select { case <-ctx.Done(): - return abi.SectorNumber(0), abi.PaddedPieceSize(0), ctx.Err() + return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, ctx.Err() case <-ch: } } if ctx.Err() != nil { - return abi.SectorNumber(0), abi.PaddedPieceSize(0), ctx.Err() + return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, ctx.Err() } var err error readBytes, err = io.ReadAll(r) - return mb.sectorId, mb.offset, err + return lapi.SectorOffset{Sector: mb.sectorId, Offset: mb.offset}, err }) mb.rb = &readBytes @@ -319,19 +319,19 @@ func (mb *MinerStubBuilder) SetupAddPiece(blocking bool) *MinerStubBuilder { } func (mb *MinerStubBuilder) SetupAddPieceFailure(err error) { - sdInfo := lapi.PieceDealInfo{ + sdInfo := piece.PieceDealInfo{ DealID: mb.dealId, DealProposal: &mb.dp.ClientDealProposal.Proposal, PublishCid: &mb.finalPublishCid, - DealSchedule: lapi.DealSchedule{ + DealSchedule: piece.DealSchedule{ StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch, EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch, }, KeepUnsealed: !mb.dp.RemoveUnsealedCopy, } - mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { - return abi.SectorNumber(0), abi.PaddedPieceSize(0), err + mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (lapi.SectorOffset, error) { + return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, err }) } diff --git a/storagemarket/types/mock_types/mocks.go b/storagemarket/types/mock_types/mocks.go index 362d58954..9bb73d5ce 100644 --- a/storagemarket/types/mock_types/mocks.go +++ b/storagemarket/types/mock_types/mocks.go @@ -12,6 +12,7 @@ import ( types "github.com/filecoin-project/boost/storagemarket/types" abi "github.com/filecoin-project/go-state-types/abi" market "github.com/filecoin-project/go-state-types/builtin/v9/market" + api "github.com/filecoin-project/lotus/api" piece "github.com/filecoin-project/lotus/storage/pipeline/piece" gomock "github.com/golang/mock/gomock" cid "github.com/ipfs/go-cid" @@ -40,20 +41,19 @@ func (m *MockPieceAdder) EXPECT() *MockPieceAdderMockRecorder { return m.recorder } -// AddPiece mocks base method. -func (m *MockPieceAdder) AddPiece(arg0 context.Context, arg1 abi.UnpaddedPieceSize, arg2 io.Reader, arg3 piece.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { +// SectorAddPieceToAny mocks base method. +func (m *MockPieceAdder) SectorAddPieceToAny(arg0 context.Context, arg1 abi.UnpaddedPieceSize, arg2 io.Reader, arg3 piece.PieceDealInfo) (api.SectorOffset, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddPiece", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(abi.SectorNumber) - ret1, _ := ret[1].(abi.PaddedPieceSize) - ret2, _ := ret[2].(error) - return ret0, ret1, ret2 + ret := m.ctrl.Call(m, "SectorAddPieceToAny", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(api.SectorOffset) + ret1, _ := ret[1].(error) + return ret0, ret1 } -// AddPiece indicates an expected call of AddPiece. -func (mr *MockPieceAdderMockRecorder) AddPiece(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +// SectorAddPieceToAny indicates an expected call of SectorAddPieceToAny. +func (mr *MockPieceAdderMockRecorder) SectorAddPieceToAny(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPiece", reflect.TypeOf((*MockPieceAdder)(nil).AddPiece), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SectorAddPieceToAny", reflect.TypeOf((*MockPieceAdder)(nil).SectorAddPieceToAny), arg0, arg1, arg2, arg3) } // MockCommpCalculator is a mock of CommpCalculator interface. diff --git a/storagemarket/types/types.go b/storagemarket/types/types.go index 98bc0da47..6ecf85aa0 100644 --- a/storagemarket/types/types.go +++ b/storagemarket/types/types.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/pipeline/piece" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/google/uuid" "github.com/ipfs/go-cid" @@ -156,7 +157,7 @@ type DealResponse struct { } type PieceAdder interface { - AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) + SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d piece.PieceDealInfo) (api.SectorOffset, error) } type CommpCalculator interface {