diff --git a/cmd/explorer/main.go b/cmd/explorer/main.go index 52b1dda4..78d8d5fe 100644 --- a/cmd/explorer/main.go +++ b/cmd/explorer/main.go @@ -19,7 +19,6 @@ import ( func main() { configPath := flag.String("config", "", "Path to the config file, if empty string defaults will be used") - flag.Parse() cfg := &types.Config{} @@ -30,9 +29,10 @@ func main() { utils.Config = cfg logWriter := utils.InitLogger() defer logWriter.Dispose() + logger.WithFields(logger.Fields{ - "config": *configPath, - //"version": version.Version, + "config": *configPath, + "version": utils.BuildVersion, "chainName": utils.Config.Chain.Config.ConfigName}).Printf("starting") if utils.Config.Chain.Config.SlotsPerEpoch == 0 || utils.Config.Chain.Config.SecondsPerSlot == 0 { diff --git a/indexer/indexer.go b/indexer/indexer.go index 02902158..86e86d42 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -251,6 +251,36 @@ func (indexer *Indexer) runIndexer() { indexer.runMutex.Lock() defer indexer.runMutex.Unlock() + for { + genesis, err := indexer.rpcClient.GetGenesis() + if err != nil { + logger.Errorf("Indexer Error while fetching genesis: %v", err) + } else if genesis != nil { + genesisTime := uint64(genesis.Data.GenesisTime) + logger.Infof("RPC Genesis: Time: %v, ForkVersion: %v, GVR: %v", genesisTime, genesis.Data.GenesisForkVersion, genesis.Data.GenesisValidatorsRoot) + if genesisTime != utils.Config.Chain.GenesisTimestamp { + logger.Warnf("Genesis time from RPC does not match the genesis time from explorer configuration.") + } + if genesis.Data.GenesisForkVersion.String() != utils.Config.Chain.Config.GenesisForkVersion { + logger.Warnf("Genesis fork version from RPC does not match the genesis fork version explorer configuration.") + } + + err := indexer.runIndexerLoop() + if err == nil { + break + } + } + + logger.Warnf("Indexer couldn't do stuff it is supposed to do. Retrying in 10 sec...") + select { + case <-time.After(10 * time.Second): + } + } + + logger.Debugf("Indexer process shutdown") +} + +func (indexer *Indexer) runIndexerLoop() error { chainConfig := utils.Config.Chain.Config genesisTime := time.Unix(int64(utils.Config.Chain.GenesisTimestamp), 0) @@ -268,6 +298,7 @@ func (indexer *Indexer) runIndexer() { err := indexer.pollHeadBlock() if err != nil { logger.Errorf("Indexer Error while polling latest head: %v", err) + return err } // start block stream @@ -314,11 +345,13 @@ func (indexer *Indexer) runIndexer() { } } - //now := time.Now() + now := time.Now() indexer.processIndexing() indexer.processCacheCleanup() - //logger.Infof("indexer loop processing time: %v ms", time.Now().Sub(now).Milliseconds()) + logger.Debugf("indexer loop processing time: %v ms", time.Now().Sub(now).Milliseconds()) } + + return nil } func (indexer *Indexer) startSynchronization(startEpoch uint64) error { diff --git a/rpc/beaconapi.go b/rpc/beaconapi.go index 97a98c51..ca740d6a 100644 --- a/rpc/beaconapi.go +++ b/rpc/beaconapi.go @@ -11,12 +11,14 @@ import ( "time" "github.com/ethereum/go-ethereum/common/lru" - logger "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "github.com/pk910/light-beaconchain-explorer/rpctypes" "github.com/pk910/light-beaconchain-explorer/utils" ) +var logger = logrus.StandardLogger().WithField("module", "rpc") + type BeaconClient struct { endpoint string assignmentsCache *lru.Cache[uint64, *rpctypes.EpochAssignments] @@ -39,8 +41,8 @@ func NewBeaconClient(endpoint string, assignmentsCacheSize int) (*BeaconClient, var errNotFound = errors.New("not found 404") func (bc *BeaconClient) get(url string) ([]byte, error) { - //t0 := time.Now() - //defer func() { fmt.Println("RPC GET: ", url, time.Since(t0)) }() + t0 := time.Now() + defer func() { logger.Debugf("RPC call (byte): %v [%v ms]", url, time.Since(t0).Milliseconds()) }() client := &http.Client{Timeout: time.Second * 120} resp, err := client.Get(url) if err != nil { @@ -62,8 +64,8 @@ func (bc *BeaconClient) get(url string) ([]byte, error) { } func (bc *BeaconClient) getJson(url string, returnValue interface{}) error { - //t0 := time.Now() - //defer func() { fmt.Println("RPC GET (json): ", url, time.Since(t0)) }() + t0 := time.Now() + defer func() { logger.Debugf("RPC call (json): %v [%v ms]", url, time.Since(t0).Milliseconds()) }() client := &http.Client{Timeout: time.Second * 120} resp, err := client.Get(url) if err != nil { diff --git a/rpc/beaconstream.go b/rpc/beaconstream.go index 1565aa01..b13aad42 100644 --- a/rpc/beaconstream.go +++ b/rpc/beaconstream.go @@ -7,7 +7,6 @@ import ( "time" "github.com/donovanhide/eventsource" - logger "github.com/sirupsen/logrus" "github.com/pk910/light-beaconchain-explorer/rpctypes" ) @@ -58,16 +57,13 @@ func (bs *BeaconStream) startStream(endpoint string) { bs.runMutex.Lock() defer bs.runMutex.Unlock() - stream, err := eventsource.Subscribe(fmt.Sprintf("%s/eth/v1/events?topics=block,head", endpoint), "") - if err != nil { - logger.Errorf("Error while subscribing beacon block stream: %v", err) - } else { - defer stream.Close() - + stream := bs.subscribeStream(endpoint) + if stream != nil { running := true for running { select { case evt := <-stream.Events: + logger.Debugf("Event received from rpc event stream: %v", evt.Event()) if evt.Event() == "block" { bs.processBlockEvent(evt) } else if evt.Event() == "head" { @@ -75,16 +71,40 @@ func (bs *BeaconStream) startStream(endpoint string) { } case <-bs.killChan: running = false - case <-time.After(120 * time.Second): - // timeout - no block since 2 mins + case <-time.After(300 * time.Second): + // timeout - no block since 5 mins logger.Errorf("beacon block stream error, no new head retrieved since %v (%v ago)", bs.lastHeadSeen, time.Since(bs.lastHeadSeen)) + stream.Close() + stream = bs.subscribeStream(endpoint) + if stream == nil { + running = false + } } } } + if stream != nil { + stream.Close() + } bs.running = false bs.CloseChan <- true } +func (bs *BeaconStream) subscribeStream(endpoint string) *eventsource.Stream { + for { + stream, err := eventsource.Subscribe(fmt.Sprintf("%s/eth/v1/events?topics=block,head", endpoint), "") + if err != nil { + logger.Errorf("Error while subscribing beacon event stream: %v", err) + select { + case <-bs.killChan: + return nil + case <-time.After(10 * time.Second): + } + } else { + return stream + } + } +} + func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) { var parsed rpctypes.StandardV1StreamedBlockEvent err := json.Unmarshal([]byte(evt.Data()), &parsed) @@ -92,6 +112,7 @@ func (bs *BeaconStream) processBlockEvent(evt eventsource.Event) { logger.Warnf("beacon block stream failed to decode block event: %v", err) return } + logger.Debugf("RPC block event! slot: %v, block: %v", parsed.Slot, parsed.Block) bs.BlockChan <- &parsed } @@ -102,6 +123,7 @@ func (bs *BeaconStream) processHeadEvent(evt eventsource.Event) { logger.Warnf("beacon block stream failed to decode block event: %v", err) return } + logger.Debugf("RPC head event! slot: %v, block: %v, state: %v", parsed.Slot, parsed.Block, parsed.State) bs.lastHeadSeen = time.Now() bs.HeadChan <- &parsed } diff --git a/utils/config.go b/utils/config.go index 87bcbae4..fd13d7b9 100644 --- a/utils/config.go +++ b/utils/config.go @@ -99,6 +99,7 @@ func ReadConfig(cfg *types.Config, path string) error { "depositContractAddress": cfg.Chain.Config.DepositContractAddress, }).Infof("did init config") + cfg.Logging.OutputLevel = "debug" return nil }