Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 59 additions & 27 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,80 @@ var (
debug bool
)

func newCmd(args []string) *cobra.Command {
root := &cobra.Command{
Use: filepath.Base(args[0]),
Short: "Manage the opensvc collector infrastructure components.",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if debug {
slog.SetLogLoggerLevel(slog.LevelDebug)
}

logConfigDir()
if err := initConfig(); err != nil {
func cmdWorker() *cobra.Command {
var maxRunners int
cmd := &cobra.Command{
Use: "worker",
Short: "run jobs from a list of queues",
RunE: func(cmd *cobra.Command, args []string) error {
if err := setup(); err != nil {
return err
}
logConfigFileUsed()
slog.Info(fmt.Sprintf("oc3 vesion: %s", version.Version()))
return nil
return work(maxRunners, args)
},
}
cmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners")
return cmd
}

root.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug")

apiCmd := cobra.Command{
func cmdAPI() *cobra.Command {
return &cobra.Command{
Use: "api",
Short: "serve the collector api",
RunE: func(cmd *cobra.Command, args []string) error {
if err := setup(); err != nil {
return err
}
return listen()
},
}
}

var maxRunners int
workerCmd := cobra.Command{
Use: "worker",
Short: "run jobs from a list of queues",
func cmdScheduler() *cobra.Command {
return &cobra.Command{
Use: "scheduler",
Short: "start running db maintenance tasks",
RunE: func(cmd *cobra.Command, args []string) error {
return work(maxRunners, args)
return schedule()
},
}
}

func cmdVersion() *cobra.Command {
cmd := &cobra.Command{
Use: "version",
Short: "display the oc3 version",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println(version.Version())
},
}
workerCmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners")
return cmd
}

root.AddCommand(
&apiCmd,
&workerCmd,
func cmdRoot(args []string) *cobra.Command {
cmd := &cobra.Command{
Use: filepath.Base(args[0]),
Short: "Manage the opensvc collector infrastructure components.",
}
cmd.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug")
cmd.AddCommand(
cmdAPI(),
cmdScheduler(),
cmdVersion(),
cmdWorker(),
)
return root
return cmd
}

func setup() error {
if debug {
slog.SetLogLoggerLevel(slog.LevelDebug)
}
logConfigDir()
if err := initConfig(); err != nil {
return err
}
logConfigFileUsed()
slog.Info(fmt.Sprintf("oc3 version: %s", version.Version()))
return nil
}
8 changes: 4 additions & 4 deletions conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
configCandidateDirs = []string{"/etc/oc3/", "$HOME/.oc3/", "./"}
configCandidateDirs = []string{"/etc/oc3/", "$HOME/.config/oc3", "./"}
)

func logConfigDir() {
Expand Down Expand Up @@ -52,9 +52,9 @@ func initConfig() error {
// config file
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath("/etc/oc3")
viper.AddConfigPath("$HOME/.oc3")
viper.AddConfigPath(".")
for _, d := range configCandidateDirs {
viper.AddConfigPath(d)
}
if err := viper.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
return err
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

// cmd parses the command line and run the selected component.
func cmd(args []string) error {
root := newCmd(args)
return root.Execute()
cmd := cmdRoot(args)
return cmd.Execute()
}

// main is the program entrypoint. It's the only function using os.Exit, so
Expand Down
51 changes: 51 additions & 0 deletions pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"fmt"
"log/slog"
"net"
"net/http"
"os"
"time"
)

func pprofUx(p string) error {
slog.Info(fmt.Sprintf("pprof ux listener on %s", p))
if err := os.RemoveAll(p); err != nil {
return err
}
listener, err := net.Listen("unix", p)
if err != nil {
return err
}
go func() {
server := http.Server{}
if err := server.Serve(listener); err != nil {
slog.Error(fmt.Sprintf("pprof ux listener on %s: %s", p, err))
}
}()
return nil
}

func pprofInet(addr string) error {
slog.Info(fmt.Sprintf("pprof inet listener on %s", addr))
c := make(chan any)
go func() {
err := http.ListenAndServe(addr, nil)
select {
case c <- err:
default:
}
}()
select {
case i := <-c:
err, ok := i.(error)
if ok {
return err
}
case <-time.After(100 * time.Millisecond):
// don't wait for future errors
return nil
}
return nil
}
45 changes: 45 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"fmt"
"log/slog"
"net/http"

"github.com/opensvc/oc3/scheduler"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
)

func schedule() error {
if err := setup(); err != nil {
return err
}
db, err := newDatabase()
if err != nil {
return err
}
if viper.GetBool("scheduler.pprof.enable") {
if p := viper.GetString("scheduler.pprof.uxsocket"); p != "" {
if err := pprofUx(p); err != nil {
return err
}
}
if addr := viper.GetString("scheduler.pprof.addr"); addr != "" {
if err := pprofInet(addr); err != nil {
return err
}
}
}
if viper.GetBool("scheduler.metrics.enable") {
addr := viper.GetString("scheduler.metrics.addr")
slog.Info(fmt.Sprintf("metrics listener on http://%s/metrics", addr))
http.Handle("/metrics", promhttp.Handler())
go func() {
_ = http.ListenAndServe(addr, nil)
}()
}
sched := &scheduler.Scheduler{
DB: db,
}
return sched.Run()
}
41 changes: 41 additions & 0 deletions scheduler/db_actions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package scheduler

import (
"context"
"time"
)

func (t *Scheduler) TaskRefreshBActionErrors(ctx context.Context) error {

ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

tx, err := t.DB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()

_, err = tx.ExecContext(ctx, "TRUNCATE b_action_errors")
if err != nil {
return err
}

sql := `INSERT INTO b_action_errors (
SELECT NULL, a.svc_id, a.node_id, count(a.id)
FROM svcactions a
WHERE
a.end>date_sub(now(), interval 1 day) AND
a.status='err' AND
isnull(a.ack) AND
a.end IS NOT NULL
GROUP BY a.svc_id, a.node_id
)`

_, err = tx.ExecContext(ctx, sql)
if err != nil {
return err
}

return tx.Commit()
}
103 changes: 103 additions & 0 deletions scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package scheduler

import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type (
Scheduler struct {
DB *sql.DB
}

Task struct {
scheduler *Scheduler
name string
fn func(context.Context) error
}
)

const (
taskExecStatusOk = "ok"
taskExecStatusFailed = "failed"
)

var (
taskExecCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "oc3",
Subsystem: "scheduler",
Name: "task_exec_count",
Help: "Task execution counter",
},
[]string{"desc", "status"},
)

taskExecDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "oc3",
Subsystem: "scheduler",
Name: "task_exec_duration_seconds",
Help: "Task execution duration in seconds.",
Buckets: prometheus.DefBuckets,
},
[]string{"desc", "status"},
)
)

func (t *Task) Run(ctx context.Context) {
slog.Info(fmt.Sprintf("%s: exec", t.name))
status := taskExecStatusOk
begin := time.Now()
err := t.fn(ctx)
duration := time.Now().Sub(begin)
if err != nil {
status = taskExecStatusFailed
slog.Error(fmt.Sprintf("%s: %s [%s]", t.name, err, duration))
} else {
slog.Info(fmt.Sprintf("%s: %s [%s]", t.name, status, duration))
}
taskExecCounter.With(prometheus.Labels{"desc": t.name, "status": status}).Inc()
taskExecDuration.With(prometheus.Labels{"desc": t.name, "status": status}).Observe(duration.Seconds())
}

func (t *Scheduler) NewTask(name string, fn func(context.Context) error) *Task {
return &Task{
scheduler: t,
name: name,
fn: fn,
}
}

func (t *Scheduler) Run() error {
ctx := context.Background()

tmrHourly := time.NewTicker(time.Hour)
defer tmrHourly.Stop()
tmrDaily := time.NewTicker(24 * time.Hour)
defer tmrDaily.Stop()
tmrWeekly := time.NewTicker(7 * 24 * time.Hour)
defer tmrWeekly.Stop()

tmrNow := time.NewTimer(0)

for {
select {
case <-tmrNow.C:
// Put here what you want to debug upon scheduler startup
t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx)
tmrNow.Stop()
case <-tmrHourly.C:
t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx)
case <-tmrDaily.C:
case <-tmrWeekly.C:
}
}
return nil
}
Loading
Loading