diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index 8ffb3ac..e527c42 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -7,7 +7,7 @@ SparkSubmitOperator # S3 및 Snowflake 설정 -S3_BUCKET = "s3a://eventsim-log" +S3_BUCKET = "s3a://de5-s4tify" # Spark JARs 설정 SPARK_HOME = os.environ.get("SPARK_JAR_DIR") SPARK_JARS = ",".join( @@ -20,8 +20,8 @@ default_args = { "owner": "airflow", - "start_date": datetime(2025, 2, 25), - "end_date": datetime(2025, 3, 1), + "start_date": datetime(2025, 3, 1), + "end_date": datetime(2025, 3, 7), "retries": 1, "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } @@ -42,7 +42,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/spark_jobs/eventsim_ETL.py", + application="/opt/airflow/dags/scripts/eventsim_ETL.py", conn_id="spark_conn", application_args=[ S3_BUCKET, diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py index 255181f..a1784f5 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -2,31 +2,24 @@ import sys from datetime import datetime +from plugins.spark_utils import execute_snowflake_query, spark_session_builder from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) from airflow.models import Variable -from ..dags.plugins.spark_utils import (execute_snowflake_query, - spark_session_builder) - -BASE_DIR = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..") -) # S4tify 루트 디렉토리 -sys.path.append(BASE_DIR) # sys.path에 S4tify 추가 - - # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" -SNOWFLAKE_SCHEMA = "raw_data" +SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { "user": Variable.get("SNOWFLAKE_USER"), "password": Variable.get("SNOWFLAKE_PASSWORD"), "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "DATA_WAREHOUSE"), + "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "raw_data", + "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", + "role": Variable.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } @@ -64,7 +57,7 @@ # -------------------CREATE TABLE-------------------- # 테이블 생성 create_table_sql = f""" -CREATE TABLE IF NOT EXISTS raw_data.EVENTSIM_LOG ( +CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE} ( song STRING, artist STRING, location STRING, @@ -74,6 +67,7 @@ ); """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) +print("Create Table") # -----------------------UPSERT---------------------- # Snowflake TEMP 테이블에 데이터 적재 df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( diff --git a/eventsim-streaming b/eventsim-streaming deleted file mode 160000 index 2cfd5de..0000000 --- a/eventsim-streaming +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 2cfd5deaaae7df84b031dd7a466de62f0e294830