From 536ed7d185bd4d47c300a7076fdfd9cf6d28d4f6 Mon Sep 17 00:00:00 2001 From: keeagnsmith21 Date: Mon, 5 Aug 2024 13:50:56 +0800 Subject: [PATCH] Added more gke functions --- observatory_platform/google/gke.py | 78 ++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/observatory_platform/google/gke.py b/observatory_platform/google/gke.py index ab6e5df81..f8312ee75 100644 --- a/observatory_platform/google/gke.py +++ b/observatory_platform/google/gke.py @@ -12,13 +12,91 @@ # See the License for the specific language governing permissions and # limitations under the License. +from dataclasses import dataclass import logging +from typing import Optional import kubernetes +from kubernetes.client import models as k8s +from kubernetes.client.models import V1ResourceRequirements from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from kubernetes import client +@dataclass +class GkeParams: + """Parameters describing the use of Google Kubernetes Engine + + :param gke_namespace: The cluster namespace to use. + :param gke_volume_name: The name of the persistent volume to create + :param gke_volume_size: The amount of storage to request for the persistent volume in GiB + :param gke_volume_path: Where to mount the persistent volume + :param gke_image: The image location to pull from. + :param gke_zone: The zone containing the gke cluster + :param gke_startup_timeout_seconds: How long to wait for the container to start in seconds. + :param gke_conn_id: The name of the airlfow connection storing the gke cluster information. + :param docker_astro_uuid: The uuid of the astro user + :param gke_resource_overrides: Task resource overrides + """ + + gke_namespace: str + gke_volume_name: str + gke_volume_size: int + gke_volume_path: str = "/data" + gke_image: str = "us-docker.pkg.dev/academic-observatory/academic-observatory/academic-observatory:latest" + gke_zone: str = "us-central1" + gke_startup_timeout_seconds: int = 300 + gke_conn_id: str = "gke_cluster" + docker_astro_uid: int = 50000 + gke_resource_overrides: Optional[dict] = None + + +def gke_make_kubernetes_task_params(gke_params: GkeParams): + """Creates the kubernetes task parameters that are handed to each task k8s task + + :param gke_params: The gke_params object + """ + + volume_mounts = [k8s.V1VolumeMount(mount_path=gke_params.gke_volume_path, name=gke_params.gke_volume_name)] + volumes = [ + k8s.V1Volume( + name=gke_params.gke_volume_name, + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=gke_params.gke_volume_name), + ) + ] + return dict( + image=gke_params.gke_image, + security_context=k8s.V1PodSecurityContext( + fs_group=gke_params.docker_astro_uid, + fs_group_change_policy="OnRootMismatch", + run_as_group=gke_params.docker_astro_uid, + run_as_user=gke_params.docker_astro_uid, + ), + do_xcom_push=True, + get_logs=True, + in_cluster=False, + kubernetes_conn_id=gke_params.gke_conn_id, + log_events_on_failure=True, + namespace=gke_params.gke_namespace, + startup_timeout_seconds=gke_params.gke_startup_timeout_seconds, + env_vars={"DATA_PATH": gke_params.gke_volume_path}, + volumes=volumes, + volume_mounts=volume_mounts, + ) + + +def gke_make_container_resources(default: dict, override: Optional[dict]) -> V1ResourceRequirements: + """Creates the container resources object. Takes an optional override. + + :param default: The default dictionary for resources. e.g. {"memory": "2G", "cpu": "2"} + :param override: If supplied, ignore the default and use this resource allocation instead + """ + resource = default + if override is not None: + resource = override + return V1ResourceRequirements(requests=resource, limits=resource) + + def gke_create_volume(*, kubernetes_conn_id: str, volume_name: str, size_gi: int) -> None: """Creates a GKE volume