diff --git a/internal/cmd/commands/server/server.go b/internal/cmd/commands/server/server.go index b41684e9c1..2d6cc2cd38 100644 --- a/internal/cmd/commands/server/server.go +++ b/internal/cmd/commands/server/server.go @@ -859,6 +859,10 @@ func (c *Command) Reload(newConf *config.Config) error { reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller api rate limits: %w", err)) } + if err := c.reloadControllerTimings(newConf); err != nil { + reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller timings: %w", err)) + } + if newConf != nil && c.worker != nil { workerReloadErr := func() error { if newConf.Controller != nil { @@ -977,6 +981,14 @@ func (c *Command) reloadControllerRateLimits(newConfig *config.Config) error { return c.controller.ReloadRateLimiter(newConfig) } +func (c *Command) reloadControllerTimings(newConfig *config.Config) error { + if c.controller == nil || newConfig == nil || newConfig.Controller == nil { + return nil + } + + return c.controller.ReloadTimings(newConfig) +} + // acquireSchemaManager returns a schema manager and generally acquires a shared lock on // the database. This is done as a mechanism to disallow running migration commands // while the database is in use. diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 8f4d617d25..361d5e5cc3 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -682,3 +682,39 @@ func (c *Controller) Shutdown() error { func (c *Controller) WorkerStatusUpdateTimes() *sync.Map { return c.workerStatusUpdateTimes } + +// ReloadTimings reloads timing related parameters +func (c *Controller) ReloadTimings(newConfig *config.Config) error { + const op = "controller.(Controller).ReloadTimings" + + switch { + case newConfig == nil: + return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config") + case newConfig.Controller == nil: + return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config.Controller") + } + + switch newConfig.Controller.WorkerStatusGracePeriodDuration { + case 0: + c.workerStatusGracePeriod.Store(int64(server.DefaultLiveness)) + default: + c.workerStatusGracePeriod.Store(int64(newConfig.Controller.WorkerStatusGracePeriodDuration)) + } + switch newConfig.Controller.LivenessTimeToStaleDuration { + case 0: + c.livenessTimeToStale.Store(int64(server.DefaultLiveness)) + default: + c.livenessTimeToStale.Store(int64(newConfig.Controller.LivenessTimeToStaleDuration)) + } + + switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration { + case 0: + to := server.DefaultLiveness + c.getDownstreamWorkersTimeout.Store(&to) + default: + to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration + c.getDownstreamWorkersTimeout.Store(&to) + } + + return nil +} diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 002e52c9de..902029dad9 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -414,6 +414,29 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) { ar.InitialAddresses(w.conf.RawConfig.Worker.InitialUpstreams) } } + + switch newConf.Worker.SuccessfulStatusGracePeriodDuration { + case 0: + w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness)) + default: + w.successfulStatusGracePeriod.Store(int64(newConf.Worker.SuccessfulStatusGracePeriodDuration)) + } + switch newConf.Worker.StatusCallTimeoutDuration { + case 0: + w.statusCallTimeoutDuration.Store(int64(common.DefaultStatusTimeout)) + default: + w.statusCallTimeoutDuration.Store(int64(newConf.Worker.StatusCallTimeoutDuration)) + } + switch newConf.Worker.GetDownstreamWorkersTimeoutDuration { + case 0: + to := server.DefaultLiveness + w.getDownstreamWorkersTimeoutDuration.Store(&to) + default: + to := newConf.Worker.GetDownstreamWorkersTimeoutDuration + w.getDownstreamWorkersTimeoutDuration.Store(&to) + } + // See comment about this in worker.go + session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load()) } func (w *Worker) Start() error { diff --git a/internal/session/job_session_cleanup.go b/internal/session/job_session_cleanup.go index dd68c57215..d98933c464 100644 --- a/internal/session/job_session_cleanup.go +++ b/internal/session/job_session_cleanup.go @@ -37,7 +37,7 @@ type sessionConnectionCleanupJob struct { // The amount of time to give disconnected workers before marking their // connections as closed. This should be larger than the liveness setting // for the worker. - gracePeriod *atomic.Int64 + workerStatusGracePeriod *atomic.Int64 // The total number of connections closed in the last run. totalClosed int @@ -47,21 +47,21 @@ type sessionConnectionCleanupJob struct { func newSessionConnectionCleanupJob( ctx context.Context, writer db.Writer, - gracePeriod *atomic.Int64, + workerStatusGracePeriod *atomic.Int64, ) (*sessionConnectionCleanupJob, error) { const op = "session.newNewSessionConnectionCleanupJob" switch { case writer == nil: return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db writer") - case gracePeriod == nil: + case workerStatusGracePeriod == nil: return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grace period") - case gracePeriod.Load() == 0: + case workerStatusGracePeriod.Load() == 0: return nil, errors.New(ctx, errors.InvalidParameter, op, "grace period is zero") } return &sessionConnectionCleanupJob{ - writer: writer, - gracePeriod: gracePeriod, + writer: writer, + workerStatusGracePeriod: workerStatusGracePeriod, }, nil } @@ -98,7 +98,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error { j.totalClosed = 0 // Run the atomic dead worker cleanup job. - gracePeriod := time.Duration(j.gracePeriod.Load()) + gracePeriod := time.Duration(j.workerStatusGracePeriod.Load()) results, err := j.closeConnectionsForDeadWorkers(ctx, gracePeriod) if err != nil { return errors.Wrap(ctx, err, op) diff --git a/internal/session/jobs.go b/internal/session/jobs.go index b18f356d50..5b9f9423e7 100644 --- a/internal/session/jobs.go +++ b/internal/session/jobs.go @@ -18,14 +18,14 @@ import ( const deleteTerminatedThreshold = time.Hour // RegisterJobs registers session related jobs with the provided scheduler. -func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod *atomic.Int64) error { +func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, workerStatusGracePeriod *atomic.Int64) error { const op = "session.RegisterJobs" - if gracePeriod == nil { + if workerStatusGracePeriod == nil { return errors.New(ctx, errors.InvalidParameter, op, "nil grace period") } - sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, gracePeriod) + sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, workerStatusGracePeriod) if err != nil { return fmt.Errorf("error creating session cleanup job: %w", err) }