45
45
# Unless/until we can provide reasonable ways to decide to change their values,
46
46
# they will live as constants instead of "proper" settings.
47
47
48
- # Number of heartbeats for a task to finish on graceful worker shutdown (approx)
49
- TASK_GRACE_INTERVAL = 3
50
- # Number of heartbeats between attempts to kill the subprocess (approx)
48
+ # Seconds for a task to finish on semi graceful worker shutdown (approx)
49
+ TASK_GRACE_INTERVAL = settings . TASK_GRACE_INTERVAL
50
+ # Seconds between attempts to kill the subprocess (approx)
51
51
TASK_KILL_INTERVAL = 1
52
52
# Number of heartbeats between cleaning up worker processes (approx)
53
53
WORKER_CLEANUP_INTERVAL = 100
@@ -69,7 +69,10 @@ def __init__(self):
69
69
self .versions = {app .label : app .version for app in pulp_plugin_configs ()}
70
70
self .cursor = connection .cursor ()
71
71
self .worker = self .handle_worker_heartbeat ()
72
- self .task_grace_timeout = 0
72
+ # This defaults to immediate task cancelation.
73
+ # It will be set into the future on moderately graceful worker shutdown,
74
+ # and set to None for fully graceful shutdown.
75
+ self .task_grace_timeout = timezone .now ()
73
76
self .worker_cleanup_countdown = random .randint (
74
77
int (WORKER_CLEANUP_INTERVAL / 10 ), WORKER_CLEANUP_INTERVAL
75
78
)
@@ -104,7 +107,8 @@ def _init_instrumentation(self):
104
107
def _signal_handler (self , thesignal , frame ):
105
108
if thesignal in (signal .SIGHUP , signal .SIGTERM ):
106
109
_logger .info (_ ("Worker %s was requested to shut down gracefully." ), self .name )
107
- self .task_grace_timeout = - 1
110
+ # Wait forever...
111
+ self .task_grace_timeout = None
108
112
else :
109
113
# Reset signal handlers to default
110
114
# If you kill the process a second time it's not graceful anymore.
@@ -113,7 +117,9 @@ def _signal_handler(self, thesignal, frame):
113
117
signal .signal (signal .SIGHUP , signal .SIG_DFL )
114
118
115
119
_logger .info (_ ("Worker %s was requested to shut down." ), self .name )
116
- self .task_grace_timeout = TASK_GRACE_INTERVAL
120
+ self .task_grace_timeout = timezone .now () + timezone .timedelta (
121
+ seconds = TASK_GRACE_INTERVAL
122
+ )
117
123
self .shutdown_requested = True
118
124
119
125
def _pg_notify_handler (self , notification ):
@@ -173,8 +179,6 @@ def worker_cleanup(self):
173
179
def beat (self ):
174
180
if self .worker .last_heartbeat < timezone .now () - self .heartbeat_period :
175
181
self .worker = self .handle_worker_heartbeat ()
176
- if self .task_grace_timeout > 0 :
177
- self .task_grace_timeout -= 1
178
182
self .worker_cleanup_countdown -= 1
179
183
if self .worker_cleanup_countdown <= 0 :
180
184
self .worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL
@@ -390,10 +394,12 @@ def supervise_task(self, task):
390
394
task_process .start ()
391
395
while True :
392
396
if cancel_state :
393
- if self .task_grace_timeout != 0 :
397
+ if self .task_grace_timeout is None or self . task_grace_timeout > timezone . now () :
394
398
_logger .info ("Wait for canceled task to abort." )
395
399
else :
396
- self .task_grace_timeout = TASK_KILL_INTERVAL
400
+ self .task_grace_timeout = timezone .now () + timezone .TimeDelta (
401
+ seconds = TASK_KILL_INTERVAL
402
+ )
397
403
_logger .info (
398
404
"Aborting current task %s in domain: %s due to cancelation." ,
399
405
task .pk ,
@@ -430,7 +436,7 @@ def supervise_task(self, task):
430
436
if self .sentinel in r :
431
437
os .read (self .sentinel , 256 )
432
438
if self .shutdown_requested :
433
- if self .task_grace_timeout != 0 :
439
+ if self .task_grace_timeout is None or self . task_grace_timeout > timezone . now () :
434
440
msg = (
435
441
"Worker shutdown requested, waiting for task {pk} in domain: {name} "
436
442
"to finish." .format (pk = task .pk , name = domain .name )
0 commit comments