Skip to content

Commit

Permalink
Merge pull request #19 from coinbase/patrick/persist-inactive
Browse files Browse the repository at this point in the history
Bootstrap Reconciler with Previously Seen Accounts
  • Loading branch information
patrick-ogrady authored May 13, 2020
2 parents 5a24976 + 57bb7d4 commit cc2e0c4
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 36 deletions.
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Available Commands:
check Check the correctness of a Rosetta Node API Server
create:configuration Generate a static configuration file for the Asserter
help Help about any command
version Print rosetta-cli version
view:account View an account balance
view:block View a block
Expand All @@ -43,6 +44,20 @@ Flags:
Use "rosetta-cli [command] --help" for more information about a command.
```

### version
```
Print rosetta-cli version
Usage:
rosetta-cli version [flags]
Flags:
-h, --help help for version
Global Flags:
--server-url string base URL for a Rosetta server (default "http://localhost:8080")
```

### check
```
Check all server responses are properly constructed, that
Expand Down Expand Up @@ -79,7 +94,6 @@ Usage:
rosetta-cli check [flags]
Flags:
--account-concurrency uint concurrency to use while fetching accounts during reconciliation (default 8)
--block-concurrency uint concurrency to use while fetching blocks (default 8)
--bootstrap-balances string Absolute path to a file used to bootstrap balances before starting syncing.
Populating this value after beginning syncing will return an error.
Expand All @@ -101,6 +115,7 @@ Flags:
--lookup-balance-by-block When set to true, balances are looked up at the block where a balance
change occurred instead of at the current block. Blockchains that do not support
historical balance lookup should set this to false. (default true)
--reconciler-concurrency uint concurrency to use while fetching accounts during reconciliation (default 8)
--start int block index to start syncing (default -1)
--transaction-concurrency uint concurrency to use while fetching transactions (if required) (default 16)
Expand Down
23 changes: 15 additions & 8 deletions cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ of what one of these files looks like.`,
// while fetching transactions (if required).
TransactionConcurrency uint64

// AccountConcurrency is the concurrency to use
// ReconcilerConcurrency is the concurrency to use
// while fetching accounts during reconciliation.
AccountConcurrency uint64
ReconcilerConcurrency uint64

// LogBlocks determines if blocks are
// logged.
Expand Down Expand Up @@ -204,8 +204,8 @@ func init() {
"concurrency to use while fetching transactions (if required)",
)
checkCmd.Flags().Uint64Var(
&AccountConcurrency,
"account-concurrency",
&ReconcilerConcurrency,
"reconciler-concurrency",
8,
"concurrency to use while fetching accounts during reconciliation",
)
Expand Down Expand Up @@ -355,6 +355,12 @@ func runCheckCmd(cmd *cobra.Command, args []string) {
}
}

// Get all previously seen accounts
seenAccounts, err := blockStorage.GetAllAccountCurrency(ctx)
if err != nil {
log.Fatal(fmt.Errorf("%w: unable to get previously seen accounts", err))
}

reconcilerHelper := processor.NewReconcilerHelper(
blockStorage,
)
Expand All @@ -365,14 +371,15 @@ func runCheckCmd(cmd *cobra.Command, args []string) {
HaltOnReconciliationError,
)

r := reconciler.NewReconciler(
r := reconciler.New(
primaryNetwork,
reconcilerHelper,
reconcilerHandler,
fetcher,
AccountConcurrency,
LookupBalanceByBlock,
interestingAccounts,
reconciler.WithReconcilerConcurrency(int(ReconcilerConcurrency)),
reconciler.WithLookupBalanceByBlock(LookupBalanceByBlock),
reconciler.WithInterestingAccounts(interestingAccounts),
reconciler.WithSeenAccounts(seenAccounts),
)

syncerHandler := processor.NewSyncerHandler(
Expand Down
11 changes: 11 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cmd

import (
"fmt"

"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -47,4 +49,13 @@ func init() {
rootCmd.AddCommand(viewBlockCmd)
rootCmd.AddCommand(viewAccountCmd)
rootCmd.AddCommand(createConfigurationCmd)
rootCmd.AddCommand(versionCmd)
}

var versionCmd = &cobra.Command{
Use: "version",
Short: "Print rosetta-cli version",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("v0.2.1")
},
}
13 changes: 11 additions & 2 deletions cmd/view_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,20 @@ func runViewBlockCmd(cmd *cobra.Command, args []string) {

// Print out all balance changes in a given block. This does NOT exempt
// any operations/accounts from parsing.
parser := parser.New(newFetcher.Asserter, func(*types.Operation) bool { return false })
changes, err := parser.BalanceChanges(ctx, block, false)
p := parser.New(newFetcher.Asserter, func(*types.Operation) bool { return false })
changes, err := p.BalanceChanges(ctx, block, false)
if err != nil {
log.Fatal(fmt.Errorf("%w: unable to calculate balance changes", err))
}

log.Printf("Balance Changes: %s\n", types.PrettyPrintStruct(changes))

// Print out all OperationGroups for each transaction in a block.
for _, tx := range block.Transactions {
log.Printf(
"Transaction %s Operation Groups: %s\n",
tx.TransactionIdentifier.Hash,
types.PrettyPrintStruct(parser.GroupOperations(tx)),
)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/coinbase/rosetta-cli
go 1.13

require (
github.com/coinbase/rosetta-sdk-go v0.1.11-0.20200512194317-06d0663c9207
github.com/coinbase/rosetta-sdk-go v0.2.1-0.20200513162126-2278020ff1c2
github.com/dgraph-io/badger v1.6.0
github.com/mattn/goveralls v0.0.5 // indirect
github.com/spf13/cobra v0.0.5
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ github.com/coinbase/rosetta-sdk-go v0.1.10 h1:AzmvZXhsTNjXAcLfkgdbO0OWTj++ZeyW4Z
github.com/coinbase/rosetta-sdk-go v0.1.10/go.mod h1:Z3yIflVjfPH1tYN/ucYcnJuXnxIr1xzO26YLla6jYLw=
github.com/coinbase/rosetta-sdk-go v0.1.11-0.20200512194317-06d0663c9207 h1:Xg6sVVLsMjrlg3CFylzW8uNy258DJ81BMSggrXSQZjg=
github.com/coinbase/rosetta-sdk-go v0.1.11-0.20200512194317-06d0663c9207/go.mod h1:XKM7urGHLqGQJi9kM97N+GpMLJuCAGYXy2wOm3KzxEE=
github.com/coinbase/rosetta-sdk-go v0.2.1-0.20200513022350-4a871d015a27 h1:HfYzThWKaGwbh2jGrVkbpn5EEopjLamohm1rkkpTttg=
github.com/coinbase/rosetta-sdk-go v0.2.1-0.20200513022350-4a871d015a27/go.mod h1:XKM7urGHLqGQJi9kM97N+GpMLJuCAGYXy2wOm3KzxEE=
github.com/coinbase/rosetta-sdk-go v0.2.1-0.20200513162126-2278020ff1c2 h1:ibaPMZAs6dlh+lmexnEnL3DcaZ+r9NUknPQ/9FgZ7MQ=
github.com/coinbase/rosetta-sdk-go v0.2.1-0.20200513162126-2278020ff1c2/go.mod h1:XKM7urGHLqGQJi9kM97N+GpMLJuCAGYXy2wOm3KzxEE=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down
13 changes: 13 additions & 0 deletions internal/processor/reconciler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,16 @@ func (h *ReconcilerHandler) ReconciliationSucceeded(
block,
)
}

// NewAccountSeen is called each time the reconciler adds a new
// AccountCurrency to the inactiveQueue. These AccountCurrency
// should be persisted to pass to the reconciler on restart.
func (h *ReconcilerHandler) NewAccountSeen(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
) error {
// We don't persist new accounts here because we are already storing
// them when we set their balance in the internal/storage package.
return nil
}
29 changes: 29 additions & 0 deletions internal/storage/badger_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,32 @@ func (b *BadgerStorage) Get(

return true, value, nil
}

// Scan fetches all items at a given prefix. This is typically
// used to get all items in a namespace.
func (b *BadgerStorage) Scan(
ctx context.Context,
prefix []byte,
) ([][]byte, error) {
values := [][]byte{}
err := b.db.View(func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()
for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
item := it.Item()
err := item.Value(func(v []byte) error {
values = append(values, v)
return nil
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return nil, err
}

return values, nil
}
16 changes: 16 additions & 0 deletions internal/storage/badger_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package storage

import (
"context"
"fmt"
"testing"

"github.com/coinbase/rosetta-cli/internal/utils"
Expand Down Expand Up @@ -52,6 +53,21 @@ func TestDatabase(t *testing.T) {
assert.Equal(t, []byte("hola"), value)
assert.NoError(t, err)
})

t.Run("Scan", func(t *testing.T) {
storedValues := make([][]byte, 100)
for i := 0; i < 100; i++ {
v := []byte(fmt.Sprintf("%d", i))
err := database.Set(ctx, []byte(fmt.Sprintf("test/%d", i)), v)
assert.NoError(t, err)

storedValues[i] = v
}

values, err := database.Scan(ctx, []byte("test/"))
assert.NoError(t, err)
assert.ElementsMatch(t, storedValues, values)
})
}

func TestDatabaseTransaction(t *testing.T) {
Expand Down
66 changes: 42 additions & 24 deletions internal/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package storage
import (
"bytes"
"context"
"crypto/sha256"
"encoding/gob"
"encoding/json"
"errors"
Expand All @@ -29,6 +28,7 @@ import (

"github.com/coinbase/rosetta-sdk-go/asserter"
"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/reconciler"
"github.com/coinbase/rosetta-sdk-go/syncer"
"github.com/coinbase/rosetta-sdk-go/types"
)
Expand All @@ -51,6 +51,9 @@ const (

// balanceNamespace is prepended to any stored balance.
balanceNamespace = "balance"

// blockNamespace is prepended to any stored block.
blockNamespace = "block"
)

var (
Expand Down Expand Up @@ -87,40 +90,27 @@ var (
Key Construction
*/

// hashBytes is used to construct a SHA1
// hash to protect against arbitrarily
// large key sizes.
func hashBytes(data string) []byte {
h := sha256.New()
_, err := h.Write([]byte(data))
if err != nil {
log.Fatal(err)
}

return h.Sum(nil)
}

func getHeadBlockKey() []byte {
return hashBytes(headBlockKey)
return []byte(headBlockKey)
}

func getBlockKey(blockIdentifier *types.BlockIdentifier) []byte {
return hashBytes(
fmt.Sprintf("%s:%d", blockIdentifier.Hash, blockIdentifier.Index),
return []byte(
fmt.Sprintf("%s/%s/%d", blockNamespace, blockIdentifier.Hash, blockIdentifier.Index),
)
}

func getHashKey(hash string, isBlock bool) []byte {
if isBlock {
return hashBytes(fmt.Sprintf("%s:%s", blockHashNamespace, hash))
return []byte(fmt.Sprintf("%s/%s", blockHashNamespace, hash))
}

return hashBytes(fmt.Sprintf("%s:%s", transactionHashNamespace, hash))
return []byte(fmt.Sprintf("%s/%s", transactionHashNamespace, hash))
}

// GetBalanceKey returns a deterministic hash of an types.Account + types.Currency.
func GetBalanceKey(account *types.AccountIdentifier, currency *types.Currency) []byte {
return hashBytes(
return []byte(
fmt.Sprintf("%s/%s/%s", balanceNamespace, types.Hash(account), types.Hash(currency)),
)
}
Expand Down Expand Up @@ -386,8 +376,9 @@ func (b *BlockStorage) RemoveBlock(
}

type balanceEntry struct {
Amount *types.Amount
Block *types.BlockIdentifier
Account *types.AccountIdentifier
Amount *types.Amount
Block *types.BlockIdentifier
}

func serializeBalanceEntry(bal balanceEntry) ([]byte, error) {
Expand Down Expand Up @@ -463,8 +454,9 @@ func (b *BlockStorage) SetBalance(
key := GetBalanceKey(account, amount.Currency)

serialBal, err := serializeBalanceEntry(balanceEntry{
Amount: amount,
Block: block,
Account: account,
Amount: amount,
Block: block,
})
if err != nil {
return err
Expand Down Expand Up @@ -536,6 +528,7 @@ func (b *BlockStorage) UpdateBalance(
}

serialBal, err := serializeBalanceEntry(balanceEntry{
Account: change.Account,
Amount: &types.Amount{
Value: newVal,
Currency: change.Currency,
Expand Down Expand Up @@ -709,3 +702,28 @@ func (b *BlockStorage) CreateBlockCache(ctx context.Context) []*types.BlockIdent

return cache
}

// GetAllAccountCurrency scans the db for all balances and returns a slice
// of reconciler.AccountCurrency. This is useful for bootstrapping the reconciler
// after restart.
func (b *BlockStorage) GetAllAccountCurrency(ctx context.Context) ([]*reconciler.AccountCurrency, error) {
rawBalances, err := b.db.Scan(ctx, []byte(balanceNamespace))
if err != nil {
return nil, err
}

accounts := make([]*reconciler.AccountCurrency, len(rawBalances))
for i, rawBalance := range rawBalances {
deserialBal, err := parseBalanceEntry(rawBalance)
if err != nil {
return nil, err
}

accounts[i] = &reconciler.AccountCurrency{
Account: deserialBal.Account,
Currency: deserialBal.Amount.Currency,
}
}

return accounts, nil
}
Loading

0 comments on commit cc2e0c4

Please sign in to comment.