Skip to content

Commit

Permalink
Create functions for cluster and container metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Aug 20, 2024
1 parent 4c557ac commit 255bfc5
Showing 1 changed file with 64 additions and 103 deletions.
167 changes: 64 additions & 103 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,134 +543,95 @@ func (pms *PromMetricSync) Pods(ctx context.Context, informer kcache.SharedIndex
return g.Wait()
}

// Run starts syncing the prometheus metrics to the database.
// Therefore, it gets a list of the metric queries.
func (pms *PromMetricSync) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
func (pms *PromMetricSync) Containers(ctx context.Context, informer kcache.SharedIndexInformer) error {
if !kcache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
pms.logger.Fatal("timed out waiting for caches to sync")
}

upsertClusterMetrics := make(chan database.Entity)
upsertContainerMetrics := make(chan database.Entity)
upsertMetrics := make(chan database.Entity)

for _, promQuery := range promQueriesCluster {
promQuery := promQuery
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
for {
result, warnings, err := pms.promApiClient.Query(
ctx,
promQuery.query,
time.Time{},
)
if err != nil {
return errors.Wrap(err, "error querying Prometheus")
}
if len(warnings) > 0 {
fmt.Printf("Warnings: %v\n", warnings)
}
if result == nil {
fmt.Println("No results found")
continue
g.Go(func() error {
return pms.run(
ctx,
promQueriesContainer,
upsertMetrics,
func(query PromQuery, res *model.Sample) database.Entity {
if res.Value.String() == "NaN" {
return nil
}

for _, res := range result.(model.Vector) {
if res.Value.String() == "NaN" {
continue
}

//clusterId := sha1.Sum([]byte(""))

name := ""

if promQuery.nameLabel != "" {
name = string(res.Metric[promQuery.nameLabel])
}
//containerId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"] + "/" + res.Metric["container"]))

newClusterMetric := &schemav1.PrometheusClusterMetric{
Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000,
Category: promQuery.metricCategory,
Name: name,
Value: float64(res.Value),
}
name := ""

select {
case upsertClusterMetrics <- newClusterMetric:
case <-ctx.Done():
return ctx.Err()
}
if query.nameLabel != "" {
name = string(res.Metric[query.nameLabel])
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 55):
newContainerMetric := &schemav1.PrometheusContainerMetric{
// TODO uuid
Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000,
Category: query.metricCategory,
Name: name,
Value: float64(res.Value),
}
}
})
}

for _, promQuery := range promQueriesContainer {
promQuery := promQuery
return newContainerMetric
},
)
})

g.Go(func() error {
for {
result, warnings, err := pms.promApiClient.Query(
ctx,
promQuery.query,
time.Time{},
)
if err != nil {
return errors.Wrap(err, "error querying Prometheus")
}
if len(warnings) > 0 {
fmt.Printf("Warnings: %v\n", warnings)
}
if result == nil {
fmt.Println("No results found")
continue
}
g.Go(func() error {
return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricContainerUpsertStmt(), 5)).Stream(ctx, upsertMetrics)
})

for _, res := range result.(model.Vector) {
if res.Value.String() == "NaN" {
continue
}
return g.Wait()
}

//containerId := sha1.Sum([]byte(res.Metric["namespace"] + "/" + res.Metric["pod"] + "/" + res.Metric["container"]))
func (pms *PromMetricSync) Clusters(ctx context.Context, informer kcache.SharedIndexInformer) error {
if !kcache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
pms.logger.Fatal("timed out waiting for caches to sync")
}

name := ""
upsertMetrics := make(chan database.Entity)

if promQuery.nameLabel != "" {
name = string(res.Metric[promQuery.nameLabel])
}
g, ctx := errgroup.WithContext(ctx)

newContainerMetric := &schemav1.PrometheusContainerMetric{
Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000,
Category: promQuery.metricCategory,
Name: name,
Value: float64(res.Value),
}
g.Go(func() error {
return pms.run(
ctx,
promQueriesCluster,
upsertMetrics,
func(query PromQuery, res *model.Sample) database.Entity {
if res.Value.String() == "NaN" {
return nil
}

select {
case upsertContainerMetrics <- newContainerMetric:
case <-ctx.Done():
return ctx.Err()
}
//clusterId := sha1.Sum([]byte(""))

name := ""

if query.nameLabel != "" {
name = string(res.Metric[query.nameLabel])
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 55):
newClusterMetric := &schemav1.PrometheusClusterMetric{
// TODO uuid
Timestamp: (res.Timestamp.UnixNano() - res.Timestamp.UnixNano()%(60*1000000000)) / 1000000,
Category: query.metricCategory,
Name: name,
Value: float64(res.Value),
}
}
})
}

g.Go(func() error {
return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricClusterUpsertStmt(), 5)).Stream(ctx, upsertClusterMetrics)
return newClusterMetric
},
)
})

g.Go(func() error {
return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricContainerUpsertStmt(), 5)).Stream(ctx, upsertContainerMetrics)
return database.NewUpsert(pms.db, database.WithStatement(pms.promMetricClusterUpsertStmt(), 5)).Stream(ctx, upsertMetrics)
})

return g.Wait()
Expand Down

0 comments on commit 255bfc5

Please sign in to comment.