diff --git a/integration/integration_test.go b/integration/integration_test.go index 7742ad071e2fd..0703a5dd7e9f5 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -524,7 +524,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { } // wait for the upload of the right session to complete - timeoutC := time.After(10 * time.Second) + timeoutC := time.After(20 * time.Second) loop: for { select { @@ -1268,7 +1268,7 @@ func testLeafProxySessionRecording(t *testing.T, suite *integrationTestSuite) { // Wait for the session recording to be uploaded and available var uploaded bool - timeoutC := time.After(10 * time.Second) + timeoutC := time.After(20 * time.Second) for !uploaded { select { case event := <-uploadChan: diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index af9ce8efd59ae..bcf21232a6062 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -329,7 +329,7 @@ func testExec(t *testing.T, suite *KubeSuite, pinnedIP string, clientError strin // verify traffic capture and upload, wait for the upload to hit var sessionID string - timeoutC := time.After(10 * time.Second) + timeoutC := time.After(20 * time.Second) loop: for { select { @@ -764,7 +764,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) { // verify traffic capture and upload, wait for the upload to hit var sessionID string - timeoutC := time.After(10 * time.Second) + timeoutC := time.After(20 * time.Second) loop: for { select { @@ -1038,7 +1038,7 @@ func testKubeTrustedClustersSNI(t *testing.T, suite *KubeSuite) { // verify traffic capture and upload, wait for the upload to hit var sessionID string - timeoutC := time.After(10 * time.Second) + timeoutC := time.After(20 * time.Second) loop: for { select { diff --git a/lib/events/filesessions/fileasync.go b/lib/events/filesessions/fileasync.go index 33760d7b2cff6..8f6f799643e59 100644 --- a/lib/events/filesessions/fileasync.go +++ b/lib/events/filesessions/fileasync.go @@ -50,6 +50,8 @@ type UploaderConfig struct { CorruptedDir string // Clock is the clock replacement Clock clockwork.Clock + // InitialScanDelay is how long to wait before performing the initial scan. + InitialScanDelay time.Duration // ScanPeriod is a uploader dir scan period ScanPeriod time.Duration // ConcurrentUploads sets up how many parallel uploads to schedule @@ -193,9 +195,11 @@ func (u *Uploader) Serve(ctx context.Context) error { u.log.Infof("uploader will scan %v every %v", u.cfg.ScanDir, u.cfg.ScanPeriod.String()) backoff, err := retryutils.NewLinear(retryutils.LinearConfig{ - Step: u.cfg.ScanPeriod, - Max: u.cfg.ScanPeriod * 100, - Clock: u.cfg.Clock, + First: u.cfg.InitialScanDelay, + Step: u.cfg.ScanPeriod, + Max: u.cfg.ScanPeriod * 100, + Clock: u.cfg.Clock, + Jitter: retryutils.NewSeventhJitter(), }) if err != nil { return trace.Wrap(err) @@ -221,8 +225,7 @@ func (u *Uploader) Serve(ctx context.Context) error { } default: backoff.Inc() - u.log.WithError(event.Error).Warningf( - "Backing off, will retry after %v.", backoff.Duration()) + u.log.Warnf("Increasing session upload backoff due to error, will retry after %v.", backoff.Duration()) } // forward the event to channel that used in tests if u.cfg.EventsC != nil { @@ -234,19 +237,12 @@ func (u *Uploader) Serve(ctx context.Context) error { } // Tick at scan period but slow down (and speeds up) on errors. case <-backoff.After(): - var failed bool if _, err := u.Scan(ctx); err != nil { if !errors.Is(trace.Unwrap(err), errContext) { - failed = true - u.log.WithError(err).Warningf("Uploader scan failed.") + backoff.Inc() + u.log.WithError(err).Warningf("Uploader scan failed, will retry after %v.", backoff.Duration()) } } - if failed { - backoff.Inc() - u.log.Debugf("Scan failed, backing off, will retry after %v.", backoff.Duration()) - } else { - backoff.ResetToDelay() - } } } } @@ -532,6 +528,12 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error { select { case <-u.closeC: return trace.Errorf("operation has been canceled, uploader is closed") + case <-stream.Done(): + if errStream, ok := stream.(interface{ Error() error }); ok { + return trace.ConnectionProblem(errStream.Error(), errStream.Error().Error()) + } + + return trace.ConnectionProblem(nil, "upload stream terminated unexpectedly") case <-stream.Status(): case <-time.After(events.NetworkRetryDuration): return trace.ConnectionProblem(nil, "timeout waiting for stream status update") diff --git a/lib/events/filesessions/fileasync_test.go b/lib/events/filesessions/fileasync_test.go index 947fcd64f6634..4e3b1e061ec46 100644 --- a/lib/events/filesessions/fileasync_test.go +++ b/lib/events/filesessions/fileasync_test.go @@ -483,16 +483,17 @@ func TestUploadBadSession(t *testing.T) { // uploaderPack reduces boilerplate required // to create a test type uploaderPack struct { - scanPeriod time.Duration - clock clockwork.FakeClock - eventsC chan events.UploadEvent - memEventsC chan events.UploadEvent - memUploader *eventstest.MemoryUploader - streamer events.Streamer - scanDir string - uploader *Uploader - ctx context.Context - cancel context.CancelFunc + scanPeriod time.Duration + initialScanDelay time.Duration + clock clockwork.FakeClock + eventsC chan events.UploadEvent + memEventsC chan events.UploadEvent + memUploader *eventstest.MemoryUploader + streamer events.Streamer + scanDir string + uploader *Uploader + ctx context.Context + cancel context.CancelFunc } func (u *uploaderPack) Close(t *testing.T) { @@ -505,13 +506,14 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) pack := uploaderPack{ - clock: clockwork.NewFakeClock(), - eventsC: make(chan events.UploadEvent, 100), - memEventsC: make(chan events.UploadEvent, 100), - ctx: ctx, - cancel: cancel, - scanDir: scanDir, - scanPeriod: 10 * time.Second, + clock: clockwork.NewFakeClock(), + eventsC: make(chan events.UploadEvent, 100), + memEventsC: make(chan events.UploadEvent, 100), + ctx: ctx, + cancel: cancel, + scanDir: scanDir, + scanPeriod: 10 * time.Second, + initialScanDelay: 10 * time.Millisecond, } pack.memUploader = eventstest.NewMemoryUploader(pack.memEventsC) @@ -527,12 +529,13 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack { } uploader, err := NewUploader(UploaderConfig{ - ScanDir: pack.scanDir, - CorruptedDir: corruptedDir, - ScanPeriod: pack.scanPeriod, - Streamer: pack.streamer, - Clock: pack.clock, - EventsC: pack.eventsC, + ScanDir: pack.scanDir, + CorruptedDir: corruptedDir, + InitialScanDelay: pack.initialScanDelay, + ScanPeriod: pack.scanPeriod, + Streamer: pack.streamer, + Clock: pack.clock, + EventsC: pack.eventsC, }) require.NoError(t, err) pack.uploader = uploader @@ -564,12 +567,13 @@ func runResume(t *testing.T, testCase resumeTestCase) { scanPeriod := 10 * time.Second uploader, err := NewUploader(UploaderConfig{ - EventsC: eventsC, - ScanDir: scanDir, - CorruptedDir: corruptedDir, - ScanPeriod: scanPeriod, - Streamer: test.streamer, - Clock: clock, + EventsC: eventsC, + ScanDir: scanDir, + CorruptedDir: corruptedDir, + InitialScanDelay: 10 * time.Millisecond, + ScanPeriod: scanPeriod, + Streamer: test.streamer, + Clock: clock, }) require.NoError(t, err) go uploader.Serve(ctx) diff --git a/lib/service/service.go b/lib/service/service.go index 2e94137f02828..18724d04c28da 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -3086,10 +3086,11 @@ func (process *TeleportProcess) initUploaderService() error { corruptedDir := filepath.Join(paths[1]...) fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{ - Streamer: uploaderClient, - ScanDir: uploadsDir, - CorruptedDir: corruptedDir, - EventsC: process.Config.Testing.UploadEventsC, + Streamer: uploaderClient, + ScanDir: uploadsDir, + CorruptedDir: corruptedDir, + EventsC: process.Config.Testing.UploadEventsC, + InitialScanDelay: 15 * time.Second, }) if err != nil { return trace.Wrap(err)