diff --git a/airflow/.env b/airflow/.env index 65e4cda..42ddb2d 100644 --- a/airflow/.env +++ b/airflow/.env @@ -16,4 +16,9 @@ SPOTIFY_CLIENT_SECRET =04b1cb29a0ce4fe98877758dd33151b3 # AWS S3 Connection 정보 AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 -AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 \ No newline at end of file +AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 + +SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 +SNOWFLAKE_DB=S4TIFY +SNOWFLAKE_PASSWORD=BSH1234! +SNOWFLAKE_USER=BSH diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index e527c42..baf5058 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -19,15 +19,14 @@ ) default_args = { - "owner": "airflow", + "owner": "sanghyeok_boo", "start_date": datetime(2025, 3, 1), "end_date": datetime(2025, 3, 7), - "retries": 1, "template_searchpath": ["/opt/airflow/dags/spark_jobs/"], } dag = DAG( - dag_id="spark_s3_to_snowflake_upsert", + dag_id="eventsim_ETL", default_args=default_args, schedule_interval="@daily", catchup=True, @@ -42,19 +41,19 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/eventsim_ETL.py", + application="/opt/airflow/dags/scripts/eventsim_ETL_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, "{{ (data_interval_start - macros.timedelta(days=1)).strftime('%Y-%m-%d') }}", ], - executor_memory="4g", - driver_memory="2g", + executor_memory="2g", + driver_memory="1g", jars=SPARK_JARS, conf={ "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_conn.login }}", - "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_conn.password }}", + "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}", + "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}", "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", }, diff --git a/airflow/dags/eventsim_song_count_ELT.py b/airflow/dags/eventsim_song_count_ELT.py new file mode 100644 index 0000000..68f7dc9 --- /dev/null +++ b/airflow/dags/eventsim_song_count_ELT.py @@ -0,0 +1,50 @@ +from datetime import timedelta + +from airflow import DAG +from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator +from airflow.utils.dates import days_ago + +SOURCE_TABLE = "RAW_DATA.EVENTSIM_LOG" +TARGET_TABLE = "ANALYTICS.EVENTSIM_SONG_COUNTS" + +SQL_QUERY = f""" + CREATE OR REPLACE TABLE {TARGET_TABLE} AS + SELECT SONG, ARTIST, COUNT(*) AS song_count + FROM {SOURCE_TABLE} + GROUP BY SONG, ARTIST + ORDER BY song_count DESC; +""" + +default_args = { + "owner": "sanghyoek_boo", + "retries": 1, + "retry_delay": timedelta(minutes=5), + "start_date": days_ago(1), +} + +dag = DAG( + dag_id="evetnsim_song_count_ELT", + default_args=default_args, + schedule_interval="@daily", + catchup=False, + tags=["ELT", "Eventsim", "Song Count"], +) + +trigger_dag_task = TriggerDagRunOperator( + task_id="trigger_task", + trigger_dag_id="eventsim_ETL", + wait_for_completion=True, + poke_interval=10, + dag=dag, +) + +run_snowflake_query = SnowflakeOperator( + task_id="aggregate_song_counts", + sql=SQL_QUERY, + snowflake_conn_id="snowflake_conn", + dag=dag, +) + +if __name__ == "__main__": + trigger_dag_task >> run_snowflake_query diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 24a477c..2ff2a8b 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -55,10 +55,16 @@ def data_crawling(): try: # top50 리스트 가져오기 - scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]') - driver.execute_script(""" - arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); - """, scroll_element) + scroll_element = driver.find_element( + By.XPATH, + '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]', + ) + driver.execute_script( + """ + arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); + """, + scroll_element, + ) # 페이지 로딩 대기 driver.implicitly_wait(30) song_lists = driver.find_elements( diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL_script.py similarity index 70% rename from airflow/dags/scripts/eventsim_ETL.py rename to airflow/dags/scripts/eventsim_ETL_script.py index a1784f5..35cb843 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL_script.py @@ -2,26 +2,29 @@ import sys from datetime import datetime -from plugins.spark_utils import execute_snowflake_query, spark_session_builder +from dotenv import load_dotenv from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) +from spark_utils import execute_snowflake_query, spark_session_builder from airflow.models import Variable +load_dotenv() + # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" SNOWFLAKE_PROPERTIES = { - "user": Variable.get("SNOWFLAKE_USER"), - "password": Variable.get("SNOWFLAKE_PASSWORD"), - "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "user": os.environ.get("SNOWFLAKE_USER"), + "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", - "role": Variable.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } S3_BUCKET = sys.argv[1] DATA_INTERVAL_START = sys.argv[2] @@ -68,12 +71,21 @@ """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) print("Create Table") + # -----------------------UPSERT---------------------- # Snowflake TEMP 테이블에 데이터 적재 -df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( - "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" -).mode("overwrite").save() -print("TEMP Table에 데이터 적재 완료") +create_temp_table_sql = f""" +CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} ( + song STRING, + artist STRING, + location STRING, + sessionId INT, + userId INT, + ts BIGINT +); +""" +execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) +print("TEMP 테이블 확인 완료") # Snowflake에서 MERGE 수행 merge_sql = f""" @@ -93,4 +105,12 @@ execute_snowflake_query(merge_sql, SNOWFLAKE_PROPERTIES) print("Merge 완료") +# -------------------DROP TABLE-------------------- +# 임시 테이블 삭제제 +drop_table_sql = f""" +DROP TABLE {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}; +""" +execute_snowflake_query(drop_table_sql, SNOWFLAKE_PROPERTIES) +print("Drop Table") + spark.stop() diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index b9088df..8df83dd 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -66,10 +66,9 @@ def get_arti_top10(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 아티스트 정보 가져오기 @@ -111,10 +110,9 @@ def get_artist_info(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 크롤링 데이터 읽어오는 함수 diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 6ae5738..561bbee 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -4,8 +4,8 @@ from scripts.request_spotify_api import * from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator default_args = { "depends_on_past": False, @@ -40,13 +40,15 @@ python_callable=get_arti_top10, op_kwargs={"logical_date": "{{ ds }}"}, ) - + remove_crawling_data = BashOperator( - task_id = 'remove_crawling_data', - bash_command='rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv', - dag=dag + task_id="remove_crawling_data", + bash_command="rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv", + dag=dag, ) - extract_globalTop50_data >> [ - extract_artistInfo_data, - extract_artistTop10_data] >> remove_crawling_data + ( + extract_globalTop50_data + >> [extract_artistInfo_data, extract_artistTop10_data] + >> remove_crawling_data + ) diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index 1ca5b11..3fbb066 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -74,6 +74,7 @@ x-airflow-common: AIRFLOW__CORE__TEST_CONNECTION: 'Enabled' AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW_VAR_DATA_DIR: /opt/airflow/data + PYTHONPATH: /opt/airflow/dags:/opt/airflow/dags/plugins:/opt/airflow/dags/scripts:$PYTHONPATH # Python 모듈 경로 추가 # JAR 파일 경로 추가 SPARK_JAR_DIR: /opt/spark/jars # yamllint disable rule:line-length @@ -84,9 +85,9 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION - _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark} + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake} # The following line can be used to set a custom config file, stored in the local config folder - # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file + # If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags