Skip to content
This repository has been archived by the owner on May 4, 2022. It is now read-only.

Commit

Permalink
Ceilometer metrics support (#86)
Browse files Browse the repository at this point in the history
* Ceilometer metrics support

This patch add ability to listen for metric data messages from Ceilometer
and prepare them for PRometheus scraping.

* Fix lint issues

* Complete metrics main loop

- added missing support for AMQP1Connections to main loop for metrics
- unify shared logic with main loop for events

* Fix lint issues

* Simplify FlushPrometheusMetric

* Fix data propagation to Prometheus

* Ensure DataSource is set for each metric

* Handle more types of timestamps

* DIsable IUS for now

* Add missing end of block

* Fix lint issue

* Switch CI to CentOS8

* Install git

* Final fixes

Co-authored-by: Martin Magr <para@osp-cloudops-03.ext.cloudops.lab.eng.rdu2.redhat.com>
  • Loading branch information
paramite and Martin Magr authored Jul 10, 2020
1 parent 53c1f1d commit c27d3e0
Show file tree
Hide file tree
Showing 15 changed files with 535 additions and 221 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ before_install:
- docker run -p 9200:9200 --name elastic -p 9300:9300 -e "discovery.type=single-node" -d docker.elastic.co/elasticsearch/elasticsearch:$ELASTIC_VERSION
- docker pull quay.io/interconnectedcloud/qdrouterd:$QDROUTERD_VERSION
- docker run -p 5672:5672 -d quay.io/interconnectedcloud/qdrouterd:$QDROUTERD_VERSION
- docker pull centos:7
- docker pull centos:8

# execute unit testing and code coverage
install:
- docker run -eCOVERALLS_TOKEN -uroot --network host -it --volume $PWD:/go/src/github.com/infrawatch/smart-gateway:z --workdir /go/src/github.com/infrawatch/smart-gateway centos:7 /bin/sh -c 'sh ./build/test-framework/run_tests.sh'
- docker run -eCOVERALLS_TOKEN -uroot --network host -it --volume $PWD:/go/src/github.com/infrawatch/smart-gateway:z --workdir /go/src/github.com/infrawatch/smart-gateway centos:8 /bin/sh -c 'sh ./build/test-framework/run_tests.sh'

# functional testing (validate it works)
script:
Expand Down
9 changes: 5 additions & 4 deletions build/test-framework/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ export PATH=$PATH:$GOPATH/bin
# get dependencies
sed -i '/^tsflags=.*/a ip_resolve=4' /etc/yum.conf
yum install -y epel-release
yum install -y https://centos7.iuscommunity.org/ius-release.rpm
yum remove -y git*
yum install -y git216-all
yum install -y golang qpid-proton-c-devel iproute
# below is not available currently
#yum install -y https://centos7.iuscommunity.org/ius-release.rpm
#yum remove -y git*
#yum install -y git216-all
yum install -y git golang qpid-proton-c-devel iproute
go get -u golang.org/x/tools/cmd/cover
go get -u github.com/mattn/goveralls
go get -u golang.org/x/lint/golint
Expand Down
95 changes: 95 additions & 0 deletions internal/pkg/amqp10/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import (
"fmt"
"log"
"os"
"os/signal"
"reflect"
"strings"
"sync"

"github.com/infrawatch/smart-gateway/internal/pkg/cacheutil"
"github.com/infrawatch/smart-gateway/internal/pkg/saconfig"
"github.com/prometheus/client_golang/prometheus"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
Expand All @@ -49,6 +54,12 @@ type AMQPServer struct {
collectinterval float64
}

//AMQPServerItem hold information about data source which is AMQPServer listening to.
type AMQPServerItem struct {
Server *AMQPServer
DataSource saconfig.DataSource
}

//AMQPHandler ...
type AMQPHandler struct {
totalCount int
Expand Down Expand Up @@ -298,3 +309,87 @@ func fatalIf(err error) {
log.Fatal(err)
}
}

//SpawnSignalHandler spawns goroutine which will wait for interruption signal(s)
// and end smart gateway in case any of the signal is received
func SpawnSignalHandler(finish chan bool, watchedSignals ...os.Signal) {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, watchedSignals...)
go func() {
signalLoop:
for sig := range interruptChannel {
log.Printf("Stopping execution on caught signal: %+v\n", sig)
close(finish)
break signalLoop
}
}()
}

//SpawnQpidStatusReporter builds dynamic select for reporting status of AMQP connections
func SpawnQpidStatusReporter(wg *sync.WaitGroup, applicationHealth *cacheutil.ApplicationHealthCache, qpidStatusCases []reflect.SelectCase) {
wg.Add(1)
go func() {
defer wg.Done()
finishCase := len(qpidStatusCases) - 1
statusLoop:
for {
switch index, status, _ := reflect.Select(qpidStatusCases); index {
case finishCase:
break statusLoop
default:
// Note: status here is always very low integer, so we don't need to be afraid of int64>int conversion
applicationHealth.QpidRouterState = int(status.Int())
}
}
log.Println("Closing QPID status reporter")
}()
}

//CreateMessageLoopComponents creates signal select cases for configured AMQP1.0 connections and connects to all of thos
func CreateMessageLoopComponents(config interface{}, finish chan bool, amqpHandler *AMQPHandler, uniqueName string) ([]reflect.SelectCase, []reflect.SelectCase, []AMQPServerItem) {
var (
debug bool
prefetch int
connections []saconfig.AMQPConnection
)
switch conf := config.(type) {
case *saconfig.EventConfiguration:
debug = conf.Debug
prefetch = conf.Prefetch
connections = conf.AMQP1Connections
case *saconfig.MetricConfiguration:
debug = conf.Debug
prefetch = conf.Prefetch
connections = conf.AMQP1Connections
default:
panic("Invalid type of configuration file struct.")
}

processingCases := make([]reflect.SelectCase, 0, len(connections))
qpidStatusCases := make([]reflect.SelectCase, 0, len(connections))
amqpServers := make([]AMQPServerItem, 0, len(connections))
for _, conn := range connections {
amqpServer := NewAMQPServer(conn.URL, debug, -1, prefetch, amqpHandler, uniqueName)
//create select case for this listener
processingCases = append(processingCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(amqpServer.GetNotifier()),
})
qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(amqpServer.GetStatus()),
})
amqpServers = append(amqpServers, AMQPServerItem{amqpServer, conn.DataSourceID})
}
log.Println("Listening for AMQP1.0 messages")
// include also case for finishing the loops
processingCases = append(processingCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(finish),
})
qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(finish),
})
return processingCases, qpidStatusCases, amqpServers
}
15 changes: 4 additions & 11 deletions internal/pkg/cacheutil/cacheserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/infrawatch/smart-gateway/internal/pkg/metrics/incoming"
"github.com/infrawatch/smart-gateway/internal/pkg/saconfig"
)

// MAXTTL to remove plugin is stale for 5
Expand Down Expand Up @@ -122,7 +121,6 @@ func (i IncomingDataCache) GetShard(key string) *ShardedIncomingDataCache {
if i.hosts[key] == nil {
i.Put(key)
}

return i.hosts[key]
}

Expand All @@ -148,20 +146,16 @@ func (shard *ShardedIncomingDataCache) Size() int {
}

//SetData ...
//TODO : add generic
//TODO(mmagr): either don't export or maybe make sure data.Host has the same
//value as is saved under in DataCache
func (shard *ShardedIncomingDataCache) SetData(data incoming.MetricDataFormat) error {
shard.lock.Lock()
defer shard.lock.Unlock()
if shard.plugin[data.GetItemKey()] == nil {
//TODO: change this to more generic later
shard.plugin[data.GetItemKey()] = incoming.NewFromDataSource(saconfig.DataSourceCollectd)
shard.plugin[data.GetItemKey()] = incoming.NewFromDataSourceName(data.GetDataSourceName())
}
shard.lastAccess = time.Now().Unix()
collectd := shard.plugin[data.GetItemKey()]
collectd.SetData(data)

metric := shard.plugin[data.GetItemKey()]
metric.SetData(data)
return nil
}

Expand Down Expand Up @@ -197,11 +191,10 @@ func (cs *CacheServer) Put(incomingData incoming.MetricDataFormat) {
case buffer = <-freeList:
//go one from buffer
default:
buffer = new(IncomingBuffer)
buffer = &IncomingBuffer{}
}
buffer.data = incomingData
cs.ch <- buffer

}

func (cs CacheServer) loop() {
Expand Down
53 changes: 24 additions & 29 deletions internal/pkg/cacheutil/processcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cacheutil
import (
"log"

"github.com/infrawatch/smart-gateway/internal/pkg/metrics/incoming"
"github.com/infrawatch/smart-gateway/internal/pkg/tsdb"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -32,27 +31,25 @@ func (shard *ShardedIncomingDataCache) FlushPrometheusMetric(usetimestamp bool,
defer shard.lock.Unlock()
minMetricCreated := 0 //..minimum of one metrics created

for _, IncomingDataInterface := range shard.plugin {
if collectd, ok := IncomingDataInterface.(*incoming.CollectdMetric); ok {
if collectd.ISNew() {
collectd.SetNew(false)
for index := range collectd.Values {
m, err := tsdb.NewCollectdMetric(usetimestamp, *collectd, index)
if err != nil {
log.Printf("newMetric: %v", err)
continue
}
ch <- m
minMetricCreated++
}
} else {
//clean up if data is not access for max TTL specified
if shard.Expired() {
delete(shard.plugin, collectd.GetItemKey())
//log.Printf("Cleaned up plugin for %s", collectd.GetKey())
for _, dataInterface := range shard.plugin {
if dataInterface.ISNew() {
dataInterface.SetNew(false)
for index := range dataInterface.GetValues() {
m, err := tsdb.NewPrometheusMetric(usetimestamp, dataInterface.GetDataSourceName(), dataInterface, index)
if err != nil {
log.Printf("newMetric: %v", err)
continue
}
ch <- m
minMetricCreated++
}
} else {
//clean up if data is not access for max TTL specified
if shard.Expired() {
delete(shard.plugin, dataInterface.GetItemKey())
}
}

}
return minMetricCreated
}
Expand All @@ -62,16 +59,14 @@ func (shard *ShardedIncomingDataCache) FlushAllMetrics() {
shard.lock.Lock()
defer shard.lock.Unlock()
for _, dataInterface := range shard.plugin {
if collectd, ok := dataInterface.(*incoming.CollectdMetric); ok {
if collectd.ISNew() {
collectd.SetNew(false)
log.Printf("New Metrics %#v\n", collectd)
} else {
//clean up if data is not access for max TTL specified
if shard.Expired() {
delete(shard.plugin, collectd.GetItemKey())
log.Printf("Cleaned up plugin for %s", collectd.GetItemKey())
}
if dataInterface.ISNew() {
dataInterface.SetNew(false)
log.Printf("New Metrics %#v\n", dataInterface)
} else {
//clean up if data is not access for max TTL specified
if shard.Expired() {
delete(shard.plugin, dataInterface.GetItemKey())
log.Printf("Cleaned up plugin for %s", dataInterface.GetItemKey())
}
}
}
Expand Down
74 changes: 4 additions & 70 deletions internal/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"math/rand"
"net/http"
"os"
"os/signal"
"reflect"
"strconv"
"sync"
Expand Down Expand Up @@ -46,12 +45,6 @@ const (
`
)

//AMQPServerItem hold information about data source which is AMQPServer listening to.
type AMQPServerItem struct {
Server *amqp10.AMQPServer
DataSource saconfig.DataSource
}

/*************** main routine ***********************/
// eventusage and command-line flags
func eventusage() {
Expand All @@ -68,21 +61,6 @@ func eventusage() {

var debuge = func(format string, data ...interface{}) {} // Default no debugging output

//spawnSignalHandler spawns goroutine which will wait for interruption signal(s)
// and end smart gateway in case any of the signal is received
func spawnSignalHandler(finish chan bool, watchedSignals ...os.Signal) {
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, watchedSignals...)
go func() {
signalLoop:
for sig := range interruptChannel {
log.Printf("Stopping execution on caught signal: %+v\n", sig)
close(finish)
break signalLoop
}
}()
}

//spawnAPIServer spawns goroutine which provides http API for alerts and metrics statistics for Prometheus
func spawnAPIServer(wg *sync.WaitGroup, finish chan bool, serverConfig saconfig.EventConfiguration, metricHandler *api.EventMetricHandler, amqpHandler *amqp10.AMQPHandler) {
prometheus.MustRegister(metricHandler, amqpHandler)
Expand Down Expand Up @@ -122,26 +100,6 @@ func spawnAPIServer(wg *sync.WaitGroup, finish chan bool, serverConfig saconfig.
}()
}

//spawnQpidStatusReporter builds dynamic select for reporting status of AMQP connections
func spawnQpidStatusReporter(wg *sync.WaitGroup, applicationHealth *cacheutil.ApplicationHealthCache, qpidStatusCases []reflect.SelectCase) {
wg.Add(1)
go func() {
defer wg.Done()
finishCase := len(qpidStatusCases) - 1
statusLoop:
for {
switch index, status, _ := reflect.Select(qpidStatusCases); index {
case finishCase:
break statusLoop
default:
// Note: status here is always very low integer, so we don't need to be afraid of int64>int conversion
applicationHealth.QpidRouterState = int(status.Int())
}
}
log.Println("Closing QPID status reporter")
}()
}

//notifyAlertManager generates alert from event for Prometheus Alert Manager
func notifyAlertManager(wg *sync.WaitGroup, serverConfig saconfig.EventConfiguration, event *incoming.EventDataFormat, record string) {
wg.Add(1)
Expand Down Expand Up @@ -177,7 +135,7 @@ func StartEvents() {
var wg sync.WaitGroup
finish := make(chan bool)

spawnSignalHandler(finish, os.Interrupt)
amqp10.SpawnSignalHandler(finish, os.Interrupt)
log.SetFlags(log.LstdFlags | log.Lshortfile)

// set flags for parsing options
Expand Down Expand Up @@ -272,33 +230,9 @@ func StartEvents() {
}

// AMQP connection(s)
processingCases := make([]reflect.SelectCase, 0, len(serverConfig.AMQP1Connections))
qpidStatusCases := make([]reflect.SelectCase, 0, len(serverConfig.AMQP1Connections))
amqpServers := make([]AMQPServerItem, 0, len(serverConfig.AMQP1Connections))
for _, conn := range serverConfig.AMQP1Connections {
amqpServer := amqp10.NewAMQPServer(conn.URL, serverConfig.Debug, -1, serverConfig.Prefetch, amqpHandler, *fUniqueName)
//create select case for this listener
processingCases = append(processingCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(amqpServer.GetNotifier()),
})
qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(amqpServer.GetStatus()),
})
amqpServers = append(amqpServers, AMQPServerItem{amqpServer, conn.DataSourceID})
}
log.Println("Listening for AMQP1.0 messages")
// include also case for finishing the loops
processingCases = append(processingCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(finish),
})
qpidStatusCases = append(qpidStatusCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(finish),
})
spawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases)
processingCases, qpidStatusCases, amqpServers := amqp10.CreateMessageLoopComponents(serverConfig, finish, amqpHandler, *fUniqueName)
amqp10.SpawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases)

// spawn event processor
wg.Add(1)
go func() {
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/events/incoming/ceilometer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (evt *CeilometerEvent) ParseEvent(data string) error {
newPayload[key] = value
}
}
fmt.Printf("newPayload: %v\n", newPayload)
}
(*evt).parsed["payload"] = newPayload
}
Expand Down
Loading

0 comments on commit c27d3e0

Please sign in to comment.