From 22330530e4191dfad7394dfea02e65e4828f1727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bla=C5=BE=20Dular?= <22869613+xBlaz3kx@users.noreply.github.com> Date: Mon, 26 Aug 2024 23:31:49 +0200 Subject: [PATCH] Feature: Runner metrics support (#17) * Added observability stack * Moved docker composes to deployments/docker * Added runner metrics * Simplified logger setup * Added short description of observability for the scheduler. * Moved metrics definitions to constants instead of hardcoding in the string --- .dockerignore | 4 +- cmd/runner/main.go | 4 +- .../docker/docker-compose.dev.yml | 0 .../docker/docker-compose.local-dev.yml | 16 +-- .../docker/docker-compose.observability.yaml | 46 +++++++ .../docker/docker-compose.yml | 0 .../docker/observability/promtail/config.yaml | 15 +++ documentation/observability.md | 31 +++++ go.mod | 2 +- internal/pkg/logger/logger.go | 52 +------- internal/pkg/metrics/runner.go | 119 ++++++++++++++++++ internal/runner/runner.go | 32 +++++ makefile | 4 +- 13 files changed, 263 insertions(+), 62 deletions(-) rename docker-compose.dev.yml => deployments/docker/docker-compose.dev.yml (100%) rename docker-compose.local-dev.yml => deployments/docker/docker-compose.local-dev.yml (85%) create mode 100644 deployments/docker/docker-compose.observability.yaml rename docker-compose.yml => deployments/docker/docker-compose.yml (100%) create mode 100644 deployments/docker/observability/promtail/config.yaml create mode 100644 documentation/observability.md create mode 100644 internal/pkg/metrics/runner.go diff --git a/.dockerignore b/.dockerignore index b341597..56e9733 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,8 +3,8 @@ .idea .dockerignore .gitignore -docker-compose.dev.yml -docker-compose.yml +deployments/docker/docker-compose.dev.yml +deployments/docker/docker-compose.yml docker-compose.local-dev.yml.yml docs *.md diff --git a/cmd/runner/main.go b/cmd/runner/main.go index 9b39bf8..7bf7a46 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -16,6 +16,7 @@ import ( "github.com/xBlaz3kx/distributed-scheduler/internal/executor" "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database" "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/metrics" "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/security" "github.com/xBlaz3kx/distributed-scheduler/internal/runner" "github.com/xBlaz3kx/distributed-scheduler/internal/service/job" @@ -129,6 +130,7 @@ func runCmd(cmd *cobra.Command, args []string) { runner := runner.New(runner.Config{ JobService: jobService, + Metrics: metrics.NewRunnerMetrics(cfg.Observability.Metrics), Log: log, ExecutorFactory: executorFactory, InstanceId: cfg.ID, @@ -136,7 +138,7 @@ func runCmd(cmd *cobra.Command, args []string) { }) runner.Start() - httpServer := devxHttp.NewServer(cfg.Http, observability.NewNoopObservability()) + httpServer := devxHttp.NewServer(cfg.Http, obs) go func() { log.Info("Started HTTP server", zap.String("address", cfg.Http.Address)) databaseCheck := database.NewHealthChecker(db) diff --git a/docker-compose.dev.yml b/deployments/docker/docker-compose.dev.yml similarity index 100% rename from docker-compose.dev.yml rename to deployments/docker/docker-compose.dev.yml diff --git a/docker-compose.local-dev.yml b/deployments/docker/docker-compose.local-dev.yml similarity index 85% rename from docker-compose.local-dev.yml rename to deployments/docker/docker-compose.local-dev.yml index 3e3f503..0a6642f 100644 --- a/docker-compose.local-dev.yml +++ b/deployments/docker/docker-compose.local-dev.yml @@ -6,8 +6,8 @@ services: depends_on: - postgres build: - context: . - dockerfile: build/manager/Dockerfile + context: ../.. + dockerfile: ./build/manager/Dockerfile command: - "/app/tooling" - "migrate" @@ -18,8 +18,8 @@ services: manager: build: - context: . - dockerfile: build/manager/Dockerfile + context: ../.. + dockerfile: ./build/manager/Dockerfile container_name: manager restart: always ports: @@ -34,19 +34,19 @@ services: - MANAGER_HTTP_ADDRESS=0.0.0.0:8000 - MANAGER_STORAGE_ENCRYPTION_KEY=ishouldbechanged volumes: - - ./config/manager.yaml:/app/config.yaml + - ../../config/manager.yaml:/app/config.yaml depends_on: - postgres - migration runner: build: - context: . - dockerfile: build/runner/Dockerfile + context: ../.. + dockerfile: ./build/runner/Dockerfile container_name: runner restart: always volumes: - - ./config/runner.yaml:/app/config.yaml + - ../../config/runner.yaml:/app/config.yaml environment: - RUNNER_OBSERVABILITY_LOG_LEVEL=info - RUNNER_DB_HOST=postgres:5432 diff --git a/deployments/docker/docker-compose.observability.yaml b/deployments/docker/docker-compose.observability.yaml new file mode 100644 index 0000000..0675f8f --- /dev/null +++ b/deployments/docker/docker-compose.observability.yaml @@ -0,0 +1,46 @@ +services: + + # Configure the Grafana observability stack + grafana-lgtm-stack: + image: grafana/otel-lgtm + container_name: lgtm-stack + hostname: lgtm-stack + profiles: + - observability + ports: + - "3000:3000" + - "4317:4317" + - "4318:4318" + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:3000/api/health" ] + start_period: 30s + interval: 30s + timeout: 10s + retries: 5 + volumes: + - prometheus:/prometheus + - loki:/data/loki + - grafana:/var/lib/grafana + + promtail: + image: grafana/promtail:latest + container_name: promtail + command: + - "-config.file=/etc/promtail/promtail.yaml" + profiles: + - observability + hostname: promtail + restart: always + depends_on: + grafana-lgtm-stack: + condition: service_healthy + volumes: + - ./observability/promtail/config.yaml:/etc/promtail/promtail.yaml + - /var/run/docker.sock:/var/run/docker.sock:ro + +volumes: + prometheus: + grafana: + loki: + minio_loki: + tempo_data: diff --git a/docker-compose.yml b/deployments/docker/docker-compose.yml similarity index 100% rename from docker-compose.yml rename to deployments/docker/docker-compose.yml diff --git a/deployments/docker/observability/promtail/config.yaml b/deployments/docker/observability/promtail/config.yaml new file mode 100644 index 0000000..aa1cb8e --- /dev/null +++ b/deployments/docker/observability/promtail/config.yaml @@ -0,0 +1,15 @@ +server: + http_listen_address: 0.0.0.0 + http_listen_port: 9080 + +positions: + filename: /tmp/positions.yaml + +client: + url: http://lgtm-stack:3100/loki/api/v1/push + +scrape_configs: + - job_name: docker + docker_sd_configs: + - host: unix:///var/run/docker.sock + refresh_interval: 5s \ No newline at end of file diff --git a/documentation/observability.md b/documentation/observability.md new file mode 100644 index 0000000..c2008c4 --- /dev/null +++ b/documentation/observability.md @@ -0,0 +1,31 @@ +# Observability + +Scheduler currently supports logging and metrics. Both are exported via the OpenTelemetry protocol (GRPC) and can be +collected by any OpenTelemetry-compatible collector. + +## Logging + +Logging can be configured via the `LOG_LEVEL` environment variable. The following levels are supported: + +- `debug` +- `info` +- `warn` +- `error` + +## Metrics + +Metrics can be enabled by setting the `METRICS_ENABLED` environment variable to `true`. Metrics are exported via the +OpenTelemetry protocol (GRPC). + +The following manager metrics are currently exported: + +- `http_requests_total`: The total number of HTTP requests received by the server. +- `http_request_duration_seconds`: The duration of HTTP requests in seconds. +- `http_errors_total`: The total number of failed HTTP requests. + +The following runner metrics are currently exported: + +- `scheduler_jobs_total`: The total number of jobs that have been scheduled. +- `scheduler_jobs_failed_total`: The total number of jobs that have failed. +- `scheduler_jobs_duration_seconds`: The duration of jobs in seconds. +- `scheduler_jobs_in_execution`: The total number of jobs currently in execution. diff --git a/go.mod b/go.mod index 63a5406..290b463 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/swaggo/gin-swagger v1.6.0 github.com/swaggo/swag v1.16.3 github.com/xBlaz3kx/DevX v0.0.0-20240731212815-b4dca2816bed + go.opentelemetry.io/otel/metric v1.28.0 ) require ( @@ -114,7 +115,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect diff --git a/internal/pkg/logger/logger.go b/internal/pkg/logger/logger.go index 873cf86..953cd61 100644 --- a/internal/pkg/logger/logger.go +++ b/internal/pkg/logger/logger.go @@ -3,58 +3,14 @@ package logger import ( - "os" - "github.com/GLCharge/otelzap" + "github.com/samber/lo" "github.com/spf13/viper" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" + "github.com/xBlaz3kx/DevX/observability" ) func SetupLogging() { logLevel := viper.GetString("log.level") - logger, err := New(logLevel) - if err == nil { - otelzap.ReplaceGlobals(logger) - } -} - -// New constructs a Sugared Logger that writes to stdout and -// provides human-readable timestamps. -func New(logLevel string) (*otelzap.Logger, error) { - level := zapcore.InfoLevel - - switch logLevel { - case "debug": - level = zapcore.DebugLevel - case "warn": - level = zapcore.WarnLevel - case "error": - level = zapcore.ErrorLevel - } - - stdout := zapcore.Lock(os.Stdout) - stderr := zapcore.Lock(os.Stderr) - - stdoutLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool { - return l >= level && l < zapcore.ErrorLevel - }) - stderrLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool { - return l >= level && l >= zapcore.ErrorLevel - }) - - encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()) - - core := zapcore.NewTee( - zapcore.NewCore(encoder, stdout, stdoutLevelEnabler), - zapcore.NewCore(encoder, stderr, stderrLevelEnabler), - ) - - logger := otelzap.New( - zap.New(core), - otelzap.WithTraceIDField(true), - otelzap.WithMinLevel(level), - ) - - return logger, nil + logger := observability.NewLogging(observability.LogConfig{Level: lo.ToPtr(observability.LogLevel(logLevel))}) + otelzap.ReplaceGlobals(logger.Logger()) } diff --git a/internal/pkg/metrics/runner.go b/internal/pkg/metrics/runner.go new file mode 100644 index 0000000..32ef399 --- /dev/null +++ b/internal/pkg/metrics/runner.go @@ -0,0 +1,119 @@ +package metrics + +import ( + "context" + + "github.com/xBlaz3kx/DevX/observability" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + jobsTotal = "scheduler_runner_jobs_total" + jobsExecuted = "scheduler_runner_jobs_executed" + jobsFailed = "scheduler_runner_jobs_failed" + jobRetries = "scheduler_runner_job_retries" + jobDuration = "scheduler_runner_job_duration" + jobsInExecution = "scheduler_runner_jobs_in_execution" +) + +// Add attributes: Job Type/Executor, Instance ID, status, numberOfTries + +type RunnerMetrics struct { + enabled bool + + jobsTotal metric.Int64Counter + + jobsExecuted metric.Int64Counter + + jobsFailed metric.Int64Counter + + jobRetries metric.Int64Counter + + jobDuration metric.Float64Histogram + + jobsInExecution metric.Int64Gauge +} + +func NewRunnerMetrics(config observability.MetricsConfig) *RunnerMetrics { + if !config.Enabled { + return &RunnerMetrics{enabled: false} + } + + meter := otel.GetMeterProvider().Meter("runner") + + jobsTotal, err := meter.Int64Counter(jobsTotal) + must(err) + + jobsExecuted, err := meter.Int64Counter(jobsExecuted) + must(err) + + jobsFailed, err := meter.Int64Counter(jobsFailed) + must(err) + + jobRetries, err := meter.Int64Counter(jobRetries) + must(err) + + jobDuration, err := meter.Float64Histogram(jobDuration) + must(err) + + jobsInExecution, err := meter.Int64Gauge(jobsInExecution) + must(err) + + return &RunnerMetrics{ + enabled: true, + jobsTotal: jobsTotal, + jobsExecuted: jobsExecuted, + jobsFailed: jobsFailed, + jobRetries: jobRetries, + jobDuration: jobDuration, + jobsInExecution: jobsInExecution, + } +} + +func (r *RunnerMetrics) IncreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) { + if r.enabled { + // Increase gauge metric for number of running jobs + attrs := metric.WithAttributes(attributes...) + r.jobsInExecution.Record(ctx, int64(numJobs), attrs) + } +} + +func (r *RunnerMetrics) DecreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) { + if r.enabled { + jobs := int64(numJobs) + // Increase gauge metric for number of running jobs + attrs := metric.WithAttributes(attributes...) + r.jobsInExecution.Record(ctx, -jobs, attrs) + r.jobsTotal.Add(ctx, jobs, attrs) + r.jobsExecuted.Add(ctx, jobs, attrs) + } +} + +func (r *RunnerMetrics) RecordJobDuration(ctx context.Context, duration float64, attributes ...attribute.KeyValue) { + if r.enabled { + attrs := metric.WithAttributes(attributes...) + r.jobDuration.Record(ctx, duration, attrs) + } +} + +func (r *RunnerMetrics) IncrementJobRetries(ctx context.Context, attributes ...attribute.KeyValue) { + if r.enabled { + attrs := metric.WithAttributes(attributes...) + r.jobRetries.Add(ctx, 1, attrs) + } +} + +func (r *RunnerMetrics) IncreaseFailedJobCount(ctx context.Context, attributes ...attribute.KeyValue) { + if r.enabled { + attrs := metric.WithAttributes(attributes...) + r.jobsFailed.Add(ctx, 1, attrs) + } +} + +func must(err error) { + if err != nil { + panic(err) + } +} diff --git a/internal/runner/runner.go b/internal/runner/runner.go index 3836563..86db1a6 100644 --- a/internal/runner/runner.go +++ b/internal/runner/runner.go @@ -8,12 +8,17 @@ import ( "github.com/GLCharge/otelzap" "github.com/xBlaz3kx/distributed-scheduler/internal/executor" "github.com/xBlaz3kx/distributed-scheduler/internal/model" + "github.com/xBlaz3kx/distributed-scheduler/internal/pkg/metrics" + "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" ) type Runner struct { jobService JobService + // Runner metrics + metrics *metrics.RunnerMetrics + executorFactory executor.Factory ticker *time.Ticker log *otelzap.Logger @@ -51,6 +56,7 @@ type JobService interface { type Config struct { JobService JobService + Metrics *metrics.RunnerMetrics ExecutorFactory executor.Factory Log *otelzap.Logger InstanceId string @@ -69,6 +75,7 @@ func New(cfg Config) *Runner { s := &Runner{ jobService: cfg.JobService, + metrics: cfg.Metrics, instanceId: cfg.InstanceId, log: cfg.Log, ticker: time.NewTicker(cfg.JobExecution.Interval), @@ -163,12 +170,21 @@ func (s *Runner) runJobs() { return } + numJobs := len(jobs) + attr := attribute.String("instance", s.instanceId) + + // Increase gauge metric for number of running jobs + s.metrics.IncreaseJobsInExecution(ctx, numJobs, attr) + s.log.Debug("Running jobs", zap.Int("count", len(jobs))) // Run each job for _, j := range jobs { s.executeJob(j) } + + // Decrease gauge metric for number of running jobs + s.metrics.DecreaseJobsInExecution(ctx, numJobs, attr) } func (s *Runner) executeJob(job *model.Job) { @@ -196,6 +212,22 @@ func (s *Runner) executeJob(job *model.Job) { stopTime := time.Now() + attrs := []attribute.KeyValue{ + attribute.String("job_type", string(job.Type)), + attribute.String("instance", s.instanceId), + } + // Record the job duration + s.metrics.RecordJobDuration( + s.ctx, + time.Since(startTime).Seconds(), + attrs..., + ) + + // Increment the job retries metric if the job failed + if err != nil { + s.metrics.IncreaseFailedJobCount(s.ctx, attrs...) + } + // Report the job as finished err = s.jobService.FinishJobExecution(s.ctx, job, startTime, stopTime, err) if err != nil { diff --git a/makefile b/makefile index 1a3b9ae..74431cf 100644 --- a/makefile +++ b/makefile @@ -26,12 +26,12 @@ run/runner: dev/up .PHONY: dev/up dev/up: @echo "Starting dev environment..." - @docker compose -f docker-compose.dev.yml up -d + @docker compose -f ./deployments/dockerdocker-compose.dev.yml up -d .PHONY: dev/down dev/down: @echo "Stopping dev environment..." - @docker compose -f docker-compose.dev.yml down + @docker compose -f ./deployments/docker/docker-compose.dev.yml down .PHONY: test test: