Skip to content

Commit

Permalink
fix: remove sectorBlocks package from Boost (#1955)
Browse files Browse the repository at this point in the history
* remove sectorBlocks package from Boost

* fix expect

* fix mocks
  • Loading branch information
LexLuthr authored Aug 22, 2024
1 parent a370034 commit 84f46db
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 38 deletions.
5 changes: 1 addition & 4 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/filecoin-project/lotus/system"
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
Expand Down Expand Up @@ -461,9 +460,7 @@ func ConfigBoost(cfg *config.Boost) Option {
})),

// Sector API
Override(new(sectorblocks.SectorBuilder), From(new(lotus_modules.MinerStorageService))),

Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(smtypes.PieceAdder), From(new(lotus_modules.MinerStorageService))),

// Sealing Pipeline State API
Override(new(sealingpipeline.API), From(new(lotus_modules.MinerStorageService))),
Expand Down
5 changes: 2 additions & 3 deletions node/modules/directdeals.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/libp2p/go-libp2p/core/host"
"go.uber.org/fx"
)

func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
func NewDirectDealsProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
return func(lc fx.Lifecycle, h host.Host, fullnodeApi v1api.FullNode, sqldb *sql.DB, directDealsDB *db.DirectDealsDB,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, dp *storageadapter.DealPublisher, secb types.PieceAdder,
commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API,
df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.DirectDealsProvider, error) {
Expand Down
5 changes: 2 additions & 3 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
lotus_repo "github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sectorblocks"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
Expand Down Expand Up @@ -469,9 +468,9 @@ func NewLegacyDealsManager(lc fx.Lifecycle, legacyFSM fsm.Group) legacy.LegacyDe
return mgr
}

func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
func NewStorageMarketProvider(provAddr address.Address, cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb types.PieceAdder, commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API, df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB, piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
return func(lc fx.Lifecycle, h host.Host, a v1api.FullNode, sqldb *sql.DB, dealsDB *db.DealsDB,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb *sectorblocks.SectorBlocks,
fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, sask storedask.StoredAsk, dp *storageadapter.DealPublisher, secb types.PieceAdder,
commpc types.CommpCalculator, commpt storagemarket.CommpThrottle, sps sealingpipeline.API,
df dtypes.StorageDealFilter, logsSqlDB *LogSqlDB, logsDB *db.LogsDB,
piecedirectory *piecedirectory.PieceDirectory, ip *indexprovider.Wrapper, cdm *storagemarket.ChainDealManager) (*storagemarket.Provider, error) {
Expand Down
9 changes: 4 additions & 5 deletions storagemarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v1api"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/filecoin-project/lotus/storage/pipeline/piece"
Expand Down Expand Up @@ -684,8 +683,8 @@ func (p *Provider) AddPieceToSector(ctx context.Context, deal smtypes.ProviderDe
}, nil
}

func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader, sdInfo lapi.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
sectorNum, offset, err := pieceAdder.AddPiece(ctx, pieceSize, pieceData, sdInfo)
func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader, sdInfo piece.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
info, err := pieceAdder.SectorAddPieceToAny(ctx, pieceSize, pieceData, sdInfo)
curTime := build.Clock.Now()
for err != nil && build.Clock.Since(curTime) < addPieceRetryTimeout {
// Check if the error was because there are too many sectors sealing
Expand All @@ -697,10 +696,10 @@ func addPieceWithRetry(ctx context.Context, pieceAdder smtypes.PieceAdder, piece
// There are too many sectors sealing, back off for a while then try again
select {
case <-build.Clock.After(addPieceRetryWait):
sectorNum, offset, err = pieceAdder.AddPiece(ctx, pieceSize, pieceData, sdInfo)
info, err = pieceAdder.SectorAddPieceToAny(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return 0, 0, fmt.Errorf("shutdown while adding piece")
}
}
return sectorNum, offset, err
return info.Sector, info.Offset, err
}
24 changes: 12 additions & 12 deletions storagemarket/smtestutil/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/storage/pipeline/piece"

pdtypes "github.com/filecoin-project/boost/piecedirectory/types"
mock_piecedirectory "github.com/filecoin-project/boost/piecedirectory/types/mocks"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/filecoin-project/boost/testutil"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/builtin/v9/market"
"github.com/filecoin-project/lotus/api"
lapi "github.com/filecoin-project/lotus/api"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -138,7 +138,7 @@ func (mb *MinerStubBuilder) SetupNoOp() *MinerStubBuilder {
}, nil
}).AnyTimes()

mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
return mb.sectorId, mb.offset, nil
}).AnyTimes()

Expand Down Expand Up @@ -281,57 +281,57 @@ func (mb *MinerStubBuilder) SetupAddPiece(blocking bool) *MinerStubBuilder {
}
mb.stub.lk.Unlock()

sdInfo := lapi.PieceDealInfo{
sdInfo := piece.PieceDealInfo{
DealID: mb.dealId,
DealProposal: &mb.dp.ClientDealProposal.Proposal,
PublishCid: &mb.finalPublishCid,
DealSchedule: lapi.DealSchedule{
DealSchedule: piece.DealSchedule{
StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch,
},
KeepUnsealed: !mb.dp.RemoveUnsealedCopy,
}

var readBytes []byte
mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(ctx context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(ctx context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (lapi.SectorOffset, error) {
mb.stub.lk.Lock()
ch := mb.stub.unblockAddPiece[mb.dp.DealUUID]
mb.stub.lk.Unlock()
if ch != nil {
select {
case <-ctx.Done():
return abi.SectorNumber(0), abi.PaddedPieceSize(0), ctx.Err()
return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, ctx.Err()
case <-ch:
}

}
if ctx.Err() != nil {
return abi.SectorNumber(0), abi.PaddedPieceSize(0), ctx.Err()
return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, ctx.Err()
}

var err error
readBytes, err = io.ReadAll(r)
return mb.sectorId, mb.offset, err
return lapi.SectorOffset{Sector: mb.sectorId, Offset: mb.offset}, err
})

mb.rb = &readBytes
return mb
}

func (mb *MinerStubBuilder) SetupAddPieceFailure(err error) {
sdInfo := lapi.PieceDealInfo{
sdInfo := piece.PieceDealInfo{
DealID: mb.dealId,
DealProposal: &mb.dp.ClientDealProposal.Proposal,
PublishCid: &mb.finalPublishCid,
DealSchedule: lapi.DealSchedule{
DealSchedule: piece.DealSchedule{
StartEpoch: mb.dp.ClientDealProposal.Proposal.StartEpoch,
EndEpoch: mb.dp.ClientDealProposal.Proposal.EndEpoch,
},
KeepUnsealed: !mb.dp.RemoveUnsealedCopy,
}

mb.stub.MockPieceAdder.EXPECT().AddPiece(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
return abi.SectorNumber(0), abi.PaddedPieceSize(0), err
mb.stub.MockPieceAdder.EXPECT().SectorAddPieceToAny(gomock.Any(), gomock.Eq(mb.dp.ClientDealProposal.Proposal.PieceSize.Unpadded()), gomock.Any(), gomock.Eq(sdInfo)).DoAndReturn(func(_ context.Context, _ abi.UnpaddedPieceSize, r io.Reader, _ piece.PieceDealInfo) (lapi.SectorOffset, error) {
return lapi.SectorOffset{Sector: abi.SectorNumber(0), Offset: abi.PaddedPieceSize(0)}, err
})
}

Expand Down
20 changes: 10 additions & 10 deletions storagemarket/types/mock_types/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion storagemarket/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -156,7 +157,7 @@ type DealResponse struct {
}

type PieceAdder interface {
AddPiece(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d api.PieceDealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error)
SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d piece.PieceDealInfo) (api.SectorOffset, error)
}

type CommpCalculator interface {
Expand Down

0 comments on commit 84f46db

Please sign in to comment.