Skip to content
This repository has been archived by the owner on Feb 15, 2025. It is now read-only.

Feature: Runner metrics support #17

Merged
merged 9 commits into from
Aug 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@
.idea
.dockerignore
.gitignore
docker-compose.dev.yml
docker-compose.yml
deployments/docker/docker-compose.dev.yml
deployments/docker/docker-compose.yml
docker-compose.local-dev.yml.yml
docs
*.md
4 changes: 3 additions & 1 deletion cmd/runner/main.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import (
"github.com/xBlaz3kx/distributed-scheduler/internal/executor"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/metrics"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/security"
"github.com/xBlaz3kx/distributed-scheduler/internal/runner"
"github.com/xBlaz3kx/distributed-scheduler/internal/service/job"
@@ -129,14 +130,15 @@ func runCmd(cmd *cobra.Command, args []string) {

runner := runner.New(runner.Config{
JobService: jobService,
Metrics: metrics.NewRunnerMetrics(cfg.Observability.Metrics),
Log: log,
ExecutorFactory: executorFactory,
InstanceId: cfg.ID,
JobExecution: cfg.JobExecutionSettings,
})
runner.Start()

httpServer := devxHttp.NewServer(cfg.Http, observability.NewNoopObservability())
httpServer := devxHttp.NewServer(cfg.Http, obs)
go func() {
log.Info("Started HTTP server", zap.String("address", cfg.Http.Address))
databaseCheck := database.NewHealthChecker(db)
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -6,8 +6,8 @@ services:
depends_on:
- postgres
build:
context: .
dockerfile: build/manager/Dockerfile
context: ../..
dockerfile: ./build/manager/Dockerfile
command:
- "/app/tooling"
- "migrate"
@@ -18,8 +18,8 @@ services:

manager:
build:
context: .
dockerfile: build/manager/Dockerfile
context: ../..
dockerfile: ./build/manager/Dockerfile
container_name: manager
restart: always
ports:
@@ -34,19 +34,19 @@ services:
- MANAGER_HTTP_ADDRESS=0.0.0.0:8000
- MANAGER_STORAGE_ENCRYPTION_KEY=ishouldbechanged
volumes:
- ./config/manager.yaml:/app/config.yaml
- ../../config/manager.yaml:/app/config.yaml
depends_on:
- postgres
- migration

runner:
build:
context: .
dockerfile: build/runner/Dockerfile
context: ../..
dockerfile: ./build/runner/Dockerfile
container_name: runner
restart: always
volumes:
- ./config/runner.yaml:/app/config.yaml
- ../../config/runner.yaml:/app/config.yaml
environment:
- RUNNER_OBSERVABILITY_LOG_LEVEL=info
- RUNNER_DB_HOST=postgres:5432
46 changes: 46 additions & 0 deletions deployments/docker/docker-compose.observability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
services:

# Configure the Grafana observability stack
grafana-lgtm-stack:
image: grafana/otel-lgtm
container_name: lgtm-stack
hostname: lgtm-stack
profiles:
- observability
ports:
- "3000:3000"
- "4317:4317"
- "4318:4318"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:3000/api/health" ]
start_period: 30s
interval: 30s
timeout: 10s
retries: 5
volumes:
- prometheus:/prometheus
- loki:/data/loki
- grafana:/var/lib/grafana

promtail:
image: grafana/promtail:latest
container_name: promtail
command:
- "-config.file=/etc/promtail/promtail.yaml"
profiles:
- observability
hostname: promtail
restart: always
depends_on:
grafana-lgtm-stack:
condition: service_healthy
volumes:
- ./observability/promtail/config.yaml:/etc/promtail/promtail.yaml
- /var/run/docker.sock:/var/run/docker.sock:ro

volumes:
prometheus:
grafana:
loki:
minio_loki:
tempo_data:
File renamed without changes.
15 changes: 15 additions & 0 deletions deployments/docker/observability/promtail/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
http_listen_address: 0.0.0.0
http_listen_port: 9080

positions:
filename: /tmp/positions.yaml

client:
url: http://lgtm-stack:3100/loki/api/v1/push

scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
31 changes: 31 additions & 0 deletions documentation/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Observability

Scheduler currently supports logging and metrics. Both are exported via the OpenTelemetry protocol (GRPC) and can be
collected by any OpenTelemetry-compatible collector.

## Logging

Logging can be configured via the `LOG_LEVEL` environment variable. The following levels are supported:

- `debug`
- `info`
- `warn`
- `error`

## Metrics

Metrics can be enabled by setting the `METRICS_ENABLED` environment variable to `true`. Metrics are exported via the
OpenTelemetry protocol (GRPC).

The following manager metrics are currently exported:

- `http_requests_total`: The total number of HTTP requests received by the server.
- `http_request_duration_seconds`: The duration of HTTP requests in seconds.
- `http_errors_total`: The total number of failed HTTP requests.

The following runner metrics are currently exported:

- `scheduler_jobs_total`: The total number of jobs that have been scheduled.
- `scheduler_jobs_failed_total`: The total number of jobs that have failed.
- `scheduler_jobs_duration_seconds`: The duration of jobs in seconds.
- `scheduler_jobs_in_execution`: The total number of jobs currently in execution.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ require (
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
github.com/xBlaz3kx/DevX v0.0.0-20240731212815-b4dca2816bed
go.opentelemetry.io/otel/metric v1.28.0
)

require (
@@ -114,7 +115,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
52 changes: 4 additions & 48 deletions internal/pkg/logger/logger.go
Original file line number Diff line number Diff line change
@@ -3,58 +3,14 @@
package logger

import (
"os"

"github.com/GLCharge/otelzap"
"github.com/samber/lo"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/xBlaz3kx/DevX/observability"
)

func SetupLogging() {
logLevel := viper.GetString("log.level")
logger, err := New(logLevel)
if err == nil {
otelzap.ReplaceGlobals(logger)
}
}

// New constructs a Sugared Logger that writes to stdout and
// provides human-readable timestamps.
func New(logLevel string) (*otelzap.Logger, error) {
level := zapcore.InfoLevel

switch logLevel {
case "debug":
level = zapcore.DebugLevel
case "warn":
level = zapcore.WarnLevel
case "error":
level = zapcore.ErrorLevel
}

stdout := zapcore.Lock(os.Stdout)
stderr := zapcore.Lock(os.Stderr)

stdoutLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool {
return l >= level && l < zapcore.ErrorLevel
})
stderrLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool {
return l >= level && l >= zapcore.ErrorLevel
})

encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())

core := zapcore.NewTee(
zapcore.NewCore(encoder, stdout, stdoutLevelEnabler),
zapcore.NewCore(encoder, stderr, stderrLevelEnabler),
)

logger := otelzap.New(
zap.New(core),
otelzap.WithTraceIDField(true),
otelzap.WithMinLevel(level),
)

return logger, nil
logger := observability.NewLogging(observability.LogConfig{Level: lo.ToPtr(observability.LogLevel(logLevel))})
otelzap.ReplaceGlobals(logger.Logger())
}
119 changes: 119 additions & 0 deletions internal/pkg/metrics/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package metrics

import (
"context"

"github.com/xBlaz3kx/DevX/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
jobsTotal = "scheduler_runner_jobs_total"
jobsExecuted = "scheduler_runner_jobs_executed"
jobsFailed = "scheduler_runner_jobs_failed"
jobRetries = "scheduler_runner_job_retries"
jobDuration = "scheduler_runner_job_duration"
jobsInExecution = "scheduler_runner_jobs_in_execution"
)

// Add attributes: Job Type/Executor, Instance ID, status, numberOfTries

type RunnerMetrics struct {
enabled bool

jobsTotal metric.Int64Counter

jobsExecuted metric.Int64Counter

jobsFailed metric.Int64Counter

jobRetries metric.Int64Counter

jobDuration metric.Float64Histogram

jobsInExecution metric.Int64Gauge
}

func NewRunnerMetrics(config observability.MetricsConfig) *RunnerMetrics {
if !config.Enabled {
return &RunnerMetrics{enabled: false}
}

meter := otel.GetMeterProvider().Meter("runner")

jobsTotal, err := meter.Int64Counter(jobsTotal)
must(err)

jobsExecuted, err := meter.Int64Counter(jobsExecuted)
must(err)

jobsFailed, err := meter.Int64Counter(jobsFailed)
must(err)

jobRetries, err := meter.Int64Counter(jobRetries)
must(err)

jobDuration, err := meter.Float64Histogram(jobDuration)
must(err)

jobsInExecution, err := meter.Int64Gauge(jobsInExecution)
must(err)

return &RunnerMetrics{
enabled: true,
jobsTotal: jobsTotal,
jobsExecuted: jobsExecuted,
jobsFailed: jobsFailed,
jobRetries: jobRetries,
jobDuration: jobDuration,
jobsInExecution: jobsInExecution,
}
}

func (r *RunnerMetrics) IncreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) {
if r.enabled {
// Increase gauge metric for number of running jobs
attrs := metric.WithAttributes(attributes...)
r.jobsInExecution.Record(ctx, int64(numJobs), attrs)
}
}

func (r *RunnerMetrics) DecreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) {
if r.enabled {
jobs := int64(numJobs)
// Increase gauge metric for number of running jobs
attrs := metric.WithAttributes(attributes...)
r.jobsInExecution.Record(ctx, -jobs, attrs)
r.jobsTotal.Add(ctx, jobs, attrs)
r.jobsExecuted.Add(ctx, jobs, attrs)
}
}

func (r *RunnerMetrics) RecordJobDuration(ctx context.Context, duration float64, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobDuration.Record(ctx, duration, attrs)
}
}

func (r *RunnerMetrics) IncrementJobRetries(ctx context.Context, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobRetries.Add(ctx, 1, attrs)
}
}

func (r *RunnerMetrics) IncreaseFailedJobCount(ctx context.Context, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobsFailed.Add(ctx, 1, attrs)
}
}

func must(err error) {
if err != nil {
panic(err)
}
}
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.