diff --git a/cmd.go b/cmd.go index fdc08f3..369fe44 100644 --- a/cmd.go +++ b/cmd.go @@ -14,48 +14,80 @@ var ( debug bool ) -func newCmd(args []string) *cobra.Command { - root := &cobra.Command{ - Use: filepath.Base(args[0]), - Short: "Manage the opensvc collector infrastructure components.", - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - if debug { - slog.SetLogLoggerLevel(slog.LevelDebug) - } - - logConfigDir() - if err := initConfig(); err != nil { +func cmdWorker() *cobra.Command { + var maxRunners int + cmd := &cobra.Command{ + Use: "worker", + Short: "run jobs from a list of queues", + RunE: func(cmd *cobra.Command, args []string) error { + if err := setup(); err != nil { return err } - logConfigFileUsed() - slog.Info(fmt.Sprintf("oc3 vesion: %s", version.Version())) - return nil + return work(maxRunners, args) }, } + cmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners") + return cmd +} - root.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") - - apiCmd := cobra.Command{ +func cmdAPI() *cobra.Command { + return &cobra.Command{ Use: "api", Short: "serve the collector api", RunE: func(cmd *cobra.Command, args []string) error { + if err := setup(); err != nil { + return err + } return listen() }, } +} - var maxRunners int - workerCmd := cobra.Command{ - Use: "worker", - Short: "run jobs from a list of queues", +func cmdScheduler() *cobra.Command { + return &cobra.Command{ + Use: "scheduler", + Short: "start running db maintenance tasks", RunE: func(cmd *cobra.Command, args []string) error { - return work(maxRunners, args) + return schedule() + }, + } +} + +func cmdVersion() *cobra.Command { + cmd := &cobra.Command{ + Use: "version", + Short: "display the oc3 version", + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(version.Version()) }, } - workerCmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners") + return cmd +} - root.AddCommand( - &apiCmd, - &workerCmd, +func cmdRoot(args []string) *cobra.Command { + cmd := &cobra.Command{ + Use: filepath.Base(args[0]), + Short: "Manage the opensvc collector infrastructure components.", + } + cmd.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") + cmd.AddCommand( + cmdAPI(), + cmdScheduler(), + cmdVersion(), + cmdWorker(), ) - return root + return cmd +} + +func setup() error { + if debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + logConfigDir() + if err := initConfig(); err != nil { + return err + } + logConfigFileUsed() + slog.Info(fmt.Sprintf("oc3 version: %s", version.Version())) + return nil } diff --git a/conf.go b/conf.go index e29cc1b..2329090 100644 --- a/conf.go +++ b/conf.go @@ -9,7 +9,7 @@ import ( ) var ( - configCandidateDirs = []string{"/etc/oc3/", "$HOME/.oc3/", "./"} + configCandidateDirs = []string{"/etc/oc3/", "$HOME/.config/oc3", "./"} ) func logConfigDir() { @@ -52,9 +52,9 @@ func initConfig() error { // config file viper.SetConfigName("config") viper.SetConfigType("yaml") - viper.AddConfigPath("/etc/oc3") - viper.AddConfigPath("$HOME/.oc3") - viper.AddConfigPath(".") + for _, d := range configCandidateDirs { + viper.AddConfigPath(d) + } if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); !ok { return err diff --git a/main.go b/main.go index 3c08b26..da79882 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( // cmd parses the command line and run the selected component. func cmd(args []string) error { - root := newCmd(args) - return root.Execute() + cmd := cmdRoot(args) + return cmd.Execute() } // main is the program entrypoint. It's the only function using os.Exit, so diff --git a/pprof.go b/pprof.go new file mode 100644 index 0000000..c2a2e29 --- /dev/null +++ b/pprof.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + "log/slog" + "net" + "net/http" + "os" + "time" +) + +func pprofUx(p string) error { + slog.Info(fmt.Sprintf("pprof ux listener on %s", p)) + if err := os.RemoveAll(p); err != nil { + return err + } + listener, err := net.Listen("unix", p) + if err != nil { + return err + } + go func() { + server := http.Server{} + if err := server.Serve(listener); err != nil { + slog.Error(fmt.Sprintf("pprof ux listener on %s: %s", p, err)) + } + }() + return nil +} + +func pprofInet(addr string) error { + slog.Info(fmt.Sprintf("pprof inet listener on %s", addr)) + c := make(chan any) + go func() { + err := http.ListenAndServe(addr, nil) + select { + case c <- err: + default: + } + }() + select { + case i := <-c: + err, ok := i.(error) + if ok { + return err + } + case <-time.After(100 * time.Millisecond): + // don't wait for future errors + return nil + } + return nil +} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..188137f --- /dev/null +++ b/scheduler.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "log/slog" + "net/http" + + "github.com/opensvc/oc3/scheduler" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/viper" +) + +func schedule() error { + if err := setup(); err != nil { + return err + } + db, err := newDatabase() + if err != nil { + return err + } + if viper.GetBool("scheduler.pprof.enable") { + if p := viper.GetString("scheduler.pprof.uxsocket"); p != "" { + if err := pprofUx(p); err != nil { + return err + } + } + if addr := viper.GetString("scheduler.pprof.addr"); addr != "" { + if err := pprofInet(addr); err != nil { + return err + } + } + } + if viper.GetBool("scheduler.metrics.enable") { + addr := viper.GetString("scheduler.metrics.addr") + slog.Info(fmt.Sprintf("metrics listener on http://%s/metrics", addr)) + http.Handle("/metrics", promhttp.Handler()) + go func() { + _ = http.ListenAndServe(addr, nil) + }() + } + sched := &scheduler.Scheduler{ + DB: db, + } + return sched.Run() +} diff --git a/scheduler/db_actions.go b/scheduler/db_actions.go new file mode 100644 index 0000000..1f0e82e --- /dev/null +++ b/scheduler/db_actions.go @@ -0,0 +1,41 @@ +package scheduler + +import ( + "context" + "time" +) + +func (t *Scheduler) TaskRefreshBActionErrors(ctx context.Context) error { + + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + tx, err := t.DB.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.ExecContext(ctx, "TRUNCATE b_action_errors") + if err != nil { + return err + } + + sql := `INSERT INTO b_action_errors ( + SELECT NULL, a.svc_id, a.node_id, count(a.id) + FROM svcactions a + WHERE + a.end>date_sub(now(), interval 1 day) AND + a.status='err' AND + isnull(a.ack) AND + a.end IS NOT NULL + GROUP BY a.svc_id, a.node_id + )` + + _, err = tx.ExecContext(ctx, sql) + if err != nil { + return err + } + + return tx.Commit() +} diff --git a/scheduler/main.go b/scheduler/main.go new file mode 100644 index 0000000..0284391 --- /dev/null +++ b/scheduler/main.go @@ -0,0 +1,103 @@ +package scheduler + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ( + Scheduler struct { + DB *sql.DB + } + + Task struct { + scheduler *Scheduler + name string + fn func(context.Context) error + } +) + +const ( + taskExecStatusOk = "ok" + taskExecStatusFailed = "failed" +) + +var ( + taskExecCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "scheduler", + Name: "task_exec_count", + Help: "Task execution counter", + }, + []string{"desc", "status"}, + ) + + taskExecDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "oc3", + Subsystem: "scheduler", + Name: "task_exec_duration_seconds", + Help: "Task execution duration in seconds.", + Buckets: prometheus.DefBuckets, + }, + []string{"desc", "status"}, + ) +) + +func (t *Task) Run(ctx context.Context) { + slog.Info(fmt.Sprintf("%s: exec", t.name)) + status := taskExecStatusOk + begin := time.Now() + err := t.fn(ctx) + duration := time.Now().Sub(begin) + if err != nil { + status = taskExecStatusFailed + slog.Error(fmt.Sprintf("%s: %s [%s]", t.name, err, duration)) + } else { + slog.Info(fmt.Sprintf("%s: %s [%s]", t.name, status, duration)) + } + taskExecCounter.With(prometheus.Labels{"desc": t.name, "status": status}).Inc() + taskExecDuration.With(prometheus.Labels{"desc": t.name, "status": status}).Observe(duration.Seconds()) +} + +func (t *Scheduler) NewTask(name string, fn func(context.Context) error) *Task { + return &Task{ + scheduler: t, + name: name, + fn: fn, + } +} + +func (t *Scheduler) Run() error { + ctx := context.Background() + + tmrHourly := time.NewTicker(time.Hour) + defer tmrHourly.Stop() + tmrDaily := time.NewTicker(24 * time.Hour) + defer tmrDaily.Stop() + tmrWeekly := time.NewTicker(7 * 24 * time.Hour) + defer tmrWeekly.Stop() + + tmrNow := time.NewTimer(0) + + for { + select { + case <-tmrNow.C: + // Put here what you want to debug upon scheduler startup + t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx) + tmrNow.Stop() + case <-tmrHourly.C: + t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx) + case <-tmrDaily.C: + case <-tmrWeekly.C: + } + } + return nil +} diff --git a/work.go b/work.go index 4daaed0..da95886 100644 --- a/work.go +++ b/work.go @@ -3,11 +3,8 @@ package main import ( "fmt" "log/slog" - "net" "net/http" _ "net/http/pprof" - "os" - "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/viper" @@ -37,19 +34,19 @@ func work(runners int, queues []string) error { } if viper.GetBool("worker.pprof.enable") { if p := viper.GetString("worker.pprof.uxsocket"); p != "" { - if err := workerUxPprof(p); err != nil { + if err := pprofUx(p); err != nil { return err } } if addr := viper.GetString("worker.pprof.addr"); addr != "" { - if err := workerHttpPprof(addr); err != nil { + if err := pprofInet(addr); err != nil { return err } } } if viper.GetBool("worker.metrics.enable") { addr := viper.GetString("worker.metrics.addr") - slog.Info(fmt.Sprintf("metrics listener on %s /metrics", addr)) + slog.Info(fmt.Sprintf("metrics listener on http://%s/metrics", addr)) http.Handle("/metrics", promhttp.Handler()) go func() { _ = http.ListenAndServe(addr, nil) @@ -57,45 +54,3 @@ func work(runners int, queues []string) error { } return w.Run() } - -func workerUxPprof(p string) error { - slog.Info(fmt.Sprintf("pprof listener on %s", p)) - if err := os.RemoveAll(p); err != nil { - return err - } - listener, err := net.Listen("unix", p) - if err != nil { - return err - } - go func() { - server := http.Server{} - if err := server.Serve(listener); err != nil { - slog.Error(fmt.Sprintf("worker ux listener: %s", err)) - } - }() - return nil -} - -func workerHttpPprof(addr string) error { - slog.Info(fmt.Sprintf("pprof listener on %s", addr)) - c := make(chan any) - go func() { - err := http.ListenAndServe(addr, nil) - slog.Info(fmt.Sprintf("pprof listener on %s", addr)) - select { - case c <- err: - default: - } - }() - select { - case i := <-c: - err, ok := i.(error) - if ok { - return err - } - case <-time.After(100 * time.Millisecond): - // don't wait for future errors - return nil - } - return nil -}