From 413e2c1389b2c34780ca8a0ab63387d508073b83 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Mon, 10 Mar 2025 17:58:42 +0900 Subject: [PATCH 1/5] chagne snowflake properties --- airflow/dags/scripts/eventsim_ETL.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py index 255181f..354974e 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -7,14 +7,9 @@ from airflow.models import Variable -from ..dags.plugins.spark_utils import (execute_snowflake_query, +from plugins.spark_utils import (execute_snowflake_query, spark_session_builder) -BASE_DIR = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..") -) # S4tify 루트 디렉토리 -sys.path.append(BASE_DIR) # sys.path에 S4tify 추가 - # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" @@ -24,7 +19,7 @@ "user": Variable.get("SNOWFLAKE_USER"), "password": Variable.get("SNOWFLAKE_PASSWORD"), "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "DATA_WAREHOUSE"), + "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "raw_data", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", From 558f5bd741dcd51647da6644a8a42467c2347bfa Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Mon, 10 Mar 2025 18:02:15 +0900 Subject: [PATCH 2/5] chagne snowflake properties --- airflow/dags/eventsim_ETL_DAG.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index 8ffb3ac..a09e9e9 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -7,7 +7,7 @@ SparkSubmitOperator # S3 및 Snowflake 설정 -S3_BUCKET = "s3a://eventsim-log" +S3_BUCKET = "s3a://de5-s4tify" # Spark JARs 설정 SPARK_HOME = os.environ.get("SPARK_JAR_DIR") SPARK_JARS = ",".join( From c6b7a6c7805d09bca13499def17ae95d69aec222 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Mon, 10 Mar 2025 18:03:17 +0900 Subject: [PATCH 3/5] chagne snowflake properties --- airflow/dags/eventsim_ETL_DAG.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index a09e9e9..2bae95f 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -20,8 +20,8 @@ default_args = { "owner": "airflow", - "start_date": datetime(2025, 2, 25), - "end_date": datetime(2025, 3, 1), + "start_date": datetime(2025, 2, 29), + "end_date": datetime(2025, 3, 7), "retries": 1, "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } From 55d90c4bca4566632878501c1c4c53f0e77ed5c3 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Mon, 10 Mar 2025 21:30:08 +0900 Subject: [PATCH 4/5] =?UTF-8?q?ETL=20DAG=20endpoint=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/eventsim_ETL_DAG.py | 6 +++--- airflow/dags/scripts/eventsim_ETL.py | 8 +++++--- eventsim-streaming | 1 - 3 files changed, 8 insertions(+), 7 deletions(-) delete mode 160000 eventsim-streaming diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index 2bae95f..203f7e4 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -20,7 +20,7 @@ default_args = { "owner": "airflow", - "start_date": datetime(2025, 2, 29), + "start_date": datetime(2025, 3, 1), "end_date": datetime(2025, 3, 7), "retries": 1, "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], @@ -42,7 +42,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/spark_jobs/eventsim_ETL.py", + application="/opt/airflow/dags/scripts/eventsim_ETL.py", conn_id="spark_conn", application_args=[ S3_BUCKET, @@ -56,7 +56,7 @@ "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_conn.login }}", "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_conn.password }}", "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", - "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" }, dag=dag, ) diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py index 354974e..38bc88e 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -14,14 +14,15 @@ # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" -SNOWFLAKE_SCHEMA = "raw_data" +SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { "user": Variable.get("SNOWFLAKE_USER"), "password": Variable.get("SNOWFLAKE_PASSWORD"), "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "raw_data", + "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", + "role": Variable.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } @@ -59,7 +60,7 @@ # -------------------CREATE TABLE-------------------- # 테이블 생성 create_table_sql = f""" -CREATE TABLE IF NOT EXISTS raw_data.EVENTSIM_LOG ( +CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE} ( song STRING, artist STRING, location STRING, @@ -69,6 +70,7 @@ ); """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) +print("Create Table") # -----------------------UPSERT---------------------- # Snowflake TEMP 테이블에 데이터 적재 df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( diff --git a/eventsim-streaming b/eventsim-streaming deleted file mode 160000 index 2cfd5de..0000000 --- a/eventsim-streaming +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 2cfd5deaaae7df84b031dd7a466de62f0e294830 From 24a229e073d2d67b97acfd864d17b89b7150e7e9 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Mon, 10 Mar 2025 12:31:12 +0000 Subject: [PATCH 5/5] Automated format fixes --- airflow/dags/eventsim_ETL_DAG.py | 2 +- airflow/dags/scripts/eventsim_ETL.py | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index 203f7e4..e527c42 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -56,7 +56,7 @@ "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_conn.login }}", "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_conn.password }}", "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", - "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider" + "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", }, dag=dag, ) diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py index 38bc88e..a1784f5 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -2,15 +2,12 @@ import sys from datetime import datetime +from plugins.spark_utils import execute_snowflake_query, spark_session_builder from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) from airflow.models import Variable -from plugins.spark_utils import (execute_snowflake_query, - spark_session_builder) - - # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP"