From ae8eca6c38efda4085ab7816581482c9d77c0ed4 Mon Sep 17 00:00:00 2001 From: Artem Poltorzhitskiy Date: Wed, 8 Nov 2023 01:19:07 +0100 Subject: [PATCH] Fix: reinitialization periodic networks after reset (#1002) --- cmd/indexer/indexer/initializer.go | 46 ++++++++++++++++++++++++++---- internal/helpers/string.go | 3 +- internal/noderpc/rpc.go | 12 ++++---- internal/noderpc/rpc_test.go | 2 +- 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/cmd/indexer/indexer/initializer.go b/cmd/indexer/indexer/initializer.go index c46a8da5e..6919bbc4d 100644 --- a/cmd/indexer/indexer/initializer.go +++ b/cmd/indexer/indexer/initializer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "time" "github.com/baking-bad/bcdhub/internal/models" "github.com/baking-bad/bcdhub/internal/models/block" @@ -39,22 +40,55 @@ func (initializer Initializer) Init(ctx context.Context) error { // check first block in node and in database, compare its hash. // if hash is differed new periodic chain was started. log.Info().Str("network", initializer.network.String()).Msg("checking for new periodic chain...") - blockHash, err := initializer.rpc.BlockHash(ctx, 1) + + var ( + notRunning = true + ) + + for notRunning { + header, err := initializer.rpc.GetHead(ctx) + if err != nil { + return err + } + notRunning = header.Level == 0 + log.Info().Bool("running", !notRunning).Str("network", initializer.network.String()).Msg("chain status") + if notRunning { + time.Sleep(time.Second * 10) + } + } + + header, err := initializer.rpc.GetHeader(ctx, 1) if err != nil { return err } + firstBlock, err := initializer.block.Get(ctx, 1) - log.Info().Str("node_hash", blockHash).Str("indexer_hash", firstBlock.Hash).Msg("checking first block hash...") - if err == nil && firstBlock.Hash != blockHash { + if err != nil { + return nil + } + + log.Info(). + Str("network", initializer.network.String()). + Str("node_hash", header.Hash). + Str("indexer_hash", firstBlock.Hash). + Msg("checking first block hash...") + if firstBlock.Hash != header.Hash { log.Info().Str("network", initializer.network.String()).Msg("found new periodic chain") - log.Warn().Str("network", initializer.network.String()).Msg("drop database...") - if err := initializer.repo.Drop(ctx); err != nil { + if err := initializer.drop(ctx); err != nil { return err } - log.Warn().Str("network", initializer.network.String()).Msg("database was dropped") } } } return initializer.repo.InitDatabase(ctx) } + +func (initializer Initializer) drop(ctx context.Context) error { + log.Warn().Str("network", initializer.network.String()).Msg("drop database...") + if err := initializer.repo.Drop(ctx); err != nil { + return err + } + log.Warn().Str("network", initializer.network.String()).Msg("database was dropped") + return nil +} diff --git a/internal/helpers/string.go b/internal/helpers/string.go index 13f39e93c..0dc2bdbc5 100644 --- a/internal/helpers/string.go +++ b/internal/helpers/string.go @@ -3,7 +3,6 @@ package helpers import ( "net/url" "os" - "path" "path/filepath" "strings" "unicode" @@ -33,7 +32,7 @@ func URLJoin(baseURL, queryPath string) (string, error) { if err != nil { return "", err } - u.Path = path.Join(u.Path, queryPath) + u = u.JoinPath(queryPath) return u.String(), nil } diff --git a/internal/noderpc/rpc.go b/internal/noderpc/rpc.go index 286ab5525..d278dd5bc 100644 --- a/internal/noderpc/rpc.go +++ b/internal/noderpc/rpc.go @@ -86,10 +86,12 @@ func NewWaitNodeRPC(baseURL string, opts ...NodeOption) *NodeRPC { return node } -func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode bool) error { +func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode bool, uri string) error { switch { case statusCode == http.StatusOK: return nil + case statusCode == http.StatusNotFound: + return errors.Errorf("%s: not found", uri) case statusCode > http.StatusInternalServerError: return NewNodeUnavailiableError(rpc.baseURL, statusCode) case checkStatusCode: @@ -107,7 +109,7 @@ func (rpc *NodeRPC) checkStatusCode(r io.Reader, statusCode int, checkStatusCode } func (rpc *NodeRPC) parseResponse(r io.Reader, statusCode int, checkStatusCode bool, uri string, response interface{}) error { - if err := rpc.checkStatusCode(r, statusCode, checkStatusCode); err != nil { + if err := rpc.checkStatusCode(r, statusCode, checkStatusCode, uri); err != nil { return fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err) } @@ -173,7 +175,7 @@ func (rpc *NodeRPC) get(ctx context.Context, uri string, response interface{}) e return err } - return rpc.parseResponse(buffer, resp.StatusCode, true, uri, response) + return rpc.parseResponse(buffer, resp.StatusCode, true, resp.Request.URL.String(), response) } func (rpc *NodeRPC) getRaw(ctx context.Context, uri string) ([]byte, error) { @@ -196,7 +198,7 @@ func (rpc *NodeRPC) getRaw(ctx context.Context, uri string) ([]byte, error) { } defer resp.Body.Close() - if err := rpc.checkStatusCode(resp.Body, resp.StatusCode, true); err != nil { + if err := rpc.checkStatusCode(resp.Body, resp.StatusCode, true, uri); err != nil { return nil, fmt.Errorf("%w (%s): %w", ErrNodeRPCError, uri, err) } return io.ReadAll(resp.Body) @@ -227,7 +229,7 @@ func (rpc *NodeRPC) post(ctx context.Context, uri string, data interface{}, chec return err } - return rpc.parseResponse(buffer, resp.StatusCode, checkStatusCode, uri, response) + return rpc.parseResponse(buffer, resp.StatusCode, checkStatusCode, resp.Request.URL.String(), response) } // Block - returns block diff --git a/internal/noderpc/rpc_test.go b/internal/noderpc/rpc_test.go index f8898f3b0..55b2992b8 100644 --- a/internal/noderpc/rpc_test.go +++ b/internal/noderpc/rpc_test.go @@ -56,7 +56,7 @@ func TestNodeRPC_checkStatusCode(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { rpc := new(NodeRPC) - err := rpc.checkStatusCode(tt.r, tt.statusCode, tt.checkStatusCode) + err := rpc.checkStatusCode(tt.r, tt.statusCode, tt.checkStatusCode, "") require.Equal(t, tt.wantErr, err != nil) if err != nil { require.ErrorContains(t, err, tt.errString)