From be44711f560f9799c1949689add6f7cef944da73 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Sun, 16 Mar 2025 16:21:44 +0900 Subject: [PATCH 1/4] bugfix --- airflow/.env | 15 +- airflow/Dockerfile | 16 +- airflow/dags/ELT_eventsim_song_count_DAG.py | 148 ++---------------- airflow/dags/ETL_eventsim_DAG.py | 18 +-- .../{spark_utils.py => snowflake_utils.py} | 36 ----- airflow/dags/plugins/spark_snowflake_conn.py | 55 ++----- airflow/dags/plugins/variables.py | 42 +++++ airflow/dags/scripts/ELT_eventsim_script.py | 54 +++++++ airflow/dags/scripts/ETL_eventsim_script.py | 44 ++---- airflow/docker-compose.yaml | 2 + 10 files changed, 157 insertions(+), 273 deletions(-) rename airflow/dags/plugins/{spark_utils.py => snowflake_utils.py} (70%) create mode 100644 airflow/dags/plugins/variables.py create mode 100644 airflow/dags/scripts/ELT_eventsim_script.py diff --git a/airflow/.env b/airflow/.env index cbc0216..ca5fac9 100644 --- a/airflow/.env +++ b/airflow/.env @@ -18,16 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 -# BSH SNOWFLAKE CONN INFO -SNOWFLAKE_USER_BSH= BSH -SNOWFLAKE_PASSWORD_BSH= BSH1234! -SNOWFLAKE_SILVER_SCHEMA = RAW_DATA -SNOWFLAKE_GOLD_SCHEMA = ANALYTICS - #S3_Spark_SnowFlake_ELT -SNOWFLAKE_USER= BY -SNOWFLAKE_PASSWORD= Zmfflsprtm1234 +SNOWFLAKE_USER= BSH +SNOWFLAKE_PASSWORD= BSH1234! +SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 SNOWFLAKE_WH= COMPUTE_WH +SNOWFLAKE_DB=S4TIFY SNOWFLAKE_SCHEMA = RAW_DATA -SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 -SNOWFLAKE_DB=S4TIFY \ No newline at end of file +SNOWFLAKE_ROLE=ANALYTICS_USERS \ No newline at end of file diff --git a/airflow/Dockerfile b/airflow/Dockerfile index 490e5ab..717504f 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -9,9 +9,19 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li apt-get update && apt-get install -y \ wget openjdk-11-jdk && \ mkdir -p $SPARK_JAR_DIR && \ - wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ - wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ - wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ + wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar \ + https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ + wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar \ + https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + # Snowflake JDBC Driver + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.13.33.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.33/snowflake-jdbc-3.13.33.jar && \ + # Snowflake Spark Connector + wget -O $SPARK_JAR_DIR/spark-snowflake_2.12-2.12.0-spark_3.4.jar \ + https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar && \ + # Clean up to reduce image size apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py index 03f0a91..e84fdfe 100644 --- a/airflow/dags/ELT_eventsim_song_count_DAG.py +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -1,128 +1,12 @@ import os from datetime import timedelta -import pandas as pd -import snowflake.connector -from dotenv import load_dotenv -from spark_utils import execute_snowflake_query - from airflow import DAG -from airflow.exceptions import AirflowFailException -from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago -load_dotenv() - -# SNOWFLAKE 설정 -SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" -SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") -SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" -SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" -SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") - -SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER_BSH"), - "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), - "account": os.environ.get("SNOWFLAKE_ACCOUNT"), - "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), -} - - -def extract_data_from_snowflake(): - """ - Snowflake에서 데이터를 읽어오는 함수 - """ - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA - - query = f""" - SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS - FROM {SNOWFLAKE_SOURCE_TABLE} - """ - try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_PROPERTIES["user"], - password=SNOWFLAKE_PROPERTIES["password"], - account=SNOWFLAKE_PROPERTIES["account"], - database=SNOWFLAKE_PROPERTIES["db"], - schema=SNOWFLAKE_PROPERTIES["schema"], - warehouse=SNOWFLAKE_PROPERTIES["warehouse"], - role=SNOWFLAKE_PROPERTIES["role"], - ) - cur = conn.cursor() - rows = cur.execute(query).fetchall() - columns = [desc[0] for desc in cur.description] - - df = pd.DataFrame(rows, columns=columns) - print(df.head(5)) - print("Data successfully extracted from Snowflake!") - return df - - except Exception as e: - print(f"Error extracting data from Snowflake: {e}") - return None - - finally: - cur.close() - conn.close() - - -def process_song_counts(**kwargs): - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs["ti"] - df = ti.xcom_pull(task_ids="extract_data") - - if df is None or df.empty: - raise AirflowFailException( - "No data available for processing song counts.") - - # 노래 카운트 계산 - song_counts = df.groupby( - ["SONG", "ARTIST"]).size().reset_index(name="song_count") - - # 테이블 비우기 - execute_snowflake_query( - f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES - ) - - # SQL 쿼리 (executemany 사용) - insert_query = f""" - INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count) - VALUES (%s, %s, %s) - """ - - # DataFrame을 리스트로 변환 후 executemany 실행 - execute_snowflake_query( - insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist() - ) - - print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.") - - -def process_artist_counts(**kwargs): - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs["ti"] - df = ti.xcom_pull(task_ids="extract_data") - if df is None or df.empty: - raise AirflowFailException( - "No data available for processing artist counts.") - - artist_counts = df.groupby( - "ARTIST").size().reset_index(name="artist_count") - execute_snowflake_query( - f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", - SNOWFLAKE_PROPERTIES) - insert_query = f""" - INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count) - VALUES (%s, %s) - """ - execute_snowflake_query( - insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist() - ) - print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.") - +from dags.plugins.variables import SPARK_JARS # DAG 설정 default_args = { @@ -148,26 +32,14 @@ def process_artist_counts(**kwargs): dag=dag, ) -extract_data_task = PythonOperator( - task_id="extract_data", - python_callable=extract_data_from_snowflake, - provide_context=True, - dag=dag, -) - -process_song_task = PythonOperator( - task_id="process_song_counts", - python_callable=process_song_counts, - provide_context=True, - dag=dag, -) - -process_artist_task = PythonOperator( - task_id="process_artist_counts", - python_callable=process_artist_counts, - provide_context=True, +spark_submit_task = SparkSubmitOperator( + task_id="process_songs_and_artists_spark", + application="dags/scripts/ELT_eventsim_script.py", + conn_id="spark_conn", + executor_memory="2g", + driver_memory="1g", + jars=SPARK_JARS, dag=dag, ) -trigger_dag_task >> extract_data_task >> [ - process_song_task, process_artist_task] +trigger_dag_task >> spark_submit_task diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index 668dbc8..ffa31b4 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -9,12 +9,13 @@ # S3 및 Snowflake 설정 S3_BUCKET = "s3a://de5-s4tify" # Spark JARs 설정 -SPARK_HOME = os.environ.get("SPARK_JAR_DIR") +SPARK_JAR_DIR = os.environ.get("SPARK_JAR_DIR") SPARK_JARS = ",".join( [ - os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"), - os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + os.path.join(SPARK_JAR_DIR, "snowflake-jdbc-3.13.33.jar"), + os.path.join(SPARK_JAR_DIR, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join(SPARK_JAR_DIR, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_JAR_DIR, "aws-java-sdk-bundle-1.12.262.jar"), ] ) @@ -41,7 +42,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/ETL_eventsim_script.py", + application="dags/scripts/ETL_eventsim_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, @@ -50,13 +51,6 @@ executor_memory="2g", driver_memory="1g", jars=SPARK_JARS, - conf={ - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}", - "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}", - "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", - "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", - }, dag=dag, ) diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/snowflake_utils.py similarity index 70% rename from airflow/dags/plugins/spark_utils.py rename to airflow/dags/plugins/snowflake_utils.py index 4261ee7..4d1fd88 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/snowflake_utils.py @@ -6,42 +6,6 @@ from airflow.exceptions import AirflowFailException -# Spark JARs 설정 -# SPARK_HOME 설정 -SPARK_HOME = "/opt/spark" -os.environ["SPARK_HOME"] = SPARK_HOME - -# JAR 경로 설정 -SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars") -SPARK_JARS_LIST = [ - "snowflake-jdbc-3.9.2.jar", - "hadoop-aws-3.3.4.jar", - "aws-java-sdk-bundle-1.12.262.jar", -] -SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) - for jar in SPARK_JARS_LIST]) - - -# Spark Session builder -def spark_session_builder(app_name: str) -> SparkSession: - """_summary_ - spark session builder for AWS S3 and Snowflake - Args: - app_name (str): spark session anme - - Returns: - SparkSession - """ - return ( - SparkSession.builder.appName(f"{app_name}") .config( - "spark.jars", - SPARK_JARS) .config( - "spark.driver.extraClassPath", - "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config( - "spark.executor.extraClassPath", - SPARK_JARS) .getOrCreate()) - - def execute_snowflake_query( query: str, snowflake_options: dict, data=None, fetch=False ): diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index ab33988..8c90438 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -1,34 +1,26 @@ +import os from datetime import datetime -import snowflake.connector +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook from pyspark.sql import SparkSession -from airflow.models import Variable - -SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") -SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") -SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") -SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") -SNOWFLAKE_DB = "S4TIFY" -SNOWFLAKE_SCHEMA = "RAW_DATA" - -AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") +from dags.plugins.variables import SPARK_JARS +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") TODAY = datetime.now().strftime("%Y-%m-%d") snowflake_options = { - "sfURL": SNOWFLAKE_URL, - "sfDatabase": SNOWFLAKE_DB, - "sfSchema": SNOWFLAKE_SCHEMA, - "sfWarehouse": "COMPUTE_WH", - "sfRole": "ANALYTICS_USERS", - "sfUser": SNOWFLAKE_USER, - "sfPassword": SNOWFLKAE_USER_PWD, + "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), } - def create_spark_session(app_name: str): # 만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 spark = ( @@ -37,32 +29,17 @@ def create_spark_session(app_name: str): .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") - .config( - "spark.jars", - "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar", - ) + .config("spark.jars", SPARK_JARS) .getOrCreate() ) return spark -def create_snowflake_conn(): - conn = snowflake.connector.connect( - user=SNOWFLAKE_USER, - password=SNOWFLKAE_USER_PWD, - account=SNOWFLAKE_ACCOUNT, - warehouse="COMPUTE_WH", - database=SNOWFLAKE_DB, - schema=SNOWFLAKE_SCHEMA, - ) - - return conn - - def create_snowflake_table(sql): - conn = create_snowflake_conn() + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() cur = conn.cursor() try: @@ -82,4 +59,4 @@ def write_snowflake_spark_dataframe(table_name, df): df.write.format("snowflake").options(**snowflake_options).option( "dbtable", f"{table_name}" - ).mode("append").save() + ).mode("append").save() \ No newline at end of file diff --git a/airflow/dags/plugins/variables.py b/airflow/dags/plugins/variables.py new file mode 100644 index 0000000..4249657 --- /dev/null +++ b/airflow/dags/plugins/variables.py @@ -0,0 +1,42 @@ +import os + +SNOWFLAKE_PROPERTIES = { + "user": os.environ.get("SNOWFLAKE_USER"), + "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "schema": os.getenv("SNOWFLAKE_SCHEMA"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), + "driver": "net.snowflake.client.jdbc.SnowflakeDriver", + "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', +} +snowflake_options = { + "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), +} + +snowflake_gold_options = { + "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": "ANALYTICS", + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), +} +# Spark JARs 설정 +SPARK_JAR_DIR = os.environ.get("SPARK_JAR_DIR") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_JAR_DIR, "snowflake-jdbc-3.13.33.jar"), + os.path.join(SPARK_JAR_DIR, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join(SPARK_JAR_DIR, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_JAR_DIR, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_eventsim_script.py b/airflow/dags/scripts/ELT_eventsim_script.py new file mode 100644 index 0000000..5c2cddc --- /dev/null +++ b/airflow/dags/scripts/ELT_eventsim_script.py @@ -0,0 +1,54 @@ +import os +from pyspark.sql.functions import desc + +from dags.plugins.spark_snowflake_conn import create_spark_session +from dags.plugins.variables import snowflake_gold_options, snowflake_options + +# SNOWFLAKE 설정 +SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" +SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") +SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" +SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" +SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") + +def main(): + spark = create_spark_session("Spark_Snowflake_Processing") + + # Snowflake에서 데이터 읽기 + df = spark.read.format("snowflake") \ + .options(**snowflake_options) \ + .option("dbtable", SNOWFLAKE_SOURCE_TABLE) \ + .load() + + df.show() # 데이터 확인 + + # SONG 집계 처리 + song_counts = df.groupBy("SONG", "ARTIST")\ + .count()\ + .withColumnRenamed("count", "song_count")\ + .orderBy(desc("song_count")) + + # ARTIST 집계 처리 + artist_counts = df.groupBy("ARTIST")\ + .count()\ + .withColumnRenamed("count", "artist_count")\ + .orderBy(desc("artist_count")) + + # Snowflake에 저장 + song_counts.write.format("snowflake") \ + .options(**snowflake_gold_options) \ + .option("dbtable", SNOWFLAKE_TARGET_SONG_TABLE) \ + .mode("overwrite") \ + .save() + + artist_counts.write.format("snowflake") \ + .options(**snowflake_gold_options) \ + .option("dbtable", SNOWFLAKE_TARGET_ARTIST_TABLE) \ + .mode("overwrite") \ + .save() + + print("(INFO): 데이터 적재 완료") + spark.stop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/airflow/dags/scripts/ETL_eventsim_script.py b/airflow/dags/scripts/ETL_eventsim_script.py index 3005552..09317e9 100644 --- a/airflow/dags/scripts/ETL_eventsim_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -1,30 +1,15 @@ -import os import sys from datetime import datetime -from dotenv import load_dotenv -from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, - StructType) -from spark_utils import (escape_quotes, execute_snowflake_query, - spark_session_builder) - -load_dotenv() +from dags.plugins.spark_snowflake_conn import create_spark_session +from dags.plugins.snowflake_utils import execute_snowflake_query +from dags.plugins.variables import SNOWFLAKE_PROPERTIES, snowflake_options # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" -SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER_BSH"), - "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), - "account": os.environ.get("SNOWFLAKE_ACCOUNT"), - "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", - "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), - "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', -} + S3_BUCKET = sys.argv[1] DATA_INTERVAL_START = sys.argv[2] # 날짜 변환 (data_interval_start -> year/month/day 형식) @@ -34,7 +19,7 @@ day = date_obj.strftime("%d") # ------------------------------------------------- -spark = spark_session_builder("app") +spark = create_spark_session("etl_streaming_session") # S3에서 데이터 읽어오기 df = spark.read.json( @@ -70,10 +55,7 @@ ); """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) -print("Create Table") -# -----------------------UPSERT---------------------- -# Snowflake TEMP 테이블에 데이터 적재 create_temp_table_sql = f""" CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} ( song STRING, @@ -85,18 +67,10 @@ ); """ execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) -print("TEMP 테이블 확인 완료") - -# TEMP 테이블에 데이터 INSERT -# data_to_insert = [tuple(row) for row in df_clean.collect()] -# for row in data_to_insert: -# insert_temp_table_sql = f""" -# INSERT INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} (song, artist, location, sessionId, userId, ts) -# VALUES (%s, %s, %s, %s, %s, %s) -# """ - -# execute_snowflake_query(insert_temp_table_sql, SNOWFLAKE_PROPERTIES, data=row) -df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( +print("테이블 생성 완료") +# -----------------------UPSERT---------------------- +# Snowflake TEMP 테이블에 데이터 적재 +df_clean.write.format("snowflake").options(**snowflake_options).option( "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" ).mode("overwrite").save() print("TEMP 테이블 적재 완료") diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index 3fbb066..35199b7 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -85,6 +85,8 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION + AIRFLOW_CONN_SNOWFLAKE_CONN: snowflake://$SNOWFLAKE_USER:$SNOWFLAKE_PASSWORD@$SNOWFLAKE_SCHEMA/?account=$SNOWFLAKE_ACCOUNT&warehouse=$SNOWFLAKE_WH&database=$SNOWFLAKE_DB&schema=$SNOWFLAKE_SCHEMA&role=$SNOWFLAKE_ROLE + AIRFLOW_CONN_SPARK_CONN: Spark://spark://spark-master:7077/?deploy-mode=client&spark-binary=spark-submit _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake} # The following line can be used to set a custom config file, stored in the local config folder # If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file From 9b0e0ed0ce872a7f325ef1c33e09e1522f5a7dee Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Sun, 16 Mar 2025 16:32:16 +0900 Subject: [PATCH 2/4] bugfix --- airflow/dags/ETL_eventsim_DAG.py | 15 +++------------ airflow/dags/plugins/snowflake_utils.py | 1 - airflow/dags/plugins/spark_snowflake_conn.py | 2 +- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index ffa31b4..78717c5 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -1,4 +1,3 @@ -import os from datetime import datetime, timedelta from airflow import DAG @@ -6,18 +5,10 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -# S3 및 Snowflake 설정 +from dags.plugins.variables import SPARK_JARS + +# S3 설정 S3_BUCKET = "s3a://de5-s4tify" -# Spark JARs 설정 -SPARK_JAR_DIR = os.environ.get("SPARK_JAR_DIR") -SPARK_JARS = ",".join( - [ - os.path.join(SPARK_JAR_DIR, "snowflake-jdbc-3.13.33.jar"), - os.path.join(SPARK_JAR_DIR, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), - os.path.join(SPARK_JAR_DIR, "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_JAR_DIR, "aws-java-sdk-bundle-1.12.262.jar"), - ] -) default_args = { "owner": "sanghyeok_boo", diff --git a/airflow/dags/plugins/snowflake_utils.py b/airflow/dags/plugins/snowflake_utils.py index 4d1fd88..2502de5 100644 --- a/airflow/dags/plugins/snowflake_utils.py +++ b/airflow/dags/plugins/snowflake_utils.py @@ -48,7 +48,6 @@ def execute_snowflake_query( if fetch: result = cur.fetchall() # 데이터 가져오기 - print("result: " + result) if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성 df = pd.DataFrame( result, columns=[ diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 8c90438..037c365 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -12,7 +12,7 @@ TODAY = datetime.now().strftime("%Y-%m-%d") snowflake_options = { - "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", "sfDatabase": os.getenv("SNOWFLAKE_DB"), "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), "sfWarehouse": os.getenv("SNOWFLAKE_WH"), From f2130e5a60d70b3c6800bc97ba05a6c828466401 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Sun, 16 Mar 2025 16:33:20 +0900 Subject: [PATCH 3/4] bugfix --- airflow/dags/plugins/variables.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dags/plugins/variables.py b/airflow/dags/plugins/variables.py index 4249657..9cddfd3 100644 --- a/airflow/dags/plugins/variables.py +++ b/airflow/dags/plugins/variables.py @@ -12,7 +12,7 @@ "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } snowflake_options = { - "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", "sfDatabase": os.getenv("SNOWFLAKE_DB"), "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), "sfWarehouse": os.getenv("SNOWFLAKE_WH"), @@ -22,7 +22,7 @@ } snowflake_gold_options = { - "sfURL": f"{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com", + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", "sfDatabase": os.getenv("SNOWFLAKE_DB"), "sfSchema": "ANALYTICS", "sfWarehouse": os.getenv("SNOWFLAKE_WH"), From d2522aaccf8fef8ec74a6ba6907985130c2f5b38 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 07:33:46 +0000 Subject: [PATCH 4/4] Automated format fixes --- airflow/dags/ELT_eventsim_song_count_DAG.py | 9 ++-- airflow/dags/ETL_eventsim_DAG.py | 4 +- airflow/dags/plugins/snowflake_utils.py | 1 + airflow/dags/plugins/spark_snowflake_conn.py | 7 +-- airflow/dags/plugins/variables.py | 19 ++++--- airflow/dags/scripts/ELT_eventsim_script.py | 54 ++++++++++--------- airflow/dags/scripts/ETL_eventsim_script.py | 2 +- airflow/dags/scripts/crawling_spotify_data.py | 5 +- 8 files changed, 57 insertions(+), 44 deletions(-) diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py index e84fdfe..667578e 100644 --- a/airflow/dags/ELT_eventsim_song_count_DAG.py +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -1,13 +1,14 @@ import os from datetime import timedelta +from dags.plugins.variables import SPARK_JARS + from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator -from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator from airflow.utils.dates import days_ago -from dags.plugins.variables import SPARK_JARS - # DAG 설정 default_args = { "owner": "sanghyoek_boo", @@ -35,7 +36,7 @@ spark_submit_task = SparkSubmitOperator( task_id="process_songs_and_artists_spark", application="dags/scripts/ELT_eventsim_script.py", - conn_id="spark_conn", + conn_id="spark_conn", executor_memory="2g", driver_memory="1g", jars=SPARK_JARS, diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index 78717c5..ba539fd 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -1,12 +1,12 @@ from datetime import datetime, timedelta +from dags.plugins.variables import SPARK_JARS + from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -from dags.plugins.variables import SPARK_JARS - # S3 설정 S3_BUCKET = "s3a://de5-s4tify" diff --git a/airflow/dags/plugins/snowflake_utils.py b/airflow/dags/plugins/snowflake_utils.py index 2502de5..183c540 100644 --- a/airflow/dags/plugins/snowflake_utils.py +++ b/airflow/dags/plugins/snowflake_utils.py @@ -6,6 +6,7 @@ from airflow.exceptions import AirflowFailException + def execute_snowflake_query( query: str, snowflake_options: dict, data=None, fetch=False ): diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 037c365..1bf43b6 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -1,10 +1,10 @@ import os from datetime import datetime -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from dags.plugins.variables import SPARK_JARS from pyspark.sql import SparkSession -from dags.plugins.variables import SPARK_JARS +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") @@ -21,6 +21,7 @@ "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), } + def create_spark_session(app_name: str): # 만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 spark = ( @@ -59,4 +60,4 @@ def write_snowflake_spark_dataframe(table_name, df): df.write.format("snowflake").options(**snowflake_options).option( "dbtable", f"{table_name}" - ).mode("append").save() \ No newline at end of file + ).mode("append").save() diff --git a/airflow/dags/plugins/variables.py b/airflow/dags/plugins/variables.py index 9cddfd3..a2c8a6e 100644 --- a/airflow/dags/plugins/variables.py +++ b/airflow/dags/plugins/variables.py @@ -34,9 +34,16 @@ SPARK_JAR_DIR = os.environ.get("SPARK_JAR_DIR") SPARK_JARS = ",".join( [ - os.path.join(SPARK_JAR_DIR, "snowflake-jdbc-3.13.33.jar"), - os.path.join(SPARK_JAR_DIR, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), - os.path.join(SPARK_JAR_DIR, "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_JAR_DIR, "aws-java-sdk-bundle-1.12.262.jar"), - ] -) \ No newline at end of file + os.path.join( + SPARK_JAR_DIR, + "snowflake-jdbc-3.13.33.jar"), + os.path.join( + SPARK_JAR_DIR, + "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join( + SPARK_JAR_DIR, + "hadoop-aws-3.3.4.jar"), + os.path.join( + SPARK_JAR_DIR, + "aws-java-sdk-bundle-1.12.262.jar"), + ]) diff --git a/airflow/dags/scripts/ELT_eventsim_script.py b/airflow/dags/scripts/ELT_eventsim_script.py index 5c2cddc..c2d8717 100644 --- a/airflow/dags/scripts/ELT_eventsim_script.py +++ b/airflow/dags/scripts/ELT_eventsim_script.py @@ -1,8 +1,8 @@ -import os -from pyspark.sql.functions import desc +import os from dags.plugins.spark_snowflake_conn import create_spark_session from dags.plugins.variables import snowflake_gold_options, snowflake_options +from pyspark.sql.functions import desc # SNOWFLAKE 설정 SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" @@ -11,44 +11,48 @@ SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") + def main(): spark = create_spark_session("Spark_Snowflake_Processing") # Snowflake에서 데이터 읽기 - df = spark.read.format("snowflake") \ - .options(**snowflake_options) \ - .option("dbtable", SNOWFLAKE_SOURCE_TABLE) \ + df = ( + spark.read.format("snowflake") + .options(**snowflake_options) + .option("dbtable", SNOWFLAKE_SOURCE_TABLE) .load() + ) df.show() # 데이터 확인 # SONG 집계 처리 - song_counts = df.groupBy("SONG", "ARTIST")\ - .count()\ - .withColumnRenamed("count", "song_count")\ - .orderBy(desc("song_count")) + song_counts = ( + df.groupBy("SONG", "ARTIST") + .count() + .withColumnRenamed("count", "song_count") + .orderBy(desc("song_count")) + ) # ARTIST 집계 처리 - artist_counts = df.groupBy("ARTIST")\ - .count()\ - .withColumnRenamed("count", "artist_count")\ - .orderBy(desc("artist_count")) + artist_counts = ( + df.groupBy("ARTIST") + .count() + .withColumnRenamed("count", "artist_count") + .orderBy(desc("artist_count")) + ) # Snowflake에 저장 - song_counts.write.format("snowflake") \ - .options(**snowflake_gold_options) \ - .option("dbtable", SNOWFLAKE_TARGET_SONG_TABLE) \ - .mode("overwrite") \ - .save() - - artist_counts.write.format("snowflake") \ - .options(**snowflake_gold_options) \ - .option("dbtable", SNOWFLAKE_TARGET_ARTIST_TABLE) \ - .mode("overwrite") \ - .save() + song_counts.write.format("snowflake").options( + **snowflake_gold_options).option( + "dbtable", SNOWFLAKE_TARGET_SONG_TABLE).mode("overwrite").save() + + artist_counts.write.format("snowflake").options( + **snowflake_gold_options).option( + "dbtable", SNOWFLAKE_TARGET_ARTIST_TABLE).mode("overwrite").save() print("(INFO): 데이터 적재 완료") spark.stop() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/airflow/dags/scripts/ETL_eventsim_script.py b/airflow/dags/scripts/ETL_eventsim_script.py index 09317e9..f22d45f 100644 --- a/airflow/dags/scripts/ETL_eventsim_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -1,8 +1,8 @@ import sys from datetime import datetime -from dags.plugins.spark_snowflake_conn import create_spark_session from dags.plugins.snowflake_utils import execute_snowflake_query +from dags.plugins.spark_snowflake_conn import create_spark_session from dags.plugins.variables import SNOWFLAKE_PROPERTIES, snowflake_options # SNOW_FLAKE 설정 diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 420769a..5439626 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -4,13 +4,12 @@ from datetime import datetime, timedelta import pandas as pd +from scripts.load_spotify_data import * from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By -from scripts.load_spotify_data import * - TODAY = datetime.now().strftime("%Y-%m-%d") FILE_PATH = f"data/spotify_crawling_data{TODAY}.csv" @@ -30,7 +29,7 @@ def save_as_csv_file(df, logical_date): file_path = f"data/spotify_crawling_data_{TODAY}.csv" df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False) - load_s3_bucket(f'spotify_crawling_data_{logical_date}.csv') + load_s3_bucket(f"spotify_crawling_data_{logical_date}.csv") def data_crawling(logical_date):