diff --git a/monitor/xmonitor/metrics.go b/monitor/xmonitor/metrics.go index e4e76eadf..050deba0d 100644 --- a/monitor/xmonitor/metrics.go +++ b/monitor/xmonitor/metrics.go @@ -38,15 +38,15 @@ var ( Namespace: "monitor", Subsystem: "xchain", Name: "halo_attested_height", - Help: "The latest halo attested height of a specific chain", - }, []string{"chain"}) + Help: "The latest halo attested height of a specific chain version", + }, []string{"chain_version"}) attestedOffset = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "monitor", Subsystem: "xchain", Name: "halo_attest_offset", - Help: "The latest halo attest offset of a specific chain", - }, []string{"chain"}) + Help: "The latest halo attest offset of a specific chain version", + }, []string{"chain_version"}) attestedMsgOffset = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "monitor", diff --git a/monitor/xmonitor/monitor.go b/monitor/xmonitor/monitor.go index c415046d2..8d60b4a52 100644 --- a/monitor/xmonitor/monitor.go +++ b/monitor/xmonitor/monitor.go @@ -32,7 +32,9 @@ func Start( } go monitorHeadsForever(ctx, srcChain, headsFunc) - go monitorAttestedForever(ctx, srcChain, cprovider, network, xprovider) + for _, chainVer := range srcChain.ChainVersions() { + go monitorAttestedForever(ctx, chainVer, cprovider, network, xprovider) + } } // Monitors below only apply to EVM chains. @@ -130,19 +132,20 @@ func monitorHeadsForever( } } -// monitorAttestedForever blocks and periodically monitors the halo attested height and offsets of the given chain. +// monitorAttestedForever blocks and periodically monitors the halo attested height and offsets of the given chain version. func monitorAttestedForever( ctx context.Context, - srcChain netconf.Chain, + chainVer xchain.ChainVersion, cprovider cchain.Provider, network netconf.Network, xprovider xchain.Provider, ) { - chainVer := srcChain.ChainVersions()[0] - + chainVerName := network.ChainName(chainVer.ID) ticker := time.NewTicker(time.Minute) defer ticker.Stop() + var lastAttestOffset uint64 + for { select { case <-ctx.Done(): @@ -150,27 +153,35 @@ func monitorAttestedForever( case <-ticker.C: att, ok, err := cprovider.LatestAttestation(ctx, chainVer) if err != nil { - log.Warn(ctx, "Monitoring attested failed (will retry)", err, "chain", srcChain.Name) + log.Warn(ctx, "Attest offset monitor failed getting latest attestation (will retry)", err, "chain_version", chainVerName) continue } else if !ok { continue + } else if att.AttestOffset == lastAttestOffset { + continue } - attestedHeight.WithLabelValues(srcChain.Name).Set(float64(att.BlockHeight)) - attestedOffset.WithLabelValues(srcChain.Name).Set(float64(att.AttestOffset)) + attestedHeight.WithLabelValues(chainVerName).Set(float64(att.BlockHeight)) + attestedOffset.WithLabelValues(chainVerName).Set(float64(att.AttestOffset)) - // Query emit cursor cache for offsets of the original xblock from the chain itself. - for _, stream := range network.StreamsFrom(srcChain.ID) { - name := network.StreamName(stream) + // Query stream offsets of the original xblock from the chain itself. + for _, stream := range network.StreamsFrom(chainVer.ID) { + if stream.ConfLevel() != chainVer.ConfLevel { + continue + } + + streamName := network.StreamName(stream) cursor, _, err := xprovider.GetEmittedCursor(ctx, xchain.EmitRef{Height: &att.BlockHeight}, stream) if err != nil { - log.Warn(ctx, "Failed getting emit cursor (will retry)", err, "chain", srcChain.Name) + log.Warn(ctx, "Attest offset monitor failed getting emit cursor", err, "stream", streamName) continue } - attestedMsgOffset.WithLabelValues(name).Set(float64(cursor.MsgOffset)) + attestedMsgOffset.WithLabelValues(streamName).Set(float64(cursor.MsgOffset)) } + + lastAttestOffset = att.AttestOffset } } }