Skip to content

Commit

Permalink
Start replacing logrus with slog
Browse files Browse the repository at this point in the history
  • Loading branch information
norkans7 committed Oct 13, 2023
1 parent d950cf7 commit 2ce7cc1
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 54 deletions.
35 changes: 22 additions & 13 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"database/sql"

"log/slog"
"os"
"os/signal"
"syscall"
Expand All @@ -12,7 +14,8 @@ import (
"github.com/nyaruka/ezconf"
indexer "github.com/nyaruka/rp-indexer/v8"
"github.com/nyaruka/rp-indexer/v8/indexers"
log "github.com/sirupsen/logrus"
"github.com/nyaruka/rp-indexer/v8/utils"
"github.com/sirupsen/logrus"
)

var (
Expand All @@ -26,32 +29,38 @@ func main() {
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

level, err := log.ParseLevel(cfg.LogLevel)
level, err := logrus.ParseLevel(cfg.LogLevel)
if err != nil {
log.Fatalf("Invalid log level '%s'", level)
logrus.Fatalf("Invalid log level '%s'", level)
}

log.SetLevel(level)
log.SetOutput(os.Stdout)
log.SetFormatter(&log.TextFormatter{})
log.WithField("version", version).WithField("released", date).Info("starting indexer")
logrus.SetLevel(level)
logrus.SetOutput(os.Stdout)
logrus.SetFormatter(&logrus.TextFormatter{})
logrus.WithField("version", version).WithField("released", date).Info("starting indexer")

// configure golang std structured logging to route to logrus
slog.SetDefault(slog.New(utils.NewLogrusHandler(logrus.StandardLogger())))

logger := slog.With("comp", "main")
logger.Info("starting indexer", "version", version, "released", date)

// if we have a DSN entry, try to initialize it
if cfg.SentryDSN != "" {
hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []log.Level{log.PanicLevel, log.FatalLevel, log.ErrorLevel})
hook, err := logrus_sentry.NewSentryHook(cfg.SentryDSN, []logrus.Level{logrus.PanicLevel, logrus.FatalLevel, logrus.ErrorLevel})
hook.Timeout = 0
hook.StacktraceConfiguration.Enable = true
hook.StacktraceConfiguration.Skip = 4
hook.StacktraceConfiguration.Context = 5
if err != nil {
log.Fatalf("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err)
logger.Error("invalid sentry DSN: '%s': %s", cfg.SentryDSN, err)
}
log.StandardLogger().Hooks.Add(hook)
logrus.StandardLogger().Hooks.Add(hook)
}

db, err := sql.Open("postgres", cfg.DB)
if err != nil {
log.Fatalf("unable to connect to database")
logger.Error("unable to connect to database")
}

idxrs := []indexers.Indexer{
Expand All @@ -63,7 +72,7 @@ func main() {
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
log.WithField("indexer", idxr.Name()).WithError(err).Fatal("error during rebuilding")
logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
Expand All @@ -82,7 +91,7 @@ func handleSignals(d *indexer.Daemon) {
sig := <-sigs
switch sig {
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
log.WithField("signal", sig).Info("received exit signal, exiting")
slog.Info("received exit signal, exiting", "signal", sig)
d.Stop()
return
}
Expand Down
14 changes: 7 additions & 7 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package indexer

import (
"database/sql"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-indexer/v8/indexers"
"github.com/sirupsen/logrus"
)

type Daemon struct {
Expand Down Expand Up @@ -53,7 +53,7 @@ func (d *Daemon) Start() {
func (d *Daemon) startIndexer(indexer indexers.Indexer) {
d.wg.Add(1) // add ourselves to the wait group

log := logrus.WithField("indexer", indexer.Name())
log := slog.With("indexer", indexer.Name())

go func() {
defer func() {
Expand All @@ -68,7 +68,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
if err != nil {
log.WithError(err).Error("error during indexing")
log.Error("error during indexing", "error", err)
}
}
}
Expand All @@ -80,7 +80,7 @@ func (d *Daemon) startStatsReporter(interval time.Duration) {

go func() {
defer func() {
logrus.Info("analytics exiting")
slog.Info("analytics exiting")
d.wg.Done()
}()

Expand Down Expand Up @@ -117,19 +117,19 @@ func (d *Daemon) reportStats() {
d.prevStats[ix] = stats
}

log := logrus.NewEntry(logrus.StandardLogger())
log := slog.New(slog.Default().Handler())

for k, v := range metrics {
analytics.Gauge("indexer."+k, v)
log = log.WithField(k, v)
log = log.With(k, v)
}

log.Info("stats reported")
}

// Stop stops this daemon
func (d *Daemon) Stop() {
logrus.Info("daemon stopping")
slog.Info("daemon stopping")
analytics.Stop()

close(d.quit)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/nyaruka/rp-indexer/v8

go 1.20
go 1.21

require (
github.com/evalphobia/logrus_sentry v0.8.2
Expand Down Expand Up @@ -29,7 +29,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.12.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU=
github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec=
github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -58,8 +61,8 @@ golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
29 changes: 14 additions & 15 deletions indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sort"
"strings"
"time"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/rp-indexer/v8/utils"
"github.com/sirupsen/logrus"
)

// indexes a document
Expand Down Expand Up @@ -76,8 +76,8 @@ func (i *baseIndexer) Stats() Stats {
return i.stats
}

func (i *baseIndexer) log() *logrus.Entry {
return logrus.WithField("indexer", i.name)
func (i *baseIndexer) log() *slog.Logger {
return slog.With("indexer", i.name)
}

// records a complete index and updates statistics
Expand All @@ -86,7 +86,7 @@ func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration
i.stats.Deleted += int64(deleted)
i.stats.Elapsed += elapsed

i.log().WithField("indexed", indexed).WithField("deleted", deleted).WithField("elapsed", elapsed).Info("completed indexing")
i.log().Info("completed indexing", "indexed", indexed, "deleted", deleted, "elapsed", elapsed)
}

// our response for figuring out the physical index for an alias
Expand All @@ -111,7 +111,7 @@ func (i *baseIndexer) FindIndexes() []string {
// reverse sort order should put our newest index first
sort.Sort(sort.Reverse(sort.StringSlice(indexes)))

i.log().WithField("indexes", indexes).Debug("found physical indexes")
i.log().Debug("found physical indexes", "indexes", indexes)

return indexes
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (i *baseIndexer) createNewIndex(def *IndexDefinition) (string, error) {
}

// all went well, return our physical index name
i.log().WithField("index", index).Info("created new index")
i.log().Info("created new index", "index", index)

return index, nil
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error {
remove.Remove.Index = idx
commands = append(commands, remove)

logrus.WithField("indexer", i.name).WithField("index", idx).Debug("removing old alias")
slog.Debug("removing old alias", "indexer", i.name, "index", idx)
}

// add our new index
Expand All @@ -204,7 +204,7 @@ func (i *baseIndexer) updateAlias(newIndex string) error {

_, err := utils.MakeJSONRequest(http.MethodPost, fmt.Sprintf("%s/_aliases", i.elasticURL), aliasJSON, nil)

i.log().WithField("index", newIndex).Info("updated alias")
i.log().Info("updated alias", "index", newIndex)

return err
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (i *baseIndexer) cleanupIndexes() error {
// for each active index, if it starts with our alias but is before our current index, remove it
for key := range healthResponse.Indices {
if strings.HasPrefix(key, i.name) && strings.Compare(key, currents[0]) < 0 {
logrus.WithField("index", key).Info("removing old index")
slog.Info("removing old index", "index", key)
_, err = utils.MakeJSONRequest(http.MethodDelete, fmt.Sprintf("%s/%s", i.elasticURL, key), nil, nil)
if err != nil {
return err
Expand Down Expand Up @@ -275,27 +275,26 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) {
createdCount, deletedCount, conflictedCount := 0, 0, 0
for _, item := range response.Items {
if item.Index.ID != "" {
logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Trace("index response")
slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status)
if item.Index.Status == 200 || item.Index.Status == 201 {
createdCount++
} else if item.Index.Status == 409 {
conflictedCount++
} else {
logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).WithField("result", item.Index.Result).Error("error indexing document")
slog.Error("error indexing document", "id", item.Index.ID, "status", item.Index.Status, "result", item.Index.Result)
}
} else if item.Delete.ID != "" {
logrus.WithField("id", item.Index.ID).WithField("status", item.Index.Status).Trace("delete response")
slog.Debug("delete response", "id", item.Index.ID, "status", item.Index.Status)
if item.Delete.Status == 200 {
deletedCount++
} else if item.Delete.Status == 409 {
conflictedCount++
}
} else {
logrus.Error("unparsed item in response")
slog.Error("unparsed item in response")
}
}
logrus.WithField("created", createdCount).WithField("deleted", deletedCount).WithField("conflicted", conflictedCount).Debug("indexed batch")

slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount)
return createdCount, deletedCount, nil
}

Expand Down
30 changes: 15 additions & 15 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"database/sql"
_ "embed"
"fmt"
"log/slog"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

//go:embed contacts.index.json
Expand Down Expand Up @@ -54,7 +54,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
if err != nil {
return "", errors.Wrap(err, "error creating new index")
}
i.log().WithField("index", physicalIndex).Info("created new physical index")
i.log().Info("created new physical index", "index", physicalIndex)
remapAlias = true
}

Expand All @@ -63,7 +63,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
return "", errors.Wrap(err, "error finding last modified")
}

i.log().WithField("index", physicalIndex).WithField("last_modified", lastModified).Debug("indexing newer than last modified")
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)

// now index our docs
start := time.Now()
Expand Down Expand Up @@ -211,14 +211,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
lastModified = modifiedOn

if isActive {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).WithField("contact", contactJSON).Trace("modified contact")
slog.Debug("modified contact", "id", id, "modifiedOn", modifiedOn, "contact", contactJSON)

subBatch.WriteString(fmt.Sprintf(indexCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
subBatch.WriteString(contactJSON)
subBatch.WriteString("\n")
} else {
logrus.WithField("id", id).WithField("modifiedOn", modifiedOn).Trace("deleted contact")
slog.Debug("deleted contact", "id", id, "modifiedOn", modifiedOn)

subBatch.WriteString(fmt.Sprintf(deleteCommand, id, modifiedOn.UnixNano(), orgID))
subBatch.WriteString("\n")
Expand Down Expand Up @@ -248,16 +248,16 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st
batchTime := time.Since(batchStart)
batchRate := int(float32(batchFetched) / (float32(batchTime) / float32(time.Second)))

log := i.log().WithField("index", index).WithFields(logrus.Fields{
"rate": batchRate,
"batch_fetched": batchFetched,
"batch_created": batchCreated,
"batch_elapsed": batchTime,
"batch_elapsed_es": batchESTime,
"total_fetched": totalFetched,
"total_created": totalCreated,
"total_elapsed": totalTime,
})
log := i.log().With("index", index,
"rate", batchRate,
"batch_fetched", batchFetched,
"batch_created", batchCreated,
"batch_elapsed", batchTime,
"batch_elapsed_es", batchESTime,
"total_fetched", totalFetched,
"total_created", totalCreated,
"total_elapsed", totalTime,
)

// if we're rebuilding, always log batch progress
if rebuild {
Expand Down
Loading

0 comments on commit 2ce7cc1

Please sign in to comment.