-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
Bug summary
I observed an issue where OOMKilled Subflow pod that was submitted via kubernetes decorator will keep being in Running state in Prefect UI. This in turn makes main flowkeep pending for the subflow to complete which results both main flow and main flow pod to run indefinitely.
Below is the subflow with Running state:
Below is the Kubernetes Job pod running the subflow with OOMKilled status:
which results the main flow to also pending the subflow prefect job to finish:
TLDR: If the mainflow has been running for a long time, the manual cancellation logic will not be able to kill the pod. When this happen, it will show that the job is Cancelled but the pod is running. The current workaround I found is to set job var PREFECT_RUNNER_CRASH_ON_CANCELLATION_FAILURE to true at deployment.
Root Cause Investigation
I highly suspects the issue is within observer.py where the observer fails to see the pod status OOMKilled.
This is because Prefect's internal observer (_mark_flow_run_as_crashed) watches Kubernetes Jobs, not pods:
# prefect_kubernetes/observer.py
@kopf.on.event(
"jobs",
labels={
"prefect.io/flow-run-id": kopf.PRESENT,
**settings.observer.additional_label_filters,
},
) # type: ignore
...
current_job_failed = status.get("failed", 0) > backoff_limit
if not current_job_failed:
return # skips if job hasn't exceeded backoff limitIt only acts when job.status.failed > backoffLimit. When a pod is OOMKilled, Kubernetes may not immediately reflect this as a job failure above the backoff limit especially if the job has retries, or if there's a timing race between the pod dying and the job status updating. So the condition never triggers and causing the job to be in Running state indefnitely without any pod actually running.
Current Workaround
I've deployed my own observer to monitor on pod level. When a pod has OOMKilled status, I would flag it as Failed via Prefect UI (Because my pipeline business logic treats OOMKilled as Failed). But if the bug fix were to be implement on the library, ideally it will be Crashed status.
I already has code for this so I am happy to contribute :)
Reproduction
-
Setup prefect_kubernetes environment where you are able to submit subflow to dynamic pod. Refer the guide on setup: Prefect-Submit-Flows-Directly-To-Dynamic-Infrastructure
-
Deploy a minimal flow and subflow via kubernetes decorator. Example:
from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes
# Submit `my_remote_flow` to run in a Kubernetes job
@kubernetes(work_pool="olympic")
@flow
def my_remote_flow(name: str):
print(f"Hello {name}!")
# Deliberately cause OOMKilled
data = []
while True:
data.append(bytearray(1024 * 1024 * 100)) # 100MB chunks
@flow
def main_flow():
future = my_remote_flow.submit("Marvin")
result = future.result()
# Run the flow
main_flow()- Deploy, run the job and monitor the state of both subflow and main flow pod via
kubectl get pods -n <namespace job runs in>
Version info
Prefect server image version: 3.0.40-python3.12
pip libraries version:
prefect_kubernetes==0.7.3
prefect_aws==0.5.13
kopf==1.43.0
Additional context
No response