Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gatewayz metrics #232

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Usage:

Flags:
--accounts Export per account metrics
--gatewayz Export gateway metrics
-a, --addr string Network host to listen on. (default "0.0.0.0")
--config string config file (default is ./nats-surveyor.yaml)
-c, --count int Expected number of servers (-1 for undefined). (default 1)
Expand Down
6 changes: 6 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func rootCmdArgs(args []string) []string {
"-prefix": "--prefix",
"-observe": "--observe",
"-jetstream": "--jetstream",
"-gatewayz": "--gatewayz",
}
newArgs := make([]string, 0)

Expand Down Expand Up @@ -236,6 +237,10 @@ func init() {
rootCmd.Flags().Bool("accounts", false, "Export per account metrics")
_ = viper.BindPFlag("accounts", rootCmd.Flags().Lookup("accounts"))

// gatewayz
rootCmd.Flags().Bool("gatewayz", false, "Export gateway metrics")
_ = viper.BindPFlag("gatewayz", rootCmd.Flags().Lookup("gatewayz"))

// log-level
rootCmd.Flags().String("log-level", "info", "Log level, one of: trace|debug|info|warn|error|fatal|panic")
_ = viper.BindPFlag("log-level", rootCmd.Flags().Lookup("log-level"))
Expand Down Expand Up @@ -269,6 +274,7 @@ func getSurveyorOpts() *surveyor.Options {
opts.ObservationConfigDir = viper.GetString("observe")
opts.JetStreamConfigDir = viper.GetString("jetstream")
opts.Accounts = viper.GetBool("accounts")
opts.Gatewayz = viper.GetBool("gatewayz")
opts.ServerResponseWait = viper.GetDuration("server-discovery-timeout")

logLevel, err := logrus.ParseLevel(viper.GetString("log-level"))
Expand Down
166 changes: 165 additions & 1 deletion surveyor/collector_statz.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type statzDescs struct {
GatewayRecvMsgs *prometheus.Desc
GatewayRecvBytes *prometheus.Desc
GatewayNumInbound *prometheus.Desc
OutboundGateways *gatewayzDescs
InboundGateways *gatewayzDescs

// Jetstream Info
JetstreamInfo *prometheus.Desc
Expand Down Expand Up @@ -130,6 +132,22 @@ type statzDescs struct {
accJetstreamReplicaCount *prometheus.Desc
}

// gatewayzDescs holds the gateway metric descriptions
type gatewayzDescs struct {
configured *prometheus.Desc
connStart *prometheus.Desc
connLastActivity *prometheus.Desc
connUptime *prometheus.Desc
connIdle *prometheus.Desc
connRtt *prometheus.Desc
connPendingBytes *prometheus.Desc
connInMsgs *prometheus.Desc
connOutMsgs *prometheus.Desc
connInBytes *prometheus.Desc
connOutBytes *prometheus.Desc
connSubscriptions *prometheus.Desc
}

// StatzCollector collects statz from a server deployment
type StatzCollector struct {
sync.Mutex
Expand All @@ -142,6 +160,7 @@ type StatzCollector struct {
statsChan chan *server.ServerStatsMsg
jsStats []*jsStat
accStats []*accountStats
gatewayStatz []*gatewayStatz
rtts map[string]time.Duration
pollTimeout time.Duration
reply string
Expand All @@ -153,13 +172,15 @@ type StatzCollector struct {
doneCh chan struct{}
descs statzDescs
collectAccounts bool
collectGatewayz bool
accStatZeroConn map[string]int
natsUp *prometheus.Desc

serverLabels []string
serverInfoLabels []string
routeLabels []string
gatewayLabels []string
gatewayzLabels []string
jsServerLabels []string
jsServerInfoLabels []string
constLabels prometheus.Labels
Expand Down Expand Up @@ -282,6 +303,12 @@ func (sc *StatzCollector) buildDescs() {
sc.descs.GatewayRecvBytes = newPromDesc("gateway_recv_bytes", "Number of messages sent over the gateway counter", sc.gatewayLabels)
sc.descs.GatewayNumInbound = newPromDesc("gateway_inbound_msg_count", "Number inbound messages through the gateway gauge", sc.gatewayLabels)

// Gatewayz
if sc.collectGatewayz {
sc.descs.OutboundGateways = sc.newGatewayzDescs("gatewayz_outbound_gateway", newPromDesc)
sc.descs.InboundGateways = sc.newGatewayzDescs("gatewayz_inbound_gateway", newPromDesc)
}

// Jetstream Info
sc.descs.JetstreamInfo = newPromDesc("jetstream_info", " Always 1. Contains metadata for cross-reference from other time-series", sc.jsServerInfoLabels)

Expand Down Expand Up @@ -396,7 +423,7 @@ func (sc *StatzCollector) buildDescs() {
}

// NewStatzCollector creates a NATS Statz Collector
func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, serverDiscoveryWait, pollTimeout time.Duration, accounts bool, constLabels prometheus.Labels) *StatzCollector {
func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, serverDiscoveryWait, pollTimeout time.Duration, accounts bool, gatewayz bool, constLabels prometheus.Labels) *StatzCollector {
sc := &StatzCollector{
nc: nc,
logger: logger,
Expand All @@ -407,13 +434,15 @@ func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, ser
servers: make(map[string]bool),
doneCh: make(chan struct{}, 1),
collectAccounts: accounts,
collectGatewayz: gatewayz,
accStatZeroConn: make(map[string]int),

// TODO - normalize these if possible. Jetstream varies from the other server labels
serverLabels: []string{"server_cluster", "server_name", "server_id"},
serverInfoLabels: []string{"server_cluster", "server_name", "server_id", "server_version"},
routeLabels: []string{"server_cluster", "server_name", "server_id", "server_route_name", "server_route_name_id"},
gatewayLabels: []string{"server_cluster", "server_name", "server_id", "server_gateway_name", "server_gateway_name_id"},
gatewayzLabels: []string{"server_id", "server_name", "gateway_name", "remote_gateway_name", "gateway_id", "cid"},
jsServerLabels: []string{"server_id", "server_name", "cluster_name"},
jsServerInfoLabels: []string{"server_name", "server_host", "server_id", "server_cluster", "server_domain", "server_version", "server_jetstream"},
constLabels: constLabels,
Expand All @@ -427,6 +456,23 @@ func NewStatzCollector(nc *nats.Conn, logger *logrus.Logger, numServers int, ser
return sc
}

func (sc *StatzCollector) newGatewayzDescs(gwType string, newPromDesc func(name, help string, labels []string) *prometheus.Desc) *gatewayzDescs {
return &gatewayzDescs{
configured: newPromDesc(gwType+"_configured", "configured", sc.gatewayzLabels),
connStart: newPromDesc(gwType+"_conn_start_time_seconds", "conn_start_time_seconds", sc.gatewayzLabels),
connLastActivity: newPromDesc(gwType+"_conn_last_activity_seconds", "conn_last_activity_seconds", sc.gatewayzLabels),
connUptime: newPromDesc(gwType+"_conn_uptime_seconds", "conn_uptime_seconds", sc.gatewayzLabels),
connIdle: newPromDesc(gwType+"_conn_idle_seconds", "conn_idle_seconds", sc.gatewayzLabels),
connRtt: newPromDesc(gwType+"_conn_rtt", "rtt", sc.gatewayzLabels),
connPendingBytes: newPromDesc(gwType+"_conn_pending_bytes", "pending_bytes", sc.gatewayzLabels),
connInMsgs: newPromDesc(gwType+"_conn_in_msgs", "in_msgs", sc.gatewayzLabels),
connOutMsgs: newPromDesc(gwType+"_conn_out_msgs", "out_msgs", sc.gatewayzLabels),
connInBytes: newPromDesc(gwType+"_conn_in_bytes", "in_bytes", sc.gatewayzLabels),
connOutBytes: newPromDesc(gwType+"_conn_out_bytes", "out_bytes", sc.gatewayzLabels),
connSubscriptions: newPromDesc(gwType+"_conn_subscriptions", "subscriptions", sc.gatewayzLabels),
}
}

func (sc *StatzCollector) handleResponse(msg *nats.Msg) {
m := &server.ServerStatsMsg{}

Expand Down Expand Up @@ -573,6 +619,13 @@ func (sc *StatzCollector) poll() error {
}
}

if sc.collectGatewayz {
err := sc.pollGatewayInfo()
if err != nil {
return err
}
}

if sc.collectAccounts {
return sc.pollAccountInfo()
}
Expand Down Expand Up @@ -662,6 +715,19 @@ func (sc *StatzCollector) pollAccountInfo() error {
return nil
}

func (sc *StatzCollector) pollGatewayInfo() error {
gatewayz, err := sc.getGatewayz(sc.nc)
if err != nil {
return err
}

sc.Lock()
sc.gatewayStatz = gatewayz
sc.Unlock()

return nil
}

func (sc *StatzCollector) getJSInfos(nc *nats.Conn) (map[string]*server.AccountDetail, []*jsStat) {
opts := server.JSzOptions{
Accounts: true,
Expand Down Expand Up @@ -729,6 +795,11 @@ type accStatz struct {
Data server.AccountStatz `json:"data,omitempty"`
}

type gatewayStatz struct {
server.ServerAPIResponse
Data *server.Gatewayz `json:"data,omitempty"`
}

type jsStat struct {
Server *server.ServerInfo
Data *server.JSInfo
Expand Down Expand Up @@ -802,6 +873,46 @@ func (sc *StatzCollector) getAccStatz(nc *nats.Conn) (map[string][]*accStat, err
return accStats, nil
}

func (sc *StatzCollector) getGatewayz(nc *nats.Conn) ([]*gatewayStatz, error) {
req := &server.GatewayzEventOptions{
GatewayzOptions: server.GatewayzOptions{
Accounts: false,
},
}
reqJSON, err := json.Marshal(req)
if err != nil {
return nil, err
}
res := make([]*gatewayStatz, 0)
const subj = "$SYS.REQ.SERVER.PING.GATEWAYZ"

msgs, err := requestMany(nc, sc, subj, reqJSON, true)
if err != nil {
sc.logger.Warnf("Error requesting gatewayz stats: %s", err.Error())
}

for _, msg := range msgs {
var g gatewayStatz

if err = unmarshalMsg(msg, &g); err != nil {
sc.logger.Warnf("Error deserializing gatewayz stats: %s", err.Error())
continue
}

if g.Error != nil {
sc.logger.Warnf("Error in Gatewayz stats response: %s", g.Error.Error())
continue
}

res = append(res, &g)
if sc.numServers != -1 && len(res) == sc.numServers {
break
}
}

return res, nil
}

func mergeStreamDetails(from, to *server.AccountDetail) {
Outer:
for _, stream := range from.Streams {
Expand Down Expand Up @@ -1140,6 +1251,25 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {
}
}

// Gatewayz metrics
if sc.collectGatewayz {
for _, gwStat := range sc.gatewayStatz {
if gwStat == nil || gwStat.Data == nil {
continue
}

for rgwName, gw := range gwStat.Data.OutboundGateways {
collectGatewayzMetrics(metrics, sc.descs.OutboundGateways, rgwName, gwStat, gw)
}

for rgwName, gws := range gwStat.Data.InboundGateways {
for _, gw := range gws {
collectGatewayzMetrics(metrics, sc.descs.InboundGateways, rgwName, gwStat, gw)
}
}
}
}

collectCh := make(chan prometheus.Metric)

// We want to collect these before we exit the flight group
Expand Down Expand Up @@ -1188,6 +1318,40 @@ func (sc *StatzCollector) Collect(ch chan<- prometheus.Metric) {

}

func collectGatewayzMetrics(metrics *metricSlice, gwDescs *gatewayzDescs, rgwName string, gwStat *gatewayStatz, gw *server.RemoteGatewayz) {
if gw == nil || gw.Connection == nil || gwStat == nil || gwStat.Server == nil || gwStat.Data == nil {
return
}

var isConfigured float64
if gw.IsConfigured {
isConfigured = 1
}
serverID := gwStat.Server.ID
serverName := gwStat.Server.Name
gwName := gwStat.Data.Name
gwID := gw.Connection.Name
cid := strconv.FormatUint(gw.Connection.Cid, 10)

// server_id, server_name, gateway_name, remote_gateway_name, gateway_id, cid
labels := []string{serverID, serverName, gwName, rgwName, gwID, cid}
uptime, _ := time.ParseDuration(gw.Connection.Uptime)
idle, _ := time.ParseDuration(gw.Connection.Idle)
rtt, _ := time.ParseDuration(gw.Connection.RTT)

metrics.newGaugeMetric(gwDescs.configured, isConfigured, labels)
metrics.newGaugeMetric(gwDescs.connLastActivity, float64(gw.Connection.LastActivity.Unix()), labels)
metrics.newGaugeMetric(gwDescs.connUptime, uptime.Seconds(), labels)
metrics.newGaugeMetric(gwDescs.connIdle, idle.Seconds(), labels)
metrics.newGaugeMetric(gwDescs.connRtt, rtt.Seconds(), labels)
metrics.newGaugeMetric(gwDescs.connPendingBytes, float64(gw.Connection.Pending), labels)
metrics.newGaugeMetric(gwDescs.connInMsgs, float64(gw.Connection.InMsgs), labels)
metrics.newGaugeMetric(gwDescs.connOutMsgs, float64(gw.Connection.OutMsgs), labels)
metrics.newGaugeMetric(gwDescs.connInBytes, float64(gw.Connection.InBytes), labels)
metrics.newGaugeMetric(gwDescs.connOutBytes, float64(gw.Connection.OutBytes), labels)
metrics.newGaugeMetric(gwDescs.connSubscriptions, float64(gw.Connection.NumSubs), labels)
}

func requestMany(nc *nats.Conn, sc *StatzCollector, subject string, data []byte, compression bool) ([]*nats.Msg, error) {
if subject == "" {
return nil, fmt.Errorf("subject cannot be empty")
Expand Down
3 changes: 2 additions & 1 deletion surveyor/surveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type Options struct {
ObservationConfigDir string
JetStreamConfigDir string
Accounts bool
Gatewayz bool
Logger *logrus.Logger // not exposed by CLI
NATSOpts []nats.Option // not exposed by CLI
ConstLabels prometheus.Labels // not exposed by CLI
Expand Down Expand Up @@ -200,7 +201,7 @@ func (s *Surveyor) createStatszCollector() error {
s.logger.Debugln("Skipping per-account exports")
}

s.statzC = NewStatzCollector(s.sysAcctPC.nc, s.logger, s.opts.ExpectedServers, s.opts.ServerResponseWait, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels)
s.statzC = NewStatzCollector(s.sysAcctPC.nc, s.logger, s.opts.ExpectedServers, s.opts.ServerResponseWait, s.opts.PollTimeout, s.opts.Accounts, s.opts.Gatewayz, s.opts.ConstLabels)
return s.promRegistry.Register(s.statzC)
}

Expand Down
54 changes: 54 additions & 0 deletions surveyor/surveyor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,60 @@ func TestSurveyor_Account(t *testing.T) {
}
}

func TestSurveyor_Gatewayz(t *testing.T) {
sc := st.NewSuperCluster(t)
defer sc.Shutdown()

opt := getTestOptions()
opt.Gatewayz = true
opt.ExpectedServers = 3
s, err := NewSurveyor(opt)
if err != nil {
t.Fatalf("couldn't create surveyor: %v", err)
}
if err = s.Start(); err != nil {
t.Fatalf("start error: %v", err)
}

defer s.Stop()

output, err := PollSurveyorEndpoint(t, "http://127.0.0.1:7777/metrics", false, http.StatusOK)
if err != nil {
t.Fatal(err)
}

want := []string{
"nats_core_gatewayz_inbound_gateway_configured",
"nats_core_gatewayz_inbound_gateway_conn_idle_seconds",
"nats_core_gatewayz_inbound_gateway_conn_in_bytes",
"nats_core_gatewayz_inbound_gateway_conn_in_msgs",
"nats_core_gatewayz_inbound_gateway_conn_last_activity_seconds",
"nats_core_gatewayz_inbound_gateway_conn_out_bytes",
"nats_core_gatewayz_inbound_gateway_conn_out_msgs",
"nats_core_gatewayz_inbound_gateway_conn_pending_bytes",
"nats_core_gatewayz_inbound_gateway_conn_rtt",
"nats_core_gatewayz_inbound_gateway_conn_subscriptions",
"nats_core_gatewayz_inbound_gateway_conn_uptime_seconds",
"nats_core_gatewayz_outbound_gateway_configured",
"nats_core_gatewayz_outbound_gateway_conn_idle_seconds",
"nats_core_gatewayz_outbound_gateway_conn_in_bytes",
"nats_core_gatewayz_outbound_gateway_conn_in_msgs",
"nats_core_gatewayz_outbound_gateway_conn_last_activity_seconds",
"nats_core_gatewayz_outbound_gateway_conn_out_bytes",
"nats_core_gatewayz_outbound_gateway_conn_out_msgs",
"nats_core_gatewayz_outbound_gateway_conn_pending_bytes",
"nats_core_gatewayz_outbound_gateway_conn_rtt",
"nats_core_gatewayz_outbound_gateway_conn_subscriptions",
"nats_core_gatewayz_outbound_gateway_conn_uptime_seconds",
}
for _, m := range want {
if !strings.Contains(output, m) {
t.Logf("output: %s", output)
t.Fatalf("missing: %s", m)
}
}
}

func TestSurveyor_AccountJetStreamAssets(t *testing.T) {
sc := st.NewJetStreamCluster(t)
defer sc.Shutdown()
Expand Down
Loading