diff --git a/cmd/check_construction.go b/cmd/check_construction.go index 6f4ded15..05bbbddf 100644 --- a/cmd/check_construction.go +++ b/cmd/check_construction.go @@ -20,6 +20,7 @@ import ( "log" "time" + "github.com/coinbase/rosetta-cli/pkg/results" "github.com/coinbase/rosetta-cli/pkg/tester" "github.com/coinbase/rosetta-sdk-go/fetcher" @@ -70,7 +71,7 @@ func runCheckConstructionCmd(cmd *cobra.Command, args []string) { _, _, fetchErr := fetcher.InitializeAsserter(ctx, Config.Network) if fetchErr != nil { - tester.ExitConstruction( + results.ExitConstruction( Config, nil, nil, @@ -81,7 +82,7 @@ func runCheckConstructionCmd(cmd *cobra.Command, args []string) { _, err := utils.CheckNetworkSupported(ctx, Config.Network, fetcher) if err != nil { - tester.ExitConstruction( + results.ExitConstruction( Config, nil, nil, @@ -99,7 +100,7 @@ func runCheckConstructionCmd(cmd *cobra.Command, args []string) { &SignalReceived, ) if err != nil { - tester.ExitConstruction( + results.ExitConstruction( Config, nil, nil, @@ -135,6 +136,15 @@ func runCheckConstructionCmd(cmd *cobra.Command, args []string) { return tester.LogMemoryLoop(ctx) }) + g.Go(func() error { + return tester.StartServer( + ctx, + "check:construction status", + constructionTester, + Config.Construction.StatusPort, + ) + }) + sigListeners := []context.CancelFunc{cancel} go handleSignals(&sigListeners) diff --git a/cmd/check_data.go b/cmd/check_data.go index 9f7b3744..1418697f 100644 --- a/cmd/check_data.go +++ b/cmd/check_data.go @@ -19,6 +19,7 @@ import ( "fmt" "time" + "github.com/coinbase/rosetta-cli/pkg/results" "github.com/coinbase/rosetta-cli/pkg/tester" "github.com/coinbase/rosetta-sdk-go/fetcher" @@ -83,7 +84,7 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) { _, _, fetchErr := fetcher.InitializeAsserter(ctx, Config.Network) if fetchErr != nil { - tester.ExitData( + results.ExitData( Config, nil, nil, @@ -96,7 +97,7 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) { networkStatus, err := utils.CheckNetworkSupported(ctx, Config.Network, fetcher) if err != nil { - tester.ExitData( + results.ExitData( Config, nil, nil, @@ -146,7 +147,12 @@ func runCheckDataCmd(cmd *cobra.Command, args []string) { }) g.Go(func() error { - return dataTester.StartProgressLogger(ctx) + return tester.StartServer( + ctx, + "check:data status", + dataTester, + Config.Data.StatusPort, + ) }) sigListeners := []context.CancelFunc{cancel} diff --git a/cmd/root.go b/cmd/root.go index 33c1fd18..158e0a16 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -132,6 +132,6 @@ var versionCmd = &cobra.Command{ Use: "version", Short: "Print rosetta-cli version", Run: func(cmd *cobra.Command, args []string) { - fmt.Println("v0.5.5") + fmt.Println("v0.5.6") }, } diff --git a/configuration/configuration.go b/configuration/configuration.go index 37ffba9e..4fc5981b 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -66,6 +66,7 @@ const ( DefaultBroadcastLimit = 3 DefaultTipDelay = 300 DefaultBlockBroadcastLimit = 5 + DefaultStatusPort = 9090 // ETH Defaults EthereumIDBlockchain = "Ethereum" @@ -135,6 +136,11 @@ type ConstructionConfiguration struct { // workflows should be performed before stopping. EndConditions map[string]int `json:"end_conditions,omitempty"` + // StatusPort allows the caller to query a running check:construction + // test to get stats about progress. This can be used instead + // of parsing logs to populate some sort of status dashboard. + StatusPort uint `json:"status_port,omitempty"` + // ResultsOutputFile is the absolute filepath of where to save // the results of a check:construction run. ResultsOutputFile string `json:"results_output_file,omitempty"` @@ -147,6 +153,7 @@ func DefaultDataConfiguration() *DataConfiguration { ActiveReconciliationConcurrency: DefaultActiveReconciliationConcurrency, InactiveReconciliationConcurrency: DefaultInactiveReconciliationConcurrency, InactiveReconciliationFrequency: DefaultInactiveReconciliationFrequency, + StatusPort: DefaultStatusPort, } } @@ -271,6 +278,11 @@ type DataConfiguration struct { // EndCondition contains the conditions for the syncer to stop EndConditions *DataEndConditions `json:"end_conditions,omitempty"` + // StatusPort allows the caller to query a running check:data + // test to get stats about progress. This can be used instead + // of parsing logs to populate some sort of status dashboard. + StatusPort uint `json:"status_port,omitempty"` + // ResultsOutputFile is the absolute filepath of where to save // the results of a check:data run. ResultsOutputFile string `json:"results_output_file"` @@ -354,6 +366,10 @@ func populateConstructionMissingFields( constructionConfig.BlockBroadcastLimit = DefaultBlockBroadcastLimit } + if constructionConfig.StatusPort == 0 { + constructionConfig.StatusPort = DefaultStatusPort + } + return constructionConfig } @@ -374,6 +390,10 @@ func populateDataMissingFields(dataConfig *DataConfiguration) *DataConfiguration dataConfig.InactiveReconciliationFrequency = DefaultInactiveReconciliationFrequency } + if dataConfig.StatusPort == 0 { + dataConfig.StatusPort = DefaultStatusPort + } + return dataConfig } diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 30ba5818..b3d58352 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -60,6 +60,7 @@ var ( StaleDepth: 12, BroadcastLimit: 200, BlockBroadcastLimit: 992, + StatusPort: 21, Workflows: append( fakeWorkflows, &job.Workflow{ @@ -75,6 +76,7 @@ var ( ReconciliationDisabled: false, HistoricalBalanceEnabled: &historicalEnabled, StartIndex: &startIndex, + StatusPort: 123, EndConditions: &DataEndConditions{ ReconciliationCoverage: &goodCoverage, }, @@ -153,6 +155,7 @@ func TestLoadConfiguration(t *testing.T) { StaleDepth: DefaultStaleDepth, BroadcastLimit: DefaultBroadcastLimit, BlockBroadcastLimit: DefaultBlockBroadcastLimit, + StatusPort: DefaultStatusPort, Workflows: fakeWorkflows, } diff --git a/examples/configuration/default.json b/examples/configuration/default.json index 48b5f73b..46869fe8 100644 --- a/examples/configuration/default.json +++ b/examples/configuration/default.json @@ -29,6 +29,7 @@ "inactive_discrepency_search_disabled": false, "balance_tracking_disabled": false, "coin_tracking_disabled": false, + "status_port": 9090, "results_output_file": "", "pruning_disabled": false } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 343b7b60..cde715e5 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -18,14 +18,14 @@ import ( "context" "fmt" "log" - "math/big" "os" "path" + "github.com/coinbase/rosetta-cli/pkg/results" + "github.com/coinbase/rosetta-sdk-go/parser" "github.com/coinbase/rosetta-sdk-go/reconciler" "github.com/coinbase/rosetta-sdk-go/statefulsyncer" - "github.com/coinbase/rosetta-sdk-go/storage" "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" "github.com/fatih/color" @@ -34,9 +34,6 @@ import ( var _ statefulsyncer.Logger = (*Logger)(nil) const ( - // TimeElapsedCounter tracks the total time elapsed in seconds. - TimeElapsedCounter = "time_elapsed" - // blockStreamFile contains the stream of processed // blocks and whether they were added or removed. blockStreamFile = "blocks.txt" @@ -72,16 +69,12 @@ type Logger struct { logBalanceChanges bool logReconciliation bool - lastStatsMessage string - - CounterStorage *storage.CounterStorage - BalanceStorage *storage.BalanceStorage + lastStatsMessage string + lastProgressMessage string } // NewLogger constructs a new Logger. func NewLogger( - counterStorage *storage.CounterStorage, - balanceStorage *storage.BalanceStorage, logDir string, logBlocks bool, logTransactions bool, @@ -89,8 +82,6 @@ func NewLogger( logReconciliation bool, ) *Logger { return &Logger{ - CounterStorage: counterStorage, - BalanceStorage: balanceStorage, logDir: logDir, logBlocks: logBlocks, logTransactions: logTransactions, @@ -99,168 +90,66 @@ func NewLogger( } } -// LogDataStats logs all data values in CounterStorage. -func (l *Logger) LogDataStats(ctx context.Context) error { - blocks, err := l.CounterStorage.Get(ctx, storage.BlockCounter) - if err != nil { - return fmt.Errorf("%w cannot get block counter", err) - } - - if blocks.Sign() == 0 { // wait for at least 1 block to be processed - return nil - } - - orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter) - if err != nil { - return fmt.Errorf("%w cannot get orphan counter", err) - } - - txs, err := l.CounterStorage.Get(ctx, storage.TransactionCounter) - if err != nil { - return fmt.Errorf("%w cannot get transaction counter", err) - } - - ops, err := l.CounterStorage.Get(ctx, storage.OperationCounter) - if err != nil { - return fmt.Errorf("%w cannot get operations counter", err) - } - - activeReconciliations, err := l.CounterStorage.Get(ctx, storage.ActiveReconciliationCounter) - if err != nil { - return fmt.Errorf("%w cannot get active reconciliations counter", err) - } - - inactiveReconciliations, err := l.CounterStorage.Get(ctx, storage.InactiveReconciliationCounter) - if err != nil { - return fmt.Errorf("%w cannot get inactive reconciliations counter", err) +// LogDataStatus logs results.CheckDataStatus. +func (l *Logger) LogDataStatus(ctx context.Context, status *results.CheckDataStatus) { + if status.Stats.Blocks == 0 { // wait for at least 1 block to be processed + return } statsMessage := fmt.Sprintf( - "[STATS] Blocks: %s (Orphaned: %s) Transactions: %s Operations: %s", - blocks.String(), - orphans.String(), - txs.String(), - ops.String(), + "[STATS] Blocks: %d (Orphaned: %d) Transactions: %d Operations: %d Reconciliations: %d (Inactive: %d, Coverage: %f%%)", // nolint:lll + status.Stats.Blocks, + status.Stats.Orphans, + status.Stats.Transactions, + status.Stats.Operations, + status.Stats.ActiveReconciliations+status.Stats.InactiveReconciliations, + status.Stats.InactiveReconciliations, + status.Stats.ReconciliationCoverage, ) - if l.BalanceStorage != nil { - coverage, err := l.BalanceStorage.ReconciliationCoverage(ctx, 0) - if err != nil { - return fmt.Errorf("%w: cannot get reconcile coverage", err) - } - - statsMessage = fmt.Sprintf( - "%s Reconciliations: %s (Inactive: %s, Coverage: %f%%)", - statsMessage, - new(big.Int).Add(activeReconciliations, inactiveReconciliations).String(), - inactiveReconciliations.String(), - coverage*utils.OneHundred, - ) - } - // Don't print out the same stats message twice. if statsMessage == l.lastStatsMessage { - return nil + return } l.lastStatsMessage = statsMessage color.Cyan(statsMessage) - return nil -} - -// LogTipEstimate logs information about the remaining blocks to sync. -func (l *Logger) LogTipEstimate(ctx context.Context, tipIndex int64) error { - blocks, err := l.CounterStorage.Get(ctx, storage.BlockCounter) - if err != nil { - return fmt.Errorf("%w cannot get block counter", err) - } - - if blocks.Sign() == 0 { // wait for at least 1 block to be processed - return nil - } - - orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter) - if err != nil { - return fmt.Errorf("%w cannot get orphan counter", err) - } - - adjustedBlocks := blocks.Int64() - orphans.Int64() - if tipIndex-adjustedBlocks <= 0 { // return if no blocks to sync - return nil - } - - elapsedTime, err := l.CounterStorage.Get(ctx, TimeElapsedCounter) - if err != nil { - return fmt.Errorf("%w cannot get elapsed time", err) - } - - if elapsedTime.Sign() == 0 { // wait for at least some elapsed time - return nil - } - - blocksPerSecond := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt(elapsedTime)) - blocksPerSecondFloat, _ := blocksPerSecond.Float64() - blocksSynced := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt64(tipIndex)) - blocksSyncedFloat, _ := blocksSynced.Float64() - - statsMessage := fmt.Sprintf( + progressMessage := fmt.Sprintf( "[PROGRESS] Blocks Synced: %d/%d (Completed: %f%%, Rate: %f/second) Time Remaining: %s", - adjustedBlocks, - tipIndex, - blocksSyncedFloat*utils.OneHundred, - blocksPerSecondFloat, - utils.TimeToTip(blocksPerSecondFloat, adjustedBlocks, tipIndex), + status.Progress.Blocks, + status.Progress.Tip, + status.Progress.Completed, + status.Progress.Rate, + status.Progress.TimeRemaining, ) - color.Cyan(statsMessage) - return nil -} - -// LogConstructionStats logs all construction values in CounterStorage. -func (l *Logger) LogConstructionStats(ctx context.Context, inflightTransactions int) error { - transactionsCreated, err := l.CounterStorage.Get(ctx, storage.TransactionsCreatedCounter) - if err != nil { - return fmt.Errorf("%w cannot get transactions created counter", err) - } - - transactionsConfirmed, err := l.CounterStorage.Get(ctx, storage.TransactionsConfirmedCounter) - if err != nil { - return fmt.Errorf("%w cannot get transactions confirmed counter", err) - } - - staleBroadcasts, err := l.CounterStorage.Get(ctx, storage.StaleBroadcastsCounter) - if err != nil { - return fmt.Errorf("%w cannot get stale broadcasts counter", err) - } - - failedBroadcasts, err := l.CounterStorage.Get(ctx, storage.FailedBroadcastsCounter) - if err != nil { - return fmt.Errorf("%w cannot get failed broadcasts counter", err) + // Don't print out the same progress message twice. + if progressMessage == l.lastProgressMessage { + return } - addressesCreated, err := l.CounterStorage.Get(ctx, storage.AddressesCreatedCounter) - if err != nil { - return fmt.Errorf("%w cannot get addresses created counter", err) - } + l.lastProgressMessage = progressMessage + color.Cyan(progressMessage) +} +// LogConstructionStatus logs results.CheckConstructionStatus. +func (l *Logger) LogConstructionStatus(ctx context.Context, status *results.CheckConstructionStatus) { statsMessage := fmt.Sprintf( "[STATS] Transactions Confirmed: %d (Created: %d, In Progress: %d, Stale: %d, Failed: %d) Addresses Created: %d", - transactionsConfirmed, - transactionsCreated, - inflightTransactions, - staleBroadcasts, - failedBroadcasts, - addressesCreated, + status.Stats.TransactionsConfirmed, + status.Stats.TransactionsCreated, + status.Progress.Broadcasting, + status.Stats.StaleBroadcasts, + status.Stats.FailedBroadcasts, + status.Stats.AddressesCreated, ) if statsMessage == l.lastStatsMessage { - return nil + return } l.lastStatsMessage = statsMessage color.Cyan(statsMessage) - - return nil } // LogMemoryStats logs memory usage information. diff --git a/pkg/processor/reconciler_handler.go b/pkg/processor/reconciler_handler.go index 95f58ab3..686311b8 100644 --- a/pkg/processor/reconciler_handler.go +++ b/pkg/processor/reconciler_handler.go @@ -16,25 +16,21 @@ package processor import ( "context" - "errors" "fmt" "math/big" "github.com/coinbase/rosetta-cli/pkg/logger" + "github.com/coinbase/rosetta-cli/pkg/results" "github.com/coinbase/rosetta-sdk-go/reconciler" "github.com/coinbase/rosetta-sdk-go/storage" "github.com/coinbase/rosetta-sdk-go/types" ) -var ( - // ErrReconciliationFailure is returned if reconciliation fails. - ErrReconciliationFailure = errors.New("reconciliation failure") -) - // ReconcilerHandler implements the Reconciler.Handler interface. type ReconcilerHandler struct { logger *logger.Logger + counterStorage *storage.CounterStorage balanceStorage *storage.BalanceStorage haltOnReconciliationError bool @@ -47,11 +43,13 @@ type ReconcilerHandler struct { // NewReconcilerHandler creates a new ReconcilerHandler. func NewReconcilerHandler( logger *logger.Logger, + counterStorage *storage.CounterStorage, balanceStorage *storage.BalanceStorage, haltOnReconciliationError bool, ) *ReconcilerHandler { return &ReconcilerHandler{ logger: logger, + counterStorage: counterStorage, balanceStorage: balanceStorage, haltOnReconciliationError: haltOnReconciliationError, } @@ -93,7 +91,7 @@ func (h *ReconcilerHandler) ReconciliationFailed( h.InactiveFailureBlock = block return fmt.Errorf( "%w: inactive reconciliation error for %s at %d (computed: %s%s, live: %s%s)", - ErrReconciliationFailure, + results.ErrReconciliationFailure, account.Address, block.Index, computedBalance, @@ -107,7 +105,7 @@ func (h *ReconcilerHandler) ReconciliationFailed( h.ActiveFailureBlock = block return fmt.Errorf( "%w: active reconciliation error for %s at %d (computed: %s%s, live: %s%s)", - ErrReconciliationFailure, + results.ErrReconciliationFailure, account.Address, block.Index, computedBalance, @@ -131,13 +129,13 @@ func (h *ReconcilerHandler) ReconciliationSucceeded( ) error { // Update counters if reconciliationType == reconciler.InactiveReconciliation { - _, _ = h.logger.CounterStorage.Update( + _, _ = h.counterStorage.Update( ctx, storage.InactiveReconciliationCounter, big.NewInt(1), ) } else { - _, _ = h.logger.CounterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1)) + _, _ = h.counterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1)) } if err := h.balanceStorage.Reconciled(ctx, account, currency, block); err != nil { diff --git a/pkg/tester/construction_results.go b/pkg/results/construction_results.go similarity index 78% rename from pkg/tester/construction_results.go rename to pkg/results/construction_results.go index 00b26c77..50139b68 100644 --- a/pkg/tester/construction_results.go +++ b/pkg/results/construction_results.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tester +package results import ( "context" @@ -225,6 +225,70 @@ func ComputeCheckConstructionStats( } } +// CheckConstructionProgress contains the number of +// currently broadcasting transactions and processing +// jobs. +type CheckConstructionProgress struct { + Broadcasting int `json:"broadcasting"` + Processing int `json:"processing"` +} + +// ComputeCheckConstructionProgress computes +// *CheckConstructionProgress. +func ComputeCheckConstructionProgress( + ctx context.Context, + broadcasts *storage.BroadcastStorage, + jobs *storage.JobStorage, +) *CheckConstructionProgress { + inflight, err := broadcasts.GetAllBroadcasts(ctx) + if err != nil { + log.Printf("%s cannot get all broadcasts\n", err.Error()) + return nil + } + + processing, err := jobs.AllProcessing(ctx) + if err != nil { + log.Printf("%s cannot get all jobs\n", err.Error()) + return nil + } + + return &CheckConstructionProgress{ + Broadcasting: len(inflight), + Processing: len(processing), + } +} + +// CheckConstructionStatus contains CheckConstructionStats. +type CheckConstructionStatus struct { + Stats *CheckConstructionStats `json:"stats"` + Progress *CheckConstructionProgress `json:"progress"` +} + +// ComputeCheckConstructionStatus returns a populated +// *CheckConstructionStatus. +func ComputeCheckConstructionStatus( + ctx context.Context, + config *configuration.Configuration, + counters *storage.CounterStorage, + broadcasts *storage.BroadcastStorage, + jobs *storage.JobStorage, +) *CheckConstructionStatus { + return &CheckConstructionStatus{ + Stats: ComputeCheckConstructionStats(ctx, config, counters, jobs), + Progress: ComputeCheckConstructionProgress(ctx, broadcasts, jobs), + } +} + +// FetchCheckConstructionStatus fetches *CheckConstructionStatus. +func FetchCheckConstructionStatus(url string) (*CheckConstructionStatus, error) { + var status CheckConstructionStatus + if err := JSONFetch(url, &status); err != nil { + return nil, fmt.Errorf("%w: unable to fetch construction status", err) + } + + return &status, nil +} + // ExitConstruction exits check:data, logs the test results to the console, // and to a provided output path. func ExitConstruction( diff --git a/pkg/tester/data_results.go b/pkg/results/data_results.go similarity index 79% rename from pkg/tester/data_results.go rename to pkg/results/data_results.go index 09b8d4ce..2422946b 100644 --- a/pkg/tester/data_results.go +++ b/pkg/results/data_results.go @@ -12,23 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tester +package results import ( "context" "errors" "fmt" "log" + "math/big" "os" "strconv" "github.com/coinbase/rosetta-cli/configuration" - "github.com/coinbase/rosetta-cli/pkg/processor" "github.com/coinbase/rosetta-sdk-go/asserter" "github.com/coinbase/rosetta-sdk-go/fetcher" "github.com/coinbase/rosetta-sdk-go/storage" "github.com/coinbase/rosetta-sdk-go/syncer" + "github.com/coinbase/rosetta-sdk-go/types" "github.com/coinbase/rosetta-sdk-go/utils" "github.com/fatih/color" "github.com/olekukonko/tablewriter" @@ -209,6 +210,117 @@ func ComputeCheckDataStats( return stats } +// CheckDataProgress contains information +// about check:data's syncing progress. +type CheckDataProgress struct { + Blocks int64 `json:"blocks"` + Tip int64 `json:"tip"` + Completed float64 `json:"completed"` + Rate float64 `json:"rate"` + TimeRemaining string `json:"time_remaining"` +} + +// ComputeCheckDataProgress returns +// a populated *CheckDataProgress. +func ComputeCheckDataProgress( + ctx context.Context, + fetcher *fetcher.Fetcher, + network *types.NetworkIdentifier, + counters *storage.CounterStorage, +) *CheckDataProgress { + networkStatus, fetchErr := fetcher.NetworkStatusRetry(ctx, network, nil) + if fetchErr != nil { + fmt.Printf("%s: cannot get network status", fetchErr.Err.Error()) + return nil + } + tipIndex := networkStatus.CurrentBlockIdentifier.Index + + blocks, err := counters.Get(ctx, storage.BlockCounter) + if err != nil { + fmt.Printf("%s: cannot get block counter", err.Error()) + return nil + } + + if blocks.Sign() == 0 { // wait for at least 1 block to be processed + return nil + } + + orphans, err := counters.Get(ctx, storage.OrphanCounter) + if err != nil { + fmt.Printf("%s: cannot get orphan counter", err.Error()) + return nil + } + + adjustedBlocks := blocks.Int64() - orphans.Int64() + if tipIndex-adjustedBlocks <= 0 { // return if no blocks to sync + return nil + } + + elapsedTime, err := counters.Get(ctx, TimeElapsedCounter) + if err != nil { + fmt.Printf("%s: cannot get elapsed time", err.Error()) + return nil + } + + if elapsedTime.Sign() == 0 { // wait for at least some elapsed time + return nil + } + + blocksPerSecond := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt(elapsedTime)) + blocksPerSecondFloat, _ := blocksPerSecond.Float64() + blocksSynced := new(big.Float).Quo(new(big.Float).SetInt64(adjustedBlocks), new(big.Float).SetInt64(tipIndex)) + blocksSyncedFloat, _ := blocksSynced.Float64() + + return &CheckDataProgress{ + Blocks: adjustedBlocks, + Tip: tipIndex, + Completed: blocksSyncedFloat * utils.OneHundred, + Rate: blocksPerSecondFloat, + TimeRemaining: utils.TimeToTip(blocksPerSecondFloat, adjustedBlocks, tipIndex).String(), + } +} + +// CheckDataStatus contains both CheckDataStats +// and CheckDataProgress. +type CheckDataStatus struct { + Stats *CheckDataStats `json:"stats"` + Progress *CheckDataProgress `json:"progress"` +} + +// ComputeCheckDataStatus returns a populated +// *CheckDataStatus. +func ComputeCheckDataStatus( + ctx context.Context, + counters *storage.CounterStorage, + balances *storage.BalanceStorage, + fetcher *fetcher.Fetcher, + network *types.NetworkIdentifier, +) *CheckDataStatus { + return &CheckDataStatus{ + Stats: ComputeCheckDataStats( + ctx, + counters, + balances, + ), + Progress: ComputeCheckDataProgress( + ctx, + fetcher, + network, + counters, + ), + } +} + +// FetchCheckDataStatus fetches *CheckDataStatus. +func FetchCheckDataStatus(url string) (*CheckDataStatus, error) { + var status CheckDataStatus + if err := JSONFetch(url, &status); err != nil { + return nil, fmt.Errorf("%w: unable to fetch construction status", err) + } + + return &status, nil +} + // CheckDataTests indicates which tests passed. // If a test is nil, it did not apply to the run. // @@ -343,7 +455,7 @@ func ReconciliationTest( reconciliationsPerformed bool, ) *bool { relatedErrors := []error{ - processor.ErrReconciliationFailure, + ErrReconciliationFailure, } reconciliationPass := true for _, relatedError := range relatedErrors { diff --git a/pkg/tester/data_results_test.go b/pkg/results/data_results_test.go similarity index 99% rename from pkg/tester/data_results_test.go rename to pkg/results/data_results_test.go index 11d8f653..68ff2f6d 100644 --- a/pkg/tester/data_results_test.go +++ b/pkg/results/data_results_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tester +package results import ( "context" @@ -23,7 +23,6 @@ import ( "testing" "github.com/coinbase/rosetta-cli/configuration" - "github.com/coinbase/rosetta-cli/pkg/processor" "github.com/coinbase/rosetta-sdk-go/asserter" "github.com/coinbase/rosetta-sdk-go/fetcher" @@ -295,7 +294,7 @@ func TestComputeCheckDataResults(t *testing.T) { }, "default configuration, no storage, reconciliation errors": { cfg: configuration.DefaultConfiguration(), - err: []error{processor.ErrReconciliationFailure}, + err: []error{ErrReconciliationFailure}, result: &CheckDataResults{ Tests: &CheckDataTests{ RequestResponse: true, diff --git a/pkg/results/types.go b/pkg/results/types.go new file mode 100644 index 00000000..6057ef67 --- /dev/null +++ b/pkg/results/types.go @@ -0,0 +1,31 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package results + +import ( + "errors" +) + +const ( + // TimeElapsedCounter tracks the total time elapsed in seconds. + TimeElapsedCounter = "time_elapsed" +) + +var ( + // ErrReconciliationFailure is returned if reconciliation fails. + // TODO: Move to reconciler package (had to remove from processor + // to prevent circular dependency) + ErrReconciliationFailure = errors.New("reconciliation failure") +) diff --git a/pkg/results/utils.go b/pkg/results/utils.go new file mode 100644 index 00000000..d9eeef41 --- /dev/null +++ b/pkg/results/utils.go @@ -0,0 +1,47 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package results + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +// JSONFetch makes a GET request to the URL and marshals +// the response into output. +func JSONFetch(url string, output interface{}) error { + resp, err := http.Get(url) // #nosec + if err != nil { + return fmt.Errorf("%w: unable to fetch GET %s", err, url) + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("%w: unable to read body", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("received %d status with body %s", resp.StatusCode, body) + } + + if err := json.Unmarshal(body, output); err != nil { + return fmt.Errorf("%w: unable to unmarshal JSON", err) + } + + return nil +} diff --git a/pkg/results/utils_test.go b/pkg/results/utils_test.go new file mode 100644 index 00000000..ed63a9cd --- /dev/null +++ b/pkg/results/utils_test.go @@ -0,0 +1,75 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package results + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestJSONFetch(t *testing.T) { + var tests = map[string]struct { + status int + body string + + expectedResult map[string]interface{} + expectedError string + }{ + "simple 200": { + status: http.StatusOK, + body: `{"test":"123"}`, + expectedResult: map[string]interface{}{ + "test": "123", + }, + }, + "not 200": { + status: http.StatusUnsupportedMediaType, + body: `hello`, + expectedError: "received 415 status with body hello\n", + }, + "not JSON": { + status: http.StatusOK, + body: `hello`, + expectedError: "invalid character 'h' looking for beginning of value: unable to unmarshal JSON", + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "GET", r.Method) + + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(test.status) + fmt.Fprintln(w, test.body) + })) + defer ts.Close() + + var obj map[string]interface{} + err := JSONFetch(ts.URL, &obj) + if len(test.expectedError) > 0 { + assert.EqualError(t, err, test.expectedError) + assert.Len(t, obj, 0) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedResult, obj) + } + }) + } +} diff --git a/pkg/tester/construction.go b/pkg/tester/construction.go index 75ac867a..ed3aebb3 100644 --- a/pkg/tester/construction.go +++ b/pkg/tester/construction.go @@ -16,15 +16,18 @@ package tester import ( "context" + "encoding/json" "errors" "fmt" "log" + "net/http" "os" "time" "github.com/coinbase/rosetta-cli/configuration" "github.com/coinbase/rosetta-cli/pkg/logger" "github.com/coinbase/rosetta-cli/pkg/processor" + "github.com/coinbase/rosetta-cli/pkg/results" "github.com/coinbase/rosetta-sdk-go/constructor/coordinator" "github.com/coinbase/rosetta-sdk-go/fetcher" @@ -47,6 +50,8 @@ const ( tipWaitInterval = 10 * time.Second ) +var _ http.Handler = (*ConstructionTester)(nil) + // ConstructionTester coordinates the `check:construction` test. type ConstructionTester struct { network *types.NetworkIdentifier @@ -87,8 +92,6 @@ func InitializeConstruction( counterStorage := storage.NewCounterStorage(localStore) logger := logger.NewLogger( - counterStorage, - nil, dataPath, false, false, @@ -272,14 +275,10 @@ func (t *ConstructionTester) StartPeriodicLogger( for { select { case <-ctx.Done(): - // Print stats one last time before exiting - inflight, _ := t.broadcastStorage.GetAllBroadcasts(ctx) - _ = t.logger.LogConstructionStats(ctx, len(inflight)) - return ctx.Err() case <-tc.C: - inflight, _ := t.broadcastStorage.GetAllBroadcasts(ctx) - _ = t.logger.LogConstructionStats(ctx, len(inflight)) + status := results.ComputeCheckConstructionStatus(ctx, t.config, t.counterStorage, t.broadcastStorage, t.jobStorage) + t.logger.LogConstructionStatus(ctx, status) } } } @@ -366,6 +365,24 @@ func (t *ConstructionTester) StartConstructor( return t.coordinator.Process(ctx) } +// ServeHTTP serves a CheckDataStatus response on all paths. +func (t *ConstructionTester) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(http.StatusOK) + + status := results.ComputeCheckConstructionStatus( + r.Context(), + t.config, + t.counterStorage, + t.broadcastStorage, + t.jobStorage, + ) + + if err := json.NewEncoder(w).Encode(status); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + // PerformBroadcasts attempts to rebroadcast all pending transactions // if the RebroadcastAll configuration is set to true. func (t *ConstructionTester) PerformBroadcasts(ctx context.Context) error { @@ -476,7 +493,7 @@ func (t *ConstructionTester) HandleErr( } if !t.reachedEndConditions { - ExitConstruction(t.config, t.counterStorage, t.jobStorage, err, 1) + results.ExitConstruction(t.config, t.counterStorage, t.jobStorage, err, 1) } // We optimistically run the ReturnFunds function on the coordinator @@ -487,5 +504,5 @@ func (t *ConstructionTester) HandleErr( sigListeners, ) - ExitConstruction(t.config, t.counterStorage, t.jobStorage, nil, 0) + results.ExitConstruction(t.config, t.counterStorage, t.jobStorage, nil, 0) } diff --git a/pkg/tester/data.go b/pkg/tester/data.go index 1eab0aa9..03569cec 100644 --- a/pkg/tester/data.go +++ b/pkg/tester/data.go @@ -16,16 +16,19 @@ package tester import ( "context" + "encoding/json" "errors" "fmt" "log" "math/big" + "net/http" "os" "time" "github.com/coinbase/rosetta-cli/configuration" "github.com/coinbase/rosetta-cli/pkg/logger" "github.com/coinbase/rosetta-cli/pkg/processor" + "github.com/coinbase/rosetta-cli/pkg/results" "github.com/coinbase/rosetta-sdk-go/fetcher" "github.com/coinbase/rosetta-sdk-go/reconciler" @@ -62,6 +65,8 @@ const ( EndAtTipCheckInterval = 10 * time.Second ) +var _ http.Handler = (*ConstructionTester)(nil) + // DataTester coordinates the `check:data` test. type DataTester struct { network *types.NetworkIdentifier @@ -160,14 +165,7 @@ func InitializeData( blockStorage := storage.NewBlockStorage(localStore) balanceStorage := storage.NewBalanceStorage(localStore) - loggerBalanceStorage := balanceStorage - if !shouldReconcile(config) { - loggerBalanceStorage = nil - } - logger := logger.NewLogger( - counterStorage, - loggerBalanceStorage, dataPath, config.Data.LogBlocks, config.Data.LogTransactions, @@ -184,6 +182,7 @@ func InitializeData( reconcilerHandler := processor.NewReconcilerHandler( logger, + counterStorage, balanceStorage, !config.Data.IgnoreReconciliationError, ) @@ -349,27 +348,6 @@ func (t *DataTester) StartPeriodicLogger( tc := time.NewTicker(PeriodicLoggingFrequency) defer tc.Stop() - for { - select { - case <-ctx.Done(): - // Print stats one last time before exiting - _ = t.logger.LogDataStats(ctx) - - return ctx.Err() - case <-tc.C: - _ = t.logger.LogDataStats(ctx) - } - } -} - -// StartProgressLogger priunts out periodic -// estimates of sync duration if we are behind tip. -func (t *DataTester) StartProgressLogger( - ctx context.Context, -) error { - tc := time.NewTicker(PeriodicLoggingFrequency) - defer tc.Stop() - for { select { case <-ctx.Done(): @@ -379,21 +357,34 @@ func (t *DataTester) StartProgressLogger( // we can log metrics about the current check:data run. _, _ = t.counterStorage.Update( ctx, - logger.TimeElapsedCounter, + results.TimeElapsedCounter, big.NewInt(periodicLoggingSeconds), ) - status, fetchErr := t.fetcher.NetworkStatusRetry(ctx, t.network, nil) - if fetchErr != nil { - log.Printf("%v: unable to get network status\n", fetchErr.Err) - continue - } - - _ = t.logger.LogTipEstimate(ctx, status.CurrentBlockIdentifier.Index) + status := results.ComputeCheckDataStatus(ctx, t.counterStorage, t.balanceStorage, t.fetcher, t.config.Network) + t.logger.LogDataStatus(ctx, status) } } } +// ServeHTTP serves a CheckDataStatus response on all paths. +func (t *DataTester) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(http.StatusOK) + + status := results.ComputeCheckDataStatus( + r.Context(), + t.counterStorage, + t.balanceStorage, + t.fetcher, + t.network, + ) + + if err := json.NewEncoder(w).Encode(status); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + // EndAtTipLoop runs a loop that evaluates end condition EndAtTip func (t *DataTester) EndAtTipLoop( ctx context.Context, @@ -540,7 +531,7 @@ func (t *DataTester) HandleErr(ctx context.Context, err error, sigListeners *[]c } if len(t.endCondition) != 0 { - ExitData( + results.ExitData( t.config, t.counterStorage, t.balanceStorage, @@ -553,19 +544,19 @@ func (t *DataTester) HandleErr(ctx context.Context, err error, sigListeners *[]c fmt.Printf("\n") if t.reconcilerHandler.InactiveFailure == nil { - ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") + results.ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") } if !t.historicalBalanceEnabled { color.Yellow( "Can't find the block missing operations automatically, please enable historical balance lookup", ) - ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") + results.ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") } if t.config.Data.InactiveDiscrepencySearchDisabled { color.Yellow("Search for inactive reconciliation discrepency is disabled") - ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") + results.ExitData(t.config, t.counterStorage, t.balanceStorage, err, 1, "", "") } t.FindMissingOps(ctx, err, sigListeners) @@ -589,7 +580,7 @@ func (t *DataTester) FindMissingOps( ) if err != nil { color.Yellow("%s: could not find block with missing ops", err.Error()) - ExitData(t.config, t.counterStorage, t.balanceStorage, originalErr, 1, "", "") + results.ExitData(t.config, t.counterStorage, t.balanceStorage, originalErr, 1, "", "") } color.Yellow( @@ -599,7 +590,7 @@ func (t *DataTester) FindMissingOps( badBlock.Hash, ) - ExitData(t.config, t.counterStorage, t.balanceStorage, originalErr, 1, "", "") + results.ExitData(t.config, t.counterStorage, t.balanceStorage, originalErr, 1, "", "") } func (t *DataTester) recursiveOpSearch( @@ -630,8 +621,6 @@ func (t *DataTester) recursiveOpSearch( balanceStorage := storage.NewBalanceStorage(localStore) logger := logger.NewLogger( - counterStorage, - nil, tmpDir, false, false, @@ -648,6 +637,7 @@ func (t *DataTester) recursiveOpSearch( reconcilerHandler := processor.NewReconcilerHandler( logger, + counterStorage, balanceStorage, true, // halt on reconciliation error ) diff --git a/pkg/tester/general.go b/pkg/tester/general.go index ae636ca1..776070ce 100644 --- a/pkg/tester/general.go +++ b/pkg/tester/general.go @@ -16,6 +16,9 @@ package tester import ( "context" + "fmt" + "log" + "net/http" "time" "github.com/coinbase/rosetta-cli/pkg/logger" @@ -44,3 +47,34 @@ func LogMemoryLoop( } } } + +// StartServer stats a server at a port with a particular handler. +// This is often used to support a status endpoint for a particular test. +func StartServer( + ctx context.Context, + name string, + handler http.Handler, + port uint, +) error { + server := &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: handler, + } + + go func() { + log.Printf("%s server running on port %d\n", name, port) + _ = server.ListenAndServe() + }() + + go func() { + // If we don't shutdown server, it will + // never stop because server.ListenAndServe doesn't + // take any context. + <-ctx.Done() + log.Printf("%s server shutting down", name) + + _ = server.Shutdown(ctx) + }() + + return ctx.Err() +}