Skip to content

Commit

Permalink
Improve recording upload backoff (#42718)
Browse files Browse the repository at this point in the history
Failed uploads could get in an infinite backoff of 10s instead of
linearly backing off as intended. Due to the asynchronous nature
of launching uploads by `(Uploader) Scan`, it could return without
an error which resulted in resetting the backoff to its initial
value even if previous failed uploads had incremented the backoff.
To avoid this, resetting the backoff delay was modified to only
occur if an upload completed successfully.

Additionally error messaging was attempted to be improved. Any errors
caused by the stream being terminated should now be returned instead
of a vague message.
  • Loading branch information
rosstimothy authored Jun 11, 2024
1 parent 11de09e commit e33de20
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 52 deletions.
4 changes: 2 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
}

// wait for the upload of the right session to complete
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func testLeafProxySessionRecording(t *testing.T, suite *integrationTestSuite) {

// Wait for the session recording to be uploaded and available
var uploaded bool
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
for !uploaded {
select {
case event := <-uploadChan:
Expand Down
6 changes: 3 additions & 3 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func testExec(t *testing.T, suite *KubeSuite, pinnedIP string, clientError strin

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -764,7 +764,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) {

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -1038,7 +1038,7 @@ func testKubeTrustedClustersSNI(t *testing.T, suite *KubeSuite) {

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down
30 changes: 16 additions & 14 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type UploaderConfig struct {
CorruptedDir string
// Clock is the clock replacement
Clock clockwork.Clock
// InitialScanDelay is how long to wait before performing the initial scan.
InitialScanDelay time.Duration
// ScanPeriod is a uploader dir scan period
ScanPeriod time.Duration
// ConcurrentUploads sets up how many parallel uploads to schedule
Expand Down Expand Up @@ -193,9 +195,11 @@ func (u *Uploader) Serve(ctx context.Context) error {

u.log.Infof("uploader will scan %v every %v", u.cfg.ScanDir, u.cfg.ScanPeriod.String())
backoff, err := retryutils.NewLinear(retryutils.LinearConfig{
Step: u.cfg.ScanPeriod,
Max: u.cfg.ScanPeriod * 100,
Clock: u.cfg.Clock,
First: u.cfg.InitialScanDelay,
Step: u.cfg.ScanPeriod,
Max: u.cfg.ScanPeriod * 100,
Clock: u.cfg.Clock,
Jitter: retryutils.NewSeventhJitter(),
})
if err != nil {
return trace.Wrap(err)
Expand All @@ -221,8 +225,7 @@ func (u *Uploader) Serve(ctx context.Context) error {
}
default:
backoff.Inc()
u.log.WithError(event.Error).Warningf(
"Backing off, will retry after %v.", backoff.Duration())
u.log.Warnf("Increasing session upload backoff due to error, will retry after %v.", backoff.Duration())
}
// forward the event to channel that used in tests
if u.cfg.EventsC != nil {
Expand All @@ -234,19 +237,12 @@ func (u *Uploader) Serve(ctx context.Context) error {
}
// Tick at scan period but slow down (and speeds up) on errors.
case <-backoff.After():
var failed bool
if _, err := u.Scan(ctx); err != nil {
if !errors.Is(trace.Unwrap(err), errContext) {
failed = true
u.log.WithError(err).Warningf("Uploader scan failed.")
backoff.Inc()
u.log.WithError(err).Warningf("Uploader scan failed, will retry after %v.", backoff.Duration())
}
}
if failed {
backoff.Inc()
u.log.Debugf("Scan failed, backing off, will retry after %v.", backoff.Duration())
} else {
backoff.ResetToDelay()
}
}
}
}
Expand Down Expand Up @@ -532,6 +528,12 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error {
select {
case <-u.closeC:
return trace.Errorf("operation has been canceled, uploader is closed")
case <-stream.Done():
if errStream, ok := stream.(interface{ Error() error }); ok {
return trace.ConnectionProblem(errStream.Error(), errStream.Error().Error())
}

return trace.ConnectionProblem(nil, "upload stream terminated unexpectedly")
case <-stream.Status():
case <-time.After(events.NetworkRetryDuration):
return trace.ConnectionProblem(nil, "timeout waiting for stream status update")
Expand Down
62 changes: 33 additions & 29 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,16 +483,17 @@ func TestUploadBadSession(t *testing.T) {
// uploaderPack reduces boilerplate required
// to create a test
type uploaderPack struct {
scanPeriod time.Duration
clock clockwork.FakeClock
eventsC chan events.UploadEvent
memEventsC chan events.UploadEvent
memUploader *eventstest.MemoryUploader
streamer events.Streamer
scanDir string
uploader *Uploader
ctx context.Context
cancel context.CancelFunc
scanPeriod time.Duration
initialScanDelay time.Duration
clock clockwork.FakeClock
eventsC chan events.UploadEvent
memEventsC chan events.UploadEvent
memUploader *eventstest.MemoryUploader
streamer events.Streamer
scanDir string
uploader *Uploader
ctx context.Context
cancel context.CancelFunc
}

func (u *uploaderPack) Close(t *testing.T) {
Expand All @@ -505,13 +506,14 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
pack := uploaderPack{
clock: clockwork.NewFakeClock(),
eventsC: make(chan events.UploadEvent, 100),
memEventsC: make(chan events.UploadEvent, 100),
ctx: ctx,
cancel: cancel,
scanDir: scanDir,
scanPeriod: 10 * time.Second,
clock: clockwork.NewFakeClock(),
eventsC: make(chan events.UploadEvent, 100),
memEventsC: make(chan events.UploadEvent, 100),
ctx: ctx,
cancel: cancel,
scanDir: scanDir,
scanPeriod: 10 * time.Second,
initialScanDelay: 10 * time.Millisecond,
}
pack.memUploader = eventstest.NewMemoryUploader(pack.memEventsC)

Expand All @@ -527,12 +529,13 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {
}

uploader, err := NewUploader(UploaderConfig{
ScanDir: pack.scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: pack.scanPeriod,
Streamer: pack.streamer,
Clock: pack.clock,
EventsC: pack.eventsC,
ScanDir: pack.scanDir,
CorruptedDir: corruptedDir,
InitialScanDelay: pack.initialScanDelay,
ScanPeriod: pack.scanPeriod,
Streamer: pack.streamer,
Clock: pack.clock,
EventsC: pack.eventsC,
})
require.NoError(t, err)
pack.uploader = uploader
Expand Down Expand Up @@ -564,12 +567,13 @@ func runResume(t *testing.T, testCase resumeTestCase) {

scanPeriod := 10 * time.Second
uploader, err := NewUploader(UploaderConfig{
EventsC: eventsC,
ScanDir: scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: scanPeriod,
Streamer: test.streamer,
Clock: clock,
EventsC: eventsC,
ScanDir: scanDir,
CorruptedDir: corruptedDir,
InitialScanDelay: 10 * time.Millisecond,
ScanPeriod: scanPeriod,
Streamer: test.streamer,
Clock: clock,
})
require.NoError(t, err)
go uploader.Serve(ctx)
Expand Down
9 changes: 5 additions & 4 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3025,10 +3025,11 @@ func (process *TeleportProcess) initUploaderService() error {
corruptedDir := filepath.Join(paths[1]...)

fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{
Streamer: uploaderClient,
ScanDir: uploadsDir,
CorruptedDir: corruptedDir,
EventsC: process.Config.Testing.UploadEventsC,
Streamer: uploaderClient,
ScanDir: uploadsDir,
CorruptedDir: corruptedDir,
EventsC: process.Config.Testing.UploadEventsC,
InitialScanDelay: 15 * time.Second,
})
if err != nil {
return trace.Wrap(err)
Expand Down

0 comments on commit e33de20

Please sign in to comment.