From f68ac7cf161908c145aa91701c0c76568b227375 Mon Sep 17 00:00:00 2001 From: Aurora Gaffney Date: Mon, 26 Aug 2024 16:33:34 -0500 Subject: [PATCH] feat: use shared DB transaction for updates in ledger state This adds support for managing a transaction across both DB backends. It uses this new support for creating a shared transaction for "atomic" write operations and using it for all operations that update the DB. Fixes #105 --- database/commit_timestamp.go | 98 +++++++++++++++++ database/database.go | 11 ++ database/txn.go | 109 ++++++++++++++++++ state/models/block.go | 14 ++- state/models/utxo.go | 54 ++++----- state/state.go | 208 ++++++++++++++++++++--------------- 6 files changed, 375 insertions(+), 119 deletions(-) create mode 100644 database/commit_timestamp.go create mode 100644 database/txn.go diff --git a/database/commit_timestamp.go b/database/commit_timestamp.go new file mode 100644 index 0000000..84ae78a --- /dev/null +++ b/database/commit_timestamp.go @@ -0,0 +1,98 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "fmt" + "math/big" + + badger "github.com/dgraph-io/badger/v4" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +const ( + commitTimestampBlobKey = "metadata_commit_timestamp" + commitTimestampRowId = 1 +) + +// CommitTimestamp represents the sqlite table used to track the current commit timestamp +type CommitTimestamp struct { + ID uint `gorm:"primarykey"` + Timestamp int64 +} + +func (CommitTimestamp) TableName() string { + return "commit_timestamp" +} + +func (b *BaseDatabase) checkCommitTimestamp() error { + // Create table if it doesn't exist + if err := b.Metadata().AutoMigrate(&CommitTimestamp{}); err != nil { + return err + } + // Get value from sqlite + var tmpCommitTimestamp CommitTimestamp + result := b.Metadata().First(&tmpCommitTimestamp) + if result.Error != nil { + // No metadata yet, so nothing to check + if result.Error == gorm.ErrRecordNotFound { + return nil + } + return result.Error + } + // Get value from badger + err := b.Blob().View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(commitTimestampBlobKey)) + if err != nil { + return err + } + val, err := item.ValueCopy(nil) + if err != nil { + return err + } + tmpTimestamp := new(big.Int).SetBytes(val).Int64() + // Compare values + if tmpTimestamp != tmpCommitTimestamp.Timestamp { + return fmt.Errorf("commit timestamp mismatch: %d (metadata) != %d (blob)", tmpCommitTimestamp.Timestamp, tmpTimestamp) + } + return nil + }) + if err != nil { + return err + } + return nil +} + +func (b *BaseDatabase) updateCommitTimestamp(txn *Transaction, timestamp int64) error { + // Update sqlite + tmpCommitTimestamp := CommitTimestamp{ + ID: commitTimestampRowId, + Timestamp: timestamp, + } + result := txn.Metadata().Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"timestamp"}), + }).Create(&tmpCommitTimestamp) + if result.Error != nil { + return result.Error + } + // Update badger + tmpTimestamp := new(big.Int).SetInt64(timestamp) + if err := txn.Blob().Set([]byte(commitTimestampBlobKey), tmpTimestamp.Bytes()); err != nil { + return err + } + return nil +} diff --git a/database/database.go b/database/database.go index 9dca196..7660099 100644 --- a/database/database.go +++ b/database/database.go @@ -34,6 +34,8 @@ import ( type Database interface { Metadata() *gorm.DB Blob() *badger.DB + Transaction(bool) *Transaction + updateCommitTimestamp(*Transaction, int64) error } type BaseDatabase struct { @@ -53,6 +55,11 @@ func (b *BaseDatabase) Blob() *badger.DB { return b.blob } +// Transaction starts a new database transaction and returns a handle to it +func (b *BaseDatabase) Transaction(readWrite bool) *Transaction { + return NewTransaction(b, readWrite) +} + func (b *BaseDatabase) init() error { if b.logger == nil { // Create logger to throw away logs @@ -65,6 +72,10 @@ func (b *BaseDatabase) init() error { } // Configure metrics for Badger DB b.registerBadgerMetrics() + // Check commit timestamp + if err := b.checkCommitTimestamp(); err != nil { + return err + } // Run GC periodically for Badger DB b.blobGcTimer = time.NewTicker(5 * time.Minute) go b.blobGc() diff --git a/database/txn.go b/database/txn.go new file mode 100644 index 0000000..e73bb3a --- /dev/null +++ b/database/txn.go @@ -0,0 +1,109 @@ +// Copyright 2024 Blink Labs Software +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package database + +import ( + "fmt" + "sync" + "time" + + "github.com/dgraph-io/badger/v4" + "gorm.io/gorm" +) + +// Transaction is a wrapper around the transaction objects for the underlying DB engines +type Transaction struct { + lock sync.Mutex + finished bool + db Database + readWrite bool + blobTxn *badger.Txn + metadataTxn *gorm.DB +} + +func NewTransaction(db Database, readWrite bool) *Transaction { + return &Transaction{ + db: db, + readWrite: readWrite, + blobTxn: db.Blob().NewTransaction(readWrite), + metadataTxn: db.Metadata().Begin(), + } +} + +func (t *Transaction) Metadata() *gorm.DB { + return t.metadataTxn +} + +func (t *Transaction) Blob() *badger.Txn { + return t.blobTxn +} + +// Do executes the specified function in the context of the transaction. Any errors returned will result +// in the transaction being rolled back +func (t *Transaction) Do(fn func(*Transaction) error) error { + if err := fn(t); err != nil { + if err2 := t.Rollback(); err2 != nil { + return fmt.Errorf("rollback failed: %w: original error: %w", err2, err) + } + return err + } + if err := t.Commit(); err != nil { + return fmt.Errorf("commit failed: %w", err) + } + return nil +} + +func (t *Transaction) Commit() error { + t.lock.Lock() + defer t.lock.Unlock() + if t.finished { + return nil + } + // No need to commit for read-only, but we do want to free up resources + if !t.readWrite { + return t.Rollback() + } + // Update the commit timestamp for both DBs + commitTimestamp := time.Now().UnixMilli() + if err := t.db.updateCommitTimestamp(t, commitTimestamp); err != nil { + return err + } + // Commit sqlite transaction + if result := t.metadataTxn.Commit(); result.Error != nil { + // Failed to commit metadata DB, so discard blob txn + t.blobTxn.Discard() + return result.Error + } + // Commit badger transaction + if err := t.blobTxn.Commit(); err != nil { + return err + } + t.finished = true + return nil +} + +func (t *Transaction) Rollback() error { + t.lock.Lock() + defer t.lock.Unlock() + if t.finished { + return nil + } + t.blobTxn.Discard() + if result := t.metadataTxn.Rollback(); result.Error != nil { + return result.Error + } + t.finished = true + return nil +} diff --git a/state/models/block.go b/state/models/block.go index 18a1fab..d3f6d89 100644 --- a/state/models/block.go +++ b/state/models/block.go @@ -34,6 +34,10 @@ type Block struct { Cbor []byte `gorm:"-"` // This is here for convenience but not represented in the metadata DB } +func (Block) TableName() string { + return "block" +} + func (b Block) Decode() (ledger.Block, error) { return ledger.NewBlockFromCbor(b.Type, b.Cbor) } @@ -63,19 +67,25 @@ func (b *Block) loadCbor(badgerDb *badger.DB) error { func BlockByPoint(db database.Database, point ocommon.Point) (Block, error) { var tmpBlock Block result := db.Metadata().First(&tmpBlock, "slot = ? AND hash = ?", point.Slot, point.Hash) + if result.Error != nil { + return tmpBlock, result.Error + } if err := tmpBlock.loadCbor(db.Blob()); err != nil { return tmpBlock, err } - return tmpBlock, result.Error + return tmpBlock, nil } func BlockByNumber(db database.Database, blockNumber uint64) (Block, error) { var tmpBlock Block result := db.Metadata().First(&tmpBlock, "number = ?", blockNumber) + if result.Error != nil { + return tmpBlock, result.Error + } if err := tmpBlock.loadCbor(db.Blob()); err != nil { return tmpBlock, err } - return tmpBlock, result.Error + return tmpBlock, nil } func BlockBlobKey(slot uint64, hash []byte) []byte { diff --git a/state/models/utxo.go b/state/models/utxo.go index 3bec856..af26813 100644 --- a/state/models/utxo.go +++ b/state/models/utxo.go @@ -35,23 +35,21 @@ type Utxo struct { Cbor []byte `gorm:"-"` // This is here for convenience but not represented in the metadata DB } +func (Utxo) TableName() string { + return "utxo" +} + func (u Utxo) Decode() (ledger.TransactionOutput, error) { return ledger.NewTransactionOutputFromCbor(u.Cbor) } -func (u *Utxo) loadCbor(badgerDb *badger.DB) error { +func (u *Utxo) loadCbor(txn *database.Transaction) error { key := UtxoBlobKey(u.TxId, u.OutputIdx) - err := badgerDb.View(func(txn *badger.Txn) error { - item, err := txn.Get(key) - if err != nil { - return err - } - u.Cbor, err = item.ValueCopy(nil) - if err != nil { - return err - } - return nil - }) + item, err := txn.Blob().Get(key) + if err != nil { + return err + } + u.Cbor, err = item.ValueCopy(nil) if err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return nil @@ -61,36 +59,39 @@ func (u *Utxo) loadCbor(badgerDb *badger.DB) error { return nil } -func UtxoByRef(db database.Database, txId []byte, outputIdx uint32) (Utxo, error) { +func UtxoByRef(txn *database.Transaction, txId []byte, outputIdx uint32) (Utxo, error) { var tmpUtxo Utxo - result := db.Metadata().First(&tmpUtxo, "tx_id = ? AND output_idx = ?", txId, outputIdx) - if err := tmpUtxo.loadCbor(db.Blob()); err != nil { + result := txn.Metadata().First(&tmpUtxo, "tx_id = ? AND output_idx = ?", txId, outputIdx) + if result.Error != nil { + return tmpUtxo, result.Error + } + if err := tmpUtxo.loadCbor(txn); err != nil { return tmpUtxo, err } - return tmpUtxo, result.Error + return tmpUtxo, nil } -func UtxosByAddress(db database.Database, addr ledger.Address) ([]Utxo, error) { +func UtxosByAddress(txn *database.Transaction, addr ledger.Address) ([]Utxo, error) { var ret []Utxo // Build sub-query for address var addrQuery *gorm.DB if addr.PaymentKeyHash() != ledger.NewBlake2b224(nil) { - addrQuery = db.Metadata().Where("payment_key = ?", addr.PaymentKeyHash().Bytes()) + addrQuery = txn.Metadata().Where("payment_key = ?", addr.PaymentKeyHash().Bytes()) } if addr.StakeKeyHash() != ledger.NewBlake2b224(nil) { if addrQuery != nil { addrQuery = addrQuery.Or("staking_key = ?", addr.StakeKeyHash().Bytes()) } else { - addrQuery = db.Metadata().Where("staking_key = ?", addr.StakeKeyHash().Bytes()) + addrQuery = txn.Metadata().Where("staking_key = ?", addr.StakeKeyHash().Bytes()) } } - result := db.Metadata().Where("deleted_slot = 0").Where(addrQuery).Find(&ret) + result := txn.Metadata().Where("deleted_slot = 0").Where(addrQuery).Find(&ret) if result.Error != nil { return nil, result.Error } // Load CBOR from blob DB for each UTxO for idx, tmpUtxo := range ret { - if err := tmpUtxo.loadCbor(db.Blob()); err != nil { + if err := tmpUtxo.loadCbor(txn); err != nil { return nil, err } ret[idx] = tmpUtxo @@ -98,23 +99,21 @@ func UtxosByAddress(db database.Database, addr ledger.Address) ([]Utxo, error) { return ret, nil } -func UtxoDelete(db database.Database, utxo Utxo) error { +func UtxoDelete(txn *database.Transaction, utxo Utxo) error { // Remove from metadata DB - if result := db.Metadata().Delete(&utxo); result.Error != nil { + if result := txn.Metadata().Delete(&utxo); result.Error != nil { return result.Error } // Remove from blob DB key := UtxoBlobKey(utxo.TxId, utxo.OutputIdx) - err := db.Blob().Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }) + err := txn.Blob().Delete(key) if err != nil { return err } return nil } +/* func UtxoDeleteByRef(db database.Database, txId []byte, outputIdx uint32) error { utxo, err := UtxoByRef(db, txId, outputIdx) if err != nil { @@ -122,6 +121,7 @@ func UtxoDeleteByRef(db database.Database, txId []byte, outputIdx uint32) error } return UtxoDelete(db, utxo) } +*/ func UtxoBlobKey(txId []byte, outputIdx uint32) []byte { key := []byte("u") diff --git a/state/state.go b/state/state.go index e424acb..a793006 100644 --- a/state/state.go +++ b/state/state.go @@ -30,7 +30,6 @@ import ( "github.com/blinklabs-io/gouroboros/ledger" ochainsync "github.com/blinklabs-io/gouroboros/protocol/chainsync" ocommon "github.com/blinklabs-io/gouroboros/protocol/common" - badger "github.com/dgraph-io/badger/v4" ) const ( @@ -94,8 +93,13 @@ func (ls *LedgerState) scheduleCleanupConsumedUtxos() { ls.timerCleanupConsumedUtxos = time.AfterFunc( cleanupConsumedUtxosInterval, func() { - // Schedule the next run when we finish - defer ls.scheduleCleanupConsumedUtxos() + ls.Lock() + defer func() { + // Schedule the next run when we finish + ls.scheduleCleanupConsumedUtxos() + // Unlock ledger state + ls.Unlock() + }() // Get the current tip, since we're querying by slot tip, err := ls.Tip() if err != nil { @@ -104,23 +108,26 @@ func (ls *LedgerState) scheduleCleanupConsumedUtxos() { ) return } - // Get UTxOs that are marked as deleted and older than our slot window - var tmpUtxos []models.Utxo - result := ls.db.Metadata().Where("deleted_slot <= ?", tip.Point.Slot-cleanupConsumedUtxosSlotWindow).Order("id DESC").Find(&tmpUtxos) - if result.Error != nil { - ls.logger.Error( - fmt.Sprintf("failed to query consumed UTxOs: %s", result.Error), - ) - return - } - // Delete the UTxOs - for _, utxo := range tmpUtxos { - if err := models.UtxoDelete(ls.db, utxo); err != nil { - ls.logger.Error( - fmt.Sprintf("failed to remove consumed UTxO: %s", err), - ) - return + // Perform updates in a transaction + txn := ls.db.Transaction(true) + err = txn.Do(func(txn *database.Transaction) error { + // Get UTxOs that are marked as deleted and older than our slot window + var tmpUtxos []models.Utxo + result := ls.db.Metadata().Where("deleted_slot <= ?", tip.Point.Slot-cleanupConsumedUtxosSlotWindow).Order("id DESC").Find(&tmpUtxos) + if result.Error != nil { + return fmt.Errorf("failed to query consumed UTxOs: %w", result.Error) + } + // Delete the UTxOs + for _, utxo := range tmpUtxos { + if err := models.UtxoDelete(txn, utxo); err != nil { + return fmt.Errorf("failed to remove consumed UTxO: %w", err) + } } + return nil + }) + if err != nil { + ls.logger.Error(err.Error()) + return } }, ) @@ -150,32 +157,40 @@ func (ls *LedgerState) handleEventChainSync(evt event.Event) { } func (ls *LedgerState) handleEventChainSyncRollback(e ChainsyncEvent) error { - // Remove rolled-back blocks in reverse order - var tmpBlocks []models.Block - result := ls.db.Metadata().Where("slot > ?", e.Point.Slot).Order("slot DESC").Find(&tmpBlocks) - if result.Error != nil { - return fmt.Errorf("query blocks: %w", result.Error) - } - for _, tmpBlock := range tmpBlocks { - if err := ls.removeBlock(tmpBlock); err != nil { - return fmt.Errorf("remove block: %w", err) + // Start a transaction + txn := ls.db.Transaction(true) + err := txn.Do(func(txn *database.Transaction) error { + // Remove rolled-back blocks in reverse order + var tmpBlocks []models.Block + result := txn.Metadata().Where("slot > ?", e.Point.Slot).Order("slot DESC").Find(&tmpBlocks) + if result.Error != nil { + return fmt.Errorf("query blocks: %w", result.Error) } - } - // Delete rolled-back UTxOs - var tmpUtxos []models.Utxo - result = ls.db.Metadata().Where("added_slot > ?", e.Point.Slot).Order("id DESC").Find(&tmpUtxos) - if result.Error != nil { - return fmt.Errorf("remove rolled-backup UTxOs: %w", result.Error) - } - for _, utxo := range tmpUtxos { - if err := models.UtxoDelete(ls.db, utxo); err != nil { - return fmt.Errorf("remove rolled-back UTxO: %w", err) + for _, tmpBlock := range tmpBlocks { + if err := ls.removeBlock(txn, tmpBlock); err != nil { + return fmt.Errorf("remove block: %w", err) + } } - } - // Restore spent UTxOs - result = ls.db.Metadata().Model(models.Utxo{}).Where("deleted_slot > ?", e.Point.Slot).Update("deleted_slot", 0) - if result.Error != nil { - return fmt.Errorf("restore spent UTxOs after rollback: %w", result.Error) + // Delete rolled-back UTxOs + var tmpUtxos []models.Utxo + result = txn.Metadata().Where("added_slot > ?", e.Point.Slot).Order("id DESC").Find(&tmpUtxos) + if result.Error != nil { + return fmt.Errorf("remove rolled-backup UTxOs: %w", result.Error) + } + for _, utxo := range tmpUtxos { + if err := models.UtxoDelete(txn, utxo); err != nil { + return fmt.Errorf("remove rolled-back UTxO: %w", err) + } + } + // Restore spent UTxOs + result = txn.Metadata().Model(models.Utxo{}).Where("deleted_slot > ?", e.Point.Slot).Update("deleted_slot", 0) + if result.Error != nil { + return fmt.Errorf("restore spent UTxOs after rollback: %w", result.Error) + } + return nil + }) + if err != nil { + return err } // Generate event ls.eventBus.Publish( @@ -196,7 +211,6 @@ func (ls *LedgerState) handleEventChainSyncRollback(e ChainsyncEvent) error { } func (ls *LedgerState) handleEventChainSyncBlock(e ChainsyncEvent) error { - // Add block to database tmpBlock := models.Block{ Slot: e.Point.Slot, Hash: e.Point.Hash, @@ -206,33 +220,42 @@ func (ls *LedgerState) handleEventChainSyncBlock(e ChainsyncEvent) error { Type: e.Type, Cbor: e.Block.Cbor(), } - if err := ls.AddBlock(tmpBlock); err != nil { - return fmt.Errorf("add block: %w", err) - } - // Process transactions - for _, tx := range e.Block.Transactions() { - // Process consumed UTxOs - for _, consumed := range tx.Consumed() { - if err := ls.consumeUtxo(consumed, e.Point.Slot); err != nil { - return fmt.Errorf("remove consumed UTxO: %w", err) - } + // Start a transaction + txn := ls.db.Transaction(true) + err := txn.Do(func(txn *database.Transaction) error { + // Add block to database + if err := ls.addBlock(txn, tmpBlock); err != nil { + return fmt.Errorf("add block: %w", err) } - // Process produced UTxOs - for _, produced := range tx.Produced() { - outAddr := produced.Output.Address() - tmpUtxo := models.Utxo{ - TxId: produced.Id.Id().Bytes(), - OutputIdx: produced.Id.Index(), - AddedSlot: e.Point.Slot, - PaymentKey: outAddr.PaymentKeyHash().Bytes(), - StakingKey: outAddr.StakeKeyHash().Bytes(), - Cbor: produced.Output.Cbor(), + // Process transactions + for _, tx := range e.Block.Transactions() { + // Process consumed UTxOs + for _, consumed := range tx.Consumed() { + if err := ls.consumeUtxo(txn, consumed, e.Point.Slot); err != nil { + return fmt.Errorf("remove consumed UTxO: %w", err) + } } - if err := ls.addUtxo(tmpUtxo); err != nil { - return fmt.Errorf("add produced UTxO: %w", err) + // Process produced UTxOs + for _, produced := range tx.Produced() { + outAddr := produced.Output.Address() + tmpUtxo := models.Utxo{ + TxId: produced.Id.Id().Bytes(), + OutputIdx: produced.Id.Index(), + AddedSlot: e.Point.Slot, + PaymentKey: outAddr.PaymentKeyHash().Bytes(), + StakingKey: outAddr.StakeKeyHash().Bytes(), + Cbor: produced.Output.Cbor(), + } + if err := ls.addUtxo(txn, tmpUtxo); err != nil { + return fmt.Errorf("add produced UTxO: %w", err) + } } + // XXX: generate event for each TX/UTxO? } - // XXX: generate event for each TX/UTxO? + return nil + }) + if err != nil { + return err } // Generate event ls.eventBus.Publish( @@ -253,18 +276,15 @@ func (ls *LedgerState) handleEventChainSyncBlock(e ChainsyncEvent) error { return nil } -func (ls *LedgerState) addUtxo(utxo models.Utxo) error { +func (ls *LedgerState) addUtxo(txn *database.Transaction, utxo models.Utxo) error { // Add UTxO to blob DB key := models.UtxoBlobKey(utxo.TxId, utxo.OutputIdx) - err := ls.db.Blob().Update(func(txn *badger.Txn) error { - err := txn.Set(key, utxo.Cbor) - return err - }) + err := txn.Blob().Set(key, utxo.Cbor) if err != nil { return err } // Add to metadata DB - if result := ls.db.Metadata().Create(&utxo); result.Error != nil { + if result := txn.Metadata().Create(&utxo); result.Error != nil { return result.Error } return nil @@ -272,9 +292,9 @@ func (ls *LedgerState) addUtxo(utxo models.Utxo) error { // consumeUtxo marks a UTxO as "deleted" without actually deleting it. This allows for a UTxO // to be easily on rollback -func (ls *LedgerState) consumeUtxo(utxoId ledger.TransactionInput, slot uint64) error { +func (ls *LedgerState) consumeUtxo(txn *database.Transaction, utxoId ledger.TransactionInput, slot uint64) error { // Find UTxO - utxo, err := models.UtxoByRef(ls.db, utxoId.Id().Bytes(), utxoId.Index()) + utxo, err := models.UtxoByRef(txn, utxoId.Id().Bytes(), utxoId.Index()) if err != nil { // TODO: make this configurable? if err == gorm.ErrRecordNotFound { @@ -284,40 +304,34 @@ func (ls *LedgerState) consumeUtxo(utxoId ledger.TransactionInput, slot uint64) } // Mark as deleted in specified slot utxo.DeletedSlot = slot - if result := ls.db.Metadata().Save(&utxo); result.Error != nil { + if result := txn.Metadata().Save(&utxo); result.Error != nil { return result.Error } return nil } -func (ls *LedgerState) AddBlock(block models.Block) error { +func (ls *LedgerState) addBlock(txn *database.Transaction, block models.Block) error { // Add block to blob DB key := models.BlockBlobKey(block.Slot, block.Hash) - err := ls.db.Blob().Update(func(txn *badger.Txn) error { - err := txn.Set(key, block.Cbor) - return err - }) + err := txn.Blob().Set(key, block.Cbor) if err != nil { return err } // Add to metadata DB - if result := ls.db.Metadata().Create(&block); result.Error != nil { + if result := txn.Metadata().Create(&block); result.Error != nil { return result.Error } return nil } -func (ls *LedgerState) removeBlock(block models.Block) error { +func (ls *LedgerState) removeBlock(txn *database.Transaction, block models.Block) error { // Remove from metadata DB - if result := ls.db.Metadata().Delete(&block); result.Error != nil { + if result := txn.Metadata().Delete(&block); result.Error != nil { return result.Error } // Remove from blob DB key := models.BlockBlobKey(block.Slot, block.Hash) - err := ls.db.Blob().Update(func(txn *badger.Txn) error { - err := txn.Delete(key) - return err - }) + err := txn.Blob().Delete(key) if err != nil { return err } @@ -402,12 +416,26 @@ func (ls *LedgerState) Tip() (ochainsync.Tip, error) { func (ls *LedgerState) UtxoByRef(txId []byte, outputIdx uint32) (models.Utxo, error) { ls.RLock() defer ls.RUnlock() - return models.UtxoByRef(ls.db, txId, outputIdx) + txn := ls.db.Transaction(false) + var ret models.Utxo + err := txn.Do(func(txn *database.Transaction) error { + var err error + ret, err = models.UtxoByRef(txn, txId, outputIdx) + return err + }) + return ret, err } // UtxosByAddress returns all UTxOs that belong to the specified address func (ls *LedgerState) UtxosByAddress(addr ledger.Address) ([]models.Utxo, error) { ls.RLock() defer ls.RUnlock() - return models.UtxosByAddress(ls.db, addr) + txn := ls.db.Transaction(false) + var ret []models.Utxo + err := txn.Do(func(txn *database.Transaction) error { + var err error + ret, err = models.UtxosByAddress(txn, addr) + return err + }) + return ret, err }