Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docs for GCP Dataproc deployment #4393

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Prev Previous commit
Next Next commit
fix submit_batches.py script
Signed-off-by: Abhishek Bhatia <bhatiaabhishek8893@gmail.com>
abhi8893 committed Dec 30, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit cdde986cb019fbf265953752734fbe6b43af1fdf
254 changes: 144 additions & 110 deletions docs/source/deployment/gcp_dataproc.md
Original file line number Diff line number Diff line change
@@ -153,116 +153,6 @@ if KEDRO_RUN_ARGS:
subprocess.run(kedro_cmd, check=True, shell=True)
```

#### Define run submit artifact preparation script

`deployment/dataproc/prepare_run_submit_artifacts.py`

```python
import argparse
import subprocess
import sys

# NOTE: Suggested pattern => Define your own project_utils module to store common functions
from project_utils import load_project_config

import re


def run_subprocess_commands(commands):
for command in commands:
print(f"Running command: {command}")
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
sys.exit(1)


parser = argparse.ArgumentParser(description='Submit Kedro job to Dataproc')
parser.add_argument('--env', choices=['dev', 'prod'])
parser.add_argument('--release', type=str, help='Release version', required=False)
parser.add_argument('--run-id', type=str, help='run id', required=False)
parser.add_argument('--kedro-run-args', type=str, help='Arguments for kedro run', required=False)

args = parser.parse_args()

if args.env == 'dev' and args.release:
parser.error("--release should not be provided when env is 'dev'")
if args.env == 'dev' and not args.run_id:
parser.error("--run-id must be provided when env is 'dev'")
if args.env == 'prod' and not args.release:
parser.error("--release must be provided when env is 'prod'")
if args.env == 'prod' and args.run_id:
parser.error("--run-id should not be provided when env is 'prod'")

ENV = args.env
RELEASE = args.release
RUN_ID = args.run_id
KEDRO_RUN_ARGS = args.kedro_run_args

proc = subprocess.Popen(["git", "rev-parse", "--abbrev-ref", "HEAD"], stdout=subprocess.PIPE)
BRANCH_NAME, err = proc.communicate()
BRANCH_NAME = BRANCH_NAME.decode().strip()
BRANCH_NAME_CLEAN = re.sub(r'[^a-zA-Z0-9]', '-', BRANCH_NAME)

PROJECT_CONFIG = load_project_config()[ENV]

PROJECT_ID = PROJECT_CONFIG["PROJECT_ID"]
USECASE = PROJECT_CONFIG["USECASE"]
REGION = PROJECT_CONFIG["REGION"]
RUN_SUBMIT_BUCKET = PROJECT_CONFIG["DATAPROC"]["RUN_SUBMIT_BUCKET"]

if ENV == 'dev':
GCS_RUN_SUBMIT_DIR = f'gs://{RUN_SUBMIT_BUCKET}/{BRANCH_NAME_CLEAN}/{RUN_ID}'
elif ENV == 'prod':
GCS_RUN_SUBMIT_DIR = f'gs://{RUN_SUBMIT_BUCKET}/{RELEASE}'


MAIN_ENTRYPOINT_PY = f"{GCS_RUN_SUBMIT_DIR}/entrypoint.py"
GCS_CODE_ARCHIVE_URI = f'{GCS_RUN_SUBMIT_DIR}/code.zip'

LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR = "dataproc_runs"

if ENV == 'dev':
CONTAINER_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{USECASE}/dataproc-serverless:{BRANCH_NAME_CLEAN}"
elif ENV == 'prod':
CONTAINER_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{USECASE}/dataproc-serverless:{RELEASE}"


commands = [
# create staging directory
['mkdir', '-p', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}']
]


# For dev workflows, we create a new branch for each run with all changes auto-committed
if ENV == 'dev':

commands += [
# Stash uncommited changes, checkout new branch, apply stash, commit changes
["git", "stash"],
["git", "checkout", '-B', f"pipeline-run/{RUN_ID}"],
["git", "stash", "apply"],
["git", "add", "."],
["git", "commit", '-m', f"pipeline run: {RUN_ID}"]
]

commands += [
# Archive code
['git', 'archive', '-o', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}/code.zip', 'HEAD'],

# Copy run submit artifacts to GCS
['gsutil', 'cp', '-r', 'scripts/dataproc/main.py', MAIN_ENTRYPOINT_PY],
['gsutil', 'cp', '-r', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}/code.zip', GCS_CODE_ARCHIVE_URI],

# Return back to original git changes
["git", "checkout", "-"],
["git", "stash", "pop"]
]

run_subprocess_commands(commands)
```

## Dataproc Serverless

Dataproc Serverless allows you to run Spark jobs without needing to manage the underlying infrastructure. It automatically scales resources up and down based on the job requirements.
@@ -418,6 +308,150 @@ deployment/dataproc/serverless/build_push_docker.sh --env prod --release v0.1.2

#### Submit pyspark batches

`deployment/dataproc/serverless/submit_batches.py`

```python
import argparse
import subprocess
import sys
from datetime import datetime

# NOTE: Suggested pattern => Define your own project_utils module to store common functions
from project_utils import load_project_config

import re


def run_subprocess_commands(commands):
for command in commands:
print(f"Running command: {command}")
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
print(f"An error occurred: {e}")
sys.exit(1)


parser = argparse.ArgumentParser(description='Submit Kedro job to Dataproc')
parser.add_argument('--env', choices=['dev', 'prod'])
parser.add_argument('--release', type=str, help='Release version', required=False)
parser.add_argument('--run-id', type=str, help='run id', required=False)
parser.add_argument('--no-prepare-artifacts', action="store_true", help='whether to prepare run submit artifacts. Default: True', required=False)
parser.add_argument('--no-run-trigger', action="store_true", help='whether to trigger run. Default: True', required=False)
parser.add_argument('--kedro-run-args', type=str, help='kedro pipeline run args', required=False)

args = parser.parse_args()

if args.env == 'dev' and args.release:
parser.error("--release should not be provided when env is 'dev'")
if args.env == 'dev' and not args.run_id:
parser.error("--run-id must be provided when env is 'dev'")
if args.env == 'prod' and not args.release:
parser.error("--release must be provided when env is 'prod'")
if args.env == 'prod' and args.run_id:
parser.error("--run-id should not be provided when env is 'prod'")

ENV = args.env
RELEASE = args.release
RUN_ID = args.run_id
PREPARE_ARTIFACTS = not args.no_prepare_artifacts
TRIGGER_RUN = not args.no_run_trigger
KEDRO_RUN_ARGS = args.kedro_run_args

proc = subprocess.Popen(["git", "rev-parse", "--abbrev-ref", "HEAD"], stdout=subprocess.PIPE)
BRANCH_NAME, err = proc.communicate()
BRANCH_NAME = BRANCH_NAME.decode().strip()
BRANCH_NAME_CLEAN = re.sub(r'[^a-zA-Z0-9]', '-', BRANCH_NAME)

PROJECT_CONFIG = load_project_config()[ENV]

PROJECT_ID = PROJECT_CONFIG["PROJECT_ID"]
USECASE = PROJECT_CONFIG["USECASE"]
REGION = PROJECT_CONFIG["REGION"]
RUN_SUBMIT_BUCKET = PROJECT_CONFIG["DATAPROC"]["RUN_SUBMIT_BUCKET"]
RUN_SERVICE_ACCOUNT = PROJECT_CONFIG["DATAPROC"]["RUN_SERVICE_ACCOUNT"]
DATAPROC_RUNTIME_VERSION = PROJECT_CONFIG["DATAPROC"]["SERVERLESS"]["RUNTIME_VERSION"]

if ENV == 'dev':
GCS_RUN_SUBMIT_DIR = f'gs://{RUN_SUBMIT_BUCKET}/{BRANCH_NAME_CLEAN}/{RUN_ID}'
elif ENV == 'prod':
GCS_RUN_SUBMIT_DIR = f'gs://{RUN_SUBMIT_BUCKET}/{RELEASE}'


if ENV == 'dev':
MAIN_ENTRYPOINT_PY = f"{GCS_RUN_SUBMIT_DIR}/entrypoint.py"
elif ENV == 'prod':
MAIN_ENTRYPOINT_PY = "file://home/kedro/code/deployment/dataproc/entrypoint.py"

GCS_CODE_ARCHIVE_URI = f'{GCS_RUN_SUBMIT_DIR}/code.zip'

LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR = "dataproc_runs"

if ENV == 'dev':
CONTAINER_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{USECASE}/dataproc-serverless:{BRANCH_NAME_CLEAN}"
elif ENV == 'prod':
CONTAINER_IMAGE_URI = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{USECASE}/dataproc-serverless:{RELEASE}"



if PREPARE_ARTIFACTS:
commands = [
# create staging directory
['mkdir', '-p', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}']
]


# For dev workflows, we create a new branch for each run with all changes auto-committed
if ENV == 'dev':

commands += [
# Stash uncommited changes, checkout new branch, apply stash, commit changes
["git", "stash"],
["git", "checkout", '-B', f"pipeline-run/{RUN_ID}"],
["git", "stash", "apply"],
["git", "add", "."],
["git", "commit", '-m', f"pipeline run: {RUN_ID}"]
]

commands += [
# Archive code
['git', 'archive', '-o', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}/code.zip', 'HEAD'],

# Copy run submit artifacts to GCS
['gsutil', 'cp', '-r', 'scripts/dataproc/main.py', MAIN_ENTRYPOINT_PY],
['gsutil', 'cp', '-r', f'{LOCAL_DATAPROC_RUN_SUBMIT_STAGING_DIR}/code.zip', GCS_CODE_ARCHIVE_URI],

# Return back to original git changes
["git", "checkout", "-"],
["git", "stash", "pop"]
]

if TRIGGER_RUN:

trigger_command = [
'gcloud', 'dataproc', 'batches', 'submit', 'pyspark', MAIN_ENTRYPOINT_PY,
'--region', REGION,
'--container-image', CONTAINER_IMAGE_URI,
'--service-account', RUN_SERVICE_ACCOUNT,
'--properties', 'spark.executor.instances=10' # Add more spark conf here
'--version', DATAPROC_RUNTIME_VERSION,
]

if ENV == 'dev':
trigger_command += [
'--archives', f'{GCS_CODE_ARCHIVE_URI}#code'
]

if KEDRO_RUN_ARGS:
trigger_command += [
'--', f'--kedro-run-args={KEDRO_RUN_ARGS}'
]

commands += [trigger_command]

run_subprocess_commands(commands)
```

For dev workflow:

```bash