Skip to content

Commit

Permalink
feat: use shared DB transaction for updates in ledger state
Browse files Browse the repository at this point in the history
This adds support for managing a transaction across both DB backends. It
uses this new support for creating a shared transaction for "atomic" write
operations and using it for all operations that update the DB.

Fixes #105
  • Loading branch information
agaffney committed Aug 27, 2024
1 parent 280338a commit f68ac7c
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 119 deletions.
98 changes: 98 additions & 0 deletions database/commit_timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package database

import (
"fmt"
"math/big"

badger "github.com/dgraph-io/badger/v4"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

const (
commitTimestampBlobKey = "metadata_commit_timestamp"
commitTimestampRowId = 1
)

// CommitTimestamp represents the sqlite table used to track the current commit timestamp
type CommitTimestamp struct {
ID uint `gorm:"primarykey"`
Timestamp int64
}

func (CommitTimestamp) TableName() string {
return "commit_timestamp"
}

func (b *BaseDatabase) checkCommitTimestamp() error {
// Create table if it doesn't exist
if err := b.Metadata().AutoMigrate(&CommitTimestamp{}); err != nil {
return err
}
// Get value from sqlite
var tmpCommitTimestamp CommitTimestamp
result := b.Metadata().First(&tmpCommitTimestamp)
if result.Error != nil {
// No metadata yet, so nothing to check
if result.Error == gorm.ErrRecordNotFound {
return nil
}
return result.Error
}
// Get value from badger
err := b.Blob().View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(commitTimestampBlobKey))
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
tmpTimestamp := new(big.Int).SetBytes(val).Int64()
// Compare values
if tmpTimestamp != tmpCommitTimestamp.Timestamp {
return fmt.Errorf("commit timestamp mismatch: %d (metadata) != %d (blob)", tmpCommitTimestamp.Timestamp, tmpTimestamp)
}
return nil
})
if err != nil {
return err
}
return nil
}

func (b *BaseDatabase) updateCommitTimestamp(txn *Transaction, timestamp int64) error {
// Update sqlite
tmpCommitTimestamp := CommitTimestamp{
ID: commitTimestampRowId,
Timestamp: timestamp,
}
result := txn.Metadata().Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"timestamp"}),
}).Create(&tmpCommitTimestamp)
if result.Error != nil {
return result.Error
}
// Update badger
tmpTimestamp := new(big.Int).SetInt64(timestamp)
if err := txn.Blob().Set([]byte(commitTimestampBlobKey), tmpTimestamp.Bytes()); err != nil {
return err
}
return nil
}
11 changes: 11 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
type Database interface {
Metadata() *gorm.DB
Blob() *badger.DB
Transaction(bool) *Transaction
updateCommitTimestamp(*Transaction, int64) error
}

type BaseDatabase struct {
Expand All @@ -53,6 +55,11 @@ func (b *BaseDatabase) Blob() *badger.DB {
return b.blob
}

// Transaction starts a new database transaction and returns a handle to it
func (b *BaseDatabase) Transaction(readWrite bool) *Transaction {
return NewTransaction(b, readWrite)
}

func (b *BaseDatabase) init() error {
if b.logger == nil {
// Create logger to throw away logs
Expand All @@ -65,6 +72,10 @@ func (b *BaseDatabase) init() error {
}
// Configure metrics for Badger DB
b.registerBadgerMetrics()
// Check commit timestamp
if err := b.checkCommitTimestamp(); err != nil {
return err
}
// Run GC periodically for Badger DB
b.blobGcTimer = time.NewTicker(5 * time.Minute)
go b.blobGc()
Expand Down
109 changes: 109 additions & 0 deletions database/txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package database

import (
"fmt"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"gorm.io/gorm"
)

// Transaction is a wrapper around the transaction objects for the underlying DB engines
type Transaction struct {
lock sync.Mutex
finished bool
db Database
readWrite bool
blobTxn *badger.Txn
metadataTxn *gorm.DB
}

func NewTransaction(db Database, readWrite bool) *Transaction {
return &Transaction{
db: db,
readWrite: readWrite,
blobTxn: db.Blob().NewTransaction(readWrite),
metadataTxn: db.Metadata().Begin(),
}
}

func (t *Transaction) Metadata() *gorm.DB {
return t.metadataTxn
}

func (t *Transaction) Blob() *badger.Txn {
return t.blobTxn
}

// Do executes the specified function in the context of the transaction. Any errors returned will result
// in the transaction being rolled back
func (t *Transaction) Do(fn func(*Transaction) error) error {
if err := fn(t); err != nil {
if err2 := t.Rollback(); err2 != nil {
return fmt.Errorf("rollback failed: %w: original error: %w", err2, err)
}
return err
}
if err := t.Commit(); err != nil {
return fmt.Errorf("commit failed: %w", err)
}
return nil
}

func (t *Transaction) Commit() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.finished {
return nil
}
// No need to commit for read-only, but we do want to free up resources
if !t.readWrite {
return t.Rollback()
}
// Update the commit timestamp for both DBs
commitTimestamp := time.Now().UnixMilli()
if err := t.db.updateCommitTimestamp(t, commitTimestamp); err != nil {
return err
}
// Commit sqlite transaction
if result := t.metadataTxn.Commit(); result.Error != nil {
// Failed to commit metadata DB, so discard blob txn
t.blobTxn.Discard()
return result.Error
}
// Commit badger transaction
if err := t.blobTxn.Commit(); err != nil {
return err
}
t.finished = true
return nil
}

func (t *Transaction) Rollback() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.finished {
return nil
}
t.blobTxn.Discard()
if result := t.metadataTxn.Rollback(); result.Error != nil {
return result.Error
}
t.finished = true
return nil
}
14 changes: 12 additions & 2 deletions state/models/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Block struct {
Cbor []byte `gorm:"-"` // This is here for convenience but not represented in the metadata DB
}

func (Block) TableName() string {
return "block"
}

func (b Block) Decode() (ledger.Block, error) {
return ledger.NewBlockFromCbor(b.Type, b.Cbor)
}
Expand Down Expand Up @@ -63,19 +67,25 @@ func (b *Block) loadCbor(badgerDb *badger.DB) error {
func BlockByPoint(db database.Database, point ocommon.Point) (Block, error) {
var tmpBlock Block
result := db.Metadata().First(&tmpBlock, "slot = ? AND hash = ?", point.Slot, point.Hash)
if result.Error != nil {
return tmpBlock, result.Error
}
if err := tmpBlock.loadCbor(db.Blob()); err != nil {
return tmpBlock, err
}
return tmpBlock, result.Error
return tmpBlock, nil
}

func BlockByNumber(db database.Database, blockNumber uint64) (Block, error) {
var tmpBlock Block
result := db.Metadata().First(&tmpBlock, "number = ?", blockNumber)
if result.Error != nil {
return tmpBlock, result.Error
}
if err := tmpBlock.loadCbor(db.Blob()); err != nil {
return tmpBlock, err
}
return tmpBlock, result.Error
return tmpBlock, nil
}

func BlockBlobKey(slot uint64, hash []byte) []byte {
Expand Down
54 changes: 27 additions & 27 deletions state/models/utxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,21 @@ type Utxo struct {
Cbor []byte `gorm:"-"` // This is here for convenience but not represented in the metadata DB
}

func (Utxo) TableName() string {
return "utxo"
}

func (u Utxo) Decode() (ledger.TransactionOutput, error) {
return ledger.NewTransactionOutputFromCbor(u.Cbor)
}

func (u *Utxo) loadCbor(badgerDb *badger.DB) error {
func (u *Utxo) loadCbor(txn *database.Transaction) error {
key := UtxoBlobKey(u.TxId, u.OutputIdx)
err := badgerDb.View(func(txn *badger.Txn) error {
item, err := txn.Get(key)
if err != nil {
return err
}
u.Cbor, err = item.ValueCopy(nil)
if err != nil {
return err
}
return nil
})
item, err := txn.Blob().Get(key)
if err != nil {
return err
}
u.Cbor, err = item.ValueCopy(nil)
if err != nil {
if errors.Is(err, badger.ErrKeyNotFound) {
return nil
Expand All @@ -61,67 +59,69 @@ func (u *Utxo) loadCbor(badgerDb *badger.DB) error {
return nil
}

func UtxoByRef(db database.Database, txId []byte, outputIdx uint32) (Utxo, error) {
func UtxoByRef(txn *database.Transaction, txId []byte, outputIdx uint32) (Utxo, error) {
var tmpUtxo Utxo
result := db.Metadata().First(&tmpUtxo, "tx_id = ? AND output_idx = ?", txId, outputIdx)
if err := tmpUtxo.loadCbor(db.Blob()); err != nil {
result := txn.Metadata().First(&tmpUtxo, "tx_id = ? AND output_idx = ?", txId, outputIdx)
if result.Error != nil {
return tmpUtxo, result.Error
}
if err := tmpUtxo.loadCbor(txn); err != nil {
return tmpUtxo, err
}
return tmpUtxo, result.Error
return tmpUtxo, nil
}

func UtxosByAddress(db database.Database, addr ledger.Address) ([]Utxo, error) {
func UtxosByAddress(txn *database.Transaction, addr ledger.Address) ([]Utxo, error) {
var ret []Utxo
// Build sub-query for address
var addrQuery *gorm.DB
if addr.PaymentKeyHash() != ledger.NewBlake2b224(nil) {
addrQuery = db.Metadata().Where("payment_key = ?", addr.PaymentKeyHash().Bytes())
addrQuery = txn.Metadata().Where("payment_key = ?", addr.PaymentKeyHash().Bytes())
}
if addr.StakeKeyHash() != ledger.NewBlake2b224(nil) {
if addrQuery != nil {
addrQuery = addrQuery.Or("staking_key = ?", addr.StakeKeyHash().Bytes())
} else {
addrQuery = db.Metadata().Where("staking_key = ?", addr.StakeKeyHash().Bytes())
addrQuery = txn.Metadata().Where("staking_key = ?", addr.StakeKeyHash().Bytes())
}
}
result := db.Metadata().Where("deleted_slot = 0").Where(addrQuery).Find(&ret)
result := txn.Metadata().Where("deleted_slot = 0").Where(addrQuery).Find(&ret)
if result.Error != nil {
return nil, result.Error
}
// Load CBOR from blob DB for each UTxO
for idx, tmpUtxo := range ret {
if err := tmpUtxo.loadCbor(db.Blob()); err != nil {
if err := tmpUtxo.loadCbor(txn); err != nil {
return nil, err
}
ret[idx] = tmpUtxo
}
return ret, nil
}

func UtxoDelete(db database.Database, utxo Utxo) error {
func UtxoDelete(txn *database.Transaction, utxo Utxo) error {
// Remove from metadata DB
if result := db.Metadata().Delete(&utxo); result.Error != nil {
if result := txn.Metadata().Delete(&utxo); result.Error != nil {
return result.Error
}
// Remove from blob DB
key := UtxoBlobKey(utxo.TxId, utxo.OutputIdx)
err := db.Blob().Update(func(txn *badger.Txn) error {
err := txn.Delete(key)
return err
})
err := txn.Blob().Delete(key)
if err != nil {
return err
}
return nil
}

/*
func UtxoDeleteByRef(db database.Database, txId []byte, outputIdx uint32) error {
utxo, err := UtxoByRef(db, txId, outputIdx)
if err != nil {
return err
}
return UtxoDelete(db, utxo)
}
*/

func UtxoBlobKey(txId []byte, outputIdx uint32) []byte {
key := []byte("u")
Expand Down
Loading

0 comments on commit f68ac7c

Please sign in to comment.