diff --git a/.travis.yml b/.travis.yml index 6091028..066504b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,11 +21,11 @@ before_install: - docker run -p 9200:9200 --name elastic -p 9300:9300 -e "discovery.type=single-node" -d docker.elastic.co/elasticsearch/elasticsearch:$ELASTIC_VERSION - docker pull quay.io/interconnectedcloud/qdrouterd:$QDROUTERD_VERSION - docker run -p 5672:5672 -d quay.io/interconnectedcloud/qdrouterd:$QDROUTERD_VERSION - - docker pull centos:7 + - docker pull centos:8 # execute unit testing and code coverage install: - - docker run -eCOVERALLS_TOKEN -uroot --network host -it --volume $PWD:/go/src/github.com/infrawatch/smart-gateway:z --workdir /go/src/github.com/infrawatch/smart-gateway centos:7 /bin/sh -c 'sh ./build/test-framework/run_tests.sh' + - docker run -eCOVERALLS_TOKEN -uroot --network host -it --volume $PWD:/go/src/github.com/infrawatch/smart-gateway:z --workdir /go/src/github.com/infrawatch/smart-gateway centos:8 /bin/sh -c 'sh ./build/test-framework/run_tests.sh' # functional testing (validate it works) script: diff --git a/build/test-framework/run_tests.sh b/build/test-framework/run_tests.sh index 60fbc54..1297053 100755 --- a/build/test-framework/run_tests.sh +++ b/build/test-framework/run_tests.sh @@ -10,10 +10,11 @@ export PATH=$PATH:$GOPATH/bin # get dependencies sed -i '/^tsflags=.*/a ip_resolve=4' /etc/yum.conf yum install -y epel-release -yum install -y https://centos7.iuscommunity.org/ius-release.rpm -yum remove -y git* -yum install -y git216-all -yum install -y golang qpid-proton-c-devel iproute +# below is not available currently +#yum install -y https://centos7.iuscommunity.org/ius-release.rpm +#yum remove -y git* +#yum install -y git216-all +yum install -y git golang qpid-proton-c-devel iproute go get -u golang.org/x/tools/cmd/cover go get -u github.com/mattn/goveralls go get -u golang.org/x/lint/golint diff --git a/internal/pkg/amqp10/receiver.go b/internal/pkg/amqp10/receiver.go index 9661b63..529a70a 100644 --- a/internal/pkg/amqp10/receiver.go +++ b/internal/pkg/amqp10/receiver.go @@ -23,8 +23,13 @@ import ( "fmt" "log" "os" + "os/signal" + "reflect" "strings" + "sync" + "github.com/infrawatch/smart-gateway/internal/pkg/cacheutil" + "github.com/infrawatch/smart-gateway/internal/pkg/saconfig" "github.com/prometheus/client_golang/prometheus" "qpid.apache.org/amqp" "qpid.apache.org/electron" @@ -49,6 +54,12 @@ type AMQPServer struct { collectinterval float64 } +//AMQPServerItem hold information about data source which is AMQPServer listening to. +type AMQPServerItem struct { + Server *AMQPServer + DataSource saconfig.DataSource +} + //AMQPHandler ... type AMQPHandler struct { totalCount int @@ -298,3 +309,87 @@ func fatalIf(err error) { log.Fatal(err) } } + +//SpawnSignalHandler spawns goroutine which will wait for interruption signal(s) +// and end smart gateway in case any of the signal is received +func SpawnSignalHandler(finish chan bool, watchedSignals ...os.Signal) { + interruptChannel := make(chan os.Signal, 1) + signal.Notify(interruptChannel, watchedSignals...) + go func() { + signalLoop: + for sig := range interruptChannel { + log.Printf("Stopping execution on caught signal: %+v\n", sig) + close(finish) + break signalLoop + } + }() +} + +//SpawnQpidStatusReporter builds dynamic select for reporting status of AMQP connections +func SpawnQpidStatusReporter(wg *sync.WaitGroup, applicationHealth *cacheutil.ApplicationHealthCache, qpidStatusCases []reflect.SelectCase) { + wg.Add(1) + go func() { + defer wg.Done() + finishCase := len(qpidStatusCases) - 1 + statusLoop: + for { + switch index, status, _ := reflect.Select(qpidStatusCases); index { + case finishCase: + break statusLoop + default: + // Note: status here is always very low integer, so we don't need to be afraid of int64>int conversion + applicationHealth.QpidRouterState = int(status.Int()) + } + } + log.Println("Closing QPID status reporter") + }() +} + +//CreateMessageLoopComponents creates signal select cases for configured AMQP1.0 connections and connects to all of thos +func CreateMessageLoopComponents(config interface{}, finish chan bool, amqpHandler *AMQPHandler, uniqueName string) ([]reflect.SelectCase, []reflect.SelectCase, []AMQPServerItem) { + var ( + debug bool + prefetch int + connections []saconfig.AMQPConnection + ) + switch conf := config.(type) { + case *saconfig.EventConfiguration: + debug = conf.Debug + prefetch = conf.Prefetch + connections = conf.AMQP1Connections + case *saconfig.MetricConfiguration: + debug = conf.Debug + prefetch = conf.Prefetch + connections = conf.AMQP1Connections + default: + panic("Invalid type of configuration file struct.") + } + + processingCases := make([]reflect.SelectCase, 0, len(connections)) + qpidStatusCases := make([]reflect.SelectCase, 0, len(connections)) + amqpServers := make([]AMQPServerItem, 0, len(connections)) + for _, conn := range connections { + amqpServer := NewAMQPServer(conn.URL, debug, -1, prefetch, amqpHandler, uniqueName) + //create select case for this listener + processingCases = append(processingCases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(amqpServer.GetNotifier()), + }) + qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(amqpServer.GetStatus()), + }) + amqpServers = append(amqpServers, AMQPServerItem{amqpServer, conn.DataSourceID}) + } + log.Println("Listening for AMQP1.0 messages") + // include also case for finishing the loops + processingCases = append(processingCases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(finish), + }) + qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(finish), + }) + return processingCases, qpidStatusCases, amqpServers +} diff --git a/internal/pkg/cacheutil/cacheserver.go b/internal/pkg/cacheutil/cacheserver.go index 81ad69c..07e6088 100644 --- a/internal/pkg/cacheutil/cacheserver.go +++ b/internal/pkg/cacheutil/cacheserver.go @@ -6,7 +6,6 @@ import ( "time" "github.com/infrawatch/smart-gateway/internal/pkg/metrics/incoming" - "github.com/infrawatch/smart-gateway/internal/pkg/saconfig" ) // MAXTTL to remove plugin is stale for 5 @@ -122,7 +121,6 @@ func (i IncomingDataCache) GetShard(key string) *ShardedIncomingDataCache { if i.hosts[key] == nil { i.Put(key) } - return i.hosts[key] } @@ -148,20 +146,16 @@ func (shard *ShardedIncomingDataCache) Size() int { } //SetData ... -//TODO : add generic -//TODO(mmagr): either don't export or maybe make sure data.Host has the same //value as is saved under in DataCache func (shard *ShardedIncomingDataCache) SetData(data incoming.MetricDataFormat) error { shard.lock.Lock() defer shard.lock.Unlock() if shard.plugin[data.GetItemKey()] == nil { - //TODO: change this to more generic later - shard.plugin[data.GetItemKey()] = incoming.NewFromDataSource(saconfig.DataSourceCollectd) + shard.plugin[data.GetItemKey()] = incoming.NewFromDataSourceName(data.GetDataSourceName()) } shard.lastAccess = time.Now().Unix() - collectd := shard.plugin[data.GetItemKey()] - collectd.SetData(data) - + metric := shard.plugin[data.GetItemKey()] + metric.SetData(data) return nil } @@ -197,11 +191,10 @@ func (cs *CacheServer) Put(incomingData incoming.MetricDataFormat) { case buffer = <-freeList: //go one from buffer default: - buffer = new(IncomingBuffer) + buffer = &IncomingBuffer{} } buffer.data = incomingData cs.ch <- buffer - } func (cs CacheServer) loop() { diff --git a/internal/pkg/cacheutil/processcache.go b/internal/pkg/cacheutil/processcache.go index 9633f8b..13cbeb6 100644 --- a/internal/pkg/cacheutil/processcache.go +++ b/internal/pkg/cacheutil/processcache.go @@ -3,7 +3,6 @@ package cacheutil import ( "log" - "github.com/infrawatch/smart-gateway/internal/pkg/metrics/incoming" "github.com/infrawatch/smart-gateway/internal/pkg/tsdb" "github.com/prometheus/client_golang/prometheus" ) @@ -32,27 +31,25 @@ func (shard *ShardedIncomingDataCache) FlushPrometheusMetric(usetimestamp bool, defer shard.lock.Unlock() minMetricCreated := 0 //..minimum of one metrics created - for _, IncomingDataInterface := range shard.plugin { - if collectd, ok := IncomingDataInterface.(*incoming.CollectdMetric); ok { - if collectd.ISNew() { - collectd.SetNew(false) - for index := range collectd.Values { - m, err := tsdb.NewCollectdMetric(usetimestamp, *collectd, index) - if err != nil { - log.Printf("newMetric: %v", err) - continue - } - ch <- m - minMetricCreated++ - } - } else { - //clean up if data is not access for max TTL specified - if shard.Expired() { - delete(shard.plugin, collectd.GetItemKey()) - //log.Printf("Cleaned up plugin for %s", collectd.GetKey()) + for _, dataInterface := range shard.plugin { + if dataInterface.ISNew() { + dataInterface.SetNew(false) + for index := range dataInterface.GetValues() { + m, err := tsdb.NewPrometheusMetric(usetimestamp, dataInterface.GetDataSourceName(), dataInterface, index) + if err != nil { + log.Printf("newMetric: %v", err) + continue } + ch <- m + minMetricCreated++ + } + } else { + //clean up if data is not access for max TTL specified + if shard.Expired() { + delete(shard.plugin, dataInterface.GetItemKey()) } } + } return minMetricCreated } @@ -62,16 +59,14 @@ func (shard *ShardedIncomingDataCache) FlushAllMetrics() { shard.lock.Lock() defer shard.lock.Unlock() for _, dataInterface := range shard.plugin { - if collectd, ok := dataInterface.(*incoming.CollectdMetric); ok { - if collectd.ISNew() { - collectd.SetNew(false) - log.Printf("New Metrics %#v\n", collectd) - } else { - //clean up if data is not access for max TTL specified - if shard.Expired() { - delete(shard.plugin, collectd.GetItemKey()) - log.Printf("Cleaned up plugin for %s", collectd.GetItemKey()) - } + if dataInterface.ISNew() { + dataInterface.SetNew(false) + log.Printf("New Metrics %#v\n", dataInterface) + } else { + //clean up if data is not access for max TTL specified + if shard.Expired() { + delete(shard.plugin, dataInterface.GetItemKey()) + log.Printf("Cleaned up plugin for %s", dataInterface.GetItemKey()) } } } diff --git a/internal/pkg/events/events.go b/internal/pkg/events/events.go index 2b0f501..44a7d3a 100644 --- a/internal/pkg/events/events.go +++ b/internal/pkg/events/events.go @@ -10,7 +10,6 @@ import ( "math/rand" "net/http" "os" - "os/signal" "reflect" "strconv" "sync" @@ -46,12 +45,6 @@ const ( ` ) -//AMQPServerItem hold information about data source which is AMQPServer listening to. -type AMQPServerItem struct { - Server *amqp10.AMQPServer - DataSource saconfig.DataSource -} - /*************** main routine ***********************/ // eventusage and command-line flags func eventusage() { @@ -68,21 +61,6 @@ func eventusage() { var debuge = func(format string, data ...interface{}) {} // Default no debugging output -//spawnSignalHandler spawns goroutine which will wait for interruption signal(s) -// and end smart gateway in case any of the signal is received -func spawnSignalHandler(finish chan bool, watchedSignals ...os.Signal) { - interruptChannel := make(chan os.Signal, 1) - signal.Notify(interruptChannel, watchedSignals...) - go func() { - signalLoop: - for sig := range interruptChannel { - log.Printf("Stopping execution on caught signal: %+v\n", sig) - close(finish) - break signalLoop - } - }() -} - //spawnAPIServer spawns goroutine which provides http API for alerts and metrics statistics for Prometheus func spawnAPIServer(wg *sync.WaitGroup, finish chan bool, serverConfig saconfig.EventConfiguration, metricHandler *api.EventMetricHandler, amqpHandler *amqp10.AMQPHandler) { prometheus.MustRegister(metricHandler, amqpHandler) @@ -122,26 +100,6 @@ func spawnAPIServer(wg *sync.WaitGroup, finish chan bool, serverConfig saconfig. }() } -//spawnQpidStatusReporter builds dynamic select for reporting status of AMQP connections -func spawnQpidStatusReporter(wg *sync.WaitGroup, applicationHealth *cacheutil.ApplicationHealthCache, qpidStatusCases []reflect.SelectCase) { - wg.Add(1) - go func() { - defer wg.Done() - finishCase := len(qpidStatusCases) - 1 - statusLoop: - for { - switch index, status, _ := reflect.Select(qpidStatusCases); index { - case finishCase: - break statusLoop - default: - // Note: status here is always very low integer, so we don't need to be afraid of int64>int conversion - applicationHealth.QpidRouterState = int(status.Int()) - } - } - log.Println("Closing QPID status reporter") - }() -} - //notifyAlertManager generates alert from event for Prometheus Alert Manager func notifyAlertManager(wg *sync.WaitGroup, serverConfig saconfig.EventConfiguration, event *incoming.EventDataFormat, record string) { wg.Add(1) @@ -177,7 +135,7 @@ func StartEvents() { var wg sync.WaitGroup finish := make(chan bool) - spawnSignalHandler(finish, os.Interrupt) + amqp10.SpawnSignalHandler(finish, os.Interrupt) log.SetFlags(log.LstdFlags | log.Lshortfile) // set flags for parsing options @@ -272,33 +230,9 @@ func StartEvents() { } // AMQP connection(s) - processingCases := make([]reflect.SelectCase, 0, len(serverConfig.AMQP1Connections)) - qpidStatusCases := make([]reflect.SelectCase, 0, len(serverConfig.AMQP1Connections)) - amqpServers := make([]AMQPServerItem, 0, len(serverConfig.AMQP1Connections)) - for _, conn := range serverConfig.AMQP1Connections { - amqpServer := amqp10.NewAMQPServer(conn.URL, serverConfig.Debug, -1, serverConfig.Prefetch, amqpHandler, *fUniqueName) - //create select case for this listener - processingCases = append(processingCases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(amqpServer.GetNotifier()), - }) - qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(amqpServer.GetStatus()), - }) - amqpServers = append(amqpServers, AMQPServerItem{amqpServer, conn.DataSourceID}) - } - log.Println("Listening for AMQP1.0 messages") - // include also case for finishing the loops - processingCases = append(processingCases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(finish), - }) - qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(finish), - }) - spawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases) + processingCases, qpidStatusCases, amqpServers := amqp10.CreateMessageLoopComponents(serverConfig, finish, amqpHandler, *fUniqueName) + amqp10.SpawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases) + // spawn event processor wg.Add(1) go func() { diff --git a/internal/pkg/events/incoming/ceilometer.go b/internal/pkg/events/incoming/ceilometer.go index 0b0ac76..e7d3a29 100644 --- a/internal/pkg/events/incoming/ceilometer.go +++ b/internal/pkg/events/incoming/ceilometer.go @@ -152,7 +152,6 @@ func (evt *CeilometerEvent) ParseEvent(data string) error { newPayload[key] = value } } - fmt.Printf("newPayload: %v\n", newPayload) } (*evt).parsed["payload"] = newPayload } diff --git a/internal/pkg/metrics/incoming/ceilometer.go b/internal/pkg/metrics/incoming/ceilometer.go index 628305c..5318888 100644 --- a/internal/pkg/metrics/incoming/ceilometer.go +++ b/internal/pkg/metrics/incoming/ceilometer.go @@ -1,73 +1,228 @@ package incoming import ( - "time" + "fmt" + "log" + "regexp" + "strings" + + jsoniter "github.com/json-iterator/go" +) + +var ( + rexForPayload = regexp.MustCompile(`\"payload\"\s*:\s*\[(.*)\]`) + rexForOsloMessage = regexp.MustCompile(`"oslo.message"\s*:\s*"({.*})"`) + rexForNestedQuote = regexp.MustCompile(`\\\"`) ) +const defaultCeilometerInterval = 5.0 + // CeilometerMetric struct represents metric data formated and sent by Ceilometer type CeilometerMetric struct { - Publisher string - Timestamp time.Time - EventType string - //TODO(mmagr): include payload unmarshalled data here + WithDataSource + Publisher string `json:"publisher_id"` + Payload map[string]interface{} `json:"payload"` + // analogy to collectd metric + Plugin string + PluginInstance string + Type string + TypeInstance string + Values []float64 + new bool + wholeID string } /*************************** MetricDataFormat interface ****************************/ -//GetName ... +func (c *CeilometerMetric) getwholeID() string { + if c.wholeID == "" { + if cnt, ok := c.Payload["counter_name"]; ok { + c.wholeID = cnt.(string) + } else { + log.Printf("Did not find counter_name in metric payload: %v\n", c.Payload) + c.wholeID = "unknown" + } + } + return c.wholeID +} + +//GetName returns name of Ceilometer "plugin" (analogically to CollectdMetric implementation) func (c *CeilometerMetric) GetName() string { - return "" + return c.Plugin } -//SetData ... -func (c *CeilometerMetric) SetData(data MetricDataFormat) {} +//GetValues returns Values. The purpose of this method is to be able to get metric Values +//from the interface object itself +func (c *CeilometerMetric) GetValues() []float64 { + return c.Values +} + +//SetData generates naming and value data analogicaly to CollectdMetric from counter data and resource_id +func (c *CeilometerMetric) SetData(data MetricDataFormat) { + // example: counter_name=compute.instance.booting.time, resource_id=456 + // get Plugin -> compute + if ceilo, ok := data.(*CeilometerMetric); ok { + c.Payload = ceilo.Payload + c.Publisher = ceilo.Publisher + + plugParts := strings.Split(ceilo.getwholeID(), ".") + c.Plugin = plugParts[0] + // get PluginInstance -> 456 + if resource, ok := ceilo.Payload["resource_id"]; ok { + c.PluginInstance = resource.(string) + } + // get Type -> instance + if len(plugParts) > 1 { + c.Type = plugParts[1] + } else { + c.Type = plugParts[0] + } + // get TypeInstance -> booting + if len(plugParts) > 2 { + c.TypeInstance = plugParts[2] + } + + values := make([]float64, 0, 1) + if val, ok := ceilo.Payload["counter_volume"]; ok { + values = append(values, val.(float64)) + } else { + log.Printf("Did not find counter_volume in metric payload: %v\n", ceilo.Payload) + } + c.Values = values + c.SetNew(true) + } +} + +//sanitize search and removes all known issues in received data. +//TODO: Move this function to apputils +func (c *CeilometerMetric) sanitize(data string) string { + sanitized := data + // parse only relevant data + sub := rexForOsloMessage.FindStringSubmatch(sanitized) + if len(sub) == 2 { + sanitized = rexForNestedQuote.ReplaceAllString(sub[1], `"`) + } else { + log.Printf("Failed to find oslo.message in given message: %s\n", data) + } + // avoid getting payload data wrapped in array + item := rexForPayload.FindStringSubmatch(sanitized) + if len(item) == 2 { + sanitized = rexForPayload.ReplaceAllString(sanitized, fmt.Sprintf(`"payload":%s`, item[1])) + } + return sanitized +} //ParseInputJSON ... -func (c *CeilometerMetric) ParseInputJSON(json string) ([]MetricDataFormat, error) { - return make([]MetricDataFormat, 0), nil +func (c *CeilometerMetric) ParseInputJSON(data string) ([]MetricDataFormat, error) { + output := make([]MetricDataFormat, 0) + sanitized := c.sanitize(data) + // parse only relevant data + var json = jsoniter.ConfigCompatibleWithStandardLibrary + err := json.Unmarshal([]byte(sanitized), &c) + if err != nil { + return output, fmt.Errorf("error parsing json: %s", err) + } + c.DataSource.SetFromString("ceilometer") + c.SetNew(true) + c.SetData(c) + output = append(output, c) + return output, nil } //GetKey ... -func (c *CeilometerMetric) GetKey() string { - return "" +func (c CeilometerMetric) GetKey() string { + return c.Publisher } -//GetItemKey ... +//GetItemKey returns name cache key analogically to CollectdMetric implementation func (c *CeilometerMetric) GetItemKey() string { - return "" + parts := []string{c.Plugin} + if c.Plugin != c.Type { + parts = append(parts, c.Type) + } + if c.PluginInstance != "" { + parts = append(parts, c.PluginInstance) + } + if c.TypeInstance != "" { + parts = append(parts, c.TypeInstance) + } + return strings.Join(parts, "_") } -//ParseInputByte ... +//ParseInputByte is not really used. It is here just to implement MetricDataFormat +//TODO: Remove this method from here and also CollectdMetric func (c *CeilometerMetric) ParseInputByte(data []byte) error { - return nil + _, err := c.ParseInputJSON(string(data)) + return err } -//GetInterval ... +//GetInterval returns hardcoded defaultCeilometerInterval, because Ceilometer metricDesc +//does not contain interval information (are not periodically sent at all) and any reasonable +//interval might be needed for expiry setting for Prometheus +//TODO: Make this configurable func (c *CeilometerMetric) GetInterval() float64 { - return 0.0 + return defaultCeilometerInterval } //SetNew ... -func (c *CeilometerMetric) SetNew(new bool) {} +func (c *CeilometerMetric) SetNew(new bool) { + c.new = new +} //ISNew ... func (c *CeilometerMetric) ISNew() bool { - return true + return c.new } /*************************** tsdb.TSDB interface *****************************/ //GetLabels ... func (c *CeilometerMetric) GetLabels() map[string]string { - return make(map[string]string) + labels := make(map[string]string) + if c.TypeInstance != "" { + labels[c.Plugin] = c.TypeInstance + } else { + labels[c.Plugin] = c.PluginInstance + } + labels["publisher"] = c.Publisher + if ctype, ok := c.Payload["counter_type"]; ok { + labels["type"] = ctype.(string) + } else { + labels["type"] = "base" + } + if cproj, ok := c.Payload["project_id"]; ok { + labels["project"] = cproj.(string) + } + if cres, ok := c.Payload["resource_id"]; ok { + labels["resource"] = cres.(string) + } + if cunit, ok := c.Payload["counter_unit"]; ok { + labels["unit"] = cunit.(string) + } + if cname, ok := c.Payload["counter_name"]; ok { + labels["counter"] = cname.(string) + } + return labels } //GetMetricName ... func (c *CeilometerMetric) GetMetricName(index int) string { - return "" + nameParts := []string{"ceilometer", c.Plugin} + if c.Plugin != c.Type { + nameParts = append(nameParts, c.Type) + } + if c.TypeInstance != "" { + nameParts = append(nameParts, c.TypeInstance) + } + return strings.Join(nameParts, "_") } //GetMetricDesc ... func (c *CeilometerMetric) GetMetricDesc(index int) string { - return "" + dstype := "counter" + if ctype, ok := c.Payload["counter_type"]; ok { + dstype = ctype.(string) + } + return fmt.Sprintf("Service Telemetry exporter: '%s' Type: '%s' Dstype: '%s' Dsname: '%s'", + c.Plugin, c.Type, dstype, c.getwholeID()) } diff --git a/internal/pkg/metrics/incoming/collectd.go b/internal/pkg/metrics/incoming/collectd.go index 9fbea78..b933ebc 100644 --- a/internal/pkg/metrics/incoming/collectd.go +++ b/internal/pkg/metrics/incoming/collectd.go @@ -11,6 +11,7 @@ import ( // CollectdMetric struct represents metric data formated and sent by collectd type CollectdMetric struct { + WithDataSource Values []float64 `json:"values"` Dstypes []string `json:"dstypes"` Dsnames []string `json:"dsnames"` @@ -26,6 +27,12 @@ type CollectdMetric struct { /*************************** MetricDataFormat interface ****************************/ +//GetValues returns Values. The purpose of this method is to be able to get metric Values +//from the interface object itself +func (c CollectdMetric) GetValues() []float64 { + return c.Values +} + // GetName implement interface func (c CollectdMetric) GetName() string { return c.Plugin @@ -118,6 +125,7 @@ func (c *CollectdMetric) ParseInputJSON(jsonString string) ([]MetricDataFormat, } retDtype := make([]MetricDataFormat, len(collect)) for index, rt := range collect { + rt.DataSource.SetFromString("collectd") retDtype[index] = &rt } return retDtype, nil @@ -155,7 +163,7 @@ func (c CollectdMetric) GetLabels() map[string]string { //GetMetricDesc newDesc converts one data source of a value list to a Prometheus description. func (c CollectdMetric) GetMetricDesc(index int) string { - help := fmt.Sprintf("Service Assurance exporter: '%s' Type: '%s' Dstype: '%s' Dsname: '%s'", + help := fmt.Sprintf("Service Telemetry exporter: '%s' Type: '%s' Dstype: '%s' Dsname: '%s'", c.Plugin, c.Type, c.Dstypes[index], c.DSName(index)) return help } diff --git a/internal/pkg/metrics/incoming/formats.go b/internal/pkg/metrics/incoming/formats.go index e8524cd..2dad839 100644 --- a/internal/pkg/metrics/incoming/formats.go +++ b/internal/pkg/metrics/incoming/formats.go @@ -15,6 +15,18 @@ type MetricDataFormat interface { GetInterval() float64 SetNew(new bool) ISNew() bool + GetValues() []float64 + GetDataSourceName() string +} + +//WithDataSource is composition struct for adding DataSource parameter +type WithDataSource struct { + DataSource saconfig.DataSource +} + +//GetDataSourceName returns string representation of DataSource +func (ds WithDataSource) GetDataSourceName() string { + return ds.DataSource.String() } //NewFromDataSource creates empty DataType according to given DataSource @@ -28,13 +40,28 @@ func NewFromDataSource(source saconfig.DataSource) MetricDataFormat { return nil } +//NewFromDataSourceName creates empty DataType according to given name of DataSource +func NewFromDataSourceName(source string) MetricDataFormat { + switch source { + case saconfig.DataSourceCollectd.String(): + return newCollectdMetric( /*...*/ ) + case saconfig.DataSourceCeilometer.String(): + return newCeilometerMetric() + } + return nil +} + //newCollectd -- avoid calling this . Use factory method in incoming package func newCollectdMetric() *CollectdMetric { - return new(CollectdMetric) + metric := new(CollectdMetric) + metric.DataSource = saconfig.DataSourceCollectd + return metric } func newCeilometerMetric() *CeilometerMetric { - return new(CeilometerMetric) + metric := new(CeilometerMetric) + metric.DataSource = saconfig.DataSourceCeilometer + return metric } //ParseByte parse incoming data diff --git a/internal/pkg/metrics/metrics.go b/internal/pkg/metrics/metrics.go index f21c7b0..3c6f8c3 100644 --- a/internal/pkg/metrics/metrics.go +++ b/internal/pkg/metrics/metrics.go @@ -8,8 +8,9 @@ import ( "net/http" "net/http/pprof" "os" - "os/signal" + "reflect" "strconv" + "sync" "time" "github.com/MakeNowJust/heredoc" @@ -22,6 +23,17 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +//MetricHandlerHTML contains HTML for default endpoint +const MetricHandlerHTML = ` + + Collectd Exporter + +

Collectd Exporter

+

Metrics

+ + +` + var ( debugm = func(format string, data ...interface{}) {} // Default no debugging output debugs = func(count int) {} // Default no debugging output @@ -50,7 +62,6 @@ func (c *cacheHandler) Collect(ch chan<- prometheus.Metric) { defer lock.Unlock() debugm("Debug:Prometheus is requesting to scrape metrics...") for key, plugin := range allHosts { - //fmt.Fprintln(w, hostname) debugm("Debug:Getting metrics for host %s with total plugin size %d\n", key, plugin.Size()) metricCount = plugin.FlushPrometheusMetric(c.useTimestamp, ch) if metricCount > 0 { @@ -87,6 +98,11 @@ func metricusage() { //StartMetrics ... entry point to metrics func StartMetrics() { + var wg sync.WaitGroup + finish := make(chan bool) + + amqp10.SpawnSignalHandler(finish, os.Interrupt) + // set flags for parsing options flag.Usage = metricusage fServiceType := flag.String("servicetype", "metrics", "Metric type") @@ -110,33 +126,33 @@ func StartMetrics() { debugm = func(format string, data ...interface{}) { log.Printf(format, data...) } } - if len(serverConfig.AMQP1MetricURL) == 0 { - log.Println("AMQP1 Metrics URL is required") + if len(serverConfig.AMQP1MetricURL) == 0 && len(serverConfig.AMQP1Connections) == 0 { + log.Println("Configuration option 'AMQP1MetricURL' or 'AMQP1Connections' is required") metricusage() os.Exit(1) } - //Block -starts - //Set up signal Go routine - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt) - go func() { - for sig := range signalCh { - // sig is a ^C, handle it - log.Printf("caught sig: %+v", sig) - log.Println("Wait for 2 second to finish processing") - time.Sleep(2 * time.Second) - os.Exit(0) + if len(serverConfig.AMQP1MetricURL) > 0 { + serverConfig.AMQP1Connections = []saconfig.AMQPConnection{ + saconfig.AMQPConnection{ + URL: serverConfig.AMQP1MetricURL, + DataSourceID: saconfig.DataSourceCollectd, + DataSource: "collectd", + }, } - }() + } + + for _, conn := range serverConfig.AMQP1Connections { + log.Printf("AMQP1.0 %s listen address configured at %s\n", conn.DataSource, conn.URL) + } - //Cache sever to process and serve the exporter - cacheServer := cacheutil.NewCacheServer(cacheutil.MAXTTL, serverConfig.Debug) applicationHealth := cacheutil.NewApplicationHealthCache() - appStateHandler := api.NewAppStateMetricHandler(applicationHealth) - myHandler := &cacheHandler{useTimestamp: serverConfig.UseTimeStamp, cache: cacheServer.GetCache(), appstate: appStateHandler} + metricHandler := api.NewAppStateMetricHandler(applicationHealth) amqpHandler := amqp10.NewAMQPHandler("Metric Consumer") - prometheus.MustRegister(myHandler, amqpHandler) + //Cache sever to process and serve the exporter + cacheServer := cacheutil.NewCacheServer(cacheutil.MAXTTL, serverConfig.Debug) + cacheHandler := &cacheHandler{useTimestamp: serverConfig.UseTimeStamp, cache: cacheServer.GetCache(), appstate: metricHandler} + prometheus.MustRegister(cacheHandler, amqpHandler) if !serverConfig.CPUStats { // Including these stats kills performance when Prometheus polls with multiple targets @@ -147,13 +163,7 @@ func StartMetrics() { handler := http.NewServeMux() handler.Handle("/metrics", promhttp.Handler()) handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - w.Write([]byte(` - Collectd Exporter - cacheutil -

Collectd Exporter

-

Metrics

- - `)) + w.Write([]byte(MetricHandlerHTML)) }) // Register pprof handlers handler.HandleFunc("/debug/pprof/", pprof.Index) @@ -163,7 +173,7 @@ func StartMetrics() { handler.HandleFunc("/debug/pprof/trace", pprof.Trace) debugm("Debug: Config %#v\n", serverConfig) - //run exporter fro prometheus to scrape + //run exporter for prometheus to scrape go func() { metricsURL := fmt.Sprintf("%s:%d", serverConfig.Exporterhost, serverConfig.Exporterport) log.Printf("Metric server at : %s\n", metricsURL) @@ -172,31 +182,36 @@ func StartMetrics() { time.Sleep(2 * time.Second) log.Println("HTTP server is ready....") - ///Metric Listener - amqpMetricsurl := fmt.Sprintf("amqp://%s", serverConfig.AMQP1MetricURL) - log.Printf("Connecting to AMQP1 : %s\n", amqpMetricsurl) - amqpMetricServer := amqp10.NewAMQPServer(amqpMetricsurl, serverConfig.Debug, serverConfig.DataCount, serverConfig.Prefetch, amqpHandler, *fUniqueName) - log.Printf("Listening.....\n") - -msgloop: - for { - select { - case data := <-amqpMetricServer.GetNotifier(): - amqpMetricServer.GetHandler().IncTotalMsgProcessed() - debugm("Debug: Getting incoming data from notifier channel : %#v\n", data) - incomingType := incoming.NewFromDataSource(saconfig.DataSourceCollectd) - metrics, _ := incomingType.ParseInputJSON(data) - for _, m := range metrics { - amqpMetricServer.UpdateMinCollectInterval(m.GetInterval()) - cacheServer.Put(m) + // AMQP connection(s) + processingCases, qpidStatusCases, amqpServers := amqp10.CreateMessageLoopComponents(serverConfig, finish, amqpHandler, *fUniqueName) + amqp10.SpawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases) + + // spawn metric processor + wg.Add(1) + go func() { + defer wg.Done() + finishCase := len(processingCases) - 1 + processingLoop: + for { + switch index, msg, _ := reflect.Select(processingCases); index { + case finishCase: + break processingLoop + default: + debugm("Debug: Getting incoming data from notifier channel : %#v\n", msg) + metric := incoming.NewFromDataSource(amqpServers[index].DataSource) + amqpServers[index].Server.GetHandler().IncTotalMsgProcessed() + metrics, _ := metric.ParseInputJSON(msg.String()) + for _, m := range metrics { + amqpServers[index].Server.UpdateMinCollectInterval(m.GetInterval()) + cacheServer.Put(m) + } + debugs(len(metrics)) } - debugs(len(metrics)) - continue // priority channel - case status := <-amqpMetricServer.GetStatus(): - applicationHealth.QpidRouterState = status - case <-amqpMetricServer.GetDoneChan(): - break msgloop } - } - //TODO: to close cache server on keyboard interrupt + log.Println("Closing event processor.") + }() + + // do not end until all loop goroutines ends + wg.Wait() + log.Println("Exiting") } diff --git a/internal/pkg/saconfig/config.go b/internal/pkg/saconfig/config.go index 3f025ae..45378b8 100644 --- a/internal/pkg/saconfig/config.go +++ b/internal/pkg/saconfig/config.go @@ -22,8 +22,8 @@ const ( ) //String returns human readable data type identification. -func (src *DataSource) String() string { - return []string{"universal", "collectd", "ceilometer"}[*src] +func (src DataSource) String() string { + return []string{"universal", "collectd", "ceilometer"}[src] } //SetFromString resets value according to given human readable identification. Returns false if invalid identification was given. diff --git a/internal/pkg/tsdb/prometheus.go b/internal/pkg/tsdb/prometheus.go index 084a242..1ec86cf 100644 --- a/internal/pkg/tsdb/prometheus.go +++ b/internal/pkg/tsdb/prometheus.go @@ -3,11 +3,19 @@ package tsdb import ( "fmt" "regexp" + "time" "github.com/infrawatch/smart-gateway/internal/pkg/metrics/incoming" + "github.com/infrawatch/smart-gateway/internal/pkg/saconfig" "github.com/prometheus/client_golang/prometheus" ) +// Additional timestamp formats possibly used by sources of metric data +const ( + isoTimeLayout = "2006-01-02 15:04:05.000000" + RFC3339Python = "2006-01-02T15:04:05.000000" +) + //TSDB interface type TSDB interface { //prometheus specific reflect @@ -44,34 +52,74 @@ func AddMetricsByHost(instance string, value float64) (prometheus.Metric, error) return prometheus.NewConstMetric(desc, valueType, value) } -//NewCollectdMetric converts one data source of a value list to a Prometheus metric. -func NewCollectdMetric(usetimestamp bool, collectd incoming.CollectdMetric, index int) (prometheus.Metric, error) { - var value float64 - var valueType prometheus.ValueType +//NewPrometheusMetric converts one data source of a value list to a Prometheus metric. +func NewPrometheusMetric(usetimestamp bool, format string, metric incoming.MetricDataFormat, index int) (prometheus.Metric, error) { + var ( + timestamp time.Time + valueType prometheus.ValueType + metricName, help string + labels map[string]string + value float64 + ) - switch collectd.Dstypes[index] { - case "gauge": - value = float64(collectd.Values[index]) - valueType = prometheus.GaugeValue - case "derive", "counter": - value = float64(collectd.Values[index]) - valueType = prometheus.CounterValue - default: - return nil, fmt.Errorf("unknown name of value type: %s", collectd.Dstypes[index]) + if format == saconfig.DataSourceCollectd.String() { + collectd := metric.(*incoming.CollectdMetric) + switch collectd.Dstypes[index] { + case "gauge": + valueType = prometheus.GaugeValue + case "derive", "counter": + valueType = prometheus.CounterValue + default: + return nil, fmt.Errorf("unknown name of value type: %s", collectd.Dstypes[index]) + } + timestamp = collectd.Time.Time() + help = collectd.GetMetricDesc(index) + metricName = metricNameRe.ReplaceAllString(collectd.GetMetricName(index), "_") + labels = collectd.GetLabels() + value = collectd.Values[index] + } else if format == saconfig.DataSourceCeilometer.String() { + ceilometer := metric.(*incoming.CeilometerMetric) + if ctype, ok := ceilometer.Payload["counter_type"]; ok { + if counterType, ok := ctype.(string); ok { + switch counterType { + case "gauge": + valueType = prometheus.GaugeValue + default: + valueType = prometheus.CounterValue + } + } else { + return nil, fmt.Errorf("invalid counter_type in metric payload: %s", ceilometer.Payload) + } + } else { + return nil, fmt.Errorf("did not find counter_type in metric payload: %s", ceilometer.Payload) + } + if ts, ok := ceilometer.Payload["timestamp"]; ok { + for _, layout := range []string{time.RFC3339, time.RFC3339Nano, time.ANSIC, RFC3339Python, isoTimeLayout} { + if stamp, err := time.Parse(layout, ts.(string)); err == nil { + timestamp = stamp + break + } + } + if timestamp.IsZero() { + return nil, fmt.Errorf("invalid timestamp in metric payload: %s", ceilometer.Payload) + } + } else { + return nil, fmt.Errorf("did not find timestamp in metric payload: %s", ceilometer.Payload) + } + help = ceilometer.GetMetricDesc(index) + metricName = metricNameRe.ReplaceAllString(ceilometer.GetMetricName(index), "_") + labels = ceilometer.GetLabels() + value = ceilometer.Values[index] } - labels := collectd.GetLabels() + plabels := prometheus.Labels{} for key, value := range labels { plabels[key] = value } - - help := fmt.Sprintf("Service Assurance exporter: '%s' Type: '%s' Dstype: '%s' Dsname: '%s'", - collectd.Plugin, collectd.Type, collectd.Dstypes[index], collectd.DSName(index)) - metricName := metricNameRe.ReplaceAllString(collectd.GetMetricName(index), "_") desc := prometheus.NewDesc(metricName, help, []string{}, plabels) if usetimestamp { return prometheus.NewMetricWithTimestamp( - collectd.Time.Time(), + timestamp, prometheus.MustNewConstMetric(desc, valueType, value), ), nil } diff --git a/tests/internal_pkg/incoming_test.go b/tests/internal_pkg/metrics_incoming_test.go similarity index 58% rename from tests/internal_pkg/incoming_test.go rename to tests/internal_pkg/metrics_incoming_test.go index c392a56..123fba8 100644 --- a/tests/internal_pkg/incoming_test.go +++ b/tests/internal_pkg/metrics_incoming_test.go @@ -19,6 +19,10 @@ type IncommingCollecdDataMatrix struct { Expected string } +const ( + ceilometerSampleMetricData = `{"request": {"oslo.version": "2.0", "oslo.message": "{\"message_id\": \"499e0dda-9298-4b03-a49c-d7affcedb6b9\", \"publisher_id\": \"telemetry.publisher.controller-0.redhat.local\", \"event_type\": \"metering\", \"priority\": \"SAMPLE\", \"payload\": [{\"source\": \"openstack\", \"counter_name\": \"disk.device.read.bytes\", \"counter_type\": \"cumulative\", \"counter_unit\": \"B\", \"counter_volume\": 18872832, \"user_id\": \"5df14d3577ff4c61b0837c268a8f4c70\", \"project_id\": \"5dfb98560ce74cf780c21fb18a5ad1de\", \"resource_id\": \"285778e1-c81b-427a-826a-ebb72467b665-vda\", \"timestamp\": \"2020-04-15T13:24:02.108816\", \"resource_metadata\": {\"display_name\": \"cirros\", \"name\": \"instance-00000001\", \"instance_id\": \"285778e1-c81b-427a-826a-ebb72467b665\", \"instance_type\": \"tiny\", \"host\": \"072f98fd91b8eec8d518aa8632f013438b587cee415dc944b39c5363\", \"instance_host\": \"compute-0.redhat.local\", \"flavor\": {\"id\": \"53e3164c-3dc0-4bd3-bb22-36a55aadd3fb\", \"name\": \"tiny\", \"vcpus\": 1, \"ram\": 256, \"disk\": 0, \"ephemeral\": 0, \"swap\": 0}, \"status\": \"active\", \"state\": \"running\", \"task_state\": \"\", \"image\": {\"id\": \"64f5d56e-e61d-43c1-af03-45b1faa89e99\"}, \"image_ref\": \"64f5d56e-e61d-43c1-af03-45b1faa89e99\", \"image_ref_url\": null, \"architecture\": \"x86_64\", \"os_type\": \"hvm\", \"vcpus\": 1, \"memory_mb\": 256, \"disk_gb\": 0, \"ephemeral_gb\": 0, \"root_gb\": 0, \"disk_name\": \"vda\"}, \"message_id\": \"5f312a0e-7f1c-11ea-a7a1-525400023f45\", \"monotonic_time\": null, \"message_signature\": \"8a47fa24471558f0af6963064e7ca1409237032c6c72a505f0acd51752f8f828\"}], \"timestamp\": \"2020-04-15 13:24:02.114844\"}"}}` +) + /*----------------------------- helper functions -----------------------------*/ //GenerateSampleCollectdData ... func GenerateSampleCollectdData(hostname string, pluginname string) *incoming.CollectdMetric { @@ -119,7 +123,7 @@ func TestCollectdIncoming(t *testing.T) { assert.Contains(t, labels, sample.Plugin) assert.Contains(t, labels, "instance") // test GetMetricDesc behaviour - metricDesc := "Service Assurance exporter: 'pluginname' Type: 'collectd' Dstype: 'gauge' Dsname: 'value1'" + metricDesc := "Service Telemetry exporter: 'pluginname' Type: 'collectd' Dstype: 'gauge' Dsname: 'value1'" assert.Equal(t, metricDesc, sample.GetMetricDesc(0)) // test GetMetricName behaviour metricName := "collectd_pluginname_collectd_value1" @@ -134,3 +138,43 @@ func TestCollectdIncoming(t *testing.T) { assert.Equal(t, metricName2, sample.GetMetricName(1)) }) } + +func TestCeilometerIncoming(t *testing.T) { + cm := incoming.NewFromDataSource(saconfig.DataSourceCeilometer) + metric := cm.(*incoming.CeilometerMetric) + + t.Run("Test parsing of Ceilometer message", func(t *testing.T) { + _, err := metric.ParseInputJSON(ceilometerSampleMetricData) + if err != nil { + t.Errorf("Ceilometer message parsing failed: %s\n", err) + } + // parameters + assert.Equal(t, "telemetry.publisher.controller-0.redhat.local", metric.Publisher) + assert.Equal(t, "disk", metric.Plugin) + assert.Equal(t, "285778e1-c81b-427a-826a-ebb72467b665-vda", metric.PluginInstance) + assert.Equal(t, "device", metric.Type) + assert.Equal(t, "read", metric.TypeInstance) + assert.Equal(t, []float64{float64(18872832)}, metric.Values) + assert.Equal(t, saconfig.DataSourceCeilometer, metric.DataSource) + // methods + assert.Equal(t, 5.0, metric.GetInterval()) + assert.Equal(t, "disk_device_285778e1-c81b-427a-826a-ebb72467b665-vda_read", metric.GetItemKey()) + assert.Equal(t, "telemetry.publisher.controller-0.redhat.local", metric.GetKey()) + expectedLabels := map[string]string{ + "disk": "read", + "publisher": "telemetry.publisher.controller-0.redhat.local", + "type": "cumulative", + "project": "5dfb98560ce74cf780c21fb18a5ad1de", + "resource": "285778e1-c81b-427a-826a-ebb72467b665-vda", + "unit": "B", + "counter": "disk.device.read.bytes", + } + assert.Equal(t, expectedLabels, metric.GetLabels()) + assert.Equal(t, "Service Telemetry exporter: 'disk' Type: 'device' Dstype: 'cumulative' Dsname: 'disk.device.read.bytes'", metric.GetMetricDesc(0)) + assert.Equal(t, "ceilometer_disk_device_read", metric.GetMetricName(0)) + assert.Equal(t, "disk", metric.GetName()) + assert.Equal(t, []float64{float64(18872832)}, metric.GetValues()) + assert.Equal(t, true, metric.ISNew()) + }) + +} diff --git a/tests/internal_pkg/tsdb_test.go b/tests/internal_pkg/tsdb_test.go index 484b2bc..f32bfed 100644 --- a/tests/internal_pkg/tsdb_test.go +++ b/tests/internal_pkg/tsdb_test.go @@ -16,7 +16,7 @@ import ( //GenerateSampleCacheData .... func GenerateCollectdMetric(hostname string, pluginname string, useTimestamp bool, index int) (*incoming.CollectdMetric, prometheus.Metric, dto.Metric) { sample := GenerateSampleCollectdData(hostname, pluginname) - collectdMetric, _ := tsdb.NewCollectdMetric(useTimestamp, *sample, index) + collectdMetric, _ := tsdb.NewPrometheusMetric(useTimestamp, "collectd", sample, index) metric := dto.Metric{} collectdMetric.Write(&metric) return sample, collectdMetric, metric