Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] Improve recording upload backoff #42775

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {
}

// wait for the upload of the right session to complete
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -1269,7 +1269,7 @@ func testLeafProxySessionRecording(t *testing.T, suite *integrationTestSuite) {

// Wait for the session recording to be uploaded and available
var uploaded bool
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
for !uploaded {
select {
case event := <-uploadChan:
Expand Down
6 changes: 3 additions & 3 deletions integration/kube_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func testExec(t *testing.T, suite *KubeSuite, pinnedIP string, clientError strin

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -754,7 +754,7 @@ func testKubeTrustedClustersClientCert(t *testing.T, suite *KubeSuite) {

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down Expand Up @@ -1028,7 +1028,7 @@ func testKubeTrustedClustersSNI(t *testing.T, suite *KubeSuite) {

// verify traffic capture and upload, wait for the upload to hit
var sessionID string
timeoutC := time.After(10 * time.Second)
timeoutC := time.After(20 * time.Second)
loop:
for {
select {
Expand Down
32 changes: 17 additions & 15 deletions lib/events/filesessions/fileasync.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type UploaderConfig struct {
CorruptedDir string
// Clock is the clock replacement
Clock clockwork.Clock
// InitialScanDelay is how long to wait before performing the initial scan.
InitialScanDelay time.Duration
// ScanPeriod is a uploader dir scan period
ScanPeriod time.Duration
// ConcurrentUploads sets up how many parallel uploads to schedule
Expand Down Expand Up @@ -191,9 +193,11 @@ func (u *Uploader) Serve(ctx context.Context) error {

u.log.Infof("uploader will scan %v every %v", u.cfg.ScanDir, u.cfg.ScanPeriod.String())
backoff, err := retryutils.NewLinear(retryutils.LinearConfig{
Step: u.cfg.ScanPeriod,
Max: u.cfg.ScanPeriod * 100,
Clock: u.cfg.Clock,
First: u.cfg.InitialScanDelay,
Step: u.cfg.ScanPeriod,
Max: u.cfg.ScanPeriod * 100,
Clock: u.cfg.Clock,
Jitter: retryutils.NewSeventhJitter(),
})
if err != nil {
return trace.Wrap(err)
Expand All @@ -219,8 +223,7 @@ func (u *Uploader) Serve(ctx context.Context) error {
}
default:
backoff.Inc()
u.log.WithError(event.Error).Warningf(
"Backing off, will retry after %v.", backoff.Duration())
u.log.Warnf("Increasing session upload backoff due to error, will retry after %v.", backoff.Duration())
}
// forward the event to channel that used in tests
if u.cfg.EventsC != nil {
Expand All @@ -232,19 +235,12 @@ func (u *Uploader) Serve(ctx context.Context) error {
}
// Tick at scan period but slow down (and speeds up) on errors.
case <-backoff.After():
var failed bool
if _, err := u.Scan(ctx); err != nil {
if trace.Unwrap(err) != errContext {
failed = true
u.log.WithError(err).Warningf("Uploader scan failed.")
if !errors.Is(trace.Unwrap(err), errContext) {
backoff.Inc()
u.log.WithError(err).Warningf("Uploader scan failed, will retry after %v.", backoff.Duration())
}
}
if failed {
backoff.Inc()
u.log.Debugf("Scan failed, backing off, will retry after %v.", backoff.Duration())
} else {
backoff.ResetToDelay()
}
}
}
}
Expand Down Expand Up @@ -530,6 +526,12 @@ func (u *Uploader) upload(ctx context.Context, up *upload) error {
select {
case <-u.closeC:
return trace.Errorf("operation has been canceled, uploader is closed")
case <-stream.Done():
if errStream, ok := stream.(interface{ Error() error }); ok {
return trace.ConnectionProblem(errStream.Error(), errStream.Error().Error())
}

return trace.ConnectionProblem(nil, "upload stream terminated unexpectedly")
case <-stream.Status():
case <-time.After(events.NetworkRetryDuration):
return trace.ConnectionProblem(nil, "timeout waiting for stream status update")
Expand Down
62 changes: 33 additions & 29 deletions lib/events/filesessions/fileasync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,16 +483,17 @@ func TestUploadBadSession(t *testing.T) {
// uploaderPack reduces boilerplate required
// to create a test
type uploaderPack struct {
scanPeriod time.Duration
clock clockwork.FakeClock
eventsC chan events.UploadEvent
memEventsC chan events.UploadEvent
memUploader *eventstest.MemoryUploader
streamer events.Streamer
scanDir string
uploader *Uploader
ctx context.Context
cancel context.CancelFunc
scanPeriod time.Duration
initialScanDelay time.Duration
clock clockwork.FakeClock
eventsC chan events.UploadEvent
memEventsC chan events.UploadEvent
memUploader *eventstest.MemoryUploader
streamer events.Streamer
scanDir string
uploader *Uploader
ctx context.Context
cancel context.CancelFunc
}

func (u *uploaderPack) Close(t *testing.T) {
Expand All @@ -505,13 +506,14 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
pack := uploaderPack{
clock: clockwork.NewFakeClock(),
eventsC: make(chan events.UploadEvent, 100),
memEventsC: make(chan events.UploadEvent, 100),
ctx: ctx,
cancel: cancel,
scanDir: scanDir,
scanPeriod: 10 * time.Second,
clock: clockwork.NewFakeClock(),
eventsC: make(chan events.UploadEvent, 100),
memEventsC: make(chan events.UploadEvent, 100),
ctx: ctx,
cancel: cancel,
scanDir: scanDir,
scanPeriod: 10 * time.Second,
initialScanDelay: 10 * time.Millisecond,
}
pack.memUploader = eventstest.NewMemoryUploader(pack.memEventsC)

Expand All @@ -527,12 +529,13 @@ func newUploaderPack(t *testing.T, wrapStreamer wrapStreamerFn) uploaderPack {
}

uploader, err := NewUploader(UploaderConfig{
ScanDir: pack.scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: pack.scanPeriod,
Streamer: pack.streamer,
Clock: pack.clock,
EventsC: pack.eventsC,
ScanDir: pack.scanDir,
CorruptedDir: corruptedDir,
InitialScanDelay: pack.initialScanDelay,
ScanPeriod: pack.scanPeriod,
Streamer: pack.streamer,
Clock: pack.clock,
EventsC: pack.eventsC,
})
require.NoError(t, err)
pack.uploader = uploader
Expand Down Expand Up @@ -564,12 +567,13 @@ func runResume(t *testing.T, testCase resumeTestCase) {

scanPeriod := 10 * time.Second
uploader, err := NewUploader(UploaderConfig{
EventsC: eventsC,
ScanDir: scanDir,
CorruptedDir: corruptedDir,
ScanPeriod: scanPeriod,
Streamer: test.streamer,
Clock: clock,
EventsC: eventsC,
ScanDir: scanDir,
CorruptedDir: corruptedDir,
InitialScanDelay: 10 * time.Millisecond,
ScanPeriod: scanPeriod,
Streamer: test.streamer,
Clock: clock,
})
require.Nil(t, err)
go uploader.Serve(ctx)
Expand Down
9 changes: 5 additions & 4 deletions lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3008,10 +3008,11 @@ func (process *TeleportProcess) initUploaderService() error {
corruptedDir := filepath.Join(paths[1]...)

fileUploader, err := filesessions.NewUploader(filesessions.UploaderConfig{
Streamer: uploaderClient,
ScanDir: uploadsDir,
CorruptedDir: corruptedDir,
EventsC: process.Config.Testing.UploadEventsC,
Streamer: uploaderClient,
ScanDir: uploadsDir,
CorruptedDir: corruptedDir,
EventsC: process.Config.Testing.UploadEventsC,
InitialScanDelay: 15 * time.Second,
})
if err != nil {
return trace.Wrap(err)
Expand Down
Loading