Skip to content

Commit

Permalink
feat(component): Created Snowflake data unload component (#11349)
Browse files Browse the repository at this point in the history
* Created Snowflake data unload component

Signed-off-by: Jun Sheng <jscheng@google.com>

* chore: rename subfolder

Signed-off-by: Jun Sheng <jscheng@google.com>

---------

Signed-off-by: Jun Sheng <jscheng@google.com>
  • Loading branch information
ChaosEternal authored Dec 2, 2024
1 parent 13f83cf commit 22e7780
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 0 deletions.
156 changes: 156 additions & 0 deletions components/snowflake/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# PIPELINE DEFINITION
# Name: snowflake-unload-op
# Description: Run COPY in snowflake to unload data to GCS bucket.
# output_gcs_path: the location to land the data
# sf_storage_integration: the Snowflake Storage Integration name
# query: the query to execute in the COPY command
# sf_user: the snowflake username
# sf_password: the snowflake password
# sf_warehouse: the snowflake warehouse name
# sf_database: the database to use
# Inputs:
# output_gcs_path: str
# query: str
# sf_account: str
# sf_database: str
# sf_password: str
# sf_storage_integration: str
# sf_user: str
# sf_warehouse: str
# Outputs:
# Output: str
components:
comp-snowflake-unload-op:
executorLabel: exec-snowflake-unload-op
inputDefinitions:
parameters:
output_gcs_path:
parameterType: STRING
query:
parameterType: STRING
sf_account:
parameterType: STRING
sf_database:
parameterType: STRING
sf_password:
parameterType: STRING
sf_storage_integration:
parameterType: STRING
sf_user:
parameterType: STRING
sf_warehouse:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
deploymentSpec:
executors:
exec-snowflake-unload-op:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- snowflake_unload_op
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'snowflake-connector-python:3.12.3'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef snowflake_unload_op(\n output_gcs_path: str,\n sf_storage_integration:\
\ str,\n query: str,\n sf_user: str,\n sf_password: str,\n sf_account:\
\ str,\n sf_warehouse: str,\n sf_database: str\n ) -> str:\n \
\ \"\"\"\n Run COPY in snowflake to unload data to GCS bucket.\n\n \
\ output_gcs_path: the location to land the data\n sf_storage_integration:\
\ the Snowflake Storage Integration name\n query: the query to execute\
\ in the COPY command\n sf_user: the snowflake username\n sf_password:\
\ the snowflake password\n sf_warehouse: the snowflake warehouse name\n\
\ sf_database: the database to use\n \"\"\"\n import snowflake.connector\n\
\ conn = snowflake.connector.connect(user=sf_user,\n \
\ password=sf_password,\n \
\ account=sf_account,\n \
\ role=\"ACCOUNTADMIN\")\n\n conn.cursor().execute(f\"USE WAREHOUSE\
\ {sf_warehouse};\")\n conn.cursor().execute(f\"USE DATABASE {sf_database};\"\
)\n result = conn.cursor().execute(f\"\"\"\n COPY INTO 'gcs://{output_gcs_path}'\n\
\ FROM ({query})\n FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE)\n \
\ STORAGE_INTEGRATION = {sf_storage_integration}\n HEADER = TRUE\n\
\ \"\"\")\n _ = result.fetchall()\n if output_gcs_path.endswith(\"\
/\"):\n return output_gcs_path + \"data_0_0_0.csv\"\n else:\n\
\ return output_gcs_path\n\n"
image: python:3.11
pipelineInfo:
name: snowflake-unload-op
root:
dag:
outputs:
parameters:
Output:
valueFromParameter:
outputParameterKey: Output
producerSubtask: snowflake-unload-op
tasks:
snowflake-unload-op:
cachingOptions:
enableCache: true
componentRef:
name: comp-snowflake-unload-op
inputs:
parameters:
output_gcs_path:
componentInputParameter: output_gcs_path
query:
componentInputParameter: query
sf_account:
componentInputParameter: sf_account
sf_database:
componentInputParameter: sf_database
sf_password:
componentInputParameter: sf_password
sf_storage_integration:
componentInputParameter: sf_storage_integration
sf_user:
componentInputParameter: sf_user
sf_warehouse:
componentInputParameter: sf_warehouse
taskInfo:
name: snowflake-unload-op
inputDefinitions:
parameters:
output_gcs_path:
parameterType: STRING
query:
parameterType: STRING
sf_account:
parameterType: STRING
sf_database:
parameterType: STRING
sf_password:
parameterType: STRING
sf_storage_integration:
parameterType: STRING
sf_user:
parameterType: STRING
sf_warehouse:
parameterType: STRING
outputDefinitions:
parameters:
Output:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.7.0
56 changes: 56 additions & 0 deletions components/snowflake/snowflake_unload_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
This is a KFP component doing "unload data to GCS bucket" operation
from the Snowflake database.
"""
from kfp import compiler
from kfp.dsl import component

@component(
base_image="python:3.11",
packages_to_install=["snowflake-connector-python:3.12.3"]
)
def snowflake_unload_op(
output_gcs_path: str,
sf_storage_integration: str,
query: str,
sf_user: str,
sf_password: str,
sf_account: str,
sf_warehouse: str,
sf_database: str
) -> str:
"""
Run COPY in snowflake to unload data to GCS bucket.
output_gcs_path: the location to land the data
sf_storage_integration: the Snowflake Storage Integration name
query: the query to execute in the COPY command
sf_user: the snowflake username
sf_password: the snowflake password
sf_warehouse: the snowflake warehouse name
sf_database: the database to use
"""
import snowflake.connector
conn = snowflake.connector.connect(user=sf_user,
password=sf_password,
account=sf_account,
role="ACCOUNTADMIN")

conn.cursor().execute(f"USE WAREHOUSE {sf_warehouse};")
conn.cursor().execute(f"USE DATABASE {sf_database};")
result = conn.cursor().execute(f"""
COPY INTO 'gcs://{output_gcs_path}'
FROM ({query})
FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE)
STORAGE_INTEGRATION = {sf_storage_integration}
HEADER = TRUE
""")
_ = result.fetchall()
if output_gcs_path.endswith("/"):
return output_gcs_path + "data_0_0_0.csv"
else:
return output_gcs_path

if __name__ == "__main__":
compiler.Compiler().compile(pipeline_func=snowflake_unload_op,
package_path="component.yaml")

0 comments on commit 22e7780

Please sign in to comment.