Skip to content

Commit

Permalink
Switch logging to hclog
Browse files Browse the repository at this point in the history
Currently, serf takes a standard lib log.Logger. This makes using serf
as a library a bit weird because you have to have a logger that may produce
output in a different format to your main binary. There are ways to hack around
this, e.g. by using a log.Logger that writes to some intermediate buffer that parses
the log output, but this loses a lot of context that we would otherwise have.

In this commit, we swap serf over to taking an hclog.Logger instead. As an
interface, this means that library consumers can use whatever logger they
want, and we can use hclog's built-in support for log levels and structured
logging.

Signed-off-by: sinkingpoint <colin@quirl.co.nz>
  • Loading branch information
sinkingpoint committed Apr 25, 2023
1 parent a377671 commit c590a3f
Show file tree
Hide file tree
Showing 30 changed files with 336 additions and 335 deletions.
4 changes: 2 additions & 2 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ func (c *RPCClient) ForceLeave(node string) error {
return c.genericRPC(&header, &req, nil)
}

//ForceLeavePrune uses ForceLeave but is used to reap the
//node entirely
// ForceLeavePrune uses ForceLeave but is used to reap the
// node entirely
func (c *RPCClient) ForceLeavePrune(node string) error {
header := requestHeader{
Command: forceLeaveCommand,
Expand Down
64 changes: 33 additions & 31 deletions cmd/serf/command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strings"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
Expand All @@ -37,7 +37,7 @@ type Agent struct {
eventHandlersLock sync.Mutex

// logger instance wraps the logOutput
logger *log.Logger
logger hclog.Logger

// This is the underlying Serf we are wrapping
serf *serf.Serf
Expand Down Expand Up @@ -70,8 +70,10 @@ func Create(agentConf *Config, conf *serf.Config, logOutput io.Writer) (*Agent,
agentConf: agentConf,
eventCh: eventCh,
eventHandlers: make(map[EventHandler]struct{}),
logger: log.New(logOutput, "", log.LstdFlags),
shutdownCh: make(chan struct{}),
logger: hclog.New(&hclog.LoggerOptions{
Output: logOutput,
}),
shutdownCh: make(chan struct{}),
}

// Restore agent tags from a tags file
Expand All @@ -95,7 +97,7 @@ func Create(agentConf *Config, conf *serf.Config, logOutput io.Writer) (*Agent,
// create so that there isn't a race condition between creating the
// agent and registering handlers
func (a *Agent) Start() error {
a.logger.Printf("[INFO] agent: Serf agent starting")
a.logger.Info("agent: Serf agent starting")

// Create serf first
serf, err := serf.Create(a.conf)
Expand All @@ -115,7 +117,7 @@ func (a *Agent) Leave() error {
return nil
}

a.logger.Println("[INFO] agent: requesting graceful leave from Serf")
a.logger.Info("agent: requesting graceful leave from Serf")
return a.serf.Leave()
}

Expand All @@ -133,13 +135,13 @@ func (a *Agent) Shutdown() error {
goto EXIT
}

a.logger.Println("[INFO] agent: requesting serf shutdown")
a.logger.Info("agent: requesting serf shutdown")
if err := a.serf.Shutdown(); err != nil {
return err
}

EXIT:
a.logger.Println("[INFO] agent: shutdown complete")
a.logger.Info("agent: shutdown complete")
a.shutdown = true
close(a.shutdownCh)
return nil
Expand All @@ -163,46 +165,46 @@ func (a *Agent) SerfConfig() *serf.Config {

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
a.logger.Printf("[INFO] agent: joining: %v replay: %v", addrs, replay)
a.logger.Info(fmt.Sprintf("agent: joining: %v replay: %v", addrs, replay))
ignoreOld := !replay
n, err = a.serf.Join(addrs, ignoreOld)
if n > 0 {
a.logger.Printf("[INFO] agent: joined: %d nodes", n)
a.logger.Info(fmt.Sprintf("agent: joined: %d nodes", n))
}
if err != nil {
a.logger.Printf("[WARN] agent: error joining: %v", err)
a.logger.Warn(fmt.Sprintf("[WARN] agent: error joining: %v", err))
}
return
}

// ForceLeave is used to eject a failed node from the cluster
func (a *Agent) ForceLeave(node string) error {
a.logger.Printf("[INFO] agent: Force leaving node: %s", node)
a.logger.Info(fmt.Sprintf("agent: Force leaving node: %s", node))
err := a.serf.RemoveFailedNode(node)
if err != nil {
a.logger.Printf("[WARN] agent: failed to remove node: %v", err)
a.logger.Warn(fmt.Sprintf("agent: failed to remove node: %v", err))
}
return err
}

// ForceLeavePrune completely removes a failed node from the
// member list entirely
func (a *Agent) ForceLeavePrune(node string) error {
a.logger.Printf("[INFO] agent: Force leaving node (prune): %s", node)
a.logger.Info(fmt.Sprintf("agent: Force leaving node (prune): %s", node))
err := a.serf.RemoveFailedNodePrune(node)
if err != nil {
a.logger.Printf("[WARN] agent: failed to remove node (prune): %v", err)
a.logger.Warn(fmt.Sprintf("agent: failed to remove node (prune): %v", err))
}
return err
}

// UserEvent sends a UserEvent on Serf, see Serf.UserEvent.
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
a.logger.Printf("[DEBUG] agent: Requesting user event send: %s. Coalesced: %#v. Payload: %#v",
name, coalesce, string(payload))
a.logger.Debug(fmt.Sprintf("agent: Requesting user event send: %s. Coalesced: %#v. Payload: %#v",
name, coalesce, string(payload)))
err := a.serf.UserEvent(name, payload, coalesce)
if err != nil {
a.logger.Printf("[WARN] agent: failed to send user event: %v", err)
a.logger.Warn("agent: failed to send user event: %v", err)
}
return err
}
Expand All @@ -216,11 +218,11 @@ func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*se
return nil, fmt.Errorf("Queries cannot contain the '%s' prefix", serf.InternalQueryPrefix)
}
}
a.logger.Printf("[DEBUG] agent: Requesting query send: %s. Payload: %#v",
name, string(payload))
a.logger.Debug(fmt.Sprintf("agent: Requesting query send: %s. Payload: %#v",
name, string(payload)))
resp, err := a.serf.Query(name, payload, params)
if err != nil {
a.logger.Printf("[WARN] agent: failed to start user query: %v", err)
a.logger.Warn(fmt.Sprintf("agent: failed to start user query: %v", err))
}
return resp, err
}
Expand Down Expand Up @@ -255,7 +257,7 @@ func (a *Agent) eventLoop() {
for {
select {
case e := <-a.eventCh:
a.logger.Printf("[INFO] agent: Received event: %s", e.String())
a.logger.Info(fmt.Sprintf("agent: Received event: %s", e.String()))
a.eventHandlersLock.Lock()
handlers := a.eventHandlerList
a.eventHandlersLock.Unlock()
Expand All @@ -264,7 +266,7 @@ func (a *Agent) eventLoop() {
}

case <-serfShutdownCh:
a.logger.Printf("[WARN] agent: Serf shutdown detected, quitting")
a.logger.Warn("agent: Serf shutdown detected, quitting")
a.Shutdown()
return

Expand All @@ -276,28 +278,28 @@ func (a *Agent) eventLoop() {

// InstallKey initiates a query to install a new key on all members
func (a *Agent) InstallKey(key string) (*serf.KeyResponse, error) {
a.logger.Print("[INFO] agent: Initiating key installation")
a.logger.Info("agent: Initiating key installation")
manager := a.serf.KeyManager()
return manager.InstallKey(key)
}

// UseKey sends a query instructing all members to switch primary keys
func (a *Agent) UseKey(key string) (*serf.KeyResponse, error) {
a.logger.Print("[INFO] agent: Initiating primary key change")
a.logger.Info("agent: Initiating primary key change")
manager := a.serf.KeyManager()
return manager.UseKey(key)
}

// RemoveKey sends a query to all members to remove a key from the keyring
func (a *Agent) RemoveKey(key string) (*serf.KeyResponse, error) {
a.logger.Print("[INFO] agent: Initiating key removal")
a.logger.Info("agent: Initiating key removal")
manager := a.serf.KeyManager()
return manager.RemoveKey(key)
}

// ListKeys sends a query to all members to return a list of their keys
func (a *Agent) ListKeys() (*serf.KeyResponse, error) {
a.logger.Print("[INFO] agent: Initiating key listing")
a.logger.Info("agent: Initiating key listing")
manager := a.serf.KeyManager()
return manager.ListKeys()
}
Expand All @@ -308,7 +310,7 @@ func (a *Agent) SetTags(tags map[string]string) error {
// Update the tags file if we have one
if a.agentConf.TagsFile != "" {
if err := a.writeTagsFile(tags); err != nil {
a.logger.Printf("[ERR] agent: %s", err)
a.logger.Error("agent: %s", err)
return err
}
}
Expand All @@ -333,7 +335,7 @@ func (a *Agent) loadTagsFile(tagsFile string) error {
if err := json.Unmarshal(tagData, &a.conf.Tags); err != nil {
return fmt.Errorf("Failed to decode tags file: %s", err)
}
a.logger.Printf("[INFO] agent: Restored %d tag(s) from %s",
a.logger.Info("agent: Restored %d tag(s) from %s",
len(a.conf.Tags), tagsFile)
}

Expand Down Expand Up @@ -425,8 +427,8 @@ func (a *Agent) loadKeyringFile(keyringFile string) error {
return fmt.Errorf("Failed to restore keyring: %s", err)
}
a.conf.MemberlistConfig.Keyring = keyring
a.logger.Printf("[INFO] agent: Restored keyring with %d keys from %s",
len(keys), keyringFile)
a.logger.Info(fmt.Sprintf("agent: Restored keyring with %d keys from %s",
len(keys), keyringFile))

// Success!
return nil
Expand Down
54 changes: 18 additions & 36 deletions cmd/serf/command/agent/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
Expand All @@ -17,8 +16,8 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/go-hclog"
gsyslog "github.com/hashicorp/go-syslog"
"github.com/hashicorp/logutils"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
Expand All @@ -44,8 +43,7 @@ type Command struct {
ShutdownCh <-chan struct{}
args []string
scriptHandler *ScriptEventHandler
logFilter *logutils.LevelFilter
logger *log.Logger
logger hclog.Logger
}

var _ cli.Command = &Command{}
Expand Down Expand Up @@ -370,16 +368,6 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
Writer: &cli.UiWriter{Ui: c.Ui},
}

c.logFilter = LevelFilter()
c.logFilter.MinLevel = logutils.LogLevel(strings.ToUpper(config.LogLevel))
c.logFilter.Writer = logGate
if !ValidateLevelFilter(c.logFilter.MinLevel, c.logFilter) {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
c.logFilter.MinLevel, c.logFilter.Levels))
return nil, nil, nil
}

// Check if syslog is enabled
var syslog io.Writer
if config.EnableSyslog {
Expand All @@ -388,20 +376,23 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri
c.Ui.Error(fmt.Sprintf("Syslog setup failed: %v", err))
return nil, nil, nil
}
syslog = &SyslogWrapper{l, c.logFilter}
syslog = &SyslogWrapper{l}
}

// Create a log writer, and wrap a logOutput around it
logWriter := NewLogWriter(512)
var logOutput io.Writer
if syslog != nil {
logOutput = io.MultiWriter(c.logFilter, logWriter, syslog)
logOutput = io.MultiWriter(logWriter, syslog)
} else {
logOutput = io.MultiWriter(c.logFilter, logWriter)
logOutput = logWriter
}

// Create a logger
c.logger = log.New(logOutput, "", log.LstdFlags)
c.logger = hclog.New(&hclog.LoggerOptions{
Output: logOutput,
Level: hclog.LevelFromString(config.LogLevel),
})
return logGate, logWriter, logOutput
}

Expand All @@ -412,7 +403,9 @@ func (c *Command) startAgent(config *Config, agent *Agent,
c.scriptHandler = &ScriptEventHandler{
SelfFunc: func() serf.Member { return agent.Serf().LocalMember() },
Scripts: config.EventScripts(),
Logger: log.New(logOutput, "", log.LstdFlags),
Logger: hclog.New(&hclog.LoggerOptions{
Output: logOutput,
}),
}
agent.RegisterEventHandler(c.scriptHandler)

Expand Down Expand Up @@ -504,23 +497,23 @@ func (c *Command) retryJoin(config *Config, agent *Agent, errCh chan struct{}) {
attempt := 0
for {
// Try to perform the join
c.logger.Printf("[INFO] agent: Joining cluster...(replay: %v)", config.ReplayOnJoin)
c.logger.Info(fmt.Sprintf("agent: Joining cluster...(replay: %v)", config.ReplayOnJoin))
n, err := agent.Join(config.RetryJoin, config.ReplayOnJoin)
if err == nil {
c.logger.Printf("[INFO] agent: Join completed. Synced with %d initial agents", n)
c.logger.Info(fmt.Sprintf("agent: Join completed. Synced with %d initial agents", n))
return
}

// Check if the maximum attempts has been exceeded
attempt++
if config.RetryMaxAttempts > 0 && attempt > config.RetryMaxAttempts {
c.logger.Printf("[ERR] agent: maximum retry join attempts made, exiting")
c.logger.Error("agent: maximum retry join attempts made, exiting")
close(errCh)
return
}

// Log the failure and sleep
c.logger.Printf("[WARN] agent: Join failed: %v, retrying in %v", err, config.RetryInterval)
c.logger.Warn(fmt.Sprintf("agent: Join failed: %v, retrying in %v", err, config.RetryInterval))
time.Sleep(config.RetryInterval)
}
}
Expand Down Expand Up @@ -686,22 +679,11 @@ func (c *Command) handleReload(config *Config, agent *Agent) *Config {
c.Ui.Output("Reloading configuration...")
newConf := c.readConfig()
if newConf == nil {
c.Ui.Error(fmt.Sprintf("Failed to reload configs"))
c.Ui.Error("Failed to reload configs")
return config
}

// Change the log level
minLevel := logutils.LogLevel(strings.ToUpper(newConf.LogLevel))
if ValidateLevelFilter(minLevel, c.logFilter) {
c.logFilter.SetMinLevel(minLevel)
} else {
c.Ui.Error(fmt.Sprintf(
"Invalid log level: %s. Valid log levels are: %v",
minLevel, c.logFilter.Levels))

// Keep the current log level
newConf.LogLevel = config.LogLevel
}
c.logger.SetLevel(hclog.LevelFromString(newConf.LogLevel))

// Change the event handlers
c.scriptHandler.UpdateScripts(newConf.EventScripts())
Expand Down
Loading

0 comments on commit c590a3f

Please sign in to comment.