Skip to content

Commit

Permalink
core: fix removing old transfer data with RemoveUntraceableHeaders
Browse files Browse the repository at this point in the history
Transfer data is timestamp-based, previously it always had and used headers,
no we can go via a small cache (we don't want it to grow or be stored forever).
Otherwise it's unable to do the job:

    2024-12-13T12:55:15.056+0300    ERROR   failed to find block header for transfer GC     {"time": "19.066µs", "error": "key not found"}

Signed-off-by: Roman Khimov <roman@nspcc.ru>
  • Loading branch information
roman-khimov committed Dec 13, 2024
1 parent 9b542e5 commit 642416d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
41 changes: 32 additions & 9 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
json "github.com/nspcc-dev/go-ordered-json"
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/config/limits"
Expand Down Expand Up @@ -61,6 +62,14 @@ const (
// HeaderVerificationGasLimit is the maximum amount of GAS for block header verification.
HeaderVerificationGasLimit = 3_00000000 // 3 GAS
defaultStateSyncInterval = 40000

// defaultBlockTimesCache should be sufficient for tryRunGC() to get in
// sync with storeBlock(). Most of the time they differ by some thousands of
// blocks and GC interval is more like 10K, so this is sufficient for 80K
// deviation and should be sufficient. If it's not, it's not a big issue
// either, the next cycle will still do the job (only transfers need this,
// MPT won't notice at all).
defaultBlockTimesCache = 8
)

// stateChangeStage denotes the stage of state modification process.
Expand Down Expand Up @@ -156,6 +165,11 @@ type Blockchain struct {
// Current persisted block count.
persistedHeight uint32

// Index->Timestamp cache for garbage collector. Headers can be gone
// by the time it runs, so we use a tiny little cache to sync block
// removal (performed in storeBlock()) with transfer/MPT GC (tryRunGC())
gcBlockTimes *lru.Cache[uint32, uint64]

// Stop synchronization mechanisms.
stopCh chan struct{}
runToExitCh chan struct{}
Expand Down Expand Up @@ -324,6 +338,7 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
contracts: *native.NewContracts(cfg.ProtocolConfiguration),
}

bc.gcBlockTimes, _ = lru.New[uint32, uint64](defaultBlockTimesCache) // Never errors for positive size
bc.stateRoot = stateroot.NewModule(cfg, bc.VerifyWitness, bc.log, bc.dao.Store)
bc.contracts.Designate.StateRootService = bc.stateRoot

Expand Down Expand Up @@ -606,7 +621,7 @@ func (bc *Blockchain) jumpToStateInternal(p uint32, stage stateChangeStage) erro
// After current state is updated, we need to remove outdated state-related data if so.
// The only outdated data we might have is genesis-related data, so check it.
if p-bc.config.MaxTraceableBlocks > 0 {
err := cache.DeleteBlock(bc.GetHeaderHash(0), false)
_, err := cache.DeleteBlock(bc.GetHeaderHash(0), false)
if err != nil {
return fmt.Errorf("failed to remove outdated state data for the genesis block: %w", err)
}
Expand Down Expand Up @@ -800,7 +815,7 @@ func (bc *Blockchain) resetStateInternal(height uint32, stage stateChangeStage)
keysCnt = new(int)
)
for i := height + 1; i <= currHeight; i++ {
err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false)
_, err := upperCache.DeleteBlock(bc.GetHeaderHash(i), false)
if err != nil {
return fmt.Errorf("error while removing block %d: %w", i, err)
}
Expand Down Expand Up @@ -1289,15 +1304,20 @@ func appendTokenTransferInfo(transferData *state.TokenTransferInfo,

func (bc *Blockchain) removeOldTransfers(index uint32) time.Duration {
bc.log.Info("starting transfer data garbage collection", zap.Uint32("index", index))
start := time.Now()
h, err := bc.GetHeader(bc.GetHeaderHash(index))
if err != nil {
var (
err error
kept int64
removed int64
start = time.Now()
ts, ok = bc.gcBlockTimes.Get(index)
)

if !ok {
dur := time.Since(start)
bc.log.Error("failed to find block header for transfer GC", zap.Duration("time", dur), zap.Error(err))
bc.log.Error("failed to get block timestamp transfer GC", zap.Duration("time", dur), zap.Uint32("index", index))
return dur
}
var removed, kept int64
var ts = h.Timestamp

prefixes := []byte{byte(storage.STNEP11Transfers), byte(storage.STNEP17Transfers)}

for i := range prefixes {
Expand Down Expand Up @@ -1623,7 +1643,10 @@ func (bc *Blockchain) storeBlock(block *block.Block, txpool *mempool.Pool) error
stop = start + 1
}
for index := start; index < stop; index++ {
err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders)
ts, err := kvcache.DeleteBlock(bc.GetHeaderHash(index), bc.config.Ledger.RemoveUntraceableHeaders)
if bc.config.Ledger.RemoveUntraceableHeaders && index%bc.config.Ledger.GarbageCollectionPeriod == 0 {
_ = bc.gcBlockTimes.Add(index, ts)
}

Check warning on line 1649 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L1648-L1649

Added lines #L1648 - L1649 were not covered by tests
if err != nil {
bc.log.Warn("error while removing old block",
zap.Uint32("index", index),
Expand Down
1 change: 1 addition & 0 deletions pkg/core/blockchain_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestRemoveOldTransfers(t *testing.T) {

_, err = bc.dao.Persist()
require.NoError(t, err)
_ = bc.gcBlockTimes.Add(0, h.Timestamp)
_ = bc.removeOldTransfers(0)

for i := range uint32(2) {
Expand Down
15 changes: 8 additions & 7 deletions pkg/core/dao/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,18 +765,19 @@ func (dao *Simple) StoreAsBlock(block *block.Block, aer1 *state.AppExecResult, a
}

// DeleteBlock removes the block from dao. It's not atomic, so make sure you're
// using private MemCached instance here.
func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error {
// using private MemCached instance here. It returns block timestamp for GC
// convenience.
func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) (uint64, error) {
key := dao.makeExecutableKey(h)

b, err := dao.getBlock(key)
if err != nil {
return err
return 0, err

Check warning on line 775 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L775

Added line #L775 was not covered by tests
}
if !dropHeader {
err = dao.storeHeader(key, &b.Header)
if err != nil {
return err
return 0, err
}

Check warning on line 781 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L780-L781

Added lines #L780 - L781 were not covered by tests
} else {
dao.Store.Delete(key)
Expand All @@ -791,7 +792,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error {

v, err := dao.Store.Get(key)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err)
return 0, fmt.Errorf("failed to retrieve conflict record stub for %s (height %d, conflict %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), err)

Check warning on line 795 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L795

Added line #L795 was not covered by tests
}
// It might be a block since we allow transactions to have block hash in the Conflicts attribute.
if v[0] != storage.ExecTransaction {
Expand All @@ -809,7 +810,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error {
sKey := append(key, s.Account.BytesBE()...)
v, err := dao.Store.Get(sKey)
if err != nil {
return fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err)
return 0, fmt.Errorf("failed to retrieve conflict record for %s (height %d, conflict %s, signer %s): %w", tx.Hash().StringLE(), b.Index, hash.StringLE(), address.Uint160ToString(s.Account), err)

Check warning on line 813 in pkg/core/dao/dao.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/dao/dao.go#L813

Added line #L813 was not covered by tests
}
index = binary.LittleEndian.Uint32(v[1:])
if index == b.Index {
Expand All @@ -819,7 +820,7 @@ func (dao *Simple) DeleteBlock(h util.Uint256, dropHeader bool) error {
}
}

return nil
return b.Timestamp, nil
}

// PurgeHeader completely removes specified header from dao. It differs from
Expand Down
9 changes: 7 additions & 2 deletions pkg/core/dao/dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestPutGetBlock(t *testing.T) {
dao := NewSimple(storage.NewMemoryStore(), false)
b := &block.Block{
Header: block.Header{
Timestamp: 42,
Script: transaction.Witness{
VerificationScript: []byte{byte(opcode.PUSH1)},
InvocationScript: []byte{byte(opcode.NOP)},
Expand Down Expand Up @@ -108,12 +109,16 @@ func TestPutGetBlock(t *testing.T) {
require.Equal(t, *appExecResult1, gotAppExecResult[0])
require.Equal(t, *appExecResult2, gotAppExecResult[1])

require.NoError(t, dao.DeleteBlock(hash, false))
ts, err := dao.DeleteBlock(hash, false)
require.NoError(t, err)
require.Equal(t, uint64(42), ts)
gotBlock, err = dao.GetBlock(hash) // It's just a header, but it's still there.
require.NoError(t, err)
require.NotNil(t, gotBlock)

require.NoError(t, dao.DeleteBlock(hash, true))
ts, err = dao.DeleteBlock(hash, true)
require.NoError(t, err)
require.Equal(t, uint64(42), ts)
_, err = dao.GetBlock(hash)
require.Error(t, err)
}
Expand Down

0 comments on commit 642416d

Please sign in to comment.