Skip to content

Commit

Permalink
Convert examples/dynamoathenamigration to use slog (#50802)
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy authored Jan 7, 2025
1 parent 8a8958b commit 77eef60
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 38 deletions.
34 changes: 20 additions & 14 deletions examples/dynamoathenamigration/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@ package main
import (
"context"
"flag"
"log/slog"
"net/url"
"os"
"os/signal"
"strings"
"time"

log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport/examples/dynamoathenamigration"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

timeStr := flag.String("exportTime", "", "exportTime is time (RFC3339 format) in the past from which to export table data, empty for the current time")
exportARN := flag.String("exportARN", "", "exportARN allows to reuse already finished export without triggering new")
dynamoARN := flag.String("dynamoARN", "", "ARN of DynamoDB table to export")
Expand All @@ -43,12 +46,11 @@ func main() {
debug := flag.Bool("d", false, "debug logs")
flag.Parse()

level := log.InfoLevel
level := slog.LevelInfo
if *debug {
level = log.DebugLevel
level = slog.LevelDebug
}
logger := log.New()
logger.SetLevel(level)
logger := slog.New(logutils.NewSlogTextHandler(os.Stdout, logutils.SlogTextHandlerConfig{Level: level}))

cfg := dynamoathenamigration.Config{
ExportARN: *exportARN,
Expand All @@ -64,18 +66,21 @@ func main() {
if *timeStr != "" {
cfg.ExportTime, err = time.Parse(time.RFC3339, *timeStr)
if err != nil {
logger.Fatal(err)
logger.ErrorContext(ctx, "Failed to parse export time", "error", err)
os.Exit(1)
}
}

if *s3exportPath != "" {
u, err := url.Parse(*s3exportPath)
if err != nil {
logger.Fatal(err)
logger.ErrorContext(ctx, "Failed to parse s3 export path", "error", err)
os.Exit(1)
}

if u.Scheme != "s3" {
logger.Fatal("invalid scheme for s3 export path")
logger.ErrorContext(ctx, "invalid scheme for s3 export path", "error", err)
os.Exit(1)
}
cfg.Bucket = u.Host
cfg.Prefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/")
Expand All @@ -84,11 +89,13 @@ func main() {
if *s3largePayloadsPath != "" {
u, err := url.Parse(*s3largePayloadsPath)
if err != nil {
logger.Fatal(err)
logger.ErrorContext(ctx, "Failed to parse s3 large payloads path", "error", err)
os.Exit(1)
}

if u.Scheme != "s3" {
logger.Fatal("invalid scheme for s3 large payloads path")
logger.ErrorContext(ctx, "invalid scheme for s3 large payloads path", "error", err)
os.Exit(1)
}
cfg.LargePayloadBucket = u.Host
cfg.LargePayloadPrefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/")
Expand All @@ -98,9 +105,8 @@ func main() {
cfg.CheckpointPath = *checkpointPath
}

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := dynamoathenamigration.Migrate(ctx, cfg); err != nil {
logger.Fatal(err)
logger.ErrorContext(ctx, "migration failed", "error", err)
os.Exit(1)
}
}
50 changes: 33 additions & 17 deletions examples/dynamoathenamigration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"os"
"path"
"slices"
Expand All @@ -43,7 +44,6 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sns"
"github.com/google/uuid"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -97,7 +97,7 @@ type Config struct {
// LargePayloadPrefix is s3 prefix configured for large payloads in athena logger.
LargePayloadPrefix string

Logger log.FieldLogger
Logger *slog.Logger
}

const (
Expand Down Expand Up @@ -138,7 +138,7 @@ func (cfg *Config) CheckAndSetDefaults() error {
}

if cfg.Logger == nil {
cfg.Logger = log.New()
cfg.Logger = slog.Default()
}
return nil
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func MigrateWithAWS(ctx context.Context, cfg Config, awsCfg aws.Config) error {
return trace.Wrap(err)
}

t.Logger.Info("Migration finished")
t.Logger.InfoContext(ctx, "Migration finished")
return nil
}

Expand All @@ -235,7 +235,7 @@ func (t *task) GetOrStartExportAndWaitForResults(ctx context.Context) (*exportIn
return nil, trace.Wrap(err)
}

t.Logger.Debugf("Using export manifest %s", manifest)
t.Logger.DebugContext(ctx, "Using export manifest", "manifext", manifest)
dataObjectsInfo, err := t.getDataObjectsInfo(ctx, manifest)
if err != nil {
return nil, trace.Wrap(err)
Expand Down Expand Up @@ -291,7 +291,7 @@ func (t *task) waitForCompletedExport(ctx context.Context, exportARN string) (ex
case <-ctx.Done():
return "", trace.Wrap(ctx.Err())
case <-time.After(30 * time.Second):
t.Logger.Debug("Export job still in progress...")
t.Logger.DebugContext(ctx, "Export job still in progress")
}
default:
return "", trace.Errorf("dynamo DescribeExport returned unexpected status: %v", exportStatus)
Expand All @@ -316,7 +316,7 @@ func (t *task) startExportJob(ctx context.Context) (arn string, err error) {
}

exportArn := aws.ToString(exportOutput.ExportDescription.ExportArn)
t.Logger.Infof("Started export %s", exportArn)
t.Logger.InfoContext(ctx, "Started export", "export_arn", exportArn)
return exportArn, nil
}

Expand Down Expand Up @@ -373,14 +373,14 @@ func (t *task) getEventsFromDataFiles(ctx context.Context, exportInfo *exportInf
return trace.Wrap(err)
}
if reuse {
t.Logger.Info("Resuming emitting from checkpoint")
t.Logger.InfoContext(ctx, "Resuming emitting from checkpoint")
} else {
// selected not reuse
checkpoint = nil
}
} else {
// migration completed without any error, no sense of reusing checkpoint.
t.Logger.Info("Skipping checkpoint because previous migration finished without error")
t.Logger.InfoContext(ctx, "Skipping checkpoint because previous migration finished without error")
checkpoint = nil
}
}
Expand All @@ -389,7 +389,7 @@ func (t *task) getEventsFromDataFiles(ctx context.Context, exportInfo *exportInf
// if checkpoint was reached.
var afterCheckpoint bool
for _, dataObj := range exportInfo.DataObjectsInfo {
t.Logger.Debugf("Downloading %s", dataObj.DataFileS3Key)
t.Logger.DebugContext(ctx, "Downloading object", "object", dataObj.DataFileS3Key)
afterCheckpoint, err = t.fromS3ToChan(ctx, dataObj, eventsC, checkpoint, afterCheckpoint)
if err != nil {
return trace.Wrap(err)
Expand All @@ -408,7 +408,7 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC
checkpointValues := checkpoint.checkpointValues()
afterCheckpoint := afterCheckpointIn

t.Logger.Debugf("Scanning %d events", dataObj.ItemCount)
t.Logger.DebugContext(ctx, "Scanning events", "event_count", dataObj.ItemCount)
count := 0
decoder := json.NewDecoder(sortedExportFile)
for decoder.More() {
Expand Down Expand Up @@ -437,7 +437,7 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC
// skipping because was processed in previous run.
continue
} else {
t.Logger.Debugf("Event %s is last checkpoint, will start emitting from next event on the list", ev.GetID())
t.Logger.DebugContext(ctx, "Event is last checkpoint, will start emitting from next event on the list", "event_id", ev.GetID())
// id is on list of valid checkpoints
afterCheckpoint = true
// This was last completed, skip it and from next iteration emit everything.
Expand All @@ -452,7 +452,11 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC
}

if count%1000 == 0 && !t.DryRun {
t.Logger.Debugf("Sent on buffer %d/%d events from %s", count, dataObj.ItemCount, dataObj.DataFileS3Key)
t.Logger.DebugContext(ctx, "Sent on buffer events for object",
"sent_count", count,
"total_count", dataObj.ItemCount,
"object", dataObj.DataFileS3Key,
)
}
}

Expand Down Expand Up @@ -816,11 +820,20 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve
}
if len(invalidEvents) > 0 {
for _, eventWithErr := range invalidEvents {
t.Logger.Debugf("Event %q %q %v is invalid: %v", eventWithErr.event.GetType(), eventWithErr.event.GetID(), eventWithErr.event.GetTime().Format(time.RFC3339), eventWithErr.err)
t.Logger.DebugContext(ctx, "Event is invalid",
"event_type", eventWithErr.event.GetType(),
"event_id", eventWithErr.event.GetID(),
"event_time", eventWithErr.event.GetTime().Format(time.RFC3339),
"error", eventWithErr.err,
)
}
return trace.Errorf("there are %d invalid items", len(invalidEvents))
}
t.Logger.Infof("Dry run: there are %d events from %v to %v", count, oldest.GetTime(), newest.GetTime())
t.Logger.InfoContext(ctx, "Dry run: found valid events",
"event_count", count,
"oldest_time", oldest.GetTime(),
"newest_time", newest.GetTime(),
)
return nil
}
// mu protects checkpointsPerWorker.
Expand Down Expand Up @@ -859,7 +872,10 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve
// does not have any valid checkpoint to store. Without any valid checkpoint
// we won't be able to calculate min checkpoint, so does not store checkpoint at all.
if len(checkpointsPerWorker) < t.NoOfEmitWorkers {
t.Logger.Warnf("Not enough checkpoints from workers, got %d, expected %d", len(checkpointsPerWorker), t.NoOfEmitWorkers)
t.Logger.WarnContext(ctx, "Not enough checkpoints from workers",
"checkpoints_received", len(checkpointsPerWorker),
"checkpoints_expected", t.NoOfEmitWorkers,
)
return trace.Wrap(workersErr)
}

Expand All @@ -869,7 +885,7 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve
Checkpoints: checkpointsPerWorker,
}
if err := t.storeEmitterCheckpoint(checkpoint); err != nil {
t.Logger.Errorf("Failed to store checkpoint: %v", err)
t.Logger.ErrorContext(ctx, "Failed to store checkpoint", "error", err)
}
return trace.Wrap(workersErr)
}
Expand Down
17 changes: 10 additions & 7 deletions examples/dynamoathenamigration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"math/rand/v2"
"os"
"path/filepath"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/gravitational/teleport/api/utils/prompt"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/utils"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

func TestMigrateProcessDataObjects(t *testing.T) {
Expand All @@ -60,7 +62,7 @@ func TestMigrateProcessDataObjects(t *testing.T) {
},
eventsEmitter: emitter,
Config: Config{
Logger: utils.NewLoggerForTests(),
Logger: utils.NewSlogLoggerForTests(),
NoOfEmitWorkers: 5,
bufferSize: 10,
CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"),
Expand Down Expand Up @@ -131,7 +133,7 @@ func TestLargeEventsParse(t *testing.T) {
},
eventsEmitter: emitter,
Config: Config{
Logger: utils.NewLoggerForTests(),
Logger: utils.NewSlogLoggerForTests(),
NoOfEmitWorkers: 5,
bufferSize: 10,
CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"),
Expand Down Expand Up @@ -221,7 +223,7 @@ func TestMigrationCheckpoint(t *testing.T) {

noOfWorkers := 3
defaultConfig := Config{
Logger: utils.NewLoggerForTests(),
Logger: utils.NewSlogLoggerForTests(),
NoOfEmitWorkers: noOfWorkers,
bufferSize: noOfWorkers * 5,
CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"),
Expand Down Expand Up @@ -549,7 +551,7 @@ func TestMigrationDryRunValidation(t *testing.T) {
validEvent(), eventWithoutTime,
}
},
wantLog: "is invalid: empty event time",
wantLog: "empty event time",
wantErr: "1 invalid",
},
{
Expand All @@ -561,16 +563,17 @@ func TestMigrationDryRunValidation(t *testing.T) {
validEvent(), eventWithInvalidUUID,
}
},
wantLog: "is invalid: invalid uid format: invalid UUID length",
wantLog: "invalid uid format: invalid UUID length",
wantErr: "1 invalid",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Migration cli logs output from validation to logger.
var logBuffer bytes.Buffer
log := utils.NewLoggerForTests()
log.SetOutput(&logBuffer)
log := slog.New(logutils.NewSlogJSONHandler(&logBuffer, logutils.SlogJSONHandlerConfig{
Level: slog.LevelDebug,
}))

tr := &task{
Config: Config{
Expand Down

0 comments on commit 77eef60

Please sign in to comment.