Skip to content

Commit

Permalink
Feature: Runner metrics support (#17)
Browse files Browse the repository at this point in the history
* Added observability stack

* Moved docker composes to deployments/docker

* Added runner metrics

* Simplified logger setup

* Added short description of observability for the scheduler.

* Moved metrics definitions to constants instead of hardcoding in the string
  • Loading branch information
xBlaz3kx authored Aug 26, 2024
1 parent e181507 commit 2233053
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 62 deletions.
4 changes: 2 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
.idea
.dockerignore
.gitignore
docker-compose.dev.yml
docker-compose.yml
deployments/docker/docker-compose.dev.yml
deployments/docker/docker-compose.yml
docker-compose.local-dev.yml.yml
docs
*.md
4 changes: 3 additions & 1 deletion cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/xBlaz3kx/distributed-scheduler/internal/executor"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/database"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/logger"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/metrics"
"github.com/xBlaz3kx/distributed-scheduler/internal/pkg/security"
"github.com/xBlaz3kx/distributed-scheduler/internal/runner"
"github.com/xBlaz3kx/distributed-scheduler/internal/service/job"
Expand Down Expand Up @@ -129,14 +130,15 @@ func runCmd(cmd *cobra.Command, args []string) {

runner := runner.New(runner.Config{
JobService: jobService,
Metrics: metrics.NewRunnerMetrics(cfg.Observability.Metrics),
Log: log,
ExecutorFactory: executorFactory,
InstanceId: cfg.ID,
JobExecution: cfg.JobExecutionSettings,
})
runner.Start()

httpServer := devxHttp.NewServer(cfg.Http, observability.NewNoopObservability())
httpServer := devxHttp.NewServer(cfg.Http, obs)
go func() {
log.Info("Started HTTP server", zap.String("address", cfg.Http.Address))
databaseCheck := database.NewHealthChecker(db)
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ services:
depends_on:
- postgres
build:
context: .
dockerfile: build/manager/Dockerfile
context: ../..
dockerfile: ./build/manager/Dockerfile
command:
- "/app/tooling"
- "migrate"
Expand All @@ -18,8 +18,8 @@ services:

manager:
build:
context: .
dockerfile: build/manager/Dockerfile
context: ../..
dockerfile: ./build/manager/Dockerfile
container_name: manager
restart: always
ports:
Expand All @@ -34,19 +34,19 @@ services:
- MANAGER_HTTP_ADDRESS=0.0.0.0:8000
- MANAGER_STORAGE_ENCRYPTION_KEY=ishouldbechanged
volumes:
- ./config/manager.yaml:/app/config.yaml
- ../../config/manager.yaml:/app/config.yaml
depends_on:
- postgres
- migration

runner:
build:
context: .
dockerfile: build/runner/Dockerfile
context: ../..
dockerfile: ./build/runner/Dockerfile
container_name: runner
restart: always
volumes:
- ./config/runner.yaml:/app/config.yaml
- ../../config/runner.yaml:/app/config.yaml
environment:
- RUNNER_OBSERVABILITY_LOG_LEVEL=info
- RUNNER_DB_HOST=postgres:5432
Expand Down
46 changes: 46 additions & 0 deletions deployments/docker/docker-compose.observability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
services:

# Configure the Grafana observability stack
grafana-lgtm-stack:
image: grafana/otel-lgtm
container_name: lgtm-stack
hostname: lgtm-stack
profiles:
- observability
ports:
- "3000:3000"
- "4317:4317"
- "4318:4318"
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:3000/api/health" ]
start_period: 30s
interval: 30s
timeout: 10s
retries: 5
volumes:
- prometheus:/prometheus
- loki:/data/loki
- grafana:/var/lib/grafana

promtail:
image: grafana/promtail:latest
container_name: promtail
command:
- "-config.file=/etc/promtail/promtail.yaml"
profiles:
- observability
hostname: promtail
restart: always
depends_on:
grafana-lgtm-stack:
condition: service_healthy
volumes:
- ./observability/promtail/config.yaml:/etc/promtail/promtail.yaml
- /var/run/docker.sock:/var/run/docker.sock:ro

volumes:
prometheus:
grafana:
loki:
minio_loki:
tempo_data:
File renamed without changes.
15 changes: 15 additions & 0 deletions deployments/docker/observability/promtail/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
server:
http_listen_address: 0.0.0.0
http_listen_port: 9080

positions:
filename: /tmp/positions.yaml

client:
url: http://lgtm-stack:3100/loki/api/v1/push

scrape_configs:
- job_name: docker
docker_sd_configs:
- host: unix:///var/run/docker.sock
refresh_interval: 5s
31 changes: 31 additions & 0 deletions documentation/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Observability

Scheduler currently supports logging and metrics. Both are exported via the OpenTelemetry protocol (GRPC) and can be
collected by any OpenTelemetry-compatible collector.

## Logging

Logging can be configured via the `LOG_LEVEL` environment variable. The following levels are supported:

- `debug`
- `info`
- `warn`
- `error`

## Metrics

Metrics can be enabled by setting the `METRICS_ENABLED` environment variable to `true`. Metrics are exported via the
OpenTelemetry protocol (GRPC).

The following manager metrics are currently exported:

- `http_requests_total`: The total number of HTTP requests received by the server.
- `http_request_duration_seconds`: The duration of HTTP requests in seconds.
- `http_errors_total`: The total number of failed HTTP requests.

The following runner metrics are currently exported:

- `scheduler_jobs_total`: The total number of jobs that have been scheduled.
- `scheduler_jobs_failed_total`: The total number of jobs that have failed.
- `scheduler_jobs_duration_seconds`: The duration of jobs in seconds.
- `scheduler_jobs_in_execution`: The total number of jobs currently in execution.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
github.com/xBlaz3kx/DevX v0.0.0-20240731212815-b4dca2816bed
go.opentelemetry.io/otel/metric v1.28.0
)

require (
Expand Down Expand Up @@ -114,7 +115,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.28.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.23.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.23.1 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
Expand Down
52 changes: 4 additions & 48 deletions internal/pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,14 @@
package logger

import (
"os"

"github.com/GLCharge/otelzap"
"github.com/samber/lo"
"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"github.com/xBlaz3kx/DevX/observability"
)

func SetupLogging() {
logLevel := viper.GetString("log.level")
logger, err := New(logLevel)
if err == nil {
otelzap.ReplaceGlobals(logger)
}
}

// New constructs a Sugared Logger that writes to stdout and
// provides human-readable timestamps.
func New(logLevel string) (*otelzap.Logger, error) {
level := zapcore.InfoLevel

switch logLevel {
case "debug":
level = zapcore.DebugLevel
case "warn":
level = zapcore.WarnLevel
case "error":
level = zapcore.ErrorLevel
}

stdout := zapcore.Lock(os.Stdout)
stderr := zapcore.Lock(os.Stderr)

stdoutLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool {
return l >= level && l < zapcore.ErrorLevel
})
stderrLevelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool {
return l >= level && l >= zapcore.ErrorLevel
})

encoder := zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig())

core := zapcore.NewTee(
zapcore.NewCore(encoder, stdout, stdoutLevelEnabler),
zapcore.NewCore(encoder, stderr, stderrLevelEnabler),
)

logger := otelzap.New(
zap.New(core),
otelzap.WithTraceIDField(true),
otelzap.WithMinLevel(level),
)

return logger, nil
logger := observability.NewLogging(observability.LogConfig{Level: lo.ToPtr(observability.LogLevel(logLevel))})
otelzap.ReplaceGlobals(logger.Logger())
}
119 changes: 119 additions & 0 deletions internal/pkg/metrics/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package metrics

import (
"context"

"github.com/xBlaz3kx/DevX/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
jobsTotal = "scheduler_runner_jobs_total"
jobsExecuted = "scheduler_runner_jobs_executed"
jobsFailed = "scheduler_runner_jobs_failed"
jobRetries = "scheduler_runner_job_retries"
jobDuration = "scheduler_runner_job_duration"
jobsInExecution = "scheduler_runner_jobs_in_execution"
)

// Add attributes: Job Type/Executor, Instance ID, status, numberOfTries

type RunnerMetrics struct {
enabled bool

jobsTotal metric.Int64Counter

jobsExecuted metric.Int64Counter

jobsFailed metric.Int64Counter

jobRetries metric.Int64Counter

jobDuration metric.Float64Histogram

jobsInExecution metric.Int64Gauge
}

func NewRunnerMetrics(config observability.MetricsConfig) *RunnerMetrics {
if !config.Enabled {
return &RunnerMetrics{enabled: false}
}

meter := otel.GetMeterProvider().Meter("runner")

jobsTotal, err := meter.Int64Counter(jobsTotal)
must(err)

jobsExecuted, err := meter.Int64Counter(jobsExecuted)
must(err)

jobsFailed, err := meter.Int64Counter(jobsFailed)
must(err)

jobRetries, err := meter.Int64Counter(jobRetries)
must(err)

jobDuration, err := meter.Float64Histogram(jobDuration)
must(err)

jobsInExecution, err := meter.Int64Gauge(jobsInExecution)
must(err)

return &RunnerMetrics{
enabled: true,
jobsTotal: jobsTotal,
jobsExecuted: jobsExecuted,
jobsFailed: jobsFailed,
jobRetries: jobRetries,
jobDuration: jobDuration,
jobsInExecution: jobsInExecution,
}
}

func (r *RunnerMetrics) IncreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) {
if r.enabled {
// Increase gauge metric for number of running jobs
attrs := metric.WithAttributes(attributes...)
r.jobsInExecution.Record(ctx, int64(numJobs), attrs)
}
}

func (r *RunnerMetrics) DecreaseJobsInExecution(ctx context.Context, numJobs int, attributes ...attribute.KeyValue) {
if r.enabled {
jobs := int64(numJobs)
// Increase gauge metric for number of running jobs
attrs := metric.WithAttributes(attributes...)
r.jobsInExecution.Record(ctx, -jobs, attrs)
r.jobsTotal.Add(ctx, jobs, attrs)
r.jobsExecuted.Add(ctx, jobs, attrs)
}
}

func (r *RunnerMetrics) RecordJobDuration(ctx context.Context, duration float64, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobDuration.Record(ctx, duration, attrs)
}
}

func (r *RunnerMetrics) IncrementJobRetries(ctx context.Context, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobRetries.Add(ctx, 1, attrs)
}
}

func (r *RunnerMetrics) IncreaseFailedJobCount(ctx context.Context, attributes ...attribute.KeyValue) {
if r.enabled {
attrs := metric.WithAttributes(attributes...)
r.jobsFailed.Add(ctx, 1, attrs)
}
}

func must(err error) {
if err != nil {
panic(err)
}
}
Loading

0 comments on commit 2233053

Please sign in to comment.