From 84487891b620409df36c555fe65db9290275d099 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 10:05:32 +0100 Subject: [PATCH 1/6] Reorganize the cobra commands init to scale better --- cmd.go | 57 ++++++++++++++++++++++++++++++--------------------------- main.go | 4 ++-- 2 files changed, 32 insertions(+), 29 deletions(-) diff --git a/cmd.go b/cmd.go index fdc08f3..beb92fa 100644 --- a/cmd.go +++ b/cmd.go @@ -14,8 +14,31 @@ var ( debug bool ) -func newCmd(args []string) *cobra.Command { - root := &cobra.Command{ +func cmdWorker() *cobra.Command { + var maxRunners int + cmd := &cobra.Command{ + Use: "worker", + Short: "run jobs from a list of queues", + RunE: func(cmd *cobra.Command, args []string) error { + return work(maxRunners, args) + }, + } + cmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners") + return cmd +} + +func cmdAPI() *cobra.Command { + return &cobra.Command{ + Use: "api", + Short: "serve the collector api", + RunE: func(cmd *cobra.Command, args []string) error { + return listen() + }, + } +} + +func cmdRoot(args []string) *cobra.Command { + cmd := &cobra.Command{ Use: filepath.Base(args[0]), Short: "Manage the opensvc collector infrastructure components.", PersistentPreRunE: func(cmd *cobra.Command, args []string) error { @@ -32,30 +55,10 @@ func newCmd(args []string) *cobra.Command { return nil }, } - - root.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") - - apiCmd := cobra.Command{ - Use: "api", - Short: "serve the collector api", - RunE: func(cmd *cobra.Command, args []string) error { - return listen() - }, - } - - var maxRunners int - workerCmd := cobra.Command{ - Use: "worker", - Short: "run jobs from a list of queues", - RunE: func(cmd *cobra.Command, args []string) error { - return work(maxRunners, args) - }, - } - workerCmd.Flags().IntVar(&maxRunners, "runners", 1, "maximun number of worker job runners") - - root.AddCommand( - &apiCmd, - &workerCmd, + cmd.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") + cmd.AddCommand( + cmdAPI(), + cmdWorker(), ) - return root + return cmd } diff --git a/main.go b/main.go index 3c08b26..da79882 100644 --- a/main.go +++ b/main.go @@ -6,8 +6,8 @@ import ( // cmd parses the command line and run the selected component. func cmd(args []string) error { - root := newCmd(args) - return root.Execute() + cmd := cmdRoot(args) + return cmd.Execute() } // main is the program entrypoint. It's the only function using os.Exit, so From 52490d5abc167f191347987d26895720d42809aa Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 10:34:06 +0100 Subject: [PATCH 2/6] Add the "oc3 version" command --- cmd.go | 59 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/cmd.go b/cmd.go index beb92fa..af20c2f 100644 --- a/cmd.go +++ b/cmd.go @@ -7,6 +7,7 @@ import ( "github.com/spf13/cobra" + "github.com/opensvc/oc3/scheduler" "github.com/opensvc/oc3/util/version" ) @@ -20,6 +21,9 @@ func cmdWorker() *cobra.Command { Use: "worker", Short: "run jobs from a list of queues", RunE: func(cmd *cobra.Command, args []string) error { + if err := setup(); err != nil { + return err + } return work(maxRunners, args) }, } @@ -32,32 +36,61 @@ func cmdAPI() *cobra.Command { Use: "api", Short: "serve the collector api", RunE: func(cmd *cobra.Command, args []string) error { + if err := setup(); err != nil { + return err + } return listen() }, } } +func cmdScheduler() *cobra.Command { + return &cobra.Command{ + Use: "scheduler", + Short: "start running db maintenance tasks", + RunE: func(cmd *cobra.Command, args []string) error { + if err := setup(); err != nil { + return err + } + return scheduler.Run() + }, + } +} + +func cmdVersion() *cobra.Command { + cmd := &cobra.Command{ + Use: "version", + Short: "display the oc3 version", + Run: func(cmd *cobra.Command, args []string) { + fmt.Println(version.Version()) + }, + } + return cmd +} + +func setup() error { + if debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + logConfigDir() + if err := initConfig(); err != nil { + return err + } + logConfigFileUsed() + slog.Info(fmt.Sprintf("oc3 version: %s", version.Version())) + return nil +} + func cmdRoot(args []string) *cobra.Command { cmd := &cobra.Command{ Use: filepath.Base(args[0]), Short: "Manage the opensvc collector infrastructure components.", - PersistentPreRunE: func(cmd *cobra.Command, args []string) error { - if debug { - slog.SetLogLoggerLevel(slog.LevelDebug) - } - - logConfigDir() - if err := initConfig(); err != nil { - return err - } - logConfigFileUsed() - slog.Info(fmt.Sprintf("oc3 vesion: %s", version.Version())) - return nil - }, } cmd.PersistentFlags().BoolVar(&debug, "debug", false, "set log level to debug") cmd.AddCommand( cmdAPI(), + cmdScheduler(), + cmdVersion(), cmdWorker(), ) return cmd From 49efaf31f05bdbafcefdb5a560d7471f1fd827db Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 10:35:36 +0100 Subject: [PATCH 3/6] Bootstrap the scheduler pkg --- scheduler/main.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 scheduler/main.go diff --git a/scheduler/main.go b/scheduler/main.go new file mode 100644 index 0000000..ca3e9e4 --- /dev/null +++ b/scheduler/main.go @@ -0,0 +1,25 @@ +package scheduler + +import ( + "fmt" + "time" +) + +func Run() error { + tmrHourly := time.NewTicker(time.Hour) + defer tmrHourly.Stop() + tmrDaily := time.NewTicker(24 * time.Hour) + defer tmrDaily.Stop() + tmrWeekly := time.NewTicker(7 * 24 * time.Hour) + defer tmrWeekly.Stop() + + for { + select { + case <-tmrHourly.C: + fmt.Println("xx", time.Now()) + case <-tmrDaily.C: + case <-tmrWeekly.C: + } + } + return nil +} From c5ebfe8555054a0cd9ec80131cbf0187bd7f04d8 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 12:10:23 +0100 Subject: [PATCH 4/6] Port a first task --- cmd.go | 35 +++++++++++------- scheduler/db_actions.go | 41 +++++++++++++++++++++ scheduler/main.go | 82 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 16 deletions(-) create mode 100644 scheduler/db_actions.go diff --git a/cmd.go b/cmd.go index af20c2f..738ae58 100644 --- a/cmd.go +++ b/cmd.go @@ -52,7 +52,14 @@ func cmdScheduler() *cobra.Command { if err := setup(); err != nil { return err } - return scheduler.Run() + db, err := newDatabase() + if err != nil { + return err + } + sched := &scheduler.Scheduler{ + DB: db, + } + return sched.Run() }, } } @@ -68,19 +75,6 @@ func cmdVersion() *cobra.Command { return cmd } -func setup() error { - if debug { - slog.SetLogLoggerLevel(slog.LevelDebug) - } - logConfigDir() - if err := initConfig(); err != nil { - return err - } - logConfigFileUsed() - slog.Info(fmt.Sprintf("oc3 version: %s", version.Version())) - return nil -} - func cmdRoot(args []string) *cobra.Command { cmd := &cobra.Command{ Use: filepath.Base(args[0]), @@ -95,3 +89,16 @@ func cmdRoot(args []string) *cobra.Command { ) return cmd } + +func setup() error { + if debug { + slog.SetLogLoggerLevel(slog.LevelDebug) + } + logConfigDir() + if err := initConfig(); err != nil { + return err + } + logConfigFileUsed() + slog.Info(fmt.Sprintf("oc3 version: %s", version.Version())) + return nil +} diff --git a/scheduler/db_actions.go b/scheduler/db_actions.go new file mode 100644 index 0000000..1f0e82e --- /dev/null +++ b/scheduler/db_actions.go @@ -0,0 +1,41 @@ +package scheduler + +import ( + "context" + "time" +) + +func (t *Scheduler) TaskRefreshBActionErrors(ctx context.Context) error { + + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + tx, err := t.DB.BeginTx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.ExecContext(ctx, "TRUNCATE b_action_errors") + if err != nil { + return err + } + + sql := `INSERT INTO b_action_errors ( + SELECT NULL, a.svc_id, a.node_id, count(a.id) + FROM svcactions a + WHERE + a.end>date_sub(now(), interval 1 day) AND + a.status='err' AND + isnull(a.ack) AND + a.end IS NOT NULL + GROUP BY a.svc_id, a.node_id + )` + + _, err = tx.ExecContext(ctx, sql) + if err != nil { + return err + } + + return tx.Commit() +} diff --git a/scheduler/main.go b/scheduler/main.go index ca3e9e4..0284391 100644 --- a/scheduler/main.go +++ b/scheduler/main.go @@ -1,11 +1,83 @@ package scheduler import ( + "context" + "database/sql" "fmt" + "log/slog" "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ( + Scheduler struct { + DB *sql.DB + } + + Task struct { + scheduler *Scheduler + name string + fn func(context.Context) error + } +) + +const ( + taskExecStatusOk = "ok" + taskExecStatusFailed = "failed" ) -func Run() error { +var ( + taskExecCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "oc3", + Subsystem: "scheduler", + Name: "task_exec_count", + Help: "Task execution counter", + }, + []string{"desc", "status"}, + ) + + taskExecDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "oc3", + Subsystem: "scheduler", + Name: "task_exec_duration_seconds", + Help: "Task execution duration in seconds.", + Buckets: prometheus.DefBuckets, + }, + []string{"desc", "status"}, + ) +) + +func (t *Task) Run(ctx context.Context) { + slog.Info(fmt.Sprintf("%s: exec", t.name)) + status := taskExecStatusOk + begin := time.Now() + err := t.fn(ctx) + duration := time.Now().Sub(begin) + if err != nil { + status = taskExecStatusFailed + slog.Error(fmt.Sprintf("%s: %s [%s]", t.name, err, duration)) + } else { + slog.Info(fmt.Sprintf("%s: %s [%s]", t.name, status, duration)) + } + taskExecCounter.With(prometheus.Labels{"desc": t.name, "status": status}).Inc() + taskExecDuration.With(prometheus.Labels{"desc": t.name, "status": status}).Observe(duration.Seconds()) +} + +func (t *Scheduler) NewTask(name string, fn func(context.Context) error) *Task { + return &Task{ + scheduler: t, + name: name, + fn: fn, + } +} + +func (t *Scheduler) Run() error { + ctx := context.Background() + tmrHourly := time.NewTicker(time.Hour) defer tmrHourly.Stop() tmrDaily := time.NewTicker(24 * time.Hour) @@ -13,10 +85,16 @@ func Run() error { tmrWeekly := time.NewTicker(7 * 24 * time.Hour) defer tmrWeekly.Stop() + tmrNow := time.NewTimer(0) + for { select { + case <-tmrNow.C: + // Put here what you want to debug upon scheduler startup + t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx) + tmrNow.Stop() case <-tmrHourly.C: - fmt.Println("xx", time.Now()) + t.NewTask("TaskRefreshBActionErrors", t.TaskRefreshBActionErrors).Run(ctx) case <-tmrDaily.C: case <-tmrWeekly.C: } From f2d5f5c30bbab8d9e31215628c7aab24978dae91 Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 12:35:50 +0100 Subject: [PATCH 5/6] Add prometheus and pprof to the scheduler --- cmd.go | 13 +------------ pprof.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ scheduler.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ work.go | 51 +++------------------------------------------------ 4 files changed, 100 insertions(+), 60 deletions(-) create mode 100644 pprof.go create mode 100644 scheduler.go diff --git a/cmd.go b/cmd.go index 738ae58..369fe44 100644 --- a/cmd.go +++ b/cmd.go @@ -7,7 +7,6 @@ import ( "github.com/spf13/cobra" - "github.com/opensvc/oc3/scheduler" "github.com/opensvc/oc3/util/version" ) @@ -49,17 +48,7 @@ func cmdScheduler() *cobra.Command { Use: "scheduler", Short: "start running db maintenance tasks", RunE: func(cmd *cobra.Command, args []string) error { - if err := setup(); err != nil { - return err - } - db, err := newDatabase() - if err != nil { - return err - } - sched := &scheduler.Scheduler{ - DB: db, - } - return sched.Run() + return schedule() }, } } diff --git a/pprof.go b/pprof.go new file mode 100644 index 0000000..c2a2e29 --- /dev/null +++ b/pprof.go @@ -0,0 +1,51 @@ +package main + +import ( + "fmt" + "log/slog" + "net" + "net/http" + "os" + "time" +) + +func pprofUx(p string) error { + slog.Info(fmt.Sprintf("pprof ux listener on %s", p)) + if err := os.RemoveAll(p); err != nil { + return err + } + listener, err := net.Listen("unix", p) + if err != nil { + return err + } + go func() { + server := http.Server{} + if err := server.Serve(listener); err != nil { + slog.Error(fmt.Sprintf("pprof ux listener on %s: %s", p, err)) + } + }() + return nil +} + +func pprofInet(addr string) error { + slog.Info(fmt.Sprintf("pprof inet listener on %s", addr)) + c := make(chan any) + go func() { + err := http.ListenAndServe(addr, nil) + select { + case c <- err: + default: + } + }() + select { + case i := <-c: + err, ok := i.(error) + if ok { + return err + } + case <-time.After(100 * time.Millisecond): + // don't wait for future errors + return nil + } + return nil +} diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 0000000..188137f --- /dev/null +++ b/scheduler.go @@ -0,0 +1,45 @@ +package main + +import ( + "fmt" + "log/slog" + "net/http" + + "github.com/opensvc/oc3/scheduler" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/viper" +) + +func schedule() error { + if err := setup(); err != nil { + return err + } + db, err := newDatabase() + if err != nil { + return err + } + if viper.GetBool("scheduler.pprof.enable") { + if p := viper.GetString("scheduler.pprof.uxsocket"); p != "" { + if err := pprofUx(p); err != nil { + return err + } + } + if addr := viper.GetString("scheduler.pprof.addr"); addr != "" { + if err := pprofInet(addr); err != nil { + return err + } + } + } + if viper.GetBool("scheduler.metrics.enable") { + addr := viper.GetString("scheduler.metrics.addr") + slog.Info(fmt.Sprintf("metrics listener on http://%s/metrics", addr)) + http.Handle("/metrics", promhttp.Handler()) + go func() { + _ = http.ListenAndServe(addr, nil) + }() + } + sched := &scheduler.Scheduler{ + DB: db, + } + return sched.Run() +} diff --git a/work.go b/work.go index 4daaed0..da95886 100644 --- a/work.go +++ b/work.go @@ -3,11 +3,8 @@ package main import ( "fmt" "log/slog" - "net" "net/http" _ "net/http/pprof" - "os" - "time" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/viper" @@ -37,19 +34,19 @@ func work(runners int, queues []string) error { } if viper.GetBool("worker.pprof.enable") { if p := viper.GetString("worker.pprof.uxsocket"); p != "" { - if err := workerUxPprof(p); err != nil { + if err := pprofUx(p); err != nil { return err } } if addr := viper.GetString("worker.pprof.addr"); addr != "" { - if err := workerHttpPprof(addr); err != nil { + if err := pprofInet(addr); err != nil { return err } } } if viper.GetBool("worker.metrics.enable") { addr := viper.GetString("worker.metrics.addr") - slog.Info(fmt.Sprintf("metrics listener on %s /metrics", addr)) + slog.Info(fmt.Sprintf("metrics listener on http://%s/metrics", addr)) http.Handle("/metrics", promhttp.Handler()) go func() { _ = http.ListenAndServe(addr, nil) @@ -57,45 +54,3 @@ func work(runners int, queues []string) error { } return w.Run() } - -func workerUxPprof(p string) error { - slog.Info(fmt.Sprintf("pprof listener on %s", p)) - if err := os.RemoveAll(p); err != nil { - return err - } - listener, err := net.Listen("unix", p) - if err != nil { - return err - } - go func() { - server := http.Server{} - if err := server.Serve(listener); err != nil { - slog.Error(fmt.Sprintf("worker ux listener: %s", err)) - } - }() - return nil -} - -func workerHttpPprof(addr string) error { - slog.Info(fmt.Sprintf("pprof listener on %s", addr)) - c := make(chan any) - go func() { - err := http.ListenAndServe(addr, nil) - slog.Info(fmt.Sprintf("pprof listener on %s", addr)) - select { - case c <- err: - default: - } - }() - select { - case i := <-c: - err, ok := i.(error) - if ok { - return err - } - case <-time.After(100 * time.Millisecond): - // don't wait for future errors - return nil - } - return nil -} From 5ba19e456ea217b7ddd0b6c473c1859ee4a400be Mon Sep 17 00:00:00 2001 From: Christophe Varoqui Date: Mon, 1 Dec 2025 12:40:08 +0100 Subject: [PATCH 6/6] Move the user config file From ~/.oc3/ to the standard ~/.config/oc3/ --- conf.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/conf.go b/conf.go index e29cc1b..2329090 100644 --- a/conf.go +++ b/conf.go @@ -9,7 +9,7 @@ import ( ) var ( - configCandidateDirs = []string{"/etc/oc3/", "$HOME/.oc3/", "./"} + configCandidateDirs = []string{"/etc/oc3/", "$HOME/.config/oc3", "./"} ) func logConfigDir() { @@ -52,9 +52,9 @@ func initConfig() error { // config file viper.SetConfigName("config") viper.SetConfigType("yaml") - viper.AddConfigPath("/etc/oc3") - viper.AddConfigPath("$HOME/.oc3") - viper.AddConfigPath(".") + for _, d := range configCandidateDirs { + viper.AddConfigPath(d) + } if err := viper.ReadInConfig(); err != nil { if _, ok := err.(viper.ConfigFileNotFoundError); !ok { return err