From 66a288174d3431dee5c8fd02ca1ed5da7d0f6807 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sat, 15 Mar 2025 20:09:20 +0900 Subject: [PATCH 01/10] yr_elt --- airflow/Dockerfile | 16 +++++-- airflow/dags/S3_Spark_SnowFlake_DAG.py | 5 +- airflow/dags/eventsim_ETL_DAG.py | 2 +- airflow/dags/plugins/spark_snowflake_conn.py | 47 +++++-------------- .../scripts/ELT_artist_info_globalTop50.py | 4 +- airflow/dags/scripts/ELT_artist_info_top10.py | 4 +- airflow/dags/scripts/eventsim_ETL_script.py | 2 - airflow/dags/spotify_ELT_DAG.py | 20 +++++++- airflow/docker-compose.yaml | 2 + 9 files changed, 51 insertions(+), 51 deletions(-) diff --git a/airflow/Dockerfile b/airflow/Dockerfile index 490e5ab..717504f 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -9,9 +9,19 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li apt-get update && apt-get install -y \ wget openjdk-11-jdk && \ mkdir -p $SPARK_JAR_DIR && \ - wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ - wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ - wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ + wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar \ + https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ + wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar \ + https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + # Snowflake JDBC Driver + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.13.33.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.33/snowflake-jdbc-3.13.33.jar && \ + # Snowflake Spark Connector + wget -O $SPARK_JAR_DIR/spark-snowflake_2.12-2.12.0-spark_3.4.jar \ + https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar && \ + # Clean up to reduce image size apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 27436f0..3120d31 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -2,13 +2,12 @@ from datetime import datetime, timedelta from airflow import DAG -from airflow.models import Variable from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator # .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None) -AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) +AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index baf5058..ada1216 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -21,7 +21,7 @@ default_args = { "owner": "sanghyeok_boo", "start_date": datetime(2025, 3, 1), - "end_date": datetime(2025, 3, 7), + "end_date": datetime(2025, 3, 2), "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index ab33988..a1fb0fe 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -1,31 +1,22 @@ +import os from datetime import datetime -import snowflake.connector +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook from pyspark.sql import SparkSession -from airflow.models import Variable - -SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") -SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") -SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") -SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") -SNOWFLAKE_DB = "S4TIFY" -SNOWFLAKE_SCHEMA = "RAW_DATA" - -AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") - +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") TODAY = datetime.now().strftime("%Y-%m-%d") snowflake_options = { - "sfURL": SNOWFLAKE_URL, - "sfDatabase": SNOWFLAKE_DB, - "sfSchema": SNOWFLAKE_SCHEMA, - "sfWarehouse": "COMPUTE_WH", - "sfRole": "ANALYTICS_USERS", - "sfUser": SNOWFLAKE_USER, - "sfPassword": SNOWFLKAE_USER_PWD, + "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), } @@ -47,22 +38,10 @@ def create_spark_session(app_name: str): return spark -def create_snowflake_conn(): - conn = snowflake.connector.connect( - user=SNOWFLAKE_USER, - password=SNOWFLKAE_USER_PWD, - account=SNOWFLAKE_ACCOUNT, - warehouse="COMPUTE_WH", - database=SNOWFLAKE_DB, - schema=SNOWFLAKE_SCHEMA, - ) - - return conn - - def create_snowflake_table(sql): - conn = create_snowflake_conn() + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() cur = conn.cursor() try: diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index efff14a..1a5705a 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -8,9 +8,7 @@ regexp_replace, split, when) from pyspark.sql.types import IntegerType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index e55f5d5..9dd22fa 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -6,9 +6,7 @@ from pyspark.sql.functions import col, current_date, regexp_replace, split, udf from pyspark.sql.types import ArrayType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/scripts/eventsim_ETL_script.py b/airflow/dags/scripts/eventsim_ETL_script.py index 35cb843..4e66de2 100644 --- a/airflow/dags/scripts/eventsim_ETL_script.py +++ b/airflow/dags/scripts/eventsim_ETL_script.py @@ -7,8 +7,6 @@ StructType) from spark_utils import execute_snowflake_query, spark_session_builder -from airflow.models import Variable - load_dotenv() # SNOW_FLAKE 설정 diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 2f4ea9d..1bda5a7 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -8,6 +8,20 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator + +# Spakr JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.13.33.jar"), + os.path.join(SPARK_HOME, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + + + # DAG 기본 설정 default_args = { "owner": "yerin", @@ -27,14 +41,16 @@ artist_info_Top10_table = SparkSubmitOperator( task_id="artist_info_top10_table", application="dags/scripts/ELT_artist_info_top10.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, ) artist_info_globalTop50_table = SparkSubmitOperator( task_id="artist_info_globalTop50_table", application="dags/scripts/ELT_artist_info_globalTop50.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, ) diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index 3fbb066..35199b7 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -85,6 +85,8 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION + AIRFLOW_CONN_SNOWFLAKE_CONN: snowflake://$SNOWFLAKE_USER:$SNOWFLAKE_PASSWORD@$SNOWFLAKE_SCHEMA/?account=$SNOWFLAKE_ACCOUNT&warehouse=$SNOWFLAKE_WH&database=$SNOWFLAKE_DB&schema=$SNOWFLAKE_SCHEMA&role=$SNOWFLAKE_ROLE + AIRFLOW_CONN_SPARK_CONN: Spark://spark://spark-master:7077/?deploy-mode=client&spark-binary=spark-submit _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake} # The following line can be used to set a custom config file, stored in the local config folder # If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file From 8a9bdc371e7b88cfae4bfd2f9ca95e5fda719fdd Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 13:51:37 +0900 Subject: [PATCH 02/10] [Update]BH_ELT --- airflow/.env | 12 +++--- airflow/dags/S3_Spark_SnowFlake_DAG.py | 4 +- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 38 +++++++------------ 3 files changed, 21 insertions(+), 33 deletions(-) diff --git a/airflow/.env b/airflow/.env index e983c51..f426747 100644 --- a/airflow/.env +++ b/airflow/.env @@ -18,12 +18,12 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 - - #S3_Spark_SnowFlake_ELT -SNOWFLAKE_USER= BY -SNOWFLAKE_PASSWORD= Zmfflsprtm1234 +SNOWFLAKE_USER= BSH +SNOWFLAKE_PASSWORD= BSH1234! +SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 SNOWFLAKE_WH= COMPUTE_WH +SNOWFLAKE_DB=S4TIFY SNOWFLAKE_SCHEMA = RAW_DATA -SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 -SNOWFLAKE_DB=S4TIFY \ No newline at end of file +SNOWFLAKE_ROLE=ANALYTICS_USERS + diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 3120d31..fec3677 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -6,8 +6,8 @@ SparkSubmitOperator # .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) -AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index b2ca4a0..6027200 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -5,7 +5,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, lit, when -from airflow.models import Variable +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # Spark JARs 설정 SPARK_HOME = "/opt/spark/" @@ -30,7 +30,6 @@ "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } - # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( @@ -40,30 +39,28 @@ def spark_session_builder(app_name: str) -> SparkSession: "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.hadoop.fs.s3a.access.key", - os.getenv("AWS_ACCESS_KEY")) .config( + os.getenv("AWS_ACCESS_KEY_ID")) .config( "spark.hadoop.fs.s3a.secret.key", - os.getenv("AWS_SECRET_KEY")) .config( + os.getenv("AWS_SECRET_ACCESS_KEY")) .config( "spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") .config( "spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", ) .getOrCreate()) +# snowflake connector +def create_snowflake_conn(): + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() + cur = conn.cursor() + return conn,cur + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() - + conn,cur = create_snowflake_conn() + # 테이블 존재 여부 확인 cur.execute( f""" @@ -113,16 +110,7 @@ def escape_quotes(value): # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() + conn,cur = create_snowflake_conn() for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] From 60cb2670bfa0030017366410d93e3e1bef61208c Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sat, 15 Mar 2025 20:09:20 +0900 Subject: [PATCH 03/10] yr_elt rebase --- airflow/dags/ETL_eventsim_DAG.py | 2 +- airflow/dags/S3_Spark_SnowFlake_DAG.py | 5 ++--- .../scripts/ELT_artist_info_globalTop50.py | 4 +--- airflow/dags/scripts/ELT_artist_info_top10.py | 4 +--- airflow/dags/spotify_ELT_DAG.py | 20 +++++++++++++++++-- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index ba539fd..0411101 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -13,7 +13,7 @@ default_args = { "owner": "sanghyeok_boo", "start_date": datetime(2025, 3, 1), - "end_date": datetime(2025, 3, 7), + "end_date": datetime(2025, 3, 2), "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 27436f0..3120d31 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -2,13 +2,12 @@ from datetime import datetime, timedelta from airflow import DAG -from airflow.models import Variable from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator # .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None) -AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) +AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index efff14a..1a5705a 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -8,9 +8,7 @@ regexp_replace, split, when) from pyspark.sql.types import IntegerType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index e55f5d5..9dd22fa 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -6,9 +6,7 @@ from pyspark.sql.functions import col, current_date, regexp_replace, split, udf from pyspark.sql.types import ArrayType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 2f4ea9d..1bda5a7 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -8,6 +8,20 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator + +# Spakr JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.13.33.jar"), + os.path.join(SPARK_HOME, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + + + # DAG 기본 설정 default_args = { "owner": "yerin", @@ -27,14 +41,16 @@ artist_info_Top10_table = SparkSubmitOperator( task_id="artist_info_top10_table", application="dags/scripts/ELT_artist_info_top10.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, ) artist_info_globalTop50_table = SparkSubmitOperator( task_id="artist_info_globalTop50_table", application="dags/scripts/ELT_artist_info_globalTop50.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, ) From 8afa1fd2bb979a9756b5d9f8644681a3410e8bee Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 13:51:37 +0900 Subject: [PATCH 04/10] yr_elt rebase --- airflow/.env | 2 +- airflow/dags/S3_Spark_SnowFlake_DAG.py | 4 +- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 38 +++++++------------ 3 files changed, 16 insertions(+), 28 deletions(-) diff --git a/airflow/.env b/airflow/.env index ca5fac9..c5860d5 100644 --- a/airflow/.env +++ b/airflow/.env @@ -25,4 +25,4 @@ SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 SNOWFLAKE_WH= COMPUTE_WH SNOWFLAKE_DB=S4TIFY SNOWFLAKE_SCHEMA = RAW_DATA -SNOWFLAKE_ROLE=ANALYTICS_USERS \ No newline at end of file +SNOWFLAKE_ROLE=ANALYTICS_USERS diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 3120d31..fec3677 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -6,8 +6,8 @@ SparkSubmitOperator # .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID", None) -AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", None) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index b2ca4a0..6027200 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -5,7 +5,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, lit, when -from airflow.models import Variable +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # Spark JARs 설정 SPARK_HOME = "/opt/spark/" @@ -30,7 +30,6 @@ "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } - # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( @@ -40,30 +39,28 @@ def spark_session_builder(app_name: str) -> SparkSession: "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.hadoop.fs.s3a.access.key", - os.getenv("AWS_ACCESS_KEY")) .config( + os.getenv("AWS_ACCESS_KEY_ID")) .config( "spark.hadoop.fs.s3a.secret.key", - os.getenv("AWS_SECRET_KEY")) .config( + os.getenv("AWS_SECRET_ACCESS_KEY")) .config( "spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") .config( "spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", ) .getOrCreate()) +# snowflake connector +def create_snowflake_conn(): + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() + cur = conn.cursor() + return conn,cur + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() - + conn,cur = create_snowflake_conn() + # 테이블 존재 여부 확인 cur.execute( f""" @@ -113,16 +110,7 @@ def escape_quotes(value): # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() + conn,cur = create_snowflake_conn() for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] From 9018db5009557e6c07d87327e0ba562800efac79 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 07:40:18 +0000 Subject: [PATCH 05/10] Automated format fixes --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 12 +++++++----- airflow/dags/spotify_ELT_DAG.py | 2 -- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 6027200..0fec472 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -30,6 +30,7 @@ "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } + # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( @@ -48,19 +49,20 @@ def spark_session_builder(app_name: str) -> SparkSession: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", ) .getOrCreate()) -# snowflake connector + +# snowflake connector def create_snowflake_conn(): hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") conn = hook.get_conn() cur = conn.cursor() - return conn,cur + return conn, cur # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: - conn,cur = create_snowflake_conn() - + conn, cur = create_snowflake_conn() + # 테이블 존재 여부 확인 cur.execute( f""" @@ -110,7 +112,7 @@ def escape_quotes(value): # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): try: - conn,cur = create_snowflake_conn() + conn, cur = create_snowflake_conn() for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 1bda5a7..8c3fb96 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -8,7 +8,6 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator - # Spakr JARs 설정 SPARK_HOME = os.environ.get("SPARK_JAR_DIR") SPARK_JARS = ",".join( @@ -21,7 +20,6 @@ ) - # DAG 기본 설정 default_args = { "owner": "yerin", From a2783210ba8765b50b1076aa6eed9b947fae2346 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 16:59:06 +0900 Subject: [PATCH 06/10] date_change --- airflow/dags/ETL_eventsim_DAG.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index 0411101..ba539fd 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -13,7 +13,7 @@ default_args = { "owner": "sanghyeok_boo", "start_date": datetime(2025, 3, 1), - "end_date": datetime(2025, 3, 2), + "end_date": datetime(2025, 3, 7), "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } From 37248a7d13123c3581c6947d991b885d3d5a3149 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 17:00:59 +0900 Subject: [PATCH 07/10] update --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 672a71d..fd4239f 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -118,11 +118,7 @@ def escape_quotes(value): # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): try: -<<<<<<< HEAD - conn,cur = create_snowflake_conn() -======= conn, cur = create_snowflake_conn() ->>>>>>> 9018db5009557e6c07d87327e0ba562800efac79 for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] From ede163800b89f9c74450c93fa59bf0d248972ee8 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 08:01:30 +0000 Subject: [PATCH 08/10] Automated format fixes --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index fd4239f..a701ebd 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -30,6 +30,7 @@ "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } + # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( @@ -48,12 +49,13 @@ def spark_session_builder(app_name: str) -> SparkSession: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", ) .getOrCreate()) -# snowflake connector + +# snowflake connector def create_snowflake_conn(): hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") conn = hook.get_conn() cur = conn.cursor() - return conn,cur + return conn, cur # snowflake connector From 70a7b10a751fdf445d5960010c39065ab2274aa1 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 17:04:20 +0900 Subject: [PATCH 09/10] delete_code_duplicate --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index fd4239f..bde7975 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -55,15 +55,6 @@ def create_snowflake_conn(): cur = conn.cursor() return conn,cur - -# snowflake connector -def create_snowflake_conn(): - hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") - conn = hook.get_conn() - cur = conn.cursor() - return conn, cur - - # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: From d5ab4e119fcf9b182d681cab3d7ec0e64125537c Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 08:07:38 +0000 Subject: [PATCH 10/10] Automated format fixes --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index cf7581f..0fec472 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -57,6 +57,7 @@ def create_snowflake_conn(): cur = conn.cursor() return conn, cur + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: