diff --git a/backend/metadata_writer/requirements.txt b/backend/metadata_writer/requirements.txt index c77a354d4ef..6897654ae21 100644 --- a/backend/metadata_writer/requirements.txt +++ b/backend/metadata_writer/requirements.txt @@ -22,7 +22,7 @@ grpcio==1.66.1 # via ml-metadata idna==3.10 # via requests -kubernetes==10.0.1 +kubernetes==31.0.0 # via -r - lru-dict==1.3.0 # via -r - diff --git a/backend/metadata_writer/src/metadata_writer.py b/backend/metadata_writer/src/metadata_writer.py index 8f21ff6256c..e305bb1e15b 100644 --- a/backend/metadata_writer/src/metadata_writer.py +++ b/backend/metadata_writer/src/metadata_writer.py @@ -19,6 +19,7 @@ import collections import kubernetes import yaml +import urllib3 from time import sleep import lru @@ -30,7 +31,10 @@ workflow_name_to_context_id_size = os.environ.get('WORKFLOW_NAME_TO_CONTEXT_ID_SIZE', 5000) pods_with_written_metadata_size = os.environ.get('PODS_WITH_WRITTEN_METADATA_SIZE', 5000) debug_files_size = os.environ.get('DEBUG_FILES_SIZE', 5000) - +# See the documentation on settings k8s_watch timeouts: +# https://github.com/kubernetes-client/python/blob/master/examples/watch/timeout-settings.md +k8s_watch_server_side_timeout = os.environ.get('K8S_WATCH_SERVER_SIDE_TIMEOUT', 1800) +k8s_watch_client_side_timeout = os.environ.get('K8S_WATCH_CLIENT_SIDE_TIMEOUT', 60) kubernetes.config.load_incluster_config() k8s_api = kubernetes.client.CoreV1Api() @@ -150,18 +154,18 @@ def is_kfp_v2_pod(pod) -> bool: k8s_api.list_namespaced_pod, namespace=namespace_to_watch, label_selector=ARGO_WORKFLOW_LABEL_KEY, - timeout_seconds=1800, # Sometimes watch gets stuck - _request_timeout=2000, # Sometimes HTTP GET gets stuck + timeout_seconds=k8s_watch_server_side_timeout, + _request_timeout=k8s_watch_client_side_timeout, ) else: pod_stream = k8s_watch.stream( k8s_api.list_pod_for_all_namespaces, label_selector=ARGO_WORKFLOW_LABEL_KEY, - timeout_seconds=1800, # Sometimes watch gets stuck - _request_timeout=2000, # Sometimes HTTP GET gets stuck + timeout_seconds=k8s_watch_server_side_timeout, + _request_timeout=k8s_watch_client_side_timeout, ) - for event in pod_stream: - try: + try: + for event in pod_stream: obj = event['object'] print('Kubernetes Pod event: ', event['type'], obj.metadata.name, obj.metadata.resource_version) if event['type'] == 'ERROR': @@ -393,6 +397,10 @@ def is_kfp_v2_pod(pod) -> bool: pods_with_written_metadata[obj.metadata.name] = None - except Exception as e: - import traceback - print(traceback.format_exc()) + # If the for loop ended, a server-side timeout occurred. Continue watching. + pass + + except urllib3.exceptions.ReadTimeoutError as e: + # Client side timeout, continue watching. + continue + \ No newline at end of file