Skip to content

Commit

Permalink
refactored collector + fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
ahuret committed Nov 6, 2020
1 parent 927371d commit 5488095
Showing 1 changed file with 107 additions and 81 deletions.
188 changes: 107 additions & 81 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,47 @@ type CollectorConfig struct {
}

type Collector struct {
Logger log.Logger
ServerConfig config.ServerConfig
opcuaClient *opcua.Client
metricsCache []metric
Logger log.Logger
ServerConfig config.ServerConfig
opcuaClient *opcua.Client
opcuaMetricsCache []*opcuaMetric
statsMetricsCache []*metric
errorDesc *prometheus.Desc
}

type metric struct {
name string
labels map[string]string
labelsKeys []string
labelsValues []string
type opcuaMetric struct {
*metric
nodeID string
nodeReadValueID *ua.ReadValueID
promDesc *prometheus.Desc
typ prometheus.ValueType
}

type metric struct {
name string
properties *metricProperties
}

type metricProperties struct {
desc *prometheus.Desc
typ prometheus.ValueType
labels map[string]string
labelsKeys []string
labelsValues []string
}

func NewCollector(cfg *CollectorConfig) (*Collector, error) {
c := &Collector{Logger: cfg.Logger, ServerConfig: *cfg.Config.ServerConfig}
c.opcuaClient = client.NewClientFromServerConfig(*cfg.Config.ServerConfig, cfg.Logger)
if err := c.opcuaClient.Connect(context.Background()); err != nil {
var err error
c := &Collector{Logger: cfg.Logger, ServerConfig: *cfg.Config.ServerConfig, opcuaClient: client.NewClientFromServerConfig(*cfg.Config.ServerConfig, cfg.Logger)}
if err = c.opcuaClient.Connect(context.Background()); err != nil {
c.Logger.Fatal("cannot connect opcua client %v", err)
}
c.ReloadMetrics(cfg.Config.MetricsConfig)
c.statsMetricsCache = append(c.statsMetricsCache,
newMetric("opcua_scrape_walk_duration_seconds", "Time OPCUA walk/bulkwalk took.", prometheus.GaugeValue, nil),
newMetric("opcua_scrape_resp_returned", "RESPs returned from walk.", prometheus.GaugeValue, nil),
newMetric("opcua_scrape_duration_seconds", "Total OPCUA time scrape took (walk and processing).", prometheus.GaugeValue, nil),
newMetric("opcua_client_read_duration_seconds", "Time OPCUA to reconnect took.", prometheus.GaugeValue, nil),
)
c.errorDesc = prometheus.NewDesc("opcua_error", "error scraping target", nil, nil)
return c, nil
}

Expand All @@ -51,71 +68,99 @@ func (c *Collector) ReloadMetrics(cfg *config.MetricsConfig) {
}

func (c Collector) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range c.metricsCache {
ch <- metric.promDesc
for _, metric := range c.opcuaMetricsCache {
ch <- metric.properties.desc
}
for _, metric := range c.statsMetricsCache {
ch <- metric.properties.desc
}
}

func (c Collector) Collect(ch chan<- prometheus.Metric) {
start := time.Now()
opcuaResponse, promMetrics, err := c.scrapeTarget()

opcuaResponse, readDuration, err := c.scrapeTarget()
if err != nil {
c.Logger.Info("error scraping target : %s", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("opcua_error", "Error scraping target", nil, nil), err)
ch <- prometheus.NewInvalidMetric(c.errorDesc, err)
return
}
for _, metrics := range promMetrics {
ch <- metrics
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("opcua_scrape_walk_duration_seconds", "Time OPCUA walk/bulkwalk took.", nil, nil),
prometheus.GaugeValue,
time.Since(start).Seconds())
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("opcua_scrape_resp_returned", "RESPs returned from walk.", nil, nil),
prometheus.GaugeValue,
float64(len(opcuaResponse.Results)))
walkDuration := time.Since(start).Seconds()

for _, opcuaMetric := range c.opcuaToPrometheusMetrics(opcuaResponse) {
ch <- opcuaMetric
for idx, opcuaMetric := range c.opcuaMetricsCache {
value, err := c.getOpcuaValueFromIndex(opcuaResponse, idx)
if err != nil {
ch <- c.getErrorMetric(opcuaMetric.metric, err)
} else {
ch <- c.getMetricWithValue(opcuaMetric.metric, value)
}
}
for _, metric := range c.statsMetricsCache {
var value float64
switch metric.name {
case "opcua_client_read_duration_seconds":
value = readDuration
case "opcua_scrape_walk_duration_seconds":
value = walkDuration
case "opcua_scrape_resp_returned":
value = float64(len(opcuaResponse.Results))
case "opcua_scrape_duration_seconds":
value = time.Since(start).Seconds()
}
ch <- c.getMetricWithValue(metric, value)
}
}

ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("opcua_scrape_duration_seconds", "Total OPCUA time scrape took (walk and processing).", nil, nil),
prometheus.GaugeValue,
time.Since(start).Seconds())
func (c Collector) getErrorMetric(m *metric, err error) prometheus.Metric {
return prometheus.NewInvalidMetric(c.errorDesc, fmt.Errorf("error for metric %s with labels %v (%w)", m.name, m.properties.labels, err))
}

func (c Collector) getMetricWithValue(m *metric, value float64) prometheus.Metric {
metric, err := prometheus.NewConstMetric(m.properties.desc, m.properties.typ, value, m.properties.labelsValues...)
if err != nil {
return c.getErrorMetric(m, err)
}
return metric
}

func (c *Collector) loadMetricsCache(cfg *config.MetricsConfig) error {
var result []metric
var mm []*opcuaMetric
for _, m := range cfg.Metrics {
uaNodeID, err := ua.ParseNodeID(m.NodeID)
if err != nil {
return fmt.Errorf("invalid node id: %v", err)
}
var keys, values []string
for k, v := range m.Labels {
keys = append(keys, k)
values = append(values, v)
}
result = append(result, metric{
name: m.Name,
labels: m.Labels,
labelsKeys: keys,
labelsValues: values,
mm = append(mm, &opcuaMetric{
nodeID: m.NodeID,
nodeReadValueID: &ua.ReadValueID{NodeID: uaNodeID},
promDesc: prometheus.NewDesc(m.Name, m.Help, keys, nil),
typ: getMetricValueType(m.Type)})
metric: newMetric(m.Name, m.Help, getMetricValueType(m.Type), m.Labels),
})
}
c.metricsCache = result
c.opcuaMetricsCache = mm
return nil
}

func (c *Collector) scrapeTarget() (*ua.ReadResponse, []prometheus.Metric, error) {
func newMetric(name string, help string, typ prometheus.ValueType, labels map[string]string) *metric {
var keys, values []string
for k, v := range labels {
keys = append(keys, k)
values = append(values, v)
}
return &metric{
name: name,
properties: &metricProperties{
desc: prometheus.NewDesc(name, help, keys, nil),
typ: typ,
labels: labels,
labelsKeys: keys,
labelsValues: values,
},
}
}

func (c *Collector) scrapeTarget() (*ua.ReadResponse, float64, error) {
var opcuaNodeIDs []*ua.ReadValueID
var promMetrics []prometheus.Metric
for _, metric := range c.metricsCache {
for _, metric := range c.opcuaMetricsCache {
opcuaNodeIDs = append(opcuaNodeIDs, metric.nodeReadValueID)
}

Expand All @@ -128,36 +173,17 @@ func (c *Collector) scrapeTarget() (*ua.ReadResponse, []prometheus.Metric, error
resp, err := c.opcuaClient.Read(req)
if err != nil {
c.Logger.Err("read failed: %s", err)
return nil, nil, err
}
promMetrics = append(promMetrics, prometheus.MustNewConstMetric(
prometheus.NewDesc("opcua_client_read_duration_seconds", "Time OPCUA to reconnect took.", nil, nil),
prometheus.GaugeValue,
time.Since(start).Seconds()))

return resp, promMetrics, nil
}

func (c *Collector) opcuaToPrometheusMetrics(opcuaResponse *ua.ReadResponse) []prometheus.Metric {
var opcuaMetrics []prometheus.Metric
for idx, m := range c.metricsCache {
var sample prometheus.Metric
var err error
opcuaResult := opcuaResponse.Results[idx]

switch status := opcuaResult.Status; {
case status != ua.StatusOK:
sample = prometheus.NewInvalidMetric(prometheus.NewDesc("opcua_error", "Error calling NewConstMetric", nil, nil),
fmt.Errorf("error for metric %s with labels %v: %v", m.name, m.labels, opcuaResult.Status))
case status == ua.StatusOK:
if sample, err = prometheus.NewConstMetric(m.promDesc, m.typ, opcuaResult.Value.Float(), m.labelsValues...); err != nil {
sample = prometheus.NewInvalidMetric(prometheus.NewDesc("opcua_error", "Error calling NewConstMetric", nil, nil),
fmt.Errorf("error for metric %s with labels %v: %v", m.name, m.labels, err))
}
}
opcuaMetrics = append(opcuaMetrics, sample)
return nil, -1, err
}
return resp, time.Since(start).Seconds(), nil
}

func (c *Collector) getOpcuaValueFromIndex(opcuaResponse *ua.ReadResponse, idx int) (float64, error) {
r := opcuaResponse.Results[idx]
if r.Status != ua.StatusOK {
return -1, fmt.Errorf("invalid status %v", r.Status)
}
return opcuaMetrics
return r.Value.Float(), nil
}

func getMetricValueType(metricType string) prometheus.ValueType {
Expand Down

0 comments on commit 5488095

Please sign in to comment.