Skip to content

Commit

Permalink
Merge pull request #98 from coinbase/patrick/support-block-gaps
Browse files Browse the repository at this point in the history
Support Omitted Blocks
  • Loading branch information
patrick-ogrady authored Aug 10, 2020
2 parents 0edccc4 + e455c22 commit 6bcf424
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 70 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/coinbase/rosetta-cli
go 1.13

require (
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f
github.com/coinbase/rosetta-sdk-go v0.3.4
github.com/dgraph-io/badger/v2 v2.0.3
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/ethereum/go-ethereum v1.9.18
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f h1:U69ZwbTR10diY1MDi9LP/RxetVJzjd4bvIDUEs8XYvk=
github.com/coinbase/rosetta-sdk-go v0.3.4-0.20200807162047-31075a509b1f/go.mod h1:Q6dAY0kdG2X3jNaIYnkxnZOb8XEZQar9Q1RcnBgm/wQ=
github.com/coinbase/rosetta-sdk-go v0.3.4 h1:jWKgajozco/T0FNnZb2TqBsmsUoF6ZuCLnUJkEE+vNg=
github.com/coinbase/rosetta-sdk-go v0.3.4/go.mod h1:Q6dAY0kdG2X3jNaIYnkxnZOb8XEZQar9Q1RcnBgm/wQ=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/reconciler_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (h *ReconcilerHelper) BlockExists(
ctx context.Context,
block *types.BlockIdentifier,
) (bool, error) {
_, err := h.blockStorage.GetBlock(ctx, block)
_, err := h.blockStorage.GetBlock(ctx, types.ConstructPartialBlockIdentifier(block))
if err == nil {
return true, nil
}
Expand Down
161 changes: 101 additions & 60 deletions pkg/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,8 @@ const (
// blockNamespace is prepended to any stored block.
blockNamespace = "block"

// blockHashNamespace is prepended to any stored block hash.
// We cannot just use the stored block key to lookup whether
// a hash has been used before because it is concatenated
// with the index of the stored block.
blockHashNamespace = "block-hash"
// blockIndexNamespace is prepended to any stored block index.
blockIndexNamespace = "block-index"

// transactionHashNamespace is prepended to any stored
// transaction hash.
Expand All @@ -55,9 +52,9 @@ var (
// found in BlockStorage.
ErrBlockNotFound = errors.New("block not found")

// ErrDuplicateBlockHash is returned when a block hash
// ErrDuplicateKey is returned when a key
// cannot be stored because it is a duplicate.
ErrDuplicateBlockHash = errors.New("duplicate block hash")
ErrDuplicateKey = errors.New("duplicate key")

// ErrDuplicateTransactionHash is returned when a transaction
// hash cannot be stored because it is a duplicate.
Expand All @@ -68,14 +65,12 @@ func getHeadBlockKey() []byte {
return []byte(headBlockKey)
}

func getBlockKey(blockIdentifier *types.BlockIdentifier) []byte {
return []byte(
fmt.Sprintf("%s/%s/%d", blockNamespace, blockIdentifier.Hash, blockIdentifier.Index),
)
func getBlockHashKey(hash string) []byte {
return []byte(fmt.Sprintf("%s/%s", blockNamespace, hash))
}

func getBlockHashKey(blockIdentifier *types.BlockIdentifier) []byte {
return []byte(fmt.Sprintf("%s/%s", blockHashNamespace, blockIdentifier.Hash))
func getBlockIndexKey(index int64) []byte {
return []byte(fmt.Sprintf("%s/%d", blockIndexNamespace, index))
}

func getTransactionHashKey(transactionIdentifier *types.TransactionIdentifier) []byte {
Expand Down Expand Up @@ -174,18 +169,42 @@ func (b *BlockStorage) StoreHeadBlockIdentifier(
// GetBlock returns a block, if it exists.
func (b *BlockStorage) GetBlock(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
blockIdentifier *types.PartialBlockIdentifier,
) (*types.Block, error) {
transaction := b.db.NewDatabaseTransaction(ctx, false)
defer transaction.Discard(ctx)

exists, block, err := transaction.Get(ctx, getBlockKey(blockIdentifier))
var exists bool
var block []byte
var err error
switch {
case blockIdentifier == nil || (blockIdentifier.Hash == nil && blockIdentifier.Index == nil):
// Get current block when no blockIdentifier is provided
var head *types.BlockIdentifier
head, err = b.GetHeadBlockIdentifierTransactional(ctx, transaction)
if err != nil {
return nil, fmt.Errorf("%w: cannot get head block identifier", err)
}

exists, block, err = transaction.Get(ctx, getBlockHashKey(head.Hash))
case blockIdentifier.Hash != nil:
// Get block by hash if provided
exists, block, err = transaction.Get(ctx, getBlockHashKey(*blockIdentifier.Hash))
default:
// Get block by index if hash not provided
var blockKey []byte
exists, blockKey, err = transaction.Get(ctx, getBlockIndexKey(*blockIdentifier.Index))
if exists {
exists, block, err = transaction.Get(ctx, blockKey)
}
}

if err != nil {
return nil, err
return nil, fmt.Errorf("%w: unable to get block", err)
}

if !exists {
return nil, fmt.Errorf("%w %+v", ErrBlockNotFound, blockIdentifier)
return nil, fmt.Errorf("%w: %+v", ErrBlockNotFound, blockIdentifier)
}

var rosettaBlock types.Block
Expand All @@ -197,33 +216,48 @@ func (b *BlockStorage) GetBlock(
return &rosettaBlock, nil
}

// AddBlock stores a block or returns an error.
func (b *BlockStorage) AddBlock(
func (b *BlockStorage) storeBlock(
ctx context.Context,
transaction DatabaseTransaction,
block *types.Block,
) error {
transaction := b.db.NewDatabaseTransaction(ctx, true)
defer transaction.Discard(ctx)

buf, err := encode(block)
if err != nil {
return err
return fmt.Errorf("%w: unable to encode block", err)
}

// Store block
err = transaction.Set(ctx, getBlockKey(block.BlockIdentifier), buf)
if err != nil {
return err
if err := b.storeUniqueKey(ctx, transaction, getBlockHashKey(block.BlockIdentifier.Hash), buf); err != nil {
return fmt.Errorf("%w: unable to store block", err)
}

if err = b.StoreHeadBlockIdentifier(ctx, transaction, block.BlockIdentifier); err != nil {
return err
if err := b.storeUniqueKey(
ctx,
transaction,
getBlockIndexKey(block.BlockIdentifier.Index),
getBlockHashKey(block.BlockIdentifier.Hash),
); err != nil {
return fmt.Errorf("%w: unable to store block index", err)
}

if err := b.StoreHeadBlockIdentifier(ctx, transaction, block.BlockIdentifier); err != nil {
return fmt.Errorf("%w: unable to update head block identifier", err)
}

// Store block hash
err = b.storeBlockHash(ctx, transaction, block.BlockIdentifier)
return nil
}

// AddBlock stores a block or returns an error.
func (b *BlockStorage) AddBlock(
ctx context.Context,
block *types.Block,
) error {
transaction := b.db.NewDatabaseTransaction(ctx, true)
defer transaction.Discard(ctx)

// Store block
err := b.storeBlock(ctx, transaction, block)
if err != nil {
return fmt.Errorf("%w: unable to store block hash", err)
return fmt.Errorf("%w: unable to store block", err)
}

// Store all transaction hashes
Expand All @@ -242,6 +276,27 @@ func (b *BlockStorage) AddBlock(
return b.callWorkersAndCommit(ctx, block, transaction, true)
}

func (b *BlockStorage) deleteBlock(
ctx context.Context,
transaction DatabaseTransaction,
block *types.Block,
) error {
blockIdentifier := block.BlockIdentifier
if err := transaction.Delete(ctx, getBlockHashKey(blockIdentifier.Hash)); err != nil {
return fmt.Errorf("%w: unable to delete block", err)
}

if err := transaction.Delete(ctx, getBlockIndexKey(blockIdentifier.Index)); err != nil {
return fmt.Errorf("%w: unable to delete block index", err)
}

if err := b.StoreHeadBlockIdentifier(ctx, transaction, block.ParentBlockIdentifier); err != nil {
return fmt.Errorf("%w: unable to update head block identifier", err)
}

return nil
}

// RemoveBlock removes a block or returns an error.
// RemoveBlock also removes the block hash and all
// its transaction hashes to not break duplicate
Expand All @@ -250,7 +305,7 @@ func (b *BlockStorage) RemoveBlock(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
) error {
block, err := b.GetBlock(ctx, blockIdentifier)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(blockIdentifier))
if err != nil {
return err
}
Expand All @@ -266,19 +321,9 @@ func (b *BlockStorage) RemoveBlock(
}
}

// Remove block hash
err = transaction.Delete(ctx, getBlockHashKey(blockIdentifier))
if err != nil {
return err
}

// Remove block
if err := transaction.Delete(ctx, getBlockKey(blockIdentifier)); err != nil {
return err
}

if err = b.StoreHeadBlockIdentifier(ctx, transaction, block.ParentBlockIdentifier); err != nil {
return err
// Delete block
if err := b.deleteBlock(ctx, transaction, block); err != nil {
return fmt.Errorf("%w: unable to delete block", err)
}

return b.callWorkersAndCommit(ctx, block, transaction, false)
Expand Down Expand Up @@ -348,7 +393,7 @@ func (b *BlockStorage) SetNewStartIndex(
currBlock := head
for currBlock.Index >= startIndex {
log.Printf("Removing block %+v\n", currBlock)
block, err := b.GetBlock(ctx, currBlock)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(currBlock))
if err != nil {
return err
}
Expand All @@ -373,7 +418,7 @@ func (b *BlockStorage) CreateBlockCache(ctx context.Context) []*types.BlockIdent
}

for len(cache) < syncer.PastBlockSize {
block, err := b.GetBlock(ctx, head)
block, err := b.GetBlock(ctx, types.ConstructPartialBlockIdentifier(head))
if err != nil {
return cache
}
Expand All @@ -392,22 +437,22 @@ func (b *BlockStorage) CreateBlockCache(ctx context.Context) []*types.BlockIdent
return cache
}

func (b *BlockStorage) storeBlockHash(
func (b *BlockStorage) storeUniqueKey(
ctx context.Context,
transaction DatabaseTransaction,
block *types.BlockIdentifier,
key []byte,
value []byte,
) error {
hashKey := getBlockHashKey(block)
exists, _, err := transaction.Get(ctx, hashKey)
exists, _, err := transaction.Get(ctx, key)
if err != nil {
return err
}

if exists {
return fmt.Errorf("%w: duplicate block hash %s found", ErrDuplicateBlockHash, block.Hash)
return fmt.Errorf("%w: duplicate key %s found", ErrDuplicateKey, string(key))
}

return transaction.Set(ctx, hashKey, []byte(""))
return transaction.Set(ctx, key, value)
}

func (b *BlockStorage) storeTransactionHash(
Expand Down Expand Up @@ -518,7 +563,7 @@ func (b *BlockStorage) FindTransaction(
}
}

blockExists, block, err := txn.Get(ctx, getBlockKey(newestBlock))
blockExists, block, err := txn.Get(ctx, getBlockHashKey(newestBlock.Hash))
if err != nil {
return nil, nil, fmt.Errorf("%w: unable to query database for block", err)
}
Expand Down Expand Up @@ -554,15 +599,11 @@ func (b *BlockStorage) AtTip(
ctx context.Context,
tipDelay int64,
) (bool, error) {
head, err := b.GetHeadBlockIdentifier(ctx)
block, err := b.GetBlock(ctx, nil)
if errors.Is(err, ErrHeadBlockNotFound) {
return false, nil
}
if err != nil {
return false, fmt.Errorf("%w: unable to get head block identifir", err)
}

block, err := b.GetBlock(ctx, head)
if err != nil {
return false, fmt.Errorf("%w: unable to get head block", err)
}
Expand Down
Loading

0 comments on commit 6bcf424

Please sign in to comment.