Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airflow/.env
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ SPOTIFY_CLIENT_SECRET =04b1cb29a0ce4fe98877758dd33151b3
# AWS S3 Connection 정보
AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU
AWS_SECRET_ACCESS_KEY=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
AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정
AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정

SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658
SNOWFLAKE_DB=S4TIFY
SNOWFLAKE_PASSWORD=BSH1234!
SNOWFLAKE_USER=BSH
15 changes: 7 additions & 8 deletions airflow/dags/eventsim_ETL_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
)

default_args = {
"owner": "airflow",
"owner": "sanghyeok_boo",
"start_date": datetime(2025, 3, 1),
"end_date": datetime(2025, 3, 7),
"retries": 1,
"template_searchpath": ["/opt/airflow/dags/spark_jobs/"],
}

dag = DAG(
dag_id="spark_s3_to_snowflake_upsert",
dag_id="eventsim_ETL",
default_args=default_args,
schedule_interval="@daily",
catchup=True,
Expand All @@ -42,19 +41,19 @@
# SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE
spark_job = SparkSubmitOperator(
task_id="spark_process_s3_upsert",
application="/opt/airflow/dags/scripts/eventsim_ETL.py",
application="/opt/airflow/dags/scripts/eventsim_ETL_script.py",
conn_id="spark_conn",
application_args=[
S3_BUCKET,
"{{ (data_interval_start - macros.timedelta(days=1)).strftime('%Y-%m-%d') }}",
],
executor_memory="4g",
driver_memory="2g",
executor_memory="2g",
driver_memory="1g",
jars=SPARK_JARS,
conf={
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.access.key": "{{ conn.aws_conn.login }}",
"spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_conn.password }}",
"spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}",
"spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}",
"spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
},
Expand Down
50 changes: 50 additions & 0 deletions airflow/dags/eventsim_song_count_ELT.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from datetime import timedelta

from airflow import DAG
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.utils.dates import days_ago

SOURCE_TABLE = "RAW_DATA.EVENTSIM_LOG"
TARGET_TABLE = "ANALYTICS.EVENTSIM_SONG_COUNTS"

SQL_QUERY = f"""
CREATE OR REPLACE TABLE {TARGET_TABLE} AS
SELECT SONG, ARTIST, COUNT(*) AS song_count
FROM {SOURCE_TABLE}
GROUP BY SONG, ARTIST
ORDER BY song_count DESC;
"""

default_args = {
"owner": "sanghyoek_boo",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": days_ago(1),
}

dag = DAG(
dag_id="evetnsim_song_count_ELT",
default_args=default_args,
schedule_interval="@daily",
catchup=False,
tags=["ELT", "Eventsim", "Song Count"],
)

trigger_dag_task = TriggerDagRunOperator(
task_id="trigger_task",
trigger_dag_id="eventsim_ETL",
wait_for_completion=True,
poke_interval=10,
dag=dag,
)

run_snowflake_query = SnowflakeOperator(
task_id="aggregate_song_counts",
sql=SQL_QUERY,
snowflake_conn_id="snowflake_conn",
dag=dag,
)

if __name__ == "__main__":
trigger_dag_task >> run_snowflake_query
14 changes: 10 additions & 4 deletions airflow/dags/scripts/crawling_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ def data_crawling():

try:
# top50 리스트 가져오기
scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]')
driver.execute_script("""
arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'});
""", scroll_element)
scroll_element = driver.find_element(
By.XPATH,
'//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]',
)
driver.execute_script(
"""
arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'});
""",
scroll_element,
)
# 페이지 로딩 대기
driver.implicitly_wait(30)
song_lists = driver.find_elements(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@
import sys
from datetime import datetime

from plugins.spark_utils import execute_snowflake_query, spark_session_builder
from dotenv import load_dotenv
from pyspark.sql.types import (IntegerType, LongType, StringType, StructField,
StructType)
from spark_utils import execute_snowflake_query, spark_session_builder

from airflow.models import Variable

load_dotenv()

# SNOW_FLAKE 설정
SNOWFLAKE_TABLE = "EVENTSIM_LOG"
SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP"
SNOWFLAKE_SCHEMA = "RAW_DATA"
SNOWFLAKE_PROPERTIES = {
"user": Variable.get("SNOWFLAKE_USER"),
"password": Variable.get("SNOWFLAKE_PASSWORD"),
"account": Variable.get("SNOWFLAKE_ACCOUNT"),
"db": Variable.get("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"),
"user": os.environ.get("SNOWFLAKE_USER"),
"password": os.environ.get("SNOWFLAKE_PASSWORD"),
"account": os.environ.get("SNOWFLAKE_ACCOUNT"),
"db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"),
"schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA",
"role": Variable.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"),
"role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"),
"driver": "net.snowflake.client.jdbc.SnowflakeDriver",
"url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com',
"url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com',
}
S3_BUCKET = sys.argv[1]
DATA_INTERVAL_START = sys.argv[2]
Expand Down Expand Up @@ -68,12 +71,21 @@
"""
execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES)
print("Create Table")

# -----------------------UPSERT----------------------
# Snowflake TEMP 테이블에 데이터 적재
df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option(
"dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}"
).mode("overwrite").save()
print("TEMP Table에 데이터 적재 완료")
create_temp_table_sql = f"""
CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} (
song STRING,
artist STRING,
location STRING,
sessionId INT,
userId INT,
ts BIGINT
);
"""
execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES)
print("TEMP 테이블 확인 완료")

# Snowflake에서 MERGE 수행
merge_sql = f"""
Expand All @@ -93,4 +105,12 @@
execute_snowflake_query(merge_sql, SNOWFLAKE_PROPERTIES)
print("Merge 완료")

# -------------------DROP TABLE--------------------
# 임시 테이블 삭제제
drop_table_sql = f"""
DROP TABLE {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE};
"""
execute_snowflake_query(drop_table_sql, SNOWFLAKE_PROPERTIES)
print("Drop Table")

spark.stop()
10 changes: 4 additions & 6 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ def get_arti_top10(logical_date, **kwargs):
index=False)
try:
load_s3_bucket(object_name)
os.remove(f'data/{object_name}')
os.remove(f"data/{object_name}")
except Exception as e:
print(f'error: {e}')

print(f"error: {e}")


# 아티스트 정보 가져오기
Expand Down Expand Up @@ -111,10 +110,9 @@ def get_artist_info(logical_date, **kwargs):
index=False)
try:
load_s3_bucket(object_name)
os.remove(f'data/{object_name}')
os.remove(f"data/{object_name}")
except Exception as e:
print(f'error: {e}')

print(f"error: {e}")


# 크롤링 데이터 읽어오는 함수
Expand Down
18 changes: 10 additions & 8 deletions airflow/dags/spotify_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from scripts.request_spotify_api import *

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

default_args = {
"depends_on_past": False,
Expand Down Expand Up @@ -40,13 +40,15 @@
python_callable=get_arti_top10,
op_kwargs={"logical_date": "{{ ds }}"},
)

remove_crawling_data = BashOperator(
task_id = 'remove_crawling_data',
bash_command='rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv',
dag=dag
task_id="remove_crawling_data",
bash_command="rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv",
dag=dag,
)

extract_globalTop50_data >> [
extract_artistInfo_data,
extract_artistTop10_data] >> remove_crawling_data
(
extract_globalTop50_data
>> [extract_artistInfo_data, extract_artistTop10_data]
>> remove_crawling_data
)
5 changes: 3 additions & 2 deletions airflow/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ x-airflow-common:
AIRFLOW__CORE__TEST_CONNECTION: 'Enabled'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
PYTHONPATH: /opt/airflow/dags:/opt/airflow/dags/plugins:/opt/airflow/dags/scripts:$PYTHONPATH # Python 모듈 경로 추가
# JAR 파일 경로 추가
SPARK_JAR_DIR: /opt/spark/jars
# yamllint disable rule:line-length
Expand All @@ -84,9 +85,9 @@ x-airflow-common:
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark}
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake}
# The following line can be used to set a custom config file, stored in the local config folder
# If you want to use it, outcomment it and replace airflow.cfg with the name of your config file
# If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file
# AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
Expand Down