diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index 5b50a93392a..d53de8a77e0 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -53,6 +53,215 @@ import ( "github.com/stretchr/testify/require" ) +func TestAggregatorV3_Merge(t *testing.T) { + t.Parallel() + db, agg := testDbAndAggregatorv3(t, 10) + rwTx, err := db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + + ac := agg.BeginFilesRo() + defer ac.Close() + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + require.NoError(t, err) + defer domains.Close() + + txs := uint64(1000) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + var ( + commKey1 = []byte("someCommKey") + commKey2 = []byte("otherCommKey") + ) + + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + domains.SetTxNum(txNum) + + addr, loc := make([]byte, length.Addr), make([]byte, length.Hash) + + n, err := rnd.Read(addr) + require.NoError(t, err) + require.EqualValues(t, length.Addr, n) + + n, err = rnd.Read(loc) + require.NoError(t, err) + require.EqualValues(t, length.Hash, n) + + buf := types.EncodeAccountBytesV3(1, uint256.NewInt(0), nil, 0) + err = domains.DomainPut(kv.AccountsDomain, addr, nil, buf, nil, 0) + require.NoError(t, err) + + err = domains.DomainPut(kv.StorageDomain, addr, loc, []byte{addr[0], loc[0]}, nil, 0) + require.NoError(t, err) + + var v [8]byte + binary.BigEndian.PutUint64(v[:], txNum) + if txNum%135 == 0 { + // pv, step, _, err := ac.GetLatest(kv.CommitmentDomain, commKey2, nil, rwTx) + pv, step, err := domains.DomainGet(kv.CommitmentDomain, commKey2, nil) + require.NoError(t, err) + + err = domains.DomainPut(kv.CommitmentDomain, commKey2, nil, v[:], pv, step) + require.NoError(t, err) + otherMaxWrite = txNum + } else { + // pv, step, _, err := ac.GetLatest(kv.CommitmentDomain, commKey1, nil, rwTx) + pv, step, err := domains.DomainGet(kv.CommitmentDomain, commKey1, nil) + require.NoError(t, err) + + err = domains.DomainPut(kv.CommitmentDomain, commKey1, nil, v[:], pv, step) + require.NoError(t, err) + maxWrite = txNum + } + require.NoError(t, err) + + } + + err = domains.Flush(context.Background(), rwTx) + require.NoError(t, err) + + require.NoError(t, err) + err = rwTx.Commit() + require.NoError(t, err) + rwTx = nil + + err = agg.BuildFiles(txs) + require.NoError(t, err) + + rwTx, err = db.BeginRw(context.Background()) + require.NoError(t, err) + defer rwTx.Rollback() + + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + stat, err := ac.Prune(context.Background(), rwTx, 0, logEvery) + require.NoError(t, err) + t.Logf("Prune: %s", stat) + + err = rwTx.Commit() + require.NoError(t, err) + + err = agg.MergeLoop(context.Background()) + require.NoError(t, err) + + // Check the history + roTx, err := db.BeginRo(context.Background()) + require.NoError(t, err) + defer roTx.Rollback() + + dc := agg.BeginFilesRo() + + v, _, ex, err := dc.GetLatest(kv.CommitmentDomain, commKey1, nil, roTx) + require.NoError(t, err) + require.Truef(t, ex, "key %x not found", commKey1) + + require.EqualValues(t, maxWrite, binary.BigEndian.Uint64(v[:])) + + v, _, ex, err = dc.GetLatest(kv.CommitmentDomain, commKey2, nil, roTx) + require.NoError(t, err) + require.Truef(t, ex, "key %x not found", commKey2) + dc.Close() + + require.EqualValues(t, otherMaxWrite, binary.BigEndian.Uint64(v[:])) +} + +func TestAggregatorV3_MergeValTransform(t *testing.T) { + t.Parallel() + db, agg := testDbAndAggregatorv3(t, 10) + rwTx, err := db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + ac := agg.BeginFilesRo() + defer ac.Close() + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + require.NoError(t, err) + defer domains.Close() + + txs := uint64(1000) + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + + agg.commitmentValuesTransform = true + + state := make(map[string][]byte) + + // keys are encodings of numbers 1..31 + // each key changes value on every txNum which is multiple of the key + //var maxWrite, otherMaxWrite uint64 + for txNum := uint64(1); txNum <= txs; txNum++ { + domains.SetTxNum(txNum) + + addr, loc := make([]byte, length.Addr), make([]byte, length.Hash) + + n, err := rnd.Read(addr) + require.NoError(t, err) + require.EqualValues(t, length.Addr, n) + + n, err = rnd.Read(loc) + require.NoError(t, err) + require.EqualValues(t, length.Hash, n) + + buf := types.EncodeAccountBytesV3(1, uint256.NewInt(txNum*1e6), nil, 0) + err = domains.DomainPut(kv.AccountsDomain, addr, nil, buf, nil, 0) + require.NoError(t, err) + + err = domains.DomainPut(kv.StorageDomain, addr, loc, []byte{addr[0], loc[0]}, nil, 0) + require.NoError(t, err) + + if (txNum+1)%agg.StepSize() == 0 { + _, err := domains.ComputeCommitment(context.Background(), true, txNum/10, "") + require.NoError(t, err) + } + + state[string(addr)] = buf + state[string(addr)+string(loc)] = []byte{addr[0], loc[0]} + } + + err = domains.Flush(context.Background(), rwTx) + require.NoError(t, err) + + err = rwTx.Commit() + require.NoError(t, err) + rwTx = nil + + err = agg.BuildFiles(txs) + require.NoError(t, err) + + ac.Close() + ac = agg.BeginFilesRo() + defer ac.Close() + + rwTx, err = db.BeginRwNosync(context.Background()) + require.NoError(t, err) + defer func() { + if rwTx != nil { + rwTx.Rollback() + } + }() + + logEvery := time.NewTicker(30 * time.Second) + defer logEvery.Stop() + stat, err := ac.Prune(context.Background(), rwTx, 0, logEvery) + require.NoError(t, err) + t.Logf("Prune: %s", stat) + + err = rwTx.Commit() + require.NoError(t, err) + + err = agg.MergeLoop(context.Background()) + require.NoError(t, err) +} + func TestAggregatorV3_RestartOnDatadir(t *testing.T) { t.Parallel() //t.Skip() diff --git a/erigon-lib/state/domain.go b/erigon-lib/state/domain.go index a2dd099e924..f2e641bffc9 100644 --- a/erigon-lib/state/domain.go +++ b/erigon-lib/state/domain.go @@ -32,6 +32,7 @@ import ( "sync" "sync/atomic" "time" + "unsafe" "github.com/erigontech/erigon-lib/metrics" btree2 "github.com/tidwall/btree" @@ -760,7 +761,8 @@ type DomainRoTx struct { keyBuf [60]byte // 52b key and 8b for inverted step comBuf []byte - valsC kv.Cursor + valsC kv.Cursor + vcParentPtr atomic.Uintptr getFromFileCache *DomainGetFromFileCache } @@ -1792,6 +1794,11 @@ func (dt *DomainRoTx) Close() { if dt.files == nil { // invariant: it's safe to call Close multiple times return } + if dt.valsC != nil { + dt.valsC.Close() + dt.vcParentPtr.Store(0) + dt.valsC = nil + } files := dt.files dt.files = nil for i := range files { @@ -1859,10 +1866,20 @@ func (dt *DomainRoTx) statelessBtree(i int) *BtIndex { return r } +var sdTxImmutabilityInvariant = errors.New("tx passed into ShredDomains is immutable") + func (dt *DomainRoTx) valsCursor(tx kv.Tx) (c kv.Cursor, err error) { + eface := *(*[2]uintptr)(unsafe.Pointer(&tx)) if dt.valsC != nil { + if !dt.vcParentPtr.CompareAndSwap(eface[1], eface[1]) { // cant swap when parent ptr is different + panic(fmt.Errorf("%w: cursor parent tx %x; current tx %x", sdTxImmutabilityInvariant, dt.vcParentPtr.Load(), eface[1])) // cursor opened by different tx, invariant broken + } return dt.valsC, nil } + // initialise parent pointer tracking + if !dt.vcParentPtr.CompareAndSwap(0, eface[1]) { + panic(fmt.Errorf("%w: cursor parent tx %x; current tx %x", sdTxImmutabilityInvariant, dt.vcParentPtr.Load(), eface[1])) // cursor opened by different tx, invariant broken + } if dt.d.largeVals { dt.valsC, err = tx.Cursor(dt.d.valsTable) diff --git a/erigon-lib/state/domain_shared_test.go b/erigon-lib/state/domain_shared_test.go index 17606175321..2ca7088f7ee 100644 --- a/erigon-lib/state/domain_shared_test.go +++ b/erigon-lib/state/domain_shared_test.go @@ -237,7 +237,8 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + wrwTx := WrapTxWithCtx(rwTx, ac) + domains, err := NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() @@ -267,7 +268,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -275,7 +276,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete marker is in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -305,7 +306,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -326,7 +327,9 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { _, err := ac.Prune(ctx, rwTx, 0, nil) require.NoError(err) - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + + wrwTx = WrapTxWithCtx(rwTx, ac) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -335,7 +338,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete/update more keys in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() @@ -355,7 +358,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-3), iterCount(domains)) @@ -365,7 +368,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wrwTx, log.New()) require.NoError(err) defer domains.Close() domains.SetTxNum(domains.TxNum() + 1) @@ -391,14 +394,15 @@ func TestSharedDomain_StorageIter(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + wtxRw := WrapTxWithCtx(rwTx, ac) + domains, err := NewSharedDomains(wtxRw, log.New()) require.NoError(t, err) defer domains.Close() maxTx := 3*stepSize + 10 hashes := make([][]byte, maxTx) - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(wtxRw, log.New()) require.NoError(t, err) defer domains.Close() @@ -512,7 +516,6 @@ func TestSharedDomain_StorageIter(t *testing.T) { require.Zero(t, missed) require.Zero(t, notRemoved) } - fmt.Printf("deleted\n") err = domains.Flush(ctx, rwTx) require.NoError(t, err) diff --git a/erigon-lib/state/domain_test.go b/erigon-lib/state/domain_test.go index 5f29a48b56e..8ef5eebaf0c 100644 --- a/erigon-lib/state/domain_test.go +++ b/erigon-lib/state/domain_test.go @@ -614,6 +614,53 @@ func TestDomain_ScanFiles(t *testing.T) { checkHistory(t, db, d, txs) } +func TestDomainRoTx_CursorParentCheck(t *testing.T) { + t.Parallel() + + logger := log.New() + db, d := testDbAndDomain(t, logger) + ctx, require := context.Background(), require.New(t) + tx, err := db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + + dc := d.BeginFilesRo() + defer dc.Close() + writer := dc.NewWriter() + defer writer.close() + + val := []byte("value1") + writer.SetTxNum(1) + writer.addValue([]byte("key1"), nil, val) + + err = writer.Flush(ctx, tx) + require.NoError(err) + err = tx.Commit() + require.NoError(err) + + tx, err = db.BeginRw(ctx) + require.NoError(err) + defer tx.Rollback() + + cursor, err := dc.valsCursor(tx) + require.NoError(err) + require.NotNil(cursor) + tx.Rollback() + + otherTx, err := db.BeginRw(ctx) + require.NoError(err) + defer otherTx.Rollback() + + defer func() { + r := recover() + require.NotNil(r) + re := r.(error) + require.ErrorIs(re, sdTxImmutabilityInvariant) + }() + + dc.GetLatest([]byte("key1"), nil, otherTx) +} + func TestDomain_Delete(t *testing.T) { t.Parallel()