From 258091869b63066a9de9048aba9acde81382fe71 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Thu, 13 Mar 2025 08:02:50 +0900 Subject: [PATCH 1/3] =?UTF-8?q?[chore]=20airflow=20docker-compose.yml,=20.?= =?UTF-8?q?env=20=EC=88=98=EC=A0=95=20=EB=B0=8F=20ELT=20DAG=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/.env | 7 ++- airflow/dags/eventsim_ETL_DAG.py | 15 +++--- airflow/dags/eventsim_song_count_ELT.py | 50 +++++++++++++++++++ airflow/dags/plugins/spark_utils.py | 1 + ...eventsim_ETL.py => eventsim_ETL_script.py} | 44 +++++++++++----- airflow/docker-compose.yaml | 5 +- 6 files changed, 99 insertions(+), 23 deletions(-) create mode 100644 airflow/dags/eventsim_song_count_ELT.py rename airflow/dags/scripts/{eventsim_ETL.py => eventsim_ETL_script.py} (70%) diff --git a/airflow/.env b/airflow/.env index 65e4cda..42ddb2d 100644 --- a/airflow/.env +++ b/airflow/.env @@ -16,4 +16,9 @@ SPOTIFY_CLIENT_SECRET =04b1cb29a0ce4fe98877758dd33151b3 # AWS S3 Connection 정보 AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 -AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 \ No newline at end of file +AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 + +SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 +SNOWFLAKE_DB=S4TIFY +SNOWFLAKE_PASSWORD=BSH1234! +SNOWFLAKE_USER=BSH diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index e527c42..baf5058 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -19,15 +19,14 @@ ) default_args = { - "owner": "airflow", + "owner": "sanghyeok_boo", "start_date": datetime(2025, 3, 1), "end_date": datetime(2025, 3, 7), - "retries": 1, "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } dag = DAG( - dag_id="spark_s3_to_snowflake_upsert", + dag_id="eventsim_ETL", default_args=default_args, schedule_interval="@daily", catchup=True, @@ -42,19 +41,19 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/eventsim_ETL.py", + application="/opt/airflow/dags/scripts/eventsim_ETL_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, "{{ (data_interval_start - macros.timedelta(days=1)).strftime('%Y-%m-%d') }}", ], - executor_memory="4g", - driver_memory="2g", + executor_memory="2g", + driver_memory="1g", jars=SPARK_JARS, conf={ "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_conn.login }}", - "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_conn.password }}", + "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}", + "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}", "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", }, diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py new file mode 100644 index 0000000..ef7359d --- /dev/null +++ b/airflow/dags/eventsim_song_count_ELT.py @@ -0,0 +1,50 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator + +SOURCE_TABLE = 'RAW_DATA.EVENTSIM_LOG' +TARGET_TABLE = 'ANALYTICS.EVENTSIM_SONG_COUNTS' + +SQL_QUERY = f""" + CREATE OR REPLACE TABLE {TARGET_TABLE} AS + SELECT SONG, ARTIST, COUNT(*) AS song_count + FROM {SOURCE_TABLE} + GROUP BY SONG, ARTIST + ORDER BY song_count DESC; +""" + +default_args = { + 'owner': 'sanghyoek_boo', + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'start_date': days_ago(1), +} + +dag = DAG( + dag_id = "evetnsim_song_count_ELT", + default_args = default_args, + schedule_interval = "@daily", + catchup = False, + tags=["ELT", "Eventsim", "Song Count"] +) + +trigger_dag_task = TriggerDagRunOperator( + task_id = 'trigger_task', + trigger_dag_id = 'eventsim_ETL', + wait_for_completion=True, + poke_interval=10, + dag=dag, +) + +run_snowflake_query = SnowflakeOperator ( + task_id = "aggregate_song_counts", + sql = SQL_QUERY, + snowflake_conn_id = "snowflake_conn", + dag=dag, +) + +if __name__ == "__main__": + trigger_dag_task \ No newline at end of file diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index 6c1a461..a414601 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -28,6 +28,7 @@ def spark_session_builder(app_name: str) -> SparkSession: """ return ( SparkSession.builder.appName(f"{app_name}") + .master("spark://spark-master:7077") .config("spark.jars", SPARK_JARS) .getOrCreate() ) diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL_script.py similarity index 70% rename from airflow/dags/scripts/eventsim_ETL.py rename to airflow/dags/scripts/eventsim_ETL_script.py index a1784f5..70fb69c 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL_script.py @@ -1,27 +1,30 @@ import os import sys from datetime import datetime +from dotenv import load_dotenv -from plugins.spark_utils import execute_snowflake_query, spark_session_builder +from spark_utils import execute_snowflake_query, spark_session_builder from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) from airflow.models import Variable +load_dotenv() + # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { - "user": Variable.get("SNOWFLAKE_USER"), - "password": Variable.get("SNOWFLAKE_PASSWORD"), - "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "user": os.environ.get("SNOWFLAKE_USER"), + "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", - "role": Variable.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } S3_BUCKET = sys.argv[1] DATA_INTERVAL_START = sys.argv[2] @@ -68,12 +71,21 @@ """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) print("Create Table") + # -----------------------UPSERT---------------------- # Snowflake TEMP 테이블에 데이터 적재 -df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( - "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" -).mode("overwrite").save() -print("TEMP Table에 데이터 적재 완료") +create_temp_table_sql = f""" +CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} ( + song STRING, + artist STRING, + location STRING, + sessionId INT, + userId INT, + ts BIGINT +); +""" +execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) +print("TEMP 테이블 확인 완료") # Snowflake에서 MERGE 수행 merge_sql = f""" @@ -93,4 +105,12 @@ execute_snowflake_query(merge_sql, SNOWFLAKE_PROPERTIES) print("Merge 완료") +# -------------------DROP TABLE-------------------- +# 임시 테이블 삭제제 +drop_table_sql = f""" +DROP TABLE {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}; +""" +execute_snowflake_query(drop_table_sql, SNOWFLAKE_PROPERTIES) +print("Drop Table") + spark.stop() diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index 1ca5b11..3fbb066 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -74,6 +74,7 @@ x-airflow-common: AIRFLOW__CORE__TEST_CONNECTION: 'Enabled' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW_VAR_DATA_DIR: /opt/airflow/data + PYTHONPATH: /opt/airflow/dags:/opt/airflow/dags/plugins:/opt/airflow/dags/scripts:$PYTHONPATH # Python 모듈 경로 추가 # JAR 파일 경로 추가 SPARK_JAR_DIR: /opt/spark/jars # yamllint disable rule:line-length @@ -84,9 +85,9 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION - _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark} + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake} # The following line can be used to set a custom config file, stored in the local config folder - # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file + # If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags From 0814a64f37f8e5c41f82d4d8c53cf5412c14eb7d Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Thu, 13 Mar 2025 14:02:00 +0900 Subject: [PATCH 2/3] =?UTF-8?q?task=20=EC=88=9C=EC=84=9C=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/eventsim_song_count_ELT.py | 2 +- airflow/dags/plugins/spark_utils.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py index ef7359d..6c7ed49 100644 --- a/airflow/dags/eventsim_song_count_ELT.py +++ b/airflow/dags/eventsim_song_count_ELT.py @@ -47,4 +47,4 @@ ) if __name__ == "__main__": - trigger_dag_task \ No newline at end of file + trigger_dag_task >> run_snowflake_query \ No newline at end of file diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index a414601..6c1a461 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -28,7 +28,6 @@ def spark_session_builder(app_name: str) -> SparkSession: """ return ( SparkSession.builder.appName(f"{app_name}") - .master("spark://spark-master:7077") .config("spark.jars", SPARK_JARS) .getOrCreate() ) From 13df5f278e90939524366d9884d28738ed25b934 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Thu, 13 Mar 2025 05:04:33 +0000 Subject: [PATCH 3/3] Automated format fixes --- airflow/dags/eventsim_song_count_ELT.py | 38 +++++++++---------- airflow/dags/scripts/crawling_spotify_data.py | 14 +++++-- airflow/dags/scripts/eventsim_ETL_script.py | 4 +- airflow/dags/scripts/request_spotify_api.py | 10 ++--- airflow/dags/spotify_data_dag.py | 18 +++++---- 5 files changed, 45 insertions(+), 39 deletions(-) diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py index 6c7ed49..68f7dc9 100644 --- a/airflow/dags/eventsim_song_count_ELT.py +++ b/airflow/dags/eventsim_song_count_ELT.py @@ -1,12 +1,12 @@ from datetime import timedelta from airflow import DAG -from airflow.utils.dates import days_ago from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator +from airflow.utils.dates import days_ago -SOURCE_TABLE = 'RAW_DATA.EVENTSIM_LOG' -TARGET_TABLE = 'ANALYTICS.EVENTSIM_SONG_COUNTS' +SOURCE_TABLE = "RAW_DATA.EVENTSIM_LOG" +TARGET_TABLE = "ANALYTICS.EVENTSIM_SONG_COUNTS" SQL_QUERY = f""" CREATE OR REPLACE TABLE {TARGET_TABLE} AS @@ -17,34 +17,34 @@ """ default_args = { - 'owner': 'sanghyoek_boo', - 'retries': 1, - 'retry_delay': timedelta(minutes=5), - 'start_date': days_ago(1), + "owner": "sanghyoek_boo", + "retries": 1, + "retry_delay": timedelta(minutes=5), + "start_date": days_ago(1), } dag = DAG( - dag_id = "evetnsim_song_count_ELT", - default_args = default_args, - schedule_interval = "@daily", - catchup = False, - tags=["ELT", "Eventsim", "Song Count"] + dag_id="evetnsim_song_count_ELT", + default_args=default_args, + schedule_interval="@daily", + catchup=False, + tags=["ELT", "Eventsim", "Song Count"], ) trigger_dag_task = TriggerDagRunOperator( - task_id = 'trigger_task', - trigger_dag_id = 'eventsim_ETL', + task_id="trigger_task", + trigger_dag_id="eventsim_ETL", wait_for_completion=True, poke_interval=10, dag=dag, ) -run_snowflake_query = SnowflakeOperator ( - task_id = "aggregate_song_counts", - sql = SQL_QUERY, - snowflake_conn_id = "snowflake_conn", +run_snowflake_query = SnowflakeOperator( + task_id="aggregate_song_counts", + sql=SQL_QUERY, + snowflake_conn_id="snowflake_conn", dag=dag, ) if __name__ == "__main__": - trigger_dag_task >> run_snowflake_query \ No newline at end of file + trigger_dag_task >> run_snowflake_query diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 24a477c..2ff2a8b 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -55,10 +55,16 @@ def data_crawling(): try: # top50 리스트 가져오기 - scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]') - driver.execute_script(""" - arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); - """, scroll_element) + scroll_element = driver.find_element( + By.XPATH, + '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]', + ) + driver.execute_script( + """ + arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); + """, + scroll_element, + ) # 페이지 로딩 대기 driver.implicitly_wait(30) song_lists = driver.find_elements( diff --git a/airflow/dags/scripts/eventsim_ETL_script.py b/airflow/dags/scripts/eventsim_ETL_script.py index 70fb69c..35cb843 100644 --- a/airflow/dags/scripts/eventsim_ETL_script.py +++ b/airflow/dags/scripts/eventsim_ETL_script.py @@ -1,11 +1,11 @@ import os import sys from datetime import datetime -from dotenv import load_dotenv -from spark_utils import execute_snowflake_query, spark_session_builder +from dotenv import load_dotenv from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) +from spark_utils import execute_snowflake_query, spark_session_builder from airflow.models import Variable diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index b9088df..8df83dd 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -66,10 +66,9 @@ def get_arti_top10(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 아티스트 정보 가져오기 @@ -111,10 +110,9 @@ def get_artist_info(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 크롤링 데이터 읽어오는 함수 diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 6ae5738..561bbee 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -4,8 +4,8 @@ from scripts.request_spotify_api import * from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator default_args = { "depends_on_past": False, @@ -40,13 +40,15 @@ python_callable=get_arti_top10, op_kwargs={"logical_date": "{{ ds }}"}, ) - + remove_crawling_data = BashOperator( - task_id = 'remove_crawling_data', - bash_command='rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv', - dag=dag + task_id="remove_crawling_data", + bash_command="rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv", + dag=dag, ) - extract_globalTop50_data >> [ - extract_artistInfo_data, - extract_artistTop10_data] >> remove_crawling_data + ( + extract_globalTop50_data + >> [extract_artistInfo_data, extract_artistTop10_data] + >> remove_crawling_data + )