Skip to content

Commit

Permalink
Added more gke functions
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Aug 5, 2024
1 parent 6ddfcad commit 536ed7d
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions observatory_platform/google/gke.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,91 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
import logging
from typing import Optional

import kubernetes
from kubernetes.client import models as k8s
from kubernetes.client.models import V1ResourceRequirements
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from kubernetes import client


@dataclass
class GkeParams:
"""Parameters describing the use of Google Kubernetes Engine
:param gke_namespace: The cluster namespace to use.
:param gke_volume_name: The name of the persistent volume to create
:param gke_volume_size: The amount of storage to request for the persistent volume in GiB
:param gke_volume_path: Where to mount the persistent volume
:param gke_image: The image location to pull from.
:param gke_zone: The zone containing the gke cluster
:param gke_startup_timeout_seconds: How long to wait for the container to start in seconds.
:param gke_conn_id: The name of the airlfow connection storing the gke cluster information.
:param docker_astro_uuid: The uuid of the astro user
:param gke_resource_overrides: Task resource overrides
"""

gke_namespace: str
gke_volume_name: str
gke_volume_size: int
gke_volume_path: str = "/data"
gke_image: str = "us-docker.pkg.dev/academic-observatory/academic-observatory/academic-observatory:latest"
gke_zone: str = "us-central1"
gke_startup_timeout_seconds: int = 300
gke_conn_id: str = "gke_cluster"
docker_astro_uid: int = 50000
gke_resource_overrides: Optional[dict] = None


def gke_make_kubernetes_task_params(gke_params: GkeParams):
"""Creates the kubernetes task parameters that are handed to each task k8s task
:param gke_params: The gke_params object
"""

volume_mounts = [k8s.V1VolumeMount(mount_path=gke_params.gke_volume_path, name=gke_params.gke_volume_name)]
volumes = [
k8s.V1Volume(
name=gke_params.gke_volume_name,
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name=gke_params.gke_volume_name),
)
]
return dict(
image=gke_params.gke_image,
security_context=k8s.V1PodSecurityContext(
fs_group=gke_params.docker_astro_uid,
fs_group_change_policy="OnRootMismatch",
run_as_group=gke_params.docker_astro_uid,
run_as_user=gke_params.docker_astro_uid,
),
do_xcom_push=True,
get_logs=True,
in_cluster=False,
kubernetes_conn_id=gke_params.gke_conn_id,
log_events_on_failure=True,
namespace=gke_params.gke_namespace,
startup_timeout_seconds=gke_params.gke_startup_timeout_seconds,
env_vars={"DATA_PATH": gke_params.gke_volume_path},
volumes=volumes,
volume_mounts=volume_mounts,
)


def gke_make_container_resources(default: dict, override: Optional[dict]) -> V1ResourceRequirements:
"""Creates the container resources object. Takes an optional override.
:param default: The default dictionary for resources. e.g. {"memory": "2G", "cpu": "2"}
:param override: If supplied, ignore the default and use this resource allocation instead
"""
resource = default
if override is not None:
resource = override
return V1ResourceRequirements(requests=resource, limits=resource)


def gke_create_volume(*, kubernetes_conn_id: str, volume_name: str, size_gi: int) -> None:
"""Creates a GKE volume
Expand Down

0 comments on commit 536ed7d

Please sign in to comment.