diff --git a/examples/dynamoathenamigration/cmd/main.go b/examples/dynamoathenamigration/cmd/main.go index 26efb475fe82e..2766bce379103 100644 --- a/examples/dynamoathenamigration/cmd/main.go +++ b/examples/dynamoathenamigration/cmd/main.go @@ -17,18 +17,21 @@ package main import ( "context" "flag" + "log/slog" "net/url" "os" "os/signal" "strings" "time" - log "github.com/sirupsen/logrus" - "github.com/gravitational/teleport/examples/dynamoathenamigration" + logutils "github.com/gravitational/teleport/lib/utils/log" ) func main() { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + timeStr := flag.String("exportTime", "", "exportTime is time (RFC3339 format) in the past from which to export table data, empty for the current time") exportARN := flag.String("exportARN", "", "exportARN allows to reuse already finished export without triggering new") dynamoARN := flag.String("dynamoARN", "", "ARN of DynamoDB table to export") @@ -43,12 +46,11 @@ func main() { debug := flag.Bool("d", false, "debug logs") flag.Parse() - level := log.InfoLevel + level := slog.LevelInfo if *debug { - level = log.DebugLevel + level = slog.LevelDebug } - logger := log.New() - logger.SetLevel(level) + logger := slog.New(logutils.NewSlogTextHandler(os.Stdout, logutils.SlogTextHandlerConfig{Level: level})) cfg := dynamoathenamigration.Config{ ExportARN: *exportARN, @@ -64,18 +66,21 @@ func main() { if *timeStr != "" { cfg.ExportTime, err = time.Parse(time.RFC3339, *timeStr) if err != nil { - logger.Fatal(err) + logger.ErrorContext(ctx, "Failed to parse export time", "error", err) + os.Exit(1) } } if *s3exportPath != "" { u, err := url.Parse(*s3exportPath) if err != nil { - logger.Fatal(err) + logger.ErrorContext(ctx, "Failed to parse s3 export path", "error", err) + os.Exit(1) } if u.Scheme != "s3" { - logger.Fatal("invalid scheme for s3 export path") + logger.ErrorContext(ctx, "invalid scheme for s3 export path", "error", err) + os.Exit(1) } cfg.Bucket = u.Host cfg.Prefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/") @@ -84,11 +89,13 @@ func main() { if *s3largePayloadsPath != "" { u, err := url.Parse(*s3largePayloadsPath) if err != nil { - logger.Fatal(err) + logger.ErrorContext(ctx, "Failed to parse s3 large payloads path", "error", err) + os.Exit(1) } if u.Scheme != "s3" { - logger.Fatal("invalid scheme for s3 large payloads path") + logger.ErrorContext(ctx, "invalid scheme for s3 large payloads path", "error", err) + os.Exit(1) } cfg.LargePayloadBucket = u.Host cfg.LargePayloadPrefix = strings.TrimSuffix(strings.TrimPrefix(u.Path, "/"), "/") @@ -98,9 +105,8 @@ func main() { cfg.CheckpointPath = *checkpointPath } - ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) - defer cancel() if err := dynamoathenamigration.Migrate(ctx, cfg); err != nil { - logger.Fatal(err) + logger.ErrorContext(ctx, "migration failed", "error", err) + os.Exit(1) } } diff --git a/examples/dynamoathenamigration/migration.go b/examples/dynamoathenamigration/migration.go index 5c3dff3943988..730e2fe435891 100644 --- a/examples/dynamoathenamigration/migration.go +++ b/examples/dynamoathenamigration/migration.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "io" + "log/slog" "os" "path" "slices" @@ -43,7 +44,6 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/google/uuid" "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -97,7 +97,7 @@ type Config struct { // LargePayloadPrefix is s3 prefix configured for large payloads in athena logger. LargePayloadPrefix string - Logger log.FieldLogger + Logger *slog.Logger } const ( @@ -138,7 +138,7 @@ func (cfg *Config) CheckAndSetDefaults() error { } if cfg.Logger == nil { - cfg.Logger = log.New() + cfg.Logger = slog.Default() } return nil } @@ -214,7 +214,7 @@ func MigrateWithAWS(ctx context.Context, cfg Config, awsCfg aws.Config) error { return trace.Wrap(err) } - t.Logger.Info("Migration finished") + t.Logger.InfoContext(ctx, "Migration finished") return nil } @@ -235,7 +235,7 @@ func (t *task) GetOrStartExportAndWaitForResults(ctx context.Context) (*exportIn return nil, trace.Wrap(err) } - t.Logger.Debugf("Using export manifest %s", manifest) + t.Logger.DebugContext(ctx, "Using export manifest", "manifext", manifest) dataObjectsInfo, err := t.getDataObjectsInfo(ctx, manifest) if err != nil { return nil, trace.Wrap(err) @@ -291,7 +291,7 @@ func (t *task) waitForCompletedExport(ctx context.Context, exportARN string) (ex case <-ctx.Done(): return "", trace.Wrap(ctx.Err()) case <-time.After(30 * time.Second): - t.Logger.Debug("Export job still in progress...") + t.Logger.DebugContext(ctx, "Export job still in progress") } default: return "", trace.Errorf("dynamo DescribeExport returned unexpected status: %v", exportStatus) @@ -316,7 +316,7 @@ func (t *task) startExportJob(ctx context.Context) (arn string, err error) { } exportArn := aws.ToString(exportOutput.ExportDescription.ExportArn) - t.Logger.Infof("Started export %s", exportArn) + t.Logger.InfoContext(ctx, "Started export", "export_arn", exportArn) return exportArn, nil } @@ -373,14 +373,14 @@ func (t *task) getEventsFromDataFiles(ctx context.Context, exportInfo *exportInf return trace.Wrap(err) } if reuse { - t.Logger.Info("Resuming emitting from checkpoint") + t.Logger.InfoContext(ctx, "Resuming emitting from checkpoint") } else { // selected not reuse checkpoint = nil } } else { // migration completed without any error, no sense of reusing checkpoint. - t.Logger.Info("Skipping checkpoint because previous migration finished without error") + t.Logger.InfoContext(ctx, "Skipping checkpoint because previous migration finished without error") checkpoint = nil } } @@ -389,7 +389,7 @@ func (t *task) getEventsFromDataFiles(ctx context.Context, exportInfo *exportInf // if checkpoint was reached. var afterCheckpoint bool for _, dataObj := range exportInfo.DataObjectsInfo { - t.Logger.Debugf("Downloading %s", dataObj.DataFileS3Key) + t.Logger.DebugContext(ctx, "Downloading object", "object", dataObj.DataFileS3Key) afterCheckpoint, err = t.fromS3ToChan(ctx, dataObj, eventsC, checkpoint, afterCheckpoint) if err != nil { return trace.Wrap(err) @@ -408,7 +408,7 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC checkpointValues := checkpoint.checkpointValues() afterCheckpoint := afterCheckpointIn - t.Logger.Debugf("Scanning %d events", dataObj.ItemCount) + t.Logger.DebugContext(ctx, "Scanning events", "event_count", dataObj.ItemCount) count := 0 decoder := json.NewDecoder(sortedExportFile) for decoder.More() { @@ -437,7 +437,7 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC // skipping because was processed in previous run. continue } else { - t.Logger.Debugf("Event %s is last checkpoint, will start emitting from next event on the list", ev.GetID()) + t.Logger.DebugContext(ctx, "Event is last checkpoint, will start emitting from next event on the list", "event_id", ev.GetID()) // id is on list of valid checkpoints afterCheckpoint = true // This was last completed, skip it and from next iteration emit everything. @@ -452,7 +452,11 @@ func (t *task) fromS3ToChan(ctx context.Context, dataObj dataObjectInfo, eventsC } if count%1000 == 0 && !t.DryRun { - t.Logger.Debugf("Sent on buffer %d/%d events from %s", count, dataObj.ItemCount, dataObj.DataFileS3Key) + t.Logger.DebugContext(ctx, "Sent on buffer events for object", + "sent_count", count, + "total_count", dataObj.ItemCount, + "object", dataObj.DataFileS3Key, + ) } } @@ -816,11 +820,20 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve } if len(invalidEvents) > 0 { for _, eventWithErr := range invalidEvents { - t.Logger.Debugf("Event %q %q %v is invalid: %v", eventWithErr.event.GetType(), eventWithErr.event.GetID(), eventWithErr.event.GetTime().Format(time.RFC3339), eventWithErr.err) + t.Logger.DebugContext(ctx, "Event is invalid", + "event_type", eventWithErr.event.GetType(), + "event_id", eventWithErr.event.GetID(), + "event_time", eventWithErr.event.GetTime().Format(time.RFC3339), + "error", eventWithErr.err, + ) } return trace.Errorf("there are %d invalid items", len(invalidEvents)) } - t.Logger.Infof("Dry run: there are %d events from %v to %v", count, oldest.GetTime(), newest.GetTime()) + t.Logger.InfoContext(ctx, "Dry run: found valid events", + "event_count", count, + "oldest_time", oldest.GetTime(), + "newest_time", newest.GetTime(), + ) return nil } // mu protects checkpointsPerWorker. @@ -859,7 +872,10 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve // does not have any valid checkpoint to store. Without any valid checkpoint // we won't be able to calculate min checkpoint, so does not store checkpoint at all. if len(checkpointsPerWorker) < t.NoOfEmitWorkers { - t.Logger.Warnf("Not enough checkpoints from workers, got %d, expected %d", len(checkpointsPerWorker), t.NoOfEmitWorkers) + t.Logger.WarnContext(ctx, "Not enough checkpoints from workers", + "checkpoints_received", len(checkpointsPerWorker), + "checkpoints_expected", t.NoOfEmitWorkers, + ) return trace.Wrap(workersErr) } @@ -869,7 +885,7 @@ func (t *task) emitEvents(ctx context.Context, eventsC <-chan apievents.AuditEve Checkpoints: checkpointsPerWorker, } if err := t.storeEmitterCheckpoint(checkpoint); err != nil { - t.Logger.Errorf("Failed to store checkpoint: %v", err) + t.Logger.ErrorContext(ctx, "Failed to store checkpoint", "error", err) } return trace.Wrap(workersErr) } diff --git a/examples/dynamoathenamigration/migration_test.go b/examples/dynamoathenamigration/migration_test.go index 40aca246a9b43..b086a8c4fda1a 100644 --- a/examples/dynamoathenamigration/migration_test.go +++ b/examples/dynamoathenamigration/migration_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "log/slog" "math/rand/v2" "os" "path/filepath" @@ -41,6 +42,7 @@ import ( "github.com/gravitational/teleport/api/utils/prompt" "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/utils" + logutils "github.com/gravitational/teleport/lib/utils/log" ) func TestMigrateProcessDataObjects(t *testing.T) { @@ -60,7 +62,7 @@ func TestMigrateProcessDataObjects(t *testing.T) { }, eventsEmitter: emitter, Config: Config{ - Logger: utils.NewLoggerForTests(), + Logger: utils.NewSlogLoggerForTests(), NoOfEmitWorkers: 5, bufferSize: 10, CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"), @@ -131,7 +133,7 @@ func TestLargeEventsParse(t *testing.T) { }, eventsEmitter: emitter, Config: Config{ - Logger: utils.NewLoggerForTests(), + Logger: utils.NewSlogLoggerForTests(), NoOfEmitWorkers: 5, bufferSize: 10, CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"), @@ -221,7 +223,7 @@ func TestMigrationCheckpoint(t *testing.T) { noOfWorkers := 3 defaultConfig := Config{ - Logger: utils.NewLoggerForTests(), + Logger: utils.NewSlogLoggerForTests(), NoOfEmitWorkers: noOfWorkers, bufferSize: noOfWorkers * 5, CheckpointPath: filepath.Join(t.TempDir(), "migration-tests.json"), @@ -549,7 +551,7 @@ func TestMigrationDryRunValidation(t *testing.T) { validEvent(), eventWithoutTime, } }, - wantLog: "is invalid: empty event time", + wantLog: "empty event time", wantErr: "1 invalid", }, { @@ -561,7 +563,7 @@ func TestMigrationDryRunValidation(t *testing.T) { validEvent(), eventWithInvalidUUID, } }, - wantLog: "is invalid: invalid uid format: invalid UUID length", + wantLog: "invalid uid format: invalid UUID length", wantErr: "1 invalid", }, } @@ -569,8 +571,9 @@ func TestMigrationDryRunValidation(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Migration cli logs output from validation to logger. var logBuffer bytes.Buffer - log := utils.NewLoggerForTests() - log.SetOutput(&logBuffer) + log := slog.New(logutils.NewSlogJSONHandler(&logBuffer, logutils.SlogJSONHandlerConfig{ + Level: slog.LevelDebug, + })) tr := &task{ Config: Config{