diff --git a/config/config.go b/config/config.go index 1c9e874..7e0b0e9 100644 --- a/config/config.go +++ b/config/config.go @@ -1,28 +1,73 @@ package config import ( + "context" + "sync" + "github.com/Conflux-Chain/go-conflux-util/alert" "github.com/Conflux-Chain/go-conflux-util/log" "github.com/Conflux-Chain/go-conflux-util/metrics" "github.com/Conflux-Chain/go-conflux-util/viper" ) -// MustInit inits settings especially by loading configs from file or env var -// to viper etc., to prepare using any utility. +// MustInit performs the necessary initializations for the application, particularly +// by loading configuration settings from files or environment variables into `Viper`, +// setting up metrics, alerts, and logging systems. This function is designed to be +// used at the application's startup phase. +// +// Important: The order in which initializations are performed is critical due to +// dependencies between components. // -// Note that it will panic and exit if any error happens. +// Parameters: +// - viperEnvPrefix : The prefix used for environment variables that `Viper` should consider +// while initializing configurations. +// +// Panics: +// - If any part of the initialization fails, this function will panic, causing the application +// to terminate abruptly. func MustInit(viperEnvPrefix string) { - // NOTE, INITIALIZATION ORDER IS IMPORTANT! + // Delegates to the shared initialization logic with no context for graceful shutdown. + mustInit(viperEnvPrefix, nil, nil) +} - // init viper from config file or env var +// MustInitWithCtx carries out the same initializations as `MustInit` except for support for +// graceful shutdown by accepting a context and a wait group. +// +// Parameters: +// - ctx: The context for graceful shutdown handling. +// - wg: The wait group to track goroutines for shutdown synchronization. +// - viperEnvPrefix : The prefix used for environment variables that `Viper` should consider +// while initializing configurations. +func MustInitWithCtx(ctx context.Context, wg *sync.WaitGroup, viperEnvPrefix string) { + mustInit(viperEnvPrefix, ctx, wg) +} + +// mustInit is the internal function responsible for the core initialization steps. +// It consolidates the setup of `Viper`, metrics, alerts, and logging, adapting the logging +// setup based on whether graceful shutdown context and wait group are provided. +// +// Important: The order in which initializations are performed is critical due to +// dependencies between components. +func mustInit(viperEnvPrefix string, ctx context.Context, wg *sync.WaitGroup) { + // Initialize `Viper` to read configurations from a file or environment variables. + // The provided prefix is used to match and bind environment variables to config keys. viper.MustInit(viperEnvPrefix) - // init metrics from viper + // Initialize metrics collection based on the configurations loaded into `Viper`. + // Metrics are typically used for monitoring application performance. metrics.MustInitFromViper() - // init alert from viper + // Initialize alerting systems using configurations from `Viper`. + // Alerts are crucial for notifying about application errors or important events. alert.MustInitFromViper() - // init logging from viper which depends on alert initialization - log.MustInitFromViper() + // Initialize the logging system with configurations fetched via Viper. + // Logging setup depends on alert initialization since it might use alerting channels. + // Additionally, logging setup accepts a context and wait group to ensure logs are handled + // properly during shutdown. + if ctx != nil && wg != nil { + log.MustInitWithCtxFromViper(ctx, wg) + } else { + log.MustInitFromViper() + } } diff --git a/config/config.yaml b/config/config.yaml index 46da75f..d935376 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -4,10 +4,18 @@ # forceColor: false # disableColor: false # alertHook: # Alert hooking settings -# # Hooked logrus levels for alert notification -# levels: [warn,error,fatal] +# # Hooked logrus level for alert notification +# level: warn # # Default notification channels # channels: [] +# # Async worker options for sending alert +# async: +# # The number of worker goroutines (set `0` to turn off async mode). +# numWorkers: 1 +# # The maximum number of queued jobs. +# queueSize: 60 +# # Maximum Timeout allowed to gracefully stop.. +# StopTimeout: 5s # Alert Configurations # alert: diff --git a/log/hook/async.go b/log/hook/async.go new file mode 100644 index 0000000..727c1d4 --- /dev/null +++ b/log/hook/async.go @@ -0,0 +1,205 @@ +package hook + +import ( + "context" + "errors" + "fmt" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/Conflux-Chain/go-conflux-util/health" + "github.com/sirupsen/logrus" +) + +// ErrAsyncQueueFull is the error returned when the async queue is full. +var ErrAsyncQueueFull = errors.New("async hook queue is full") + +// AsyncOption defines configuration options for the AsyncHook. +type AsyncOption struct { + NumWorkers int `default:"1"` // Number of worker goroutines. + QueueSize int `default:"60"` // Maximum number of queued jobs. + StopTimeout time.Duration `default:"5s"` // Timeout before forced exit of async processing. +} + +// AsyncHook is a logrus hook that processes log entries asynchronously. +type AsyncHook struct { + logrus.Hook // Embedded logrus hook. + AsyncOption // Embedded options. + mu sync.Mutex // Synchronizes access to `healthStatus`. + started atomic.Bool // Atomic flag indicating hook startup state. + healthStatus health.Counter // Tracks queue health. + HealthConfig health.CounterConfig // Configuration for health tracking. + + jobQueue chan *logrus.Entry // Buffered channel for enqueuing log entries. +} + +// NewAsyncHookWithCtx initializes and starts a new AsyncHook instance that integrates with +// graceful shutdown handling. +// It's designed to work harmoniously with the application's shutdown process to ensure +// no logs are lost during shutdown. +func NewAsyncHookWithCtx( + ctx context.Context, wg *sync.WaitGroup, hook logrus.Hook, opts AsyncOption) *AsyncHook { + h := newAsyncHook(hook, opts) + h.startWithCtx(ctx, wg) + + return h +} + +// NewAsyncHook initializes and starts a standard AsyncHook without graceful shutdown handling. +// Use this when you don't require integration with a graceful shutdown mechanism. +func NewAsyncHook(hook logrus.Hook, opts AsyncOption) *AsyncHook { + h := newAsyncHook(hook, opts) + h.start() + + return h +} + +// newAsyncHook is a private constructor that sets up the necessary components for an AsyncHook. +// It should not be used directly; instead, use `NewAsyncHook` or `NewAsyncHookWithCtx`. +func newAsyncHook(hook logrus.Hook, opts AsyncOption) *AsyncHook { + return &AsyncHook{ + AsyncOption: opts, + Hook: hook, + jobQueue: make(chan *logrus.Entry, opts.QueueSize), + HealthConfig: health.CounterConfig{Remind: uint64(opts.QueueSize)}, + } +} + +// Fire implements the logrus.Hook interface, which enqueues log entries for async processing or +// handles them synchronously if necessary. +func (h *AsyncHook) Fire(entry *logrus.Entry) error { + // Synchronously fire the hook for fatal levels or if the hook is not started. + if entry.Level <= logrus.FatalLevel || !h.started.Load() { + return h.Hook.Fire(entry) + } + + select { + case h.jobQueue <- entry: // Attempt to enqueue the log entry. + h.onFiredSuccess() + return nil + default: // if the queue is full, return an error. + h.onFiredFailure(ErrAsyncQueueFull, entry) + return ErrAsyncQueueFull + } +} + +// startWithCtx initiates the hook's workers and sets up a mechanism to gracefully +// drain the job queue upon receiving a shutdown signal through the provided context. +func (h *AsyncHook) startWithCtx(ctx context.Context, wg *sync.WaitGroup) { + defer h.started.Store(true) + + wg.Add(1) + go func() { + defer wg.Done() + + var awg sync.WaitGroup + for i := 0; i < h.NumWorkers; i++ { + awg.Add(1) + go func() { + defer awg.Done() + h.worker(ctx) + }() + } + + // Waits for all workers before attempting to drain the job queue. + awg.Wait() + + // Drain remaining jobs in the queue to ensure no logs are lost during shutdown. + h.drainJobQueue() + }() +} + +// start launches the hook's worker goroutines without lifecycle managment. +func (h *AsyncHook) start() { + defer h.started.Store(true) // Mark the hook as started. + + for i := 0; i < h.NumWorkers; i++ { + go h.worker(context.Background()) + } +} + +// worker is the main loop for each worker goroutine, processing log entries from the job queue. +func (h *AsyncHook) worker(ctx context.Context) { + for { + select { + case entry := <-h.jobQueue: + h.fire(entry) + case <-ctx.Done(): + h.started.Store(false) // Mark the hook as stopped. + return + } + } +} + +// fire triggers the underlying logrus hook and handles any potential errors by outputting to stderr. +func (h *AsyncHook) fire(entry *logrus.Entry) { + if err := h.Hook.Fire(entry); err != nil { + h.outputStderr(err, entry) + } +} + +// onFiredSuccess signals successful enqueue and monitors queue health recovery. +func (h *AsyncHook) onFiredSuccess() { + h.mu.Lock() + defer h.mu.Unlock() + + recovered, failures := h.healthStatus.OnSuccess(h.HealthConfig) + if recovered { + h.notify("Async hook queue congestion recovered", logrus.Fields{"failures": failures}) + } +} + +// onFiredFailure handles failed enqueues and monitors queue congestion. +func (h *AsyncHook) onFiredFailure(err error, entry *logrus.Entry) { + h.mu.Lock() + defer h.mu.Unlock() + + unhealthy, uncovered, failures := h.healthStatus.OnFailure(h.HealthConfig) + if unhealthy { + h.notify("Async hook queue is congested", logrus.Fields{"failures": failures}) + } else if uncovered { + h.notify("Async hook queue congestion not recovered", logrus.Fields{"failures": failures}) + } + + h.outputStderr(err, entry) +} + +// drainJobQueue processes any remaining log entries in the job queue upon shutdown. +func (h *AsyncHook) drainJobQueue() { + ctx, cancel := context.WithTimeout(context.Background(), h.StopTimeout) + defer cancel() + + for { + select { + case entry := <-h.jobQueue: + h.fire(entry) + case <-ctx.Done(): + if len(h.jobQueue) > 0 { + h.notify("Async hook exiting with jobs remaining", logrus.Fields{"numJobs": len(h.jobQueue)}) + } + return + default: + return + } + } +} + +// notify sends a message through the hook as a warning. +func (h *AsyncHook) notify(msg string, fields logrus.Fields) { + h.fire(&logrus.Entry{ + Time: time.Now(), + Level: logrus.WarnLevel, + Message: msg, + Data: fields, + }) +} + +// outputStderr writes the error and the log entry to the standard error output. +func (h *AsyncHook) outputStderr(err error, entry *logrus.Entry) { + formatter := logrus.TextFormatter{} + entryStr, _ := formatter.Format(entry) + + fmt.Fprintf(os.Stderr, "Failed to fire async hook with error: %v for logrus entry: %v\n", err, entryStr) +} diff --git a/log/hook/hook.go b/log/hook/hook.go index 37bbe75..079829d 100644 --- a/log/hook/hook.go +++ b/log/hook/hook.go @@ -1,7 +1,9 @@ package hook import ( + "context" stderr "errors" + "sync" "github.com/Conflux-Chain/go-conflux-util/alert" "github.com/pkg/errors" @@ -16,10 +18,30 @@ const ( alertMsgTitle = "logrus alert notification" ) -// AddAlertHook adds logrus hook for alert notification with specified log levels. -func AddAlertHook(hookLevels []logrus.Level, chns []string) error { +type Config struct { + // Level is the minimum logrus level at which alerts will be triggered. + Level string `default:"warn"` + + // Channels lists the default alert notification channels to use. + Channels []string + + // Async configures the behavior of the asynchronous worker for handling log alerts. + Async AsyncOption +} + +// AddAlertHook attaches a custom logrus Hook for generating alert notifications +// based on configured levels and channels. +// It supports both synchronous and asynchronous operation modes, with optional +// graceful shutdown integration. +func AddAlertHook(ctx context.Context, wg *sync.WaitGroup, conf Config) error { + if len(conf.Channels) == 0 { + // No channels configured, so no hook needs to be added. + return nil + } + + // Retrieve and validate configured alert channels. var chs []alert.Channel - for _, chn := range chns { + for _, chn := range conf.Channels { ch, ok := alert.DefaultManager().Channel(chn) if !ok { return alert.ErrChannelNotFound(chn) @@ -27,10 +49,42 @@ func AddAlertHook(hookLevels []logrus.Level, chns []string) error { chs = append(chs, ch) } - logrus.AddHook(NewAlertHook(hookLevels, chs)) + // Parse the configured log level for alert triggering. + lvl, err := logrus.ParseLevel(conf.Level) + if err != nil { + return errors.WithMessage(err, "failed to parse log level") + } + + var hookLvls []logrus.Level + for l := logrus.PanicLevel; l <= lvl; l++ { + hookLvls = append(hookLvls, l) + } + + // Instantiate the base AlertHook. + var alertHook logrus.Hook = NewAlertHook(hookLvls, chs) + + // Wrap with asynchronous processing if configured. + if conf.Async.NumWorkers > 0 { + alertHook = wrapAsyncHook(ctx, wg, alertHook, conf.Async) + } + + // Finally, add the hook to Logrus. + logrus.AddHook(alertHook) + return nil } +// wrapAsyncHook wraps the given hook with asynchronous processing, optionally integrating +// graceful shutdown support if a context and wait group are provided. +func wrapAsyncHook( + ctx context.Context, wg *sync.WaitGroup, hook logrus.Hook, opt AsyncOption) *AsyncHook { + if ctx != nil && wg != nil { + return NewAsyncHookWithCtx(ctx, wg, hook, opt) + } + + return NewAsyncHook(hook, opt) +} + // AlertHook logrus hooks to send specified level logs as text message for alerting. type AlertHook struct { levels []logrus.Level diff --git a/log/log.go b/log/log.go index 0f20ef4..ab40786 100644 --- a/log/log.go +++ b/log/log.go @@ -1,37 +1,35 @@ package log import ( + "context" + "errors" "fmt" "strings" + "sync" "github.com/Conflux-Chain/go-conflux-util/log/hook" viperUtil "github.com/Conflux-Chain/go-conflux-util/viper" "github.com/ethereum/go-ethereum/log" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" ) -type AlertHookConfig struct { - // logrus level hooked for alert notification - Levels []string `default:"[warn,error,fatal]"` - // default alert channels - Channels []string -} - // LoggingConfig logging configuration such as log level etc., type LoggingConfig struct { - Level string `default:"info"` // logging level - ForceColor bool // helpful on windows - DisableColor bool // helpful to output logs in file - AlertHook AlertHookConfig // alert hooking configurations + Level string `default:"info"` // logging level + ForceColor bool // helpful on windows + DisableColor bool // helpful to output logs in file + AlertHook hook.Config // alert hooking configurations } -// MustInitFromViper inits logging from viper settings and adapts Geth logger. +// MustInitFromViper initializes the logging system using configurations from viper. // -// Note that viper must be initilized before this, and it will panic -// and exit if any error happens. +// Precondition: +// - Viper must be initialized with appropriate configurations before calling this function. +// +// Panics: +// - This function will panic if it encounters any errors during initialization. func MustInitFromViper() { var conf LoggingConfig viperUtil.MustUnmarshalKey("log", &conf) @@ -39,45 +37,68 @@ func MustInitFromViper() { MustInit(conf) } -// Init inits logging with specified log level +// MustInitWithCtxFromViper performs the similar initializations as `MustInitFromViper` with +// support for graceful shutdown by accepting a context and a wait group. +// +// Parameters: +// - ctx: The context for graceful shutdown handling. +// - wg: The wait group to track goroutines for shutdown synchronization. +func MustInitWithCtxFromViper(ctx context.Context, wg *sync.WaitGroup) { + var conf LoggingConfig + viperUtil.MustUnmarshalKey("log", &conf) + + MustInitWithCtx(ctx, wg, conf) +} + +// MustInit sets up the logging system according to the provided LoggingConfig and log level. +// It configures the log level, adds an alert hook, sets a text formatter, and adapts the logger +// for Geth compatibility. +// In case of any error during initialization, this function will panic. func MustInit(conf LoggingConfig) { - // parse logging level + mustInit(conf, nil, nil) +} + +// MustInitWithCtx performs the similiar initializations as `MustInit` with support for +// graceful shutdown by accepting a context and a wait group. +func MustInitWithCtx(ctx context.Context, wg *sync.WaitGroup, conf LoggingConfig) { + mustInit(conf, ctx, wg) +} + +// mustInit initializes the logging system with the provided configuration and sets up an alert hook. +// It supports graceful shutdown by optionally using a context and wait group for the alert hook registration. +func mustInit(conf LoggingConfig, ctx context.Context, wg *sync.WaitGroup) { + // Parse the log level string from the configuration into a logrus.Level. + // If parsing fails, log the error along with the attempted level and terminate the application. level, err := logrus.ParseLevel(conf.Level) if err != nil { logrus.WithError(err).WithField("level", conf.Level).Fatal("Failed to parse log level") } - logrus.SetLevel(level) - - // hook alert logging levels - var hookLvls []logrus.Level - for _, lvlStr := range conf.AlertHook.Levels { - lvl, err := logrus.ParseLevel(lvlStr) - if err != nil { - logrus.WithError(err).WithField("level", lvlStr).Fatal("Failed to parse log level for alert hooking") - } - hookLvls = append(hookLvls, lvl) - } + logrus.SetLevel(level) // Set the parsed log level. - if err := hook.AddAlertHook(hookLvls, conf.AlertHook.Channels); err != nil { + // Attempt to add an alert hook as configured. + if err := hook.AddAlertHook(ctx, wg, conf.AlertHook); err != nil { logrus.WithError(err).Fatal("Failed to add alert hook") } - // set text formtter + // Configure the log formatter to use a text format with a full timestamp. formatter := &logrus.TextFormatter{ FullTimestamp: true, } + // Adjust the color settings of the formatter based on the configuration. if conf.DisableColor { formatter.DisableColors = true } else if conf.ForceColor { formatter.ForceColors = true } + // Apply the configured formatter to the logger. logrus.SetFormatter(formatter) - // adapt geth logger + // Adapt the logger for use with Geth that uses a custom logging mechanism. adaptGethLogger() + // Log a debug message indicating successful initialization along with the effective configuration. logrus.WithField("config", fmt.Sprintf("%+v", conf)).Debug("Log initialized") }