Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async logrus hook for alert #36

Merged
merged 14 commits into from
May 22, 2024
63 changes: 54 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,73 @@
package config

import (
"context"
"sync"

"github.com/Conflux-Chain/go-conflux-util/alert"
"github.com/Conflux-Chain/go-conflux-util/log"
"github.com/Conflux-Chain/go-conflux-util/metrics"
"github.com/Conflux-Chain/go-conflux-util/viper"
)

// MustInit inits settings especially by loading configs from file or env var
// to viper etc., to prepare using any utility.
// MustInit performs the necessary initializations for the application, particularly
// by loading configuration settings from files or environment variables into `Viper`,
// setting up metrics, alerts, and logging systems. This function is designed to be
// used at the application's startup phase.
//
// Important: The order in which initializations are performed is critical due to
// dependencies between components.
//
// Note that it will panic and exit if any error happens.
// Parameters:
// - viperEnvPrefix : The prefix used for environment variables that `Viper` should consider
// while initializing configurations.
//
// Panics:
// - If any part of the initialization fails, this function will panic, causing the application
// to terminate abruptly.
func MustInit(viperEnvPrefix string) {
// NOTE, INITIALIZATION ORDER IS IMPORTANT!
// Delegates to the shared initialization logic with no context for graceful shutdown.
mustInit(viperEnvPrefix, nil, nil)
}

// init viper from config file or env var
// MustInitWithCtx carries out the same initializations as `MustInit` except for support for
// graceful shutdown by accepting a context and a wait group.
//
// Parameters:
// - ctx: The context for graceful shutdown handling.
// - wg: The wait group to track goroutines for shutdown synchronization.
// - viperEnvPrefix : The prefix used for environment variables that `Viper` should consider
// while initializing configurations.
func MustInitWithCtx(ctx context.Context, wg *sync.WaitGroup, viperEnvPrefix string) {
mustInit(viperEnvPrefix, ctx, wg)
}

// mustInit is the internal function responsible for the core initialization steps.
// It consolidates the setup of `Viper`, metrics, alerts, and logging, adapting the logging
// setup based on whether graceful shutdown context and wait group are provided.
//
// Important: The order in which initializations are performed is critical due to
// dependencies between components.
func mustInit(viperEnvPrefix string, ctx context.Context, wg *sync.WaitGroup) {
// Initialize `Viper` to read configurations from a file or environment variables.
// The provided prefix is used to match and bind environment variables to config keys.
viper.MustInit(viperEnvPrefix)

// init metrics from viper
// Initialize metrics collection based on the configurations loaded into `Viper`.
// Metrics are typically used for monitoring application performance.
metrics.MustInitFromViper()

// init alert from viper
// Initialize alerting systems using configurations from `Viper`.
// Alerts are crucial for notifying about application errors or important events.
alert.MustInitFromViper()

// init logging from viper which depends on alert initialization
log.MustInitFromViper()
// Initialize the logging system with configurations fetched via Viper.
// Logging setup depends on alert initialization since it might use alerting channels.
// Additionally, logging setup accepts a context and wait group to ensure logs are handled
// properly during shutdown.
if ctx != nil && wg != nil {
log.MustInitWithCtxFromViper(ctx, wg)
} else {
log.MustInitFromViper()
}
}
12 changes: 10 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
# forceColor: false
# disableColor: false
# alertHook: # Alert hooking settings
# # Hooked logrus levels for alert notification
# levels: [warn,error,fatal]
# # Hooked logrus level for alert notification
# level: warn
# # Default notification channels
# channels: []
# # Async worker options for sending alert
# async:
# # The number of worker goroutines (set `0` to turn off async mode).
# numWorkers: 1
# # The maximum number of queued jobs.
# queueSize: 60
# # Maximum Timeout allowed to gracefully stop..
# StopTimeout: 5s

# Alert Configurations
# alert:
Expand Down
205 changes: 205 additions & 0 deletions log/hook/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package hook

import (
"context"
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

"github.com/Conflux-Chain/go-conflux-util/health"
"github.com/sirupsen/logrus"
)

// ErrAsyncQueueFull is the error returned when the async queue is full.
var ErrAsyncQueueFull = errors.New("async hook queue is full")

// AsyncOption defines configuration options for the AsyncHook.
type AsyncOption struct {
NumWorkers int `default:"1"` // Number of worker goroutines.
QueueSize int `default:"60"` // Maximum number of queued jobs.
StopTimeout time.Duration `default:"5s"` // Timeout before forced exit of async processing.
}

// AsyncHook is a logrus hook that processes log entries asynchronously.
type AsyncHook struct {
logrus.Hook // Embedded logrus hook.
AsyncOption // Embedded options.
mu sync.Mutex // Synchronizes access to `healthStatus`.
started atomic.Bool // Atomic flag indicating hook startup state.
healthStatus health.Counter // Tracks queue health.
HealthConfig health.CounterConfig // Configuration for health tracking.

jobQueue chan *logrus.Entry // Buffered channel for enqueuing log entries.
}

// NewAsyncHookWithCtx initializes and starts a new AsyncHook instance that integrates with
// graceful shutdown handling.
// It's designed to work harmoniously with the application's shutdown process to ensure
// no logs are lost during shutdown.
func NewAsyncHookWithCtx(
ctx context.Context, wg *sync.WaitGroup, hook logrus.Hook, opts AsyncOption) *AsyncHook {
h := newAsyncHook(hook, opts)
h.startWithCtx(ctx, wg)

return h
}

// NewAsyncHook initializes and starts a standard AsyncHook without graceful shutdown handling.
// Use this when you don't require integration with a graceful shutdown mechanism.
func NewAsyncHook(hook logrus.Hook, opts AsyncOption) *AsyncHook {
h := newAsyncHook(hook, opts)
h.start()

return h
}

// newAsyncHook is a private constructor that sets up the necessary components for an AsyncHook.
// It should not be used directly; instead, use `NewAsyncHook` or `NewAsyncHookWithCtx`.
func newAsyncHook(hook logrus.Hook, opts AsyncOption) *AsyncHook {
return &AsyncHook{
AsyncOption: opts,
Hook: hook,
jobQueue: make(chan *logrus.Entry, opts.QueueSize),
HealthConfig: health.CounterConfig{Remind: uint64(opts.QueueSize)},
}
}

// Fire implements the logrus.Hook interface, which enqueues log entries for async processing or
// handles them synchronously if necessary.
func (h *AsyncHook) Fire(entry *logrus.Entry) error {
// Synchronously fire the hook for fatal levels or if the hook is not started.
if entry.Level <= logrus.FatalLevel || !h.started.Load() {
return h.Hook.Fire(entry)
}

select {
case h.jobQueue <- entry: // Attempt to enqueue the log entry.
h.onFiredSuccess()
return nil
default: // if the queue is full, return an error.
h.onFiredFailure(ErrAsyncQueueFull, entry)
return ErrAsyncQueueFull
}
}

// startWithCtx initiates the hook's workers and sets up a mechanism to gracefully
// drain the job queue upon receiving a shutdown signal through the provided context.
func (h *AsyncHook) startWithCtx(ctx context.Context, wg *sync.WaitGroup) {
defer h.started.Store(true)

wg.Add(1)
go func() {
defer wg.Done()

var awg sync.WaitGroup
for i := 0; i < h.NumWorkers; i++ {
awg.Add(1)
go func() {
defer awg.Done()
h.worker(ctx)
}()
}

// Waits for all workers before attempting to drain the job queue.
awg.Wait()

// Drain remaining jobs in the queue to ensure no logs are lost during shutdown.
h.drainJobQueue()
}()
}

// start launches the hook's worker goroutines without lifecycle managment.
func (h *AsyncHook) start() {
defer h.started.Store(true) // Mark the hook as started.

for i := 0; i < h.NumWorkers; i++ {
go h.worker(context.Background())
}
}

// worker is the main loop for each worker goroutine, processing log entries from the job queue.
func (h *AsyncHook) worker(ctx context.Context) {
for {
select {
case entry := <-h.jobQueue:
h.fire(entry)
case <-ctx.Done():
h.started.Store(false) // Mark the hook as stopped.
return
}
}
}

// fire triggers the underlying logrus hook and handles any potential errors by outputting to stderr.
func (h *AsyncHook) fire(entry *logrus.Entry) {
if err := h.Hook.Fire(entry); err != nil {
h.outputStderr(err, entry)
}
}

// onFiredSuccess signals successful enqueue and monitors queue health recovery.
func (h *AsyncHook) onFiredSuccess() {
h.mu.Lock()
defer h.mu.Unlock()

recovered, failures := h.healthStatus.OnSuccess(h.HealthConfig)
if recovered {
h.notify("Async hook queue congestion recovered", logrus.Fields{"failures": failures})
}
}

// onFiredFailure handles failed enqueues and monitors queue congestion.
func (h *AsyncHook) onFiredFailure(err error, entry *logrus.Entry) {
h.mu.Lock()
defer h.mu.Unlock()

unhealthy, uncovered, failures := h.healthStatus.OnFailure(h.HealthConfig)
if unhealthy {
h.notify("Async hook queue is congested", logrus.Fields{"failures": failures})
} else if uncovered {
h.notify("Async hook queue congestion not recovered", logrus.Fields{"failures": failures})
}

h.outputStderr(err, entry)
}

// drainJobQueue processes any remaining log entries in the job queue upon shutdown.
func (h *AsyncHook) drainJobQueue() {
ctx, cancel := context.WithTimeout(context.Background(), h.StopTimeout)
defer cancel()

for {
select {
case entry := <-h.jobQueue:
h.fire(entry)
case <-ctx.Done():
if len(h.jobQueue) > 0 {
h.notify("Async hook exiting with jobs remaining", logrus.Fields{"numJobs": len(h.jobQueue)})
}
return
default:
return
}
}
}

// notify sends a message through the hook as a warning.
func (h *AsyncHook) notify(msg string, fields logrus.Fields) {
h.fire(&logrus.Entry{
Time: time.Now(),
Level: logrus.WarnLevel,
Message: msg,
Data: fields,
})
}

// outputStderr writes the error and the log entry to the standard error output.
func (h *AsyncHook) outputStderr(err error, entry *logrus.Entry) {
formatter := logrus.TextFormatter{}
entryStr, _ := formatter.Format(entry)

fmt.Fprintf(os.Stderr, "Failed to fire async hook with error: %v for logrus entry: %v\n", err, entryStr)
}
62 changes: 58 additions & 4 deletions log/hook/hook.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package hook

import (
"context"
stderr "errors"
"sync"

"github.com/Conflux-Chain/go-conflux-util/alert"
"github.com/pkg/errors"
Expand All @@ -16,21 +18,73 @@ const (
alertMsgTitle = "logrus alert notification"
)

// AddAlertHook adds logrus hook for alert notification with specified log levels.
func AddAlertHook(hookLevels []logrus.Level, chns []string) error {
type Config struct {
// Level is the minimum logrus level at which alerts will be triggered.
Level string `default:"warn"`

// Channels lists the default alert notification channels to use.
Channels []string

// Async configures the behavior of the asynchronous worker for handling log alerts.
Async AsyncOption
}

// AddAlertHook attaches a custom logrus Hook for generating alert notifications
// based on configured levels and channels.
// It supports both synchronous and asynchronous operation modes, with optional
// graceful shutdown integration.
func AddAlertHook(ctx context.Context, wg *sync.WaitGroup, conf Config) error {
if len(conf.Channels) == 0 {
// No channels configured, so no hook needs to be added.
return nil
}

// Retrieve and validate configured alert channels.
var chs []alert.Channel
for _, chn := range chns {
for _, chn := range conf.Channels {
ch, ok := alert.DefaultManager().Channel(chn)
if !ok {
return alert.ErrChannelNotFound(chn)
}
chs = append(chs, ch)
}

logrus.AddHook(NewAlertHook(hookLevels, chs))
// Parse the configured log level for alert triggering.
lvl, err := logrus.ParseLevel(conf.Level)
if err != nil {
return errors.WithMessage(err, "failed to parse log level")
}

var hookLvls []logrus.Level
for l := logrus.PanicLevel; l <= lvl; l++ {
hookLvls = append(hookLvls, l)
}

// Instantiate the base AlertHook.
var alertHook logrus.Hook = NewAlertHook(hookLvls, chs)

// Wrap with asynchronous processing if configured.
if conf.Async.NumWorkers > 0 {
alertHook = wrapAsyncHook(ctx, wg, alertHook, conf.Async)
}

// Finally, add the hook to Logrus.
logrus.AddHook(alertHook)

return nil
}

// wrapAsyncHook wraps the given hook with asynchronous processing, optionally integrating
// graceful shutdown support if a context and wait group are provided.
func wrapAsyncHook(
ctx context.Context, wg *sync.WaitGroup, hook logrus.Hook, opt AsyncOption) *AsyncHook {
if ctx != nil && wg != nil {
return NewAsyncHookWithCtx(ctx, wg, hook, opt)
}

return NewAsyncHook(hook, opt)
}

// AlertHook logrus hooks to send specified level logs as text message for alerting.
type AlertHook struct {
levels []logrus.Level
Expand Down
Loading
Loading