Skip to content

Commit

Permalink
feat: use shared DB transaction for updates in ledger state
Browse files Browse the repository at this point in the history
This adds support for managing a transaction across both DB backends. It
uses this new support for creating a shared transaction for "atomic" write
operations and using it for all operations that update the DB.

Fixes #105
  • Loading branch information
agaffney committed Aug 27, 2024
1 parent 280338a commit 0c85aa4
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 93 deletions.
94 changes: 94 additions & 0 deletions database/commit_timestamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package database

import (
"fmt"
"math/big"

badger "github.com/dgraph-io/badger/v4"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

const (
commitTimestampBlobKey = "metadata_commit_timestamp"
commitTimestampRowId = 1
)

// CommitTimestamp represents the sqlite table used to track the current commit timestamp
type CommitTimestamp struct {
ID uint `gorm:"primarykey"`
Timestamp int64
}

func (b *BaseDatabase) checkCommitTimestamp() error {
// Create table if it doesn't exist
if err := b.Metadata().AutoMigrate(&CommitTimestamp{}); err != nil {
return err
}
// Get value from sqlite
var tmpCommitTimestamp CommitTimestamp
result := b.Metadata().First(&tmpCommitTimestamp)
if result.Error != nil {
// No metadata yet, so nothing to check
if result.Error == gorm.ErrRecordNotFound {
return nil
}
return result.Error
}
// Get value from badger
err := b.Blob().View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(commitTimestampBlobKey))
if err != nil {
return err
}
val, err := item.ValueCopy(nil)
if err != nil {
return err
}
tmpTimestamp := new(big.Int).SetBytes(val).Int64()
// Compare values
if tmpTimestamp != tmpCommitTimestamp.Timestamp {
return fmt.Errorf("commit timestamp mismatch: %d (metadata) != %d (blob)", tmpCommitTimestamp.Timestamp, tmpTimestamp)
}
return nil
})
if err != nil {
return err
}
return nil
}

func (b *BaseDatabase) updateCommitTimestamp(txn *Transaction, timestamp int64) error {
// Update sqlite
tmpCommitTimestamp := CommitTimestamp{
ID: commitTimestampRowId,
Timestamp: timestamp,
}
result := txn.Metadata().Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"timestamp"}),
}).Create(&tmpCommitTimestamp)
if result.Error != nil {
return result.Error
}
// Update badger
tmpTimestamp := new(big.Int).SetInt64(timestamp)
if err := txn.Blob().Set([]byte(commitTimestampBlobKey), tmpTimestamp.Bytes()); err != nil {
return err
}
return nil
}
11 changes: 11 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
type Database interface {
Metadata() *gorm.DB
Blob() *badger.DB
Transaction(bool) *Transaction
updateCommitTimestamp(*Transaction, int64) error
}

type BaseDatabase struct {
Expand All @@ -53,6 +55,11 @@ func (b *BaseDatabase) Blob() *badger.DB {
return b.blob
}

// Transaction starts a new database transaction and returns a handle to it
func (b *BaseDatabase) Transaction(readWrite bool) *Transaction {
return NewTransaction(b, readWrite)
}

func (b *BaseDatabase) init() error {
if b.logger == nil {
// Create logger to throw away logs
Expand All @@ -65,6 +72,10 @@ func (b *BaseDatabase) init() error {
}
// Configure metrics for Badger DB
b.registerBadgerMetrics()
// Check commit timestamp
if err := b.checkCommitTimestamp(); err != nil {
return err
}
// Run GC periodically for Badger DB
b.blobGcTimer = time.NewTicker(5 * time.Minute)
go b.blobGc()
Expand Down
109 changes: 109 additions & 0 deletions database/txn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2024 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package database

import (
"fmt"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"gorm.io/gorm"
)

// Transaction is a wrapper around the transaction objects for the underlying DB engines
type Transaction struct {
lock sync.Mutex
finished bool
db Database
readWrite bool
blobTxn *badger.Txn
metadataTxn *gorm.DB
}

func NewTransaction(db Database, readWrite bool) *Transaction {
return &Transaction{
db: db,
readWrite: readWrite,
blobTxn: db.Blob().NewTransaction(readWrite),
metadataTxn: db.Metadata().Begin(),
}
}

func (t *Transaction) Metadata() *gorm.DB {
return t.metadataTxn
}

func (t *Transaction) Blob() *badger.Txn {
return t.blobTxn
}

// Do executes the specified function in the context of the transaction. Any errors returned will result
// in the transaction being rolled back
func (t *Transaction) Do(fn func(*Transaction) error) error {
if err := fn(t); err != nil {
if err2 := t.Rollback(); err2 != nil {
return fmt.Errorf("rollback failed: %w: original error: %w", err2, err)
}
return err
}
if err := t.Commit(); err != nil {
return fmt.Errorf("commit failed: %w", err)
}
return nil
}

func (t *Transaction) Commit() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.finished {
return nil
}
// No need to commit for read-only, but we do want to free up resources
if !t.readWrite {
return t.Rollback()
}
// Update the commit timestamp for both DBs
commitTimestamp := time.Now().UnixMilli()
if err := t.db.updateCommitTimestamp(t, commitTimestamp); err != nil {
return err
}
// Commit sqlite transaction
if result := t.metadataTxn.Commit(); result.Error != nil {
// Failed to commit metadata DB, so discard blob txn
t.blobTxn.Discard()
return result.Error
}
// Commit badger transaction
if err := t.blobTxn.Commit(); err != nil {
return err
}
t.finished = true
return nil
}

func (t *Transaction) Rollback() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.finished {
return nil
}
t.blobTxn.Discard()
if result := t.metadataTxn.Rollback(); result.Error != nil {
return result.Error
}
t.finished = true
return nil
}
11 changes: 5 additions & 6 deletions state/models/utxo.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,30 +98,29 @@ func UtxosByAddress(db database.Database, addr ledger.Address) ([]Utxo, error) {
return ret, nil
}

func UtxoDelete(db database.Database, utxo Utxo) error {
func UtxoDelete(txn *database.Transaction, utxo Utxo) error {
// Remove from metadata DB
if result := db.Metadata().Delete(&utxo); result.Error != nil {
if result := txn.Metadata().Delete(&utxo); result.Error != nil {
return result.Error
}
// Remove from blob DB
key := UtxoBlobKey(utxo.TxId, utxo.OutputIdx)
err := db.Blob().Update(func(txn *badger.Txn) error {
err := txn.Delete(key)
return err
})
err := txn.Blob().Delete(key)
if err != nil {
return err
}
return nil
}

/*
func UtxoDeleteByRef(db database.Database, txId []byte, outputIdx uint32) error {
utxo, err := UtxoByRef(db, txId, outputIdx)
if err != nil {
return err
}
return UtxoDelete(db, utxo)
}
*/

func UtxoBlobKey(txId []byte, outputIdx uint32) []byte {
key := []byte("u")
Expand Down
Loading

0 comments on commit 0c85aa4

Please sign in to comment.