diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index be12c25..7270bf8 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -2,47 +2,40 @@ import json from datetime import datetime, timedelta -import pandas as pd -from bugs import BugsChartPeriod, BugsChartType, ChartData +import requests +from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData +from plugins.get_artist_data import get_artist_genre, search_artist_id +from scripts.get_access_token import get_token from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook - -""" -your-s3-bucket-name을 실제 S3 버킷명으로 바꾸고, -✅ Snowflake 연결 정보 및 테이블명을 맞게 설정 -""" # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -# 파일 경로 -JSON_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.csv" -# S3 설정 S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.csv" - -""" -# Snowflake 설정 -SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" -SNOWFLAKE_TABLE = "raw_data" -""" +S3_CSV_KEY = f"raw_data/bugs_chart_with_genre_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/bugs_chart_with_genre_{TODAY}.csv" -# 1. Bugs 차트 데이터 가져오기 및 JSON 저장 +# 1. Bugs 차트 데이터 가져오기 및 JSON 변환 def fetch_bugs_chart(): chart = ChartData( chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + + chart_data["entries"].append( { "rank": entry.rank, "title": entry.title, @@ -50,46 +43,56 @@ def fetch_bugs_chart(): "lastPos": entry.lastPos, "peakPos": entry.peakPos, "image": entry.image, + "genres": genre.split(", ") if genre else [], # ✅ 리스트 변환, } - for entry in chart.entries - ], - } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + ) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "peakPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") - - -# 3. AWS S3 업로드 -def upload_to_s3(): +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_bugs_chart") + csv_data = [["rank", "title", "artist", + "lastPos", "peakPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["peakPos"], + entry["image"], + json.dumps( + entry["genres"], ensure_ascii=False + ), # ✅ 리스트를 문자열로 변환하여 저장 + ] + ) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - # s3_hook.load_file(filename=JSON_PATH, key=S3_JSON_KEY, bucket_name=S3_BUCKET, replace=True) - s3_hook.load_file( - filename=CSV_PATH, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True - ) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") -"""# 4. Snowflake 업로드 -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - df = pd.read_csv(CSV_PATH) - snowflake_hook.run(f"DELETE FROM {SNOWFLAKE_TABLE} WHERE DATE = '{TODAY}';") - snowflake_hook.insert_rows(table=SNOWFLAKE_TABLE, rows=df.values.tolist(), target_fields=df.columns.tolist()) - print(f"✅ Snowflake 업로드 완료: {SNOWFLAKE_TABLE}") -""" # DAG 설정 default_args = { "owner": "airflow", @@ -106,28 +109,33 @@ def upload_to_snowflake(): catchup=False, ) as dag: + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_bugs_chart_task = PythonOperator( task_id="fetch_bugs_chart", python_callable=fetch_bugs_chart, + provide_context=True, ) convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - ) - """ - # DAG 실행 순서 ( - fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task - ) # upload_snowflake_task + get_spotify_token_task + >> fetch_bugs_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 065765c..a373a58 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -2,70 +2,35 @@ import json from datetime import datetime, timedelta -from flo import ChartData # flo.py 모듈 import +import requests +from plugins.flo import ChartData # flo.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.csv" + +# S3 설정 S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.csv" +S3_CSV_KEY = f"raw_data/flo_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/flo_chart_with_genre_{TODAY}.csv" -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -"""# Snowflake 저장 함수 -def save_to_snowflake(): - # Airflow 연결에서 Snowflake 연결 정보 가져오기 - connection = BaseHook.get_connection('s4tify_SnowFlake') # Airflow에 설정된 Snowflake 연결 정보 사용 - - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, # Airflow 연결의 username 사용 - password=connection.password, # Airflow 연결의 password 사용 - account=connection.host, # Airflow 연결의 account 정보 사용 - warehouse=connection.extra_dejson.get('warehouse'), # Extra 정보에서 warehouse 읽기 - database=connection.extra_dejson.get('database'), # Extra 정보에서 database 읽기 - schema=connection.extra_dejson.get('schema'), # Extra 정보에서 schema 읽기 - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM flo_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO flo_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - - -# 1. FLO 차트 데이터 가져오기 및 JSON 저장 +# 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, "title": entry.title, @@ -73,26 +38,52 @@ def fetch_flo_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, + "genres": genre.split(", ") if genre else [], } - for entry in chart.entries - ], - } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + ) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_flo_chart") + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False), + ] + ) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + # save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -111,37 +102,33 @@ def convert_json_to_csv(): catchup=False, ) as dag: + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_flo_chart_task = PythonOperator( task_id="fetch_flo_chart", python_callable=fetch_flo_chart, + provide_context=True, ) convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", - python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_flo_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + ( + get_spotify_token_task + >> fetch_flo_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 5f2c7ea..0a84c7f 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -2,30 +2,35 @@ import json from datetime import datetime, timedelta -import pandas as pd -from genie import ChartData, GenieChartPeriod # genie.py 모듈 import +import requests +from plugins.genie import ChartData, GenieChartPeriod # genie.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id +from scripts.get_access_token import get_token from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook -# 환경 변수 설정 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.csv" + +# S3 설정 S3_BUCKET = "de5-s4tify" -S3_CSV_KEY = f"raw_data/genie_chart/genie_chart_{TODAY}.csv" -# SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" # Airflow에서 설정한 Snowflake 연결 ID +S3_CSV_KEY = f"raw_data/genie_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/genie_chart_with_genre_{TODAY}.csv" -# Genie 차트 데이터를 가져와 JSON으로 저장 +# 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, "title": entry.title, @@ -33,65 +38,53 @@ def fetch_genie_chart(): "peakPos": entry.peakPos, "lastPos": entry.lastPos, "image": entry.image, + "genres": genre.split(", ") if genre else [], } - for entry in chart.entries - ], - } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환 후 저장 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "peakPos", "lastPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH - - -# S3 업로드 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=S3_CSV_KEY, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{S3_CSV_KEY}") - - -""" -# Snowflake 업로드 (Airflow SnowflakeHook 사용) -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - - # Snowflake에 CSV 파일 업로드 - df = pd.read_csv(CSV_PATH) - - # Snowflake 테이블에 데이터를 삽입 - # 'genie_chart' 테이블에 맞는 컬럼 이름과 데이터 형식을 확인하고 매핑 - snowflake_hook.run(f"DELETE FROM genie_chart WHERE date = '{TODAY}';") + ) + return chart_data + + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_genie_chart") + csv_data = [["rank", "title", "artist", + "peakPos", "lastPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["peakPos"], + entry["lastPos"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False), + ] + ) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + # save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") - # Snowflake 테이블에 데이터 삽입 - snowflake_hook.insert_rows( - table="genie_chart", - rows=df.values.tolist(), - target_fields=df.columns.tolist() - ) - - print(f"✅ Snowflake 업로드 완료: genie_chart") -""" # DAG 설정 default_args = { @@ -109,26 +102,33 @@ def upload_to_snowflake(): catchup=False, ) as dag: + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_genie_chart_task = PythonOperator( task_id="fetch_genie_chart", python_callable=fetch_genie_chart, + provide_context=True, ) convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - upload_to_s3_task = PythonOperator( + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_to_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - )""" ( - fetch_genie_chart_task >> convert_json_to_csv_task >> upload_to_s3_task - ) # upload_to_snowflake_task + get_spotify_token_task + >> fetch_genie_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index d754270..a7971f4 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -2,119 +2,89 @@ import json from datetime import datetime, timedelta -import boto3 import requests -import snowflake.connector +from plugins.get_artist_data import get_artist_genre, search_artist_id +from plugins.melon import ChartData # melon.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook +from airflow.models import Variable from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook -# 멜론 차트 API 정보 -_APP_VERSION = "6.5.8.1" -_CP_ID = "AS40" -_USER_AGENT = f"{_CP_ID}; Android 13; {_APP_VERSION}; sdk_gphone64_arm64" -_CHART_API_URL = f"https://m2.melon.com/m6/chart/ent/songChartList.json?cpId={_CP_ID}&cpKey=14LNC3&appVer={_APP_VERSION}" - -# 파일 저장 경로 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/melon_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/melon_chart_{TODAY}.csv" -S3_JSON_KEY = f"raw_data/melon_chart/melon_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/melon_chart/melon_chart_{TODAY}.csv" + +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/melon_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/melon_chart_with_genre_{TODAY}.csv" # 1. 멜론 차트 데이터 가져오기 def fetch_melon_chart(): - headers = {"User-Agent": _USER_AGENT} - response = requests.get(_CHART_API_URL, headers=headers) - if response.status_code != 200: - raise Exception(f"멜론 API 호출 실패: {response.status_code}") - data = response.json() - - chart_data = { - "date": f"{data['response']['RANKDAY']} {data['response']['RANKHOUR']}:00", - "entries": [ + chart = ChartData(fetch=True) + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append( { - "rank": int(song["CURRANK"]), - "title": song["SONGNAME"], - "artist": song["ARTISTLIST"][0]["ARTISTNAME"], - "lastPos": int(song["PASTRANK"]), - "peakPos": int(song.get("PEAKRANK", song["CURRANK"])), - "isNew": song["RANKTYPE"] == "NEW", - "image": song["ALBUMIMG"], + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genres": genre.split(", ") if genre else [], } - for song in data["response"]["SONGLIST"] - ], - } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH + ) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = [ - "rank", - "title", - "artist", - "lastPos", - "peakPos", - "isNew", - "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH - - -# 3. AWS S3 업로드 -def upload_to_s3(): - # AWS 연결 정보 가져오기 - aws_connection = BaseHook.get_connection( - "S4tify_S3" - ) # Airflow에서 설정한 AWS 연결 사용 - s3 = boto3.client( - "s3", - aws_access_key_id=aws_connection.login, - aws_secret_access_key=aws_connection.password, - region_name=aws_connection.extra_dejson.get("region_name"), - ) - # s3.upload_file(JSON_PATH, aws_connection.schema, S3_JSON_KEY) - s3.upload_file(CSV_PATH, "de5-s4tify", S3_CSV_KEY) - print(f"✅ S3 업로드 완료: {S3_JSON_KEY}, {S3_CSV_KEY}") - - -""" -# 4. Snowflake 업로드 -def upload_to_snowflake(): - # Snowflake 연결 정보 가져오기 - snowflake_connection = BaseHook.get_connection("S4tify_SnowFlake") # Airflow에서 설정한 Snowflake 연결 사용 - conn = snowflake.connector.connect( - user=snowflake_connection.login, - password=snowflake_connection.password, - account=snowflake_connection.host, - warehouse=snowflake_connection.extra_dejson.get("warehouse"), - database=snowflake_connection.extra_dejson.get("database"), - schema=snowflake_connection.extra_dejson.get("schema") - ) - cur = conn.cursor() - #cur.execute(f""" -# COPY INTO {snowflake_connection.extra_dejson.get('database')}.{snowflake_connection.extra_dejson.get('schema')}.melon_chart -# FROM @"{snowflake_connection.extra_dejson.get('stage')}" -# FILES = ('{S3_CSV_KEY}') -# FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY='"') -# """) -# conn.commit() -# print(f"✅ Snowflake 업로드 완료: {S3_CSV_KEY}") -# """ +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_melon_chart") + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False), + ] + ) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + # save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") + # DAG 설정 default_args = { @@ -125,36 +95,40 @@ def upload_to_snowflake(): "retry_delay": timedelta(minutes=2), } -dag = DAG( - "melon_chart_pipeline", +with DAG( + "melon_chart_dag", default_args=default_args, - description="멜론 차트를 JSON으로 저장하고 CSV로 변환 후 S3 및 Snowflake에 업로드", - schedule_interval="0 1 * * *", # 매일 새벽 1시 실행 + schedule_interval="0 1 * * *", # 매일 01:00 실행 catchup=False, -) - -fetch_task = PythonOperator( - task_id="fetch_melon_chart", - python_callable=fetch_melon_chart, - dag=dag, -) - -convert_task = PythonOperator( - task_id="convert_json_to_csv", - python_callable=convert_json_to_csv, - dag=dag, -) - -s3_upload_task = PythonOperator( - task_id="upload_to_s3", - python_callable=upload_to_s3, - dag=dag, -) -""" -snowflake_upload_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - dag=dag, -)""" - -fetch_task >> convert_task >> s3_upload_task # snowflake_upload_task +) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + + fetch_melon_chart_task = PythonOperator( + task_id="fetch_melon_chart", + python_callable=fetch_melon_chart, + provide_context=True, + ) + + convert_json_to_csv_task = PythonOperator( + task_id="convert_json_to_csv", + python_callable=convert_json_to_csv, + provide_context=True, + ) + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", + python_callable=upload_to_s3, + provide_context=True, + ) + + ( + get_spotify_token_task + >> fetch_melon_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py new file mode 100644 index 0000000..11fc5fa --- /dev/null +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -0,0 +1,73 @@ +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.models import Variable +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator + +# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) +AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) +AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None) + +if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: + raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") + +# S3 및 Snowflake 설정 +S3_BUCKET = "s3a://de5-s4tify" + +# Spark JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR", "/opt/spark/jars") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# DAG 기본 설정 +default_args = { + "owner": "airflow", + "start_date": datetime(2025, 3, 4), + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +dag = DAG( + "s3_to_snowflake_pipeline", + default_args=default_args, + description="Read from S3, process data with Spark, and store in Snowflake", + schedule_interval="0 2 * * *", + catchup=False, +) + +# Spark 작업 스크립트 경로 설정 +AIRFLOW_HOME = os.getenv("AIRFLOW_HOME", "/opt/airflow") +spark_script_path = os.path.abspath( + os.path.join(AIRFLOW_HOME, "dags", "scripts", "S3_Spark_SnowFlake_ELT.py") +) + +# SparkSubmitOperator 설정 +spark_submit_task = SparkSubmitOperator( + task_id="spark_submit_task", + application=spark_script_path, + conn_id="spark_conn", + executor_memory="4g", + executor_cores=4, + driver_memory="4g", + name="s3_to_snowflake_pipeline", + execution_timeout=timedelta(minutes=45), + verbose=True, + jars=SPARK_JARS, + conf={ + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.access.key": AWS_ACCESS_KEY, + "spark.hadoop.fs.s3a.secret.key": AWS_SECRET_KEY, + "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", + "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + }, + dag=dag, +) + +spark_submit_task diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index e9038df..4ec1d40 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -2,72 +2,35 @@ import json from datetime import datetime, timedelta -import snowflake.connector -from vibe import ChartData # vibe.py 모듈 import +import requests +from plugins.get_artist_data import get_artist_genre, search_artist_id +from plugins.vibe import ChartData # vibe.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.csv" + +# S3 설정 S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.csv" +S3_CSV_KEY = f"raw_data/vibe_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/vibe_chart_with_genre_{TODAY}.csv" -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -""" -# Snowflake 저장 함수 -def save_to_snowflake(): - connection = BaseHook.get_connection('S4tify_SnowFlake') # Snowflake 연결 가져오기 - - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, - password=connection.password, - account=connection.host, - warehouse=connection.extra_dejson.get('warehouse'), - database=connection.extra_dejson.get('database'), - schema=connection.extra_dejson.get('schema'), - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM vibe_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO vibe_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - - -# Vibe 차트 데이터를 가져와 JSON으로 저장하는 함수 +# 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, "title": entry.title, @@ -75,32 +38,52 @@ def fetch_vibe_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, + "genres": genre.split(", ") if genre else [], } - for entry in chart.entries - ], - } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환하는 함수 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH + ) + return chart_data + + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_vibe_chart") + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False), + ] + ) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + # save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -119,37 +102,33 @@ def convert_json_to_csv(): catchup=False, ) as dag: + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_vibe_chart_task = PythonOperator( task_id="fetch_vibe_chart", python_callable=fetch_vibe_chart, + provide_context=True, ) convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", - python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_vibe_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + ( + get_spotify_token_task + >> fetch_vibe_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/plugins/get_artist_data.py b/airflow/dags/plugins/get_artist_data.py new file mode 100644 index 0000000..639a064 --- /dev/null +++ b/airflow/dags/plugins/get_artist_data.py @@ -0,0 +1,54 @@ +import requests +from scripts.get_access_token import get_token + +from airflow.models import Variable + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + print("headers : ", headers) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") + return artist_id + else: + print( + f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) + return None + + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + genre_str = ", ".join(genres) if genres else "Unknown" + print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") + return genre_str + else: + print( + f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) + return "Unknown" diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py new file mode 100644 index 0000000..9c656ff --- /dev/null +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -0,0 +1,242 @@ +import os +from datetime import datetime + +import snowflake.connector +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, count, lit, when + +from airflow.models import Variable + +# Spark JARs 설정 +SPARK_HOME = "/opt/spark/" +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# Snowflake 연결 정보 설정 +SNOWFLAKE_OPTIONS = { + "user": Variable.get("SNOWFLAKE_USER"), + "password": Variable.get("SNOWFLAKE_PASSWORD"), + "account": Variable.get("SNOWFLAKE_ACCOUNT"), + "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "schema": ( + Variable.get("SNOWFLAKE_SCHEMA") + if Variable.get("SNOWFLAKE_SCHEMA") + else "raw_data" + ), + "role": "ACCOUNTADMIN", + "driver": "net.snowflake.client.jdbc.SnowflakeDriver", + "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + # "url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", +} + + +# Spark Session 생성 함수 +def spark_session_builder(app_name: str) -> SparkSession: + return ( + SparkSession.builder.appName(app_name) .config( + "spark.jars", + SPARK_JARS) .config( + "spark.hadoop.fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( + "spark.hadoop.fs.s3a.access.key", + Variable.get("AWS_ACCESS_KEY")) .config( + "spark.hadoop.fs.s3a.secret.key", + Variable.get("AWS_SECRET_KEY")) .config( + "spark.hadoop.fs.s3a.endpoint", + "s3.amazonaws.com") .config( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + ) .getOrCreate()) + + +# Snowflake에서 SQL 실행 함수 +def check_and_create_table(): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # 테이블 존재 여부 확인 + cur.execute(f"SHOW TABLES LIKE 'music_charts'") + result = cur.fetchone() + + if result is None: + # 테이블이 존재하지 않으면 생성 + create_table_query = """ + CREATE OR REPLACE TABLE ADHOC.music_charts ( + rank INT, + title STRING, + artist STRING, + lastPos INT, + image STRING, + peakPos INT, + isNew BOOLEAN, + source STRING + ); + """ + cur.execute(create_table_query) + print("✅ music_charts 테이블 생성 완료.") + else: + print("ℹ️ music_charts 테이블이 이미 존재합니다.") + + conn.commit() + cur.close() + conn.close() + + except Exception as e: + print(f"⚠️ 테이블 확인 및 생성 중 오류 발생: {e}") + + +# 문자열에서 작은따옴표 처리 및 NULL 값 처리 +def escape_quotes(value): + if value is None: + return "NULL" # None인 경우에는 'NULL'로 처리 + return "'{}'".format( + value.replace("'", "''") + ) # 작은따옴표는 두 개로 이스케이프 처리 + + +# Snowflake에서 SQL 실행 함수 +def insert_data_into_snowflake(df, table_name): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # DataFrame을 순회하며 INSERT 쿼리 실행 + for row in df.collect(): + # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 + rank = f"NULL" if row["rank"] is None else row["rank"] + title = escape_quotes( + f"NULL" if row["title"] is None else f"'{row['title']}'" + ) + artist = escape_quotes( + f"NULL" if row["artist"] is None else f"'{row['artist']}'" + ) + lastPos = f"NULL" if row["lastPos"] is None else row["lastPos"] + image = escape_quotes( + f"NULL" if row["image"] is None else f"'{row['image']}'" + ) + peakPos = f"NULL" if row["peakPos"] is None else row["peakPos"] + # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 + isNew = ( + f"NULL" + if row["isNew"] is None + else ("True" if row["isNew"] else "FALSE") + ) + source = escape_quotes( + f"NULL" if row["source"] is None else f"'{row['source']}'" + ) + + # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) + query = f""" + INSERT INTO {table_name} (rank, title, artist, lastPos, image, peakPos, isNew, source) + VALUES ({rank}, {title}, {artist}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + """ + cur.execute(query) + + conn.commit() + cur.close() + conn.close() + print("✅ Data inserted into Snowflake successfully.") + except Exception as e: + print(query) + print(f"⚠️ Error inserting data into Snowflake: {e}") + + +# Spark 세션 생성 +spark = spark_session_builder("S3_to_Snowflake") + +# 오늘 날짜 기반 S3 데이터 경로 생성 +TODAY = datetime.now().strftime("%Y%m%d") +S3_BUCKET = "s3a://de5-s4tify" +chart_sources = { + "bugs": f"{S3_BUCKET}/raw_data/bugs_chart/bugs_chart_{TODAY}.csv", + "flo": f"{S3_BUCKET}/raw_data/flo_chart/flo_chart_{TODAY}.csv", + "genie": f"{S3_BUCKET}/raw_data/genie_chart/genie_chart_{TODAY}.csv", + "melon": f"{S3_BUCKET}/raw_data/melon_chart/melon_chart_{TODAY}.csv", + "vibe": f"{S3_BUCKET}/raw_data/vibe_chart/vibe_chart_{TODAY}.csv", +} + + +def read_chart_data(source, path): + try: + df = ( + spark.read.format("csv") + .option("header", True) + .option("inferSchema", True) + .load(path) + ) + return df.withColumn("source", lit(source)) + except Exception as e: + print(f"⚠️ {source} 데이터 로드 실패: {e}") + return None + + +# 차트 데이터 읽기 및 병합 +dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] +dfs = [df for df in dfs if df is not None] + +for df in dfs: + df.show(40) + +if dfs: + merged_df = dfs[0] + for df in dfs[1:]: + merged_df = merged_df.unionByName(df, allowMissingColumns=True) + + final_df = merged_df.select( + when( + col("rank").rlike("^[0-9]+$"), + col("rank").cast("int")).alias("rank"), + col("title"), + col("artist"), + when( + col("lastPos").rlike("^[0-9]+$"), + col("lastPos").cast("int")).alias("lastPos"), + col("image"), + when( + col("peakPos").rlike("^[0-9]+$"), + col("peakPos").cast("int")).alias("peakPos"), + when( + col("isNew").rlike("^(true|false)$"), + col("isNew").cast("boolean")).alias("isNew"), + col("source"), + ) + + final_df.show(40) + + # 데이터 확인 + final_df.groupBy("source").agg(count("*").alias("count")).show() + + # Snowflake에서 테이블 존재 여부 확인 및 생성 + check_and_create_table() + + # Snowflake에 데이터 적재 + insert_data_into_snowflake(final_df, "music_charts") + +else: + print("❌ 저장할 차트 데이터가 없습니다.") + +# Spark 세션 종료 +spark.stop() diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 1a38c45..7abc5b3 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -5,7 +5,6 @@ import pandas as pd from selenium import webdriver - from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By diff --git a/airflow/dags/scripts/get_access_token.py b/airflow/dags/scripts/get_access_token.py index 3b52a22..01e3d8a 100644 --- a/airflow/dags/scripts/get_access_token.py +++ b/airflow/dags/scripts/get_access_token.py @@ -8,6 +8,7 @@ SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") + END_POINT = "https://accounts.spotify.com/api/token" diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 67561a8..53b3e7a 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -1,23 +1,24 @@ -import pandas as pd import os +import pandas as pd + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + def load_s3_bucket(file_name): s3_hook = S3Hook(aws_conn_id="AWS_S3") s3_bucket = "de5-s4tify" s3_key = f"raw_data/{file_name}" - - local_file_path = f'data/{file_name}' - + + local_file_path = f"data/{file_name}" + try: s3_hook.load_file( filename=local_file_path, key=s3_key, bucket_name=s3_bucket, - replace=True - ) - + replace=True) + except Exception as e: print(f"error:{e}") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 23df461..76a8e57 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -1,8 +1,8 @@ import ast +import os import time from datetime import datetime from typing import Any, Dict, Optional -import os import pandas as pd import requests @@ -14,94 +14,106 @@ TODAY = datetime.now().strftime("%Y-%m-%d") END_POINT = "https://api.spotify.com/v1" -SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) +SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) if not SPOTIFY_ACCESS_TOKEN: print("⚠️ Access Token 없음. 새로운 토큰을 가져옵니다.") get_token() - SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) - + SPOTIFY_ACCESS_TOKEN = Variable.get( + "SPOTIFY_ACCESS_TOKEN", default_var=None) + LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") -#아티스트 top 10 트랙 가져오기 +# 아티스트 top 10 트랙 가져오기 def get_arti_top10(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] arti_top10_list = [] object_name = f"spotify_artist_top10_{logical_date}.csv" - - #csv 파일 읽어오기 + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row['artist_id']) - - #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row["artist_id"]) + + # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT+f'/artists/{id}/top-tracks/' + end_point = END_POINT + f"/artists/{id}/top-tracks/" top_10_info = extract(end_point) - + try: - for track in top_10_info['tracks']: - arti_top10_list.append({ - "album": track['album']['name'], - "artist_id": id, - "song_id" : track['id'], - "title" : track['name'] - }) + for track in top_10_info["tracks"]: + arti_top10_list.append( + { + "album": track["album"]["name"], + "artist_id": id, + "song_id": track["id"], + "title": track["name"], + } + ) except Exception as e: print(f"error:{top_10_info}") - - #task_instance.xcom_push(key='artist_top10', value=arti_top10_list) + + # task_instance.xcom_push(key='artist_top10', value=arti_top10_list) artist_top10_df = pd.DataFrame(arti_top10_list) - artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False) + artist_top10_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - -#아티스트 정보 가져오기 + +# 아티스트 정보 가져오기 def get_artist_info(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] artist_info_list = [] - object_name =f"spotify_artist_info_{logical_date}.csv" - - #csv 파일 읽어오기 + object_name = f"spotify_artist_info_{logical_date}.csv" + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) - + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row['artist_id']) - + artist_id = ast.literal_eval(row["artist_id"]) + for i in range(len(artist_id)): id = artist_id[i] - + try: - end_point = END_POINT+f'/artists/{id}' + end_point = END_POINT + f"/artists/{id}" artist_info = extract(end_point) - #print(artist_info['name']) - artist_info_list.append({ - "artist" : artist_info['name'], - "artist_id": id, - "artist_genre": artist_info['genres'] - }) + # print(artist_info['name']) + artist_info_list.append( + { + "artist": artist_info["name"], + "artist_id": id, + "artist_genre": artist_info["genres"], + } + ) except Exception as e: time.sleep(20) print(f"error:{artist_info}") - - #task_instance.xcom_push(key="artist_info", value=artist_info_list) + + # task_instance.xcom_push(key="artist_info", value=artist_info_list) artist_info_df = pd.DataFrame(artist_info_list) - artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False) + artist_info_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - # 크롤링 데이터 읽어오는 함수 def read_crawling_csv(execution_date) -> pd.DataFrame: - - daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv") - + + daily_chart_crawling = pd.read_csv( + f"data/spotify_crawling_data_{execution_date}.csv" + ) + return daily_chart_crawling diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 342dbab..e941d3f 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -10,36 +10,36 @@ "depends_on_past": False, "start_date": datetime(2025, 2, 28), "retries": 1, - "retry_delay": timedelta(seconds=60) + "retry_delay": timedelta(seconds=60), } with DAG( - dag_id = 'GetSpotifyArtistData', + dag_id="GetSpotifyArtistData", default_args=default_args, catchup=False, - tags=['final_project'], - schedule_interval='0 11 * * *' -)as dag: - + tags=["final_project"], + schedule_interval="0 11 * * *", +) as dag: + extract_globalTop50_data = PythonOperator( - task_id = 'extract_global_top50', + task_id="extract_global_top50", python_callable=data_crawling, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistInfo_data = PythonOperator( - task_id = 'extract_artist_info', + task_id="extract_artist_info", python_callable=get_artist_info, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistTop10_data = PythonOperator( - task_id = 'extract_artist_top10', + task_id="extract_artist_top10", python_callable=get_arti_top10, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - - - extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] + extract_globalTop50_data >> [ + extract_artistInfo_data, + extract_artistTop10_data]