Skip to content

Commit

Permalink
fix: the reference root issue (cosmos#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Mar 26, 2024
1 parent 3a59fa2 commit 2894221
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 139 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Improvements

- [#909](https://github.com/cosmos/iavl/pull/909) Async pruning of legacy nodes.
- [#874](https://github.com/cosmos/iavl/pull/874) Decouple `cosmos-db` and implement own `db` package.
- [#695](https://github.com/cosmos/iavl/pull/695) Add API `SaveChangeSet` to save the changeset as a new version.
- [#703](https://github.com/cosmos/iavl/pull/703) New APIs `NewCompressExporter`/`NewCompressImporter` to support more compact snapshot format.
Expand All @@ -14,6 +15,7 @@

### Bug Fixes

- [#909](https://github.com/cosmos/iavl/pull/909) Fix the reference node formats and delete legacy nodes.
- [#773](https://github.com/cosmos/iavl/pull/773) Fix memory leak in `Import`.
- [#795](https://github.com/cosmos/iavl/pull/795) Fix plugin used for buf generate.
- [#801](https://github.com/cosmos/iavl/pull/801) Fix rootKey empty check by len equals 0.
Expand Down
52 changes: 40 additions & 12 deletions batch.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package iavl

import (
"sync"

dbm "github.com/cosmos/iavl/db"
)

// BatchWithFlusher is a wrapper
// around batch that flushes batch's data to disk
// as soon as the configurable limit is reached.
type BatchWithFlusher struct {
mtx sync.Mutex
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

Expand Down Expand Up @@ -46,18 +49,19 @@ func (b *BatchWithFlusher) estimateSizeAfterSetting(key []byte, value []byte) (i
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The addition entry is then added to the batch.
func (b *BatchWithFlusher) Set(key, value []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, value)
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Set(key, value)
}
Expand All @@ -67,31 +71,55 @@ func (b *BatchWithFlusher) Set(key, value []byte) error {
// the batch is flushed to disk, cleared, and a new one is created with buffer pre-allocated to threshold.
// The deletion entry is then added to the batch.
func (b *BatchWithFlusher) Delete(key []byte) error {
b.mtx.Lock()
defer b.mtx.Unlock()

batchSizeAfter, err := b.estimateSizeAfterSetting(key, []byte{})
if err != nil {
return err
}
if batchSizeAfter > b.flushThreshold {
if err := b.batch.Write(); err != nil {
b.mtx.Unlock()
if err := b.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
b.mtx.Lock()
}
return b.batch.Delete(key)
}

func (b *BatchWithFlusher) Write() error {
return b.batch.Write()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.Write(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) WriteSync() error {
return b.batch.WriteSync()
b.mtx.Lock()
defer b.mtx.Unlock()

if err := b.batch.WriteSync(); err != nil {
return err
}
if err := b.batch.Close(); err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
return nil
}

func (b *BatchWithFlusher) Close() error {
b.mtx.Lock()
defer b.mtx.Unlock()

return b.batch.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion import.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (i *Importer) Commit() error {
return err
}
if i.stack[0].nodeKey.version < i.version { // it means there is no update in the given version
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKeyPrefix(i.stack[0].nodeKey.version)); err != nil {
if err := i.batch.Set(i.tree.ndb.nodeKey(GetRootKey(i.version)), i.tree.ndb.nodeKey(i.stack[0].nodeKey.GetKey())); err != nil {
return err
}
}
Expand Down
156 changes: 147 additions & 9 deletions migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"os/exec"
"path"
"testing"
"time"

"cosmossdk.io/log"
"github.com/stretchr/testify/require"

dbm "github.com/cosmos/iavl/db"
iavlrand "github.com/cosmos/iavl/internal/rand"
)

const (
dbType = "goleveldb"
)

func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string, error) {
func createLegacyTree(t *testing.T, dbDir string, version int) (string, error) {
relateDir := path.Join(t.TempDir(), dbDir)
if _, err := os.Stat(relateDir); err == nil {
err := os.RemoveAll(relateDir)
Expand Down Expand Up @@ -48,10 +50,10 @@ func createLegacyTree(t *testing.T, dbType, dbDir string, version int) (string,
func TestLazySet(t *testing.T) {
legacyVersion := 1000
dbDir := fmt.Sprintf("legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -91,10 +93,10 @@ func TestLazySet(t *testing.T) {
func TestLegacyReferenceNode(t *testing.T) {
legacyVersion := 20
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand All @@ -111,6 +113,7 @@ func TestLegacyReferenceNode(t *testing.T) {
// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)
legacyLatestVersion := tree.root.nodeKey.version

// Commit new versions without updates
_, _, err = tree.SaveVersion()
Expand All @@ -123,17 +126,17 @@ func TestLegacyReferenceNode(t *testing.T) {
_, err = newTree.LoadVersion(version - 1)
require.NoError(t, err)
// Check if the reference node is refactored
require.Equal(t, newTree.root.nodeKey.nonce, uint32(1))
require.Equal(t, newTree.root.nodeKey.version, int64(legacyVersion))
require.Equal(t, newTree.root.nodeKey.nonce, uint32(0))
require.Equal(t, newTree.root.nodeKey.version, legacyLatestVersion)
}

func TestDeleteVersions(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbType, dbDir, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", "goleveldb", relateDir)
db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
Expand Down Expand Up @@ -205,3 +208,138 @@ func TestDeleteVersions(t *testing.T) {
pVersions = tree.AvailableVersions()
require.Equal(t, postVersions/2, len(pVersions))
}

func TestPruning(t *testing.T) {
legacyVersion := 100
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

// Load the latest version
tree := NewMutableTree(db, 1000, false, log.NewTestLogger(t))
_, err = tree.Load()
require.NoError(t, err)

// Save 10 versions without updates
for i := 0; i < 10; i++ {
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

// Save 990 versions
leavesCount := 10
toVersion := int64(990)
pruningInterval := int64(20)
for i := int64(0); i < toVersion; i++ {
for j := 0; j < leavesCount; j++ {
_, err := tree.Set([]byte(fmt.Sprintf("key%d", j)), []byte(fmt.Sprintf("value%d", j)))
require.NoError(t, err)
}
_, v, err := tree.SaveVersion()
require.NoError(t, err)
if v%pruningInterval == 0 {
err = tree.DeleteVersionsTo(v - pruningInterval/2)
require.NoError(t, err)
}
}

// Wait for pruning to finish
for i := 0; i < 100; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion))
require.NoError(t, err)
if !isLeacy {
break
}
// Simulate the consensus state update
time.Sleep(500 * time.Millisecond)
}
// Reload the tree
tree = NewMutableTree(db, 0, false, log.NewNopLogger())
versions := tree.AvailableVersions()
require.Equal(t, versions[0], int(toVersion)+legacyVersion+1)
for _, v := range versions {
_, err := tree.LoadVersion(int64(v))
require.NoError(t, err)
}
// Check if the legacy nodes are pruned
_, err = tree.Load()
require.NoError(t, err)
itr, err := NewNodeIterator(tree.root.GetKey(), tree.ndb)
require.NoError(t, err)
legacyNodes := make(map[string]*Node)
for ; itr.Valid(); itr.Next(false) {
node := itr.GetNode()
if node.nodeKey.nonce == 0 {
legacyNodes[string(node.hash)] = node
}
}

lNodes, err := tree.ndb.legacyNodes()
require.NoError(t, err)
require.Len(t, lNodes, len(legacyNodes))
for _, node := range lNodes {
_, ok := legacyNodes[string(node.hash)]
require.True(t, ok)
}
}

func TestRandomSet(t *testing.T) {
legacyVersion := 50
dbDir := fmt.Sprintf("./legacy-%s-%d", dbType, legacyVersion)
relateDir, err := createLegacyTree(t, dbDir, legacyVersion)
require.NoError(t, err)

db, err := dbm.NewDB("test", dbType, relateDir)
require.NoError(t, err)

defer func() {
if err := db.Close(); err != nil {
t.Errorf("DB close error: %v\n", err)
}
if err := os.RemoveAll(relateDir); err != nil {
t.Errorf("%+v\n", err)
}
}()

tree := NewMutableTree(db, 10000, false, log.NewNopLogger())

// Load the latest legacy version
_, err = tree.LoadVersion(int64(legacyVersion))
require.NoError(t, err)

// Commit new versions
postVersions := 1000
emptyVersions := 10
for i := 0; i < emptyVersions; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
}
for i := 0; i < postVersions-emptyVersions; i++ {
leafCount := rand.Intn(50)
for j := 0; j < leafCount; j++ {
key := iavlrand.RandBytes(10)
value := iavlrand.RandBytes(10)
_, err = tree.Set(key, value)
require.NoError(t, err)
}
_, _, err = tree.SaveVersion()
require.NoError(t, err)
}

err = tree.DeleteVersionsTo(int64(legacyVersion + postVersions - 1))
require.NoError(t, err)
}
Loading

0 comments on commit 2894221

Please sign in to comment.