From afcd306baebcfc507f159c73466894cdf752c930 Mon Sep 17 00:00:00 2001 From: gland78 Date: Wed, 12 Mar 2025 10:11:51 +0900 Subject: [PATCH 1/5] add_genre_data --- airflow/dags/Bugs_DAG.py | 183 ++++++++------- airflow/dags/Flo_DAG.py | 199 ++++++++-------- airflow/dags/Genie_DAG.py | 192 ++++++++-------- airflow/dags/S3_Spark_SnowFlake_DAG.py | 69 ++++++ airflow/dags/Vibe_DAG.py | 214 +++++++++--------- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 201 ++++++++++++++++ 6 files changed, 673 insertions(+), 385 deletions(-) create mode 100644 airflow/dags/S3_Spark_SnowFlake_DAG.py create mode 100644 airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index be12c25..589afaa 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,95 +1,120 @@ import csv import json +import requests from datetime import datetime, timedelta -import pandas as pd -from bugs import BugsChartPeriod, BugsChartType, ChartData +from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData +from scripts.get_access_token import get_token from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook - -""" -your-s3-bucket-name을 실제 S3 버킷명으로 바꾸고, -✅ Snowflake 연결 정보 및 테이블명을 맞게 설정 -""" +from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -# 파일 경로 -JSON_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.csv" - -# S3 설정 +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.csv" - -""" -# Snowflake 설정 -SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" -SNOWFLAKE_TABLE = "raw_data" -""" - - -# 1. Bugs 차트 데이터 가져오기 및 JSON 저장 +S3_CSV_KEY = f"raw_data/bugs_chart_with_genre_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/bugs_chart_with_genre_{TODAY}.csv" + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + print("headers : ", headers) + + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") + return artist_id + else: + print(f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}") + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + genre_str = ", ".join(genres) if genres else "Unknown" + print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") + return genre_str + else: + print(f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}") + return "Unknown" + +# 1. Bugs 차트 데이터 가져오기 및 JSON 변환 def fetch_bugs_chart(): chart = ChartData( chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, - fetch=True) + fetch=True + ) chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "peakPos": entry.peakPos, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "peakPos": entry.peakPos, + "image": entry.image, + "genre": genre + }) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "peakPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") - - -# 3. AWS S3 업로드 -def upload_to_s3(): +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_bugs_chart") + csv_data = [["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["peakPos"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - # s3_hook.load_file(filename=JSON_PATH, key=S3_JSON_KEY, bucket_name=S3_BUCKET, replace=True) - s3_hook.load_file( - filename=CSV_PATH, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True - ) + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") - -"""# 4. Snowflake 업로드 -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - df = pd.read_csv(CSV_PATH) - snowflake_hook.run(f"DELETE FROM {SNOWFLAKE_TABLE} WHERE DATE = '{TODAY}';") - snowflake_hook.insert_rows(table=SNOWFLAKE_TABLE, rows=df.values.tolist(), target_fields=df.columns.tolist()) - print(f"✅ Snowflake 업로드 완료: {SNOWFLAKE_TABLE}") -""" # DAG 설정 default_args = { "owner": "airflow", @@ -105,29 +130,29 @@ def upload_to_snowflake(): schedule_interval="10 0 * * *", # 매일 00:10 실행 catchup=False, ) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) fetch_bugs_chart_task = PythonOperator( task_id="fetch_bugs_chart", python_callable=fetch_bugs_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - ) - """ - - # DAG 실행 순서 - ( - fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task - ) # upload_snowflake_task + + get_spotify_token_task >> fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 065765c..ba25a3d 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,98 +1,104 @@ import csv import json +import requests from datetime import datetime, timedelta -from flo import ChartData # flo.py 모듈 import - +from plugins.flo import ChartData # flo.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import Variable -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.csv" - - -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -"""# Snowflake 저장 함수 -def save_to_snowflake(): - # Airflow 연결에서 Snowflake 연결 정보 가져오기 - connection = BaseHook.get_connection('s4tify_SnowFlake') # Airflow에 설정된 Snowflake 연결 정보 사용 - - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, # Airflow 연결의 username 사용 - password=connection.password, # Airflow 연결의 password 사용 - account=connection.host, # Airflow 연결의 account 정보 사용 - warehouse=connection.extra_dejson.get('warehouse'), # Extra 정보에서 warehouse 읽기 - database=connection.extra_dejson.get('database'), # Extra 정보에서 database 읽기 - schema=connection.extra_dejson.get('schema'), # Extra 정보에서 schema 읽기 - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM flo_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO flo_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - -# 1. FLO 차트 데이터 가져오기 및 JSON 저장 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/flo_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/flo_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre + }) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_flo_chart") + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -110,38 +116,29 @@ def convert_json_to_csv(): schedule_interval="20 0 * * *", # 매일 00:20 실행 catchup=False, ) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) fetch_flo_chart_task = PythonOperator( task_id="fetch_flo_chart", python_callable=fetch_flo_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", - python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_flo_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + + get_spotify_token_task >> fetch_flo_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 5f2c7ea..e997baa 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,97 +1,103 @@ import csv import json +import requests from datetime import datetime, timedelta -import pandas as pd -from genie import ChartData, GenieChartPeriod # genie.py 모듈 import - +from plugins.genie import ChartData, GenieChartPeriod # genie.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from airflow.models import Variable -# 환경 변수 설정 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_CSV_KEY = f"raw_data/genie_chart/genie_chart_{TODAY}.csv" -# SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" # Airflow에서 설정한 Snowflake 연결 ID - -# Genie 차트 데이터를 가져와 JSON으로 저장 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/genie_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/genie_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "peakPos": entry.peakPos, - "lastPos": entry.lastPos, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환 후 저장 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "peakPos", "lastPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH - - -# S3 업로드 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=S3_CSV_KEY, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{S3_CSV_KEY}") - - -""" -# Snowflake 업로드 (Airflow SnowflakeHook 사용) -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - - # Snowflake에 CSV 파일 업로드 - df = pd.read_csv(CSV_PATH) - - # Snowflake 테이블에 데이터를 삽입 - # 'genie_chart' 테이블에 맞는 컬럼 이름과 데이터 형식을 확인하고 매핑 - snowflake_hook.run(f"DELETE FROM genie_chart WHERE date = '{TODAY}';") - - # Snowflake 테이블에 데이터 삽입 - snowflake_hook.insert_rows( - table="genie_chart", - rows=df.values.tolist(), - target_fields=df.columns.tolist() - ) - - print(f"✅ Snowflake 업로드 완료: genie_chart") -""" + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "peakPos": entry.peakPos, + "lastPos": entry.lastPos, + "image": entry.image, + "genre": genre + }) + return chart_data + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_genie_chart") + csv_data = [["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["peakPos"], entry["lastPos"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 default_args = { @@ -108,27 +114,29 @@ def upload_to_snowflake(): schedule_interval="30 0 * * *", # 매일 00:30 실행 catchup=False, ) as dag: - + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_genie_chart_task = PythonOperator( task_id="fetch_genie_chart", python_callable=fetch_genie_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - upload_to_s3_task = PythonOperator( + + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_to_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - )""" - - ( - fetch_genie_chart_task >> convert_json_to_csv_task >> upload_to_s3_task - ) # upload_to_snowflake_task + + get_spotify_token_task >> fetch_genie_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py new file mode 100644 index 0000000..5006fe6 --- /dev/null +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -0,0 +1,69 @@ +from datetime import datetime, timedelta +import os +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.models import Variable + +# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) +AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) +AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None) + +if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: + raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") + +# S3 및 Snowflake 설정 +S3_BUCKET = "s3a://de5-s4tify" + +# Spark JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR", "/opt/spark/jars") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# DAG 기본 설정 +default_args = { + 'owner': 'airflow', + 'start_date': datetime(2025, 3, 4), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 's3_to_snowflake_pipeline', + default_args=default_args, + description='Read from S3, process data with Spark, and store in Snowflake', + schedule_interval='0 2 * * *', + catchup=False, +) + +# Spark 작업 스크립트 경로 설정 +AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '/opt/airflow') +spark_script_path = os.path.abspath(os.path.join(AIRFLOW_HOME, "dags", "scripts", "S3_Spark_SnowFlake_ELT.py")) + +# SparkSubmitOperator 설정 +spark_submit_task = SparkSubmitOperator( + task_id="spark_submit_task", + application=spark_script_path, + conn_id="spark_conn", + executor_memory='4g', + executor_cores=4, + driver_memory='4g', + name='s3_to_snowflake_pipeline', + execution_timeout=timedelta(minutes=45), + verbose=True, + jars=SPARK_JARS, + conf={ + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.access.key": AWS_ACCESS_KEY, + "spark.hadoop.fs.s3a.secret.key": AWS_SECRET_KEY, + "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", + "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + }, + dag=dag, +) + +spark_submit_task diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index e9038df..5e9f700 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,106 +1,103 @@ import csv import json +import requests from datetime import datetime, timedelta -import snowflake.connector -from vibe import ChartData # vibe.py 모듈 import - +from plugins.vibe import ChartData # vibe.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import Variable -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.csv" - - -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -""" -# Snowflake 저장 함수 -def save_to_snowflake(): - connection = BaseHook.get_connection('S4tify_SnowFlake') # Snowflake 연결 가져오기 - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, - password=connection.password, - account=connection.host, - warehouse=connection.extra_dejson.get('warehouse'), - database=connection.extra_dejson.get('database'), - schema=connection.extra_dejson.get('schema'), - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM vibe_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO vibe_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - - -# Vibe 차트 데이터를 가져와 JSON으로 저장하는 함수 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/vibe_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/vibe_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환하는 함수 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre + }) + return chart_data + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_vibe_chart") + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -118,38 +115,29 @@ def convert_json_to_csv(): schedule_interval="45 0 * * *", # 매일 00:45 실행 catchup=False, ) as dag: - + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_vibe_chart_task = PythonOperator( task_id="fetch_vibe_chart", python_callable=fetch_vibe_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", - python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_vibe_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + + get_spotify_token_task >> fetch_vibe_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py new file mode 100644 index 0000000..83a1685 --- /dev/null +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -0,0 +1,201 @@ +import os +from datetime import datetime +import snowflake.connector +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, lit, when, count +from airflow.models import Variable + +# Spark JARs 설정 +SPARK_HOME = "/opt/spark/" +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# Snowflake 연결 정보 설정 +SNOWFLAKE_OPTIONS = { + "user": Variable.get("SNOWFLAKE_USER"), + "password": Variable.get("SNOWFLAKE_PASSWORD"), + "account": Variable.get("SNOWFLAKE_ACCOUNT"), + "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "schema": Variable.get("SNOWFLAKE_SCHEMA") if Variable.get("SNOWFLAKE_SCHEMA") else "raw_data", + "role": "ACCOUNTADMIN", + "driver": "net.snowflake.client.jdbc.SnowflakeDriver", + "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + #"url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", +} + +# Spark Session 생성 함수 +def spark_session_builder(app_name: str) -> SparkSession: + return ( + SparkSession.builder.appName(app_name) + .config("spark.jars", SPARK_JARS) + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", Variable.get("AWS_ACCESS_KEY")) + .config("spark.hadoop.fs.s3a.secret.key", Variable.get("AWS_SECRET_KEY")) + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + .getOrCreate() + ) + +# Snowflake에서 SQL 실행 함수 +def check_and_create_table(): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # 테이블 존재 여부 확인 + cur.execute(f"SHOW TABLES LIKE 'music_charts'") + result = cur.fetchone() + + if result is None: + # 테이블이 존재하지 않으면 생성 + create_table_query = """ + CREATE OR REPLACE TABLE ADHOC.music_charts ( + rank INT, + title STRING, + artist STRING, + lastPos INT, + image STRING, + peakPos INT, + isNew BOOLEAN, + source STRING + ); + """ + cur.execute(create_table_query) + print("✅ music_charts 테이블 생성 완료.") + else: + print("ℹ️ music_charts 테이블이 이미 존재합니다.") + + conn.commit() + cur.close() + conn.close() + + except Exception as e: + print(f"⚠️ 테이블 확인 및 생성 중 오류 발생: {e}") + + +# 문자열에서 작은따옴표 처리 및 NULL 값 처리 +def escape_quotes(value): + if value is None: + return "NULL" # None인 경우에는 'NULL'로 처리 + return f"'{value.replace('\'', '\'\'')}'" # 작은따옴표는 두 개로 이스케이프 처리 + +# Snowflake에서 SQL 실행 함수 +def insert_data_into_snowflake(df, table_name): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # DataFrame을 순회하며 INSERT 쿼리 실행 + for row in df.collect(): + # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 + rank = f"NULL" if row['rank'] is None else row['rank'] + title = escape_quotes(f"NULL" if row['title'] is None else f"'{row['title']}'") + artist = escape_quotes(f"NULL" if row['artist'] is None else f"'{row['artist']}'") + lastPos = f"NULL" if row['lastPos'] is None else row['lastPos'] + image = escape_quotes(f"NULL" if row['image'] is None else f"'{row['image']}'") + peakPos = f"NULL" if row['peakPos'] is None else row['peakPos'] + # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 + isNew = f"NULL" if row['isNew'] is None else ("True" if row['isNew'] else "FALSE") + source = escape_quotes(f"NULL" if row['source'] is None else f"'{row['source']}'") + + # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) + query = f""" + INSERT INTO {table_name} (rank, title, artist, lastPos, image, peakPos, isNew, source) + VALUES ({rank}, {title}, {artist}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + """ + cur.execute(query) + + conn.commit() + cur.close() + conn.close() + print("✅ Data inserted into Snowflake successfully.") + except Exception as e: + print(query) + print(f"⚠️ Error inserting data into Snowflake: {e}") + + + + +# Spark 세션 생성 +spark = spark_session_builder("S3_to_Snowflake") + +# 오늘 날짜 기반 S3 데이터 경로 생성 +TODAY = datetime.now().strftime("%Y%m%d") +S3_BUCKET = "s3a://de5-s4tify" +chart_sources = { + "bugs": f"{S3_BUCKET}/raw_data/bugs_chart/bugs_chart_{TODAY}.csv", + "flo": f"{S3_BUCKET}/raw_data/flo_chart/flo_chart_{TODAY}.csv", + "genie": f"{S3_BUCKET}/raw_data/genie_chart/genie_chart_{TODAY}.csv", + "melon": f"{S3_BUCKET}/raw_data/melon_chart/melon_chart_{TODAY}.csv", + "vibe": f"{S3_BUCKET}/raw_data/vibe_chart/vibe_chart_{TODAY}.csv" +} + +def read_chart_data(source, path): + try: + df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(path) + return df.withColumn("source", lit(source)) + except Exception as e: + print(f"⚠️ {source} 데이터 로드 실패: {e}") + return None + +# 차트 데이터 읽기 및 병합 +dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] +dfs = [df for df in dfs if df is not None] + +for df in dfs: + df.show(40) + +if dfs: + merged_df = dfs[0] + for df in dfs[1:]: + merged_df = merged_df.unionByName(df, allowMissingColumns=True) + + final_df = merged_df.select( + when(col("rank").rlike("^[0-9]+$"), col("rank").cast("int")).alias("rank"), + col("title"), + col("artist"), + when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias("lastPos"), + col("image"), + when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias("peakPos"), + when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias("isNew"), + col("source") + ) + + final_df.show(40) + + # 데이터 확인 + final_df.groupBy("source").agg(count("*").alias("count")).show() + + # Snowflake에서 테이블 존재 여부 확인 및 생성 + check_and_create_table() + + # Snowflake에 데이터 적재 + insert_data_into_snowflake(final_df, "music_charts") + +else: + print("❌ 저장할 차트 데이터가 없습니다.") + +# Spark 세션 종료 +spark.stop() From 099cec0dc692ada30f68dcb0da7f46b4c35d9524 Mon Sep 17 00:00:00 2001 From: gland78 Date: Wed, 12 Mar 2025 10:22:28 +0900 Subject: [PATCH 2/5] python_format_fix --- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 83a1685..a0bfab7 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -91,7 +91,7 @@ def check_and_create_table(): def escape_quotes(value): if value is None: return "NULL" # None인 경우에는 'NULL'로 처리 - return f"'{value.replace('\'', '\'\'')}'" # 작은따옴표는 두 개로 이스케이프 처리 + return "'{}'".format(value.replace("'", "''")) # 작은따옴표는 두 개로 이스케이프 처리 # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): From 34759bd354701e1cab6598081c7307fc1ab21259 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 12 Mar 2025 01:23:16 +0000 Subject: [PATCH 3/5] Automated format fixes --- airflow/dags/Bugs_DAG.py | 100 ++++++++++------ airflow/dags/Flo_DAG.py | 88 +++++++++----- airflow/dags/Genie_DAG.py | 86 +++++++++----- airflow/dags/S3_Spark_SnowFlake_DAG.py | 32 ++--- airflow/dags/Vibe_DAG.py | 85 ++++++++----- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 103 +++++++++++----- airflow/dags/scripts/crawling_spotify_data.py | 1 - airflow/dags/scripts/load_spotify_data.py | 15 +-- airflow/dags/scripts/request_spotify_api.py | 112 ++++++++++-------- airflow/dags/spotify_data_dag.py | 34 +++--- 10 files changed, 407 insertions(+), 249 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 589afaa..94b04d1 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,15 +1,15 @@ import csv import json -import requests from datetime import datetime, timedelta +import requests from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData from scripts.get_access_token import get_token from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") @@ -21,6 +21,7 @@ S3_CSV_KEY = f"raw_data/bugs_chart_with_genre_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/bugs_chart_with_genre_{TODAY}.csv" + # Spotify API에서 아티스트 ID 검색 def search_artist_id(artist_name): SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) @@ -29,9 +30,8 @@ def search_artist_id(artist_name): headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} params = {"q": artist_name, "type": "artist", "limit": 1} response = requests.get(url, headers=headers, params=params) - - print("headers : ", headers) + print("headers : ", headers) if response.status_code == 200: artists = response.json().get("artists", {}).get("items", []) @@ -39,53 +39,58 @@ def search_artist_id(artist_name): print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") return artist_id else: - print(f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}") + print( + f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) return None + # Spotify API에서 아티스트 장르 가져오기 def get_artist_genre(artist_id): if not artist_id: return "Unknown" - + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) url = f"{SPOTIFY_API_URL}/artists/{artist_id}" headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} response = requests.get(url, headers=headers) - + if response.status_code == 200: genres = response.json().get("genres", []) genre_str = ", ".join(genres) if genres else "Unknown" print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") return genre_str else: - print(f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}") + print( + f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) return "Unknown" + # 1. Bugs 차트 데이터 가져오기 및 JSON 변환 def fetch_bugs_chart(): chart = ChartData( chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, - fetch=True - ) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + fetch=True) + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "peakPos": entry.peakPos, - "image": entry.image, - "genre": genre - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "peakPos": entry.peakPos, + "image": entry.image, + "genre": genre, + } + ) return chart_data @@ -93,28 +98,44 @@ def fetch_bugs_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") - csv_data = [["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "peakPos", "image", "genre"]] for entry in data["entries"]: - csv_data.append([ - entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["peakPos"], entry["image"], entry["genre"] - ]) + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["peakPos"], + entry["image"], + entry["genre"], + ] + ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) return csv_string + # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 4. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") + # DAG 설정 default_args = { "owner": "airflow", @@ -130,11 +151,11 @@ def upload_to_s3(**kwargs): schedule_interval="10 0 * * *", # 매일 00:10 실행 catchup=False, ) as dag: - + get_spotify_token_task = PythonOperator( - task_id="get_spotify_token", - python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 - provide_context=True, + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, ) fetch_bugs_chart_task = PythonOperator( @@ -142,17 +163,22 @@ def upload_to_s3(**kwargs): python_callable=fetch_bugs_chart, provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, provide_context=True, ) - - get_spotify_token_task >> fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task + + ( + get_spotify_token_task + >> fetch_bugs_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index ba25a3d..ccbb8b8 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,14 +1,15 @@ import csv import json -import requests from datetime import datetime, timedelta +import requests from plugins.flo import ChartData # flo.py 모듈 import from scripts.get_access_token import get_token + from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") @@ -22,6 +23,7 @@ SPOTIFY_API_URL = "https://api.spotify.com/v1" SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + # Spotify API에서 아티스트 ID 검색 def search_artist_id(artist_name): SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) @@ -29,48 +31,50 @@ def search_artist_id(artist_name): headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} params = {"q": artist_name, "type": "artist", "limit": 1} response = requests.get(url, headers=headers, params=params) - + if response.status_code == 200: artists = response.json().get("artists", {}).get("items", []) artist_id = artists[0]["id"] if artists else None return artist_id return None + # Spotify API에서 아티스트 장르 가져오기 def get_artist_genre(artist_id): if not artist_id: return "Unknown" - + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) url = f"{SPOTIFY_API_URL}/artists/{artist_id}" headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} response = requests.get(url, headers=headers) - + if response.status_code == 200: genres = response.json().get("genres", []) return ", ".join(genres) if genres else "Unknown" return "Unknown" + # 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - "genre": genre - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre, + } + ) return chart_data @@ -78,26 +82,41 @@ def fetch_flo_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_flo_chart") - csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: - csv_data.append([ - entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] - ]) + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + entry["genre"], + ] + ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) return csv_string + # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 4. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") @@ -116,11 +135,11 @@ def upload_to_s3(**kwargs): schedule_interval="20 0 * * *", # 매일 00:20 실행 catchup=False, ) as dag: - + get_spotify_token_task = PythonOperator( - task_id="get_spotify_token", - python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 - provide_context=True, + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, ) fetch_flo_chart_task = PythonOperator( @@ -128,17 +147,22 @@ def upload_to_s3(**kwargs): python_callable=fetch_flo_chart, provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, provide_context=True, ) - - get_spotify_token_task >> fetch_flo_chart_task >> convert_json_to_csv_task >> upload_s3_task + + ( + get_spotify_token_task + >> fetch_flo_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index e997baa..baf31b3 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,14 +1,15 @@ import csv import json -import requests from datetime import datetime, timedelta +import requests from plugins.genie import ChartData, GenieChartPeriod # genie.py 모듈 import from scripts.get_access_token import get_token + from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") @@ -22,6 +23,7 @@ SPOTIFY_API_URL = "https://api.spotify.com/v1" SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + # Spotify API에서 아티스트 ID 검색 def search_artist_id(artist_name): SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) @@ -29,76 +31,95 @@ def search_artist_id(artist_name): headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} params = {"q": artist_name, "type": "artist", "limit": 1} response = requests.get(url, headers=headers, params=params) - + if response.status_code == 200: artists = response.json().get("artists", {}).get("items", []) artist_id = artists[0]["id"] if artists else None return artist_id return None + # Spotify API에서 아티스트 장르 가져오기 def get_artist_genre(artist_id): if not artist_id: return "Unknown" - + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) url = f"{SPOTIFY_API_URL}/artists/{artist_id}" headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} response = requests.get(url, headers=headers) - + if response.status_code == 200: genres = response.json().get("genres", []) return ", ".join(genres) if genres else "Unknown" return "Unknown" + # 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "peakPos": entry.peakPos, - "lastPos": entry.lastPos, - "image": entry.image, - "genre": genre - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "peakPos": entry.peakPos, + "lastPos": entry.lastPos, + "image": entry.image, + "genre": genre, + } + ) return chart_data + # 2. JSON → CSV 변환 def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_genie_chart") - csv_data = [["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "peakPos", "lastPos", "image", "genre"]] for entry in data["entries"]: - csv_data.append([ - entry["rank"], entry["title"], entry["artist"], entry["peakPos"], entry["lastPos"], entry["image"], entry["genre"] - ]) + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["peakPos"], + entry["lastPos"], + entry["image"], + entry["genre"], + ] + ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) return csv_string + # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 4. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") + # DAG 설정 default_args = { "owner": "airflow", @@ -114,29 +135,34 @@ def upload_to_s3(**kwargs): schedule_interval="30 0 * * *", # 매일 00:30 실행 catchup=False, ) as dag: - + get_spotify_token_task = PythonOperator( task_id="get_spotify_token", python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 provide_context=True, ) - + fetch_genie_chart_task = PythonOperator( task_id="fetch_genie_chart", python_callable=fetch_genie_chart, provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, provide_context=True, ) - - get_spotify_token_task >> fetch_genie_chart_task >> convert_json_to_csv_task >> upload_s3_task + + ( + get_spotify_token_task + >> fetch_genie_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 5006fe6..11fc5fa 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -1,8 +1,10 @@ -from datetime import datetime, timedelta import os +from datetime import datetime, timedelta + from airflow import DAG -from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.models import Variable +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator # Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) @@ -26,33 +28,35 @@ # DAG 기본 설정 default_args = { - 'owner': 'airflow', - 'start_date': datetime(2025, 3, 4), - 'retries': 1, - 'retry_delay': timedelta(minutes=5), + "owner": "airflow", + "start_date": datetime(2025, 3, 4), + "retries": 1, + "retry_delay": timedelta(minutes=5), } dag = DAG( - 's3_to_snowflake_pipeline', + "s3_to_snowflake_pipeline", default_args=default_args, - description='Read from S3, process data with Spark, and store in Snowflake', - schedule_interval='0 2 * * *', + description="Read from S3, process data with Spark, and store in Snowflake", + schedule_interval="0 2 * * *", catchup=False, ) # Spark 작업 스크립트 경로 설정 -AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '/opt/airflow') -spark_script_path = os.path.abspath(os.path.join(AIRFLOW_HOME, "dags", "scripts", "S3_Spark_SnowFlake_ELT.py")) +AIRFLOW_HOME = os.getenv("AIRFLOW_HOME", "/opt/airflow") +spark_script_path = os.path.abspath( + os.path.join(AIRFLOW_HOME, "dags", "scripts", "S3_Spark_SnowFlake_ELT.py") +) # SparkSubmitOperator 설정 spark_submit_task = SparkSubmitOperator( task_id="spark_submit_task", application=spark_script_path, conn_id="spark_conn", - executor_memory='4g', + executor_memory="4g", executor_cores=4, - driver_memory='4g', - name='s3_to_snowflake_pipeline', + driver_memory="4g", + name="s3_to_snowflake_pipeline", execution_timeout=timedelta(minutes=45), verbose=True, jars=SPARK_JARS, diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 5e9f700..ac5cb53 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,14 +1,15 @@ import csv import json -import requests from datetime import datetime, timedelta +import requests from plugins.vibe import ChartData # vibe.py 모듈 import from scripts.get_access_token import get_token + from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") @@ -22,6 +23,7 @@ SPOTIFY_API_URL = "https://api.spotify.com/v1" SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + # Spotify API에서 아티스트 ID 검색 def search_artist_id(artist_name): SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) @@ -29,74 +31,92 @@ def search_artist_id(artist_name): headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} params = {"q": artist_name, "type": "artist", "limit": 1} response = requests.get(url, headers=headers, params=params) - + if response.status_code == 200: artists = response.json().get("artists", {}).get("items", []) artist_id = artists[0]["id"] if artists else None return artist_id return None + # Spotify API에서 아티스트 장르 가져오기 def get_artist_genre(artist_id): if not artist_id: return "Unknown" - + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) url = f"{SPOTIFY_API_URL}/artists/{artist_id}" headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} response = requests.get(url, headers=headers) - + if response.status_code == 200: genres = response.json().get("genres", []) return ", ".join(genres) if genres else "Unknown" return "Unknown" + # 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - "genre": genre - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre, + } + ) return chart_data + # 2. JSON → CSV 변환 def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_vibe_chart") - csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: - csv_data.append([ - entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] - ]) + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + entry["genre"], + ] + ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) return csv_string + # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 4. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") @@ -115,29 +135,34 @@ def upload_to_s3(**kwargs): schedule_interval="45 0 * * *", # 매일 00:45 실행 catchup=False, ) as dag: - + get_spotify_token_task = PythonOperator( task_id="get_spotify_token", python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 provide_context=True, ) - + fetch_vibe_chart_task = PythonOperator( task_id="fetch_vibe_chart", python_callable=fetch_vibe_chart, provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, provide_context=True, ) - - get_spotify_token_task >> fetch_vibe_chart_task >> convert_json_to_csv_task >> upload_s3_task + + ( + get_spotify_token_task + >> fetch_vibe_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index a0bfab7..9c656ff 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -1,8 +1,10 @@ import os from datetime import datetime + import snowflake.connector from pyspark.sql import SparkSession -from pyspark.sql.functions import col, lit, when, count +from pyspark.sql.functions import col, count, lit, when + from airflow.models import Variable # Spark JARs 설정 @@ -22,25 +24,36 @@ "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": Variable.get("SNOWFLAKE_SCHEMA") if Variable.get("SNOWFLAKE_SCHEMA") else "raw_data", + "schema": ( + Variable.get("SNOWFLAKE_SCHEMA") + if Variable.get("SNOWFLAKE_SCHEMA") + else "raw_data" + ), "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', - #"url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", + # "url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", } + # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( - SparkSession.builder.appName(app_name) - .config("spark.jars", SPARK_JARS) - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .config("spark.hadoop.fs.s3a.access.key", Variable.get("AWS_ACCESS_KEY")) - .config("spark.hadoop.fs.s3a.secret.key", Variable.get("AWS_SECRET_KEY")) - .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") - .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") - .getOrCreate() - ) + SparkSession.builder.appName(app_name) .config( + "spark.jars", + SPARK_JARS) .config( + "spark.hadoop.fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( + "spark.hadoop.fs.s3a.access.key", + Variable.get("AWS_ACCESS_KEY")) .config( + "spark.hadoop.fs.s3a.secret.key", + Variable.get("AWS_SECRET_KEY")) .config( + "spark.hadoop.fs.s3a.endpoint", + "s3.amazonaws.com") .config( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + ) .getOrCreate()) + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): @@ -59,7 +72,7 @@ def check_and_create_table(): # 테이블 존재 여부 확인 cur.execute(f"SHOW TABLES LIKE 'music_charts'") result = cur.fetchone() - + if result is None: # 테이블이 존재하지 않으면 생성 create_table_query = """ @@ -91,7 +104,10 @@ def check_and_create_table(): def escape_quotes(value): if value is None: return "NULL" # None인 경우에는 'NULL'로 처리 - return "'{}'".format(value.replace("'", "''")) # 작은따옴표는 두 개로 이스케이프 처리 + return "'{}'".format( + value.replace("'", "''") + ) # 작은따옴표는 두 개로 이스케이프 처리 + # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): @@ -110,15 +126,27 @@ def insert_data_into_snowflake(df, table_name): # DataFrame을 순회하며 INSERT 쿼리 실행 for row in df.collect(): # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 - rank = f"NULL" if row['rank'] is None else row['rank'] - title = escape_quotes(f"NULL" if row['title'] is None else f"'{row['title']}'") - artist = escape_quotes(f"NULL" if row['artist'] is None else f"'{row['artist']}'") - lastPos = f"NULL" if row['lastPos'] is None else row['lastPos'] - image = escape_quotes(f"NULL" if row['image'] is None else f"'{row['image']}'") - peakPos = f"NULL" if row['peakPos'] is None else row['peakPos'] + rank = f"NULL" if row["rank"] is None else row["rank"] + title = escape_quotes( + f"NULL" if row["title"] is None else f"'{row['title']}'" + ) + artist = escape_quotes( + f"NULL" if row["artist"] is None else f"'{row['artist']}'" + ) + lastPos = f"NULL" if row["lastPos"] is None else row["lastPos"] + image = escape_quotes( + f"NULL" if row["image"] is None else f"'{row['image']}'" + ) + peakPos = f"NULL" if row["peakPos"] is None else row["peakPos"] # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 - isNew = f"NULL" if row['isNew'] is None else ("True" if row['isNew'] else "FALSE") - source = escape_quotes(f"NULL" if row['source'] is None else f"'{row['source']}'") + isNew = ( + f"NULL" + if row["isNew"] is None + else ("True" if row["isNew"] else "FALSE") + ) + source = escape_quotes( + f"NULL" if row["source"] is None else f"'{row['source']}'" + ) # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) query = f""" @@ -136,8 +164,6 @@ def insert_data_into_snowflake(df, table_name): print(f"⚠️ Error inserting data into Snowflake: {e}") - - # Spark 세션 생성 spark = spark_session_builder("S3_to_Snowflake") @@ -149,17 +175,24 @@ def insert_data_into_snowflake(df, table_name): "flo": f"{S3_BUCKET}/raw_data/flo_chart/flo_chart_{TODAY}.csv", "genie": f"{S3_BUCKET}/raw_data/genie_chart/genie_chart_{TODAY}.csv", "melon": f"{S3_BUCKET}/raw_data/melon_chart/melon_chart_{TODAY}.csv", - "vibe": f"{S3_BUCKET}/raw_data/vibe_chart/vibe_chart_{TODAY}.csv" + "vibe": f"{S3_BUCKET}/raw_data/vibe_chart/vibe_chart_{TODAY}.csv", } + def read_chart_data(source, path): try: - df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(path) + df = ( + spark.read.format("csv") + .option("header", True) + .option("inferSchema", True) + .load(path) + ) return df.withColumn("source", lit(source)) except Exception as e: print(f"⚠️ {source} 데이터 로드 실패: {e}") return None + # 차트 데이터 읽기 및 병합 dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] dfs = [df for df in dfs if df is not None] @@ -173,14 +206,22 @@ def read_chart_data(source, path): merged_df = merged_df.unionByName(df, allowMissingColumns=True) final_df = merged_df.select( - when(col("rank").rlike("^[0-9]+$"), col("rank").cast("int")).alias("rank"), + when( + col("rank").rlike("^[0-9]+$"), + col("rank").cast("int")).alias("rank"), col("title"), col("artist"), - when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias("lastPos"), + when( + col("lastPos").rlike("^[0-9]+$"), + col("lastPos").cast("int")).alias("lastPos"), col("image"), - when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias("peakPos"), - when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias("isNew"), - col("source") + when( + col("peakPos").rlike("^[0-9]+$"), + col("peakPos").cast("int")).alias("peakPos"), + when( + col("isNew").rlike("^(true|false)$"), + col("isNew").cast("boolean")).alias("isNew"), + col("source"), ) final_df.show(40) diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 1a38c45..7abc5b3 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -5,7 +5,6 @@ import pandas as pd from selenium import webdriver - from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 67561a8..53b3e7a 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -1,23 +1,24 @@ -import pandas as pd import os +import pandas as pd + from airflow.providers.amazon.aws.hooks.s3 import S3Hook + def load_s3_bucket(file_name): s3_hook = S3Hook(aws_conn_id="AWS_S3") s3_bucket = "de5-s4tify" s3_key = f"raw_data/{file_name}" - - local_file_path = f'data/{file_name}' - + + local_file_path = f"data/{file_name}" + try: s3_hook.load_file( filename=local_file_path, key=s3_key, bucket_name=s3_bucket, - replace=True - ) - + replace=True) + except Exception as e: print(f"error:{e}") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 23df461..76a8e57 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -1,8 +1,8 @@ import ast +import os import time from datetime import datetime from typing import Any, Dict, Optional -import os import pandas as pd import requests @@ -14,94 +14,106 @@ TODAY = datetime.now().strftime("%Y-%m-%d") END_POINT = "https://api.spotify.com/v1" -SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) +SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) if not SPOTIFY_ACCESS_TOKEN: print("⚠️ Access Token 없음. 새로운 토큰을 가져옵니다.") get_token() - SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) - + SPOTIFY_ACCESS_TOKEN = Variable.get( + "SPOTIFY_ACCESS_TOKEN", default_var=None) + LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") -#아티스트 top 10 트랙 가져오기 +# 아티스트 top 10 트랙 가져오기 def get_arti_top10(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] arti_top10_list = [] object_name = f"spotify_artist_top10_{logical_date}.csv" - - #csv 파일 읽어오기 + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row['artist_id']) - - #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row["artist_id"]) + + # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT+f'/artists/{id}/top-tracks/' + end_point = END_POINT + f"/artists/{id}/top-tracks/" top_10_info = extract(end_point) - + try: - for track in top_10_info['tracks']: - arti_top10_list.append({ - "album": track['album']['name'], - "artist_id": id, - "song_id" : track['id'], - "title" : track['name'] - }) + for track in top_10_info["tracks"]: + arti_top10_list.append( + { + "album": track["album"]["name"], + "artist_id": id, + "song_id": track["id"], + "title": track["name"], + } + ) except Exception as e: print(f"error:{top_10_info}") - - #task_instance.xcom_push(key='artist_top10', value=arti_top10_list) + + # task_instance.xcom_push(key='artist_top10', value=arti_top10_list) artist_top10_df = pd.DataFrame(arti_top10_list) - artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False) + artist_top10_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - -#아티스트 정보 가져오기 + +# 아티스트 정보 가져오기 def get_artist_info(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] artist_info_list = [] - object_name =f"spotify_artist_info_{logical_date}.csv" - - #csv 파일 읽어오기 + object_name = f"spotify_artist_info_{logical_date}.csv" + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) - + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row['artist_id']) - + artist_id = ast.literal_eval(row["artist_id"]) + for i in range(len(artist_id)): id = artist_id[i] - + try: - end_point = END_POINT+f'/artists/{id}' + end_point = END_POINT + f"/artists/{id}" artist_info = extract(end_point) - #print(artist_info['name']) - artist_info_list.append({ - "artist" : artist_info['name'], - "artist_id": id, - "artist_genre": artist_info['genres'] - }) + # print(artist_info['name']) + artist_info_list.append( + { + "artist": artist_info["name"], + "artist_id": id, + "artist_genre": artist_info["genres"], + } + ) except Exception as e: time.sleep(20) print(f"error:{artist_info}") - - #task_instance.xcom_push(key="artist_info", value=artist_info_list) + + # task_instance.xcom_push(key="artist_info", value=artist_info_list) artist_info_df = pd.DataFrame(artist_info_list) - artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False) + artist_info_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - # 크롤링 데이터 읽어오는 함수 def read_crawling_csv(execution_date) -> pd.DataFrame: - - daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv") - + + daily_chart_crawling = pd.read_csv( + f"data/spotify_crawling_data_{execution_date}.csv" + ) + return daily_chart_crawling diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 342dbab..e941d3f 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -10,36 +10,36 @@ "depends_on_past": False, "start_date": datetime(2025, 2, 28), "retries": 1, - "retry_delay": timedelta(seconds=60) + "retry_delay": timedelta(seconds=60), } with DAG( - dag_id = 'GetSpotifyArtistData', + dag_id="GetSpotifyArtistData", default_args=default_args, catchup=False, - tags=['final_project'], - schedule_interval='0 11 * * *' -)as dag: - + tags=["final_project"], + schedule_interval="0 11 * * *", +) as dag: + extract_globalTop50_data = PythonOperator( - task_id = 'extract_global_top50', + task_id="extract_global_top50", python_callable=data_crawling, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistInfo_data = PythonOperator( - task_id = 'extract_artist_info', + task_id="extract_artist_info", python_callable=get_artist_info, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistTop10_data = PythonOperator( - task_id = 'extract_artist_top10', + task_id="extract_artist_top10", python_callable=get_arti_top10, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - - - extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] + extract_globalTop50_data >> [ + extract_artistInfo_data, + extract_artistTop10_data] From 24d23f2447ff8b2d14d65ef5444325cf46fb02b3 Mon Sep 17 00:00:00 2001 From: gland78 Date: Wed, 12 Mar 2025 15:06:56 +0900 Subject: [PATCH 4/5] =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/Bugs_DAG.py | 76 ++------ airflow/dags/Flo_DAG.py | 46 +---- airflow/dags/Genie_DAG.py | 53 +----- airflow/dags/Melon_DAG.py | 217 +++++++++-------------- airflow/dags/Vibe_DAG.py | 46 +---- airflow/dags/plugins/get_artist_data.py | 54 ++++++ airflow/dags/scripts/get_access_token.py | 1 + 7 files changed, 172 insertions(+), 321 deletions(-) create mode 100644 airflow/dags/plugins/get_artist_data.py diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 94b04d1..00ac6f1 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -4,8 +4,10 @@ import requests from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData +from plugins.get_artist_data import get_artist_genre, search_artist_id from scripts.get_access_token import get_token + from airflow import DAG from airflow.models import Variable from airflow.operators.python import PythonOperator @@ -14,72 +16,28 @@ # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -# Spotify API 설정 -SPOTIFY_API_URL = "https://api.spotify.com/v1" -SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + S3_BUCKET = "de5-s4tify" S3_CSV_KEY = f"raw_data/bugs_chart_with_genre_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/bugs_chart_with_genre_{TODAY}.csv" -# Spotify API에서 아티스트 ID 검색 -def search_artist_id(artist_name): - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - - url = f"{SPOTIFY_API_URL}/search" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - params = {"q": artist_name, "type": "artist", "limit": 1} - response = requests.get(url, headers=headers, params=params) - - print("headers : ", headers) - - if response.status_code == 200: - artists = response.json().get("artists", {}).get("items", []) - artist_id = artists[0]["id"] if artists else None - print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") - return artist_id - else: - print( - f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}" - ) - return None - - -# Spotify API에서 아티스트 장르 가져오기 -def get_artist_genre(artist_id): - if not artist_id: - return "Unknown" - - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - - url = f"{SPOTIFY_API_URL}/artists/{artist_id}" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - response = requests.get(url, headers=headers) - - if response.status_code == 200: - genres = response.json().get("genres", []) - genre_str = ", ".join(genres) if genres else "Unknown" - print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") - return genre_str - else: - print( - f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}" - ) - return "Unknown" - - # 1. Bugs 차트 데이터 가져오기 및 JSON 변환 def fetch_bugs_chart(): chart = ChartData( chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, fetch=True) - chart_data = {"date": chart.date.strftime( - "%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = { + "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), + "entries": [] + } for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, @@ -88,7 +46,7 @@ def fetch_bugs_chart(): "lastPos": entry.lastPos, "peakPos": entry.peakPos, "image": entry.image, - "genre": genre, + "genres": genre.split(", ") if genre else [] # ✅ 리스트 변환, } ) return chart_data @@ -98,8 +56,7 @@ def fetch_bugs_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "peakPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -109,7 +66,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["peakPos"], entry["image"], - entry["genre"], + json.dumps(entry["genres"], ensure_ascii=False) # ✅ 리스트를 문자열로 변환하여 저장 ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -126,7 +83,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -176,9 +133,4 @@ def upload_to_s3(**kwargs): provide_context=True, ) - ( - get_spotify_token_task - >> fetch_bugs_chart_task - >> convert_json_to_csv_task - >> upload_s3_task - ) + get_spotify_token_task >> fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task \ No newline at end of file diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index ccbb8b8..5fcba5c 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -4,6 +4,7 @@ import requests from plugins.flo import ChartData # flo.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id from scripts.get_access_token import get_token from airflow import DAG @@ -19,42 +20,6 @@ S3_CSV_KEY = f"raw_data/flo_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/flo_chart_with_genre_{TODAY}.csv" -# Spotify API 설정 -SPOTIFY_API_URL = "https://api.spotify.com/v1" -SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - - -# Spotify API에서 아티스트 ID 검색 -def search_artist_id(artist_name): - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/search" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - params = {"q": artist_name, "type": "artist", "limit": 1} - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - artists = response.json().get("artists", {}).get("items", []) - artist_id = artists[0]["id"] if artists else None - return artist_id - return None - - -# Spotify API에서 아티스트 장르 가져오기 -def get_artist_genre(artist_id): - if not artist_id: - return "Unknown" - - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/artists/{artist_id}" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - response = requests.get(url, headers=headers) - - if response.status_code == 200: - genres = response.json().get("genres", []) - return ", ".join(genres) if genres else "Unknown" - return "Unknown" - - # 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) @@ -72,7 +37,7 @@ def fetch_flo_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, - "genre": genre, + "genres": genre.split(", ") if genre else [] } ) return chart_data @@ -82,8 +47,7 @@ def fetch_flo_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_flo_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -93,7 +57,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - entry["genre"], + json.dumps(entry["genres"], ensure_ascii=False) ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -110,7 +74,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index baf31b3..e1f2f5a 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -4,6 +4,7 @@ import requests from plugins.genie import ChartData, GenieChartPeriod # genie.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id from scripts.get_access_token import get_token from airflow import DAG @@ -19,42 +20,6 @@ S3_CSV_KEY = f"raw_data/genie_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/genie_chart_with_genre_{TODAY}.csv" -# Spotify API 설정 -SPOTIFY_API_URL = "https://api.spotify.com/v1" -SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - - -# Spotify API에서 아티스트 ID 검색 -def search_artist_id(artist_name): - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/search" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - params = {"q": artist_name, "type": "artist", "limit": 1} - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - artists = response.json().get("artists", {}).get("items", []) - artist_id = artists[0]["id"] if artists else None - return artist_id - return None - - -# Spotify API에서 아티스트 장르 가져오기 -def get_artist_genre(artist_id): - if not artist_id: - return "Unknown" - - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/artists/{artist_id}" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - response = requests.get(url, headers=headers) - - if response.status_code == 200: - genres = response.json().get("genres", []) - return ", ".join(genres) if genres else "Unknown" - return "Unknown" - - # 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) @@ -72,7 +37,7 @@ def fetch_genie_chart(): "peakPos": entry.peakPos, "lastPos": entry.lastPos, "image": entry.image, - "genre": genre, + "genres": genre.split(", ") if genre else [] } ) return chart_data @@ -82,8 +47,7 @@ def fetch_genie_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_genie_chart") - csv_data = [["rank", "title", "artist", - "peakPos", "lastPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -93,7 +57,7 @@ def convert_json_to_csv(**kwargs): entry["peakPos"], entry["lastPos"], entry["image"], - entry["genre"], + json.dumps(entry["genres"], ensure_ascii=False) ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -110,7 +74,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -160,9 +124,4 @@ def upload_to_s3(**kwargs): provide_context=True, ) - ( - get_spotify_token_task - >> fetch_genie_chart_task - >> convert_json_to_csv_task - >> upload_s3_task - ) + get_spotify_token_task >> fetch_genie_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index d754270..d365935 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -1,120 +1,78 @@ import csv import json +import requests from datetime import datetime, timedelta -import boto3 -import requests -import snowflake.connector +from plugins.melon import ChartData # melon.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import Variable -# 멜론 차트 API 정보 -_APP_VERSION = "6.5.8.1" -_CP_ID = "AS40" -_USER_AGENT = f"{_CP_ID}; Android 13; {_APP_VERSION}; sdk_gphone64_arm64" -_CHART_API_URL = f"https://m2.melon.com/m6/chart/ent/songChartList.json?cpId={_CP_ID}&cpKey=14LNC3&appVer={_APP_VERSION}" - -# 파일 저장 경로 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/melon_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/melon_chart_{TODAY}.csv" -S3_JSON_KEY = f"raw_data/melon_chart/melon_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/melon_chart/melon_chart_{TODAY}.csv" +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/melon_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/melon_chart_with_genre_{TODAY}.csv" # 1. 멜론 차트 데이터 가져오기 def fetch_melon_chart(): - headers = {"User-Agent": _USER_AGENT} - response = requests.get(_CHART_API_URL, headers=headers) - if response.status_code != 200: - raise Exception(f"멜론 API 호출 실패: {response.status_code}") - data = response.json() - + chart = ChartData(fetch=True) chart_data = { - "date": f"{data['response']['RANKDAY']} {data['response']['RANKHOUR']}:00", - "entries": [ - { - "rank": int(song["CURRANK"]), - "title": song["SONGNAME"], - "artist": song["ARTISTLIST"][0]["ARTISTNAME"], - "lastPos": int(song["PASTRANK"]), - "peakPos": int(song.get("PEAKRANK", song["CURRANK"])), - "isNew": song["RANKTYPE"] == "NEW", - "image": song["ALBUMIMG"], - } - for song in data["response"]["SONGLIST"] - ], + "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), + "entries": [] } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genres": genre.split(", ") if genre else [] + }) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = [ - "rank", - "title", - "artist", - "lastPos", - "peakPos", - "isNew", - "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH - - -# 3. AWS S3 업로드 -def upload_to_s3(): - # AWS 연결 정보 가져오기 - aws_connection = BaseHook.get_connection( - "S4tify_S3" - ) # Airflow에서 설정한 AWS 연결 사용 - s3 = boto3.client( - "s3", - aws_access_key_id=aws_connection.login, - aws_secret_access_key=aws_connection.password, - region_name=aws_connection.extra_dejson.get("region_name"), - ) - # s3.upload_file(JSON_PATH, aws_connection.schema, S3_JSON_KEY) - s3.upload_file(CSV_PATH, "de5-s4tify", S3_CSV_KEY) - print(f"✅ S3 업로드 완료: {S3_JSON_KEY}, {S3_CSV_KEY}") - - -""" -# 4. Snowflake 업로드 -def upload_to_snowflake(): - # Snowflake 연결 정보 가져오기 - snowflake_connection = BaseHook.get_connection("S4tify_SnowFlake") # Airflow에서 설정한 Snowflake 연결 사용 - conn = snowflake.connector.connect( - user=snowflake_connection.login, - password=snowflake_connection.password, - account=snowflake_connection.host, - warehouse=snowflake_connection.extra_dejson.get("warehouse"), - database=snowflake_connection.extra_dejson.get("database"), - schema=snowflake_connection.extra_dejson.get("schema") - ) - cur = conn.cursor() - #cur.execute(f""" -# COPY INTO {snowflake_connection.extra_dejson.get('database')}.{snowflake_connection.extra_dejson.get('schema')}.melon_chart -# FROM @"{snowflake_connection.extra_dejson.get('stage')}" -# FILES = ('{S3_CSV_KEY}') -# FILE_FORMAT = (TYPE = 'CSV', FIELD_OPTIONALLY_ENCLOSED_BY='"') -# """) -# conn.commit() -# print(f"✅ Snowflake 업로드 완료: {S3_CSV_KEY}") -# """ +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_melon_chart") + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False) + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 default_args = { @@ -125,36 +83,35 @@ def upload_to_snowflake(): "retry_delay": timedelta(minutes=2), } -dag = DAG( - "melon_chart_pipeline", +with DAG( + "melon_chart_dag", default_args=default_args, - description="멜론 차트를 JSON으로 저장하고 CSV로 변환 후 S3 및 Snowflake에 업로드", - schedule_interval="0 1 * * *", # 매일 새벽 1시 실행 + schedule_interval="0 1 * * *", # 매일 01:00 실행 catchup=False, -) - -fetch_task = PythonOperator( - task_id="fetch_melon_chart", - python_callable=fetch_melon_chart, - dag=dag, -) - -convert_task = PythonOperator( - task_id="convert_json_to_csv", - python_callable=convert_json_to_csv, - dag=dag, -) - -s3_upload_task = PythonOperator( - task_id="upload_to_s3", - python_callable=upload_to_s3, - dag=dag, -) -""" -snowflake_upload_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - dag=dag, -)""" +) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) -fetch_task >> convert_task >> s3_upload_task # snowflake_upload_task + fetch_melon_chart_task = PythonOperator( + task_id="fetch_melon_chart", + python_callable=fetch_melon_chart, + provide_context=True, + ) + + convert_json_to_csv_task = PythonOperator( + task_id="convert_json_to_csv", + python_callable=convert_json_to_csv, + provide_context=True, + ) + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", + python_callable=upload_to_s3, + provide_context=True, + ) + + get_spotify_token_task >> fetch_melon_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index ac5cb53..29c92da 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -4,6 +4,7 @@ import requests from plugins.vibe import ChartData # vibe.py 모듈 import +from plugins.get_artist_data import get_artist_genre, search_artist_id from scripts.get_access_token import get_token from airflow import DAG @@ -19,42 +20,6 @@ S3_CSV_KEY = f"raw_data/vibe_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/vibe_chart_with_genre_{TODAY}.csv" -# Spotify API 설정 -SPOTIFY_API_URL = "https://api.spotify.com/v1" -SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - - -# Spotify API에서 아티스트 ID 검색 -def search_artist_id(artist_name): - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/search" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - params = {"q": artist_name, "type": "artist", "limit": 1} - response = requests.get(url, headers=headers, params=params) - - if response.status_code == 200: - artists = response.json().get("artists", {}).get("items", []) - artist_id = artists[0]["id"] if artists else None - return artist_id - return None - - -# Spotify API에서 아티스트 장르 가져오기 -def get_artist_genre(artist_id): - if not artist_id: - return "Unknown" - - SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) - url = f"{SPOTIFY_API_URL}/artists/{artist_id}" - headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} - response = requests.get(url, headers=headers) - - if response.status_code == 200: - genres = response.json().get("genres", []) - return ", ".join(genres) if genres else "Unknown" - return "Unknown" - - # 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) @@ -72,7 +37,7 @@ def fetch_vibe_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, - "genre": genre, + "genres": genre.split(", ") if genre else [] } ) return chart_data @@ -82,8 +47,7 @@ def fetch_vibe_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_vibe_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -93,7 +57,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - entry["genre"], + json.dumps(entry["genres"], ensure_ascii=False) ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -110,7 +74,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, diff --git a/airflow/dags/plugins/get_artist_data.py b/airflow/dags/plugins/get_artist_data.py new file mode 100644 index 0000000..02c5fdf --- /dev/null +++ b/airflow/dags/plugins/get_artist_data.py @@ -0,0 +1,54 @@ +import requests + + +from scripts.get_access_token import get_token +from airflow.models import Variable + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + print("headers : ", headers) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") + return artist_id + else: + print( + f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) + return None + + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + genre_str = ", ".join(genres) if genres else "Unknown" + print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") + return genre_str + else: + print( + f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}" + ) + return "Unknown" \ No newline at end of file diff --git a/airflow/dags/scripts/get_access_token.py b/airflow/dags/scripts/get_access_token.py index 3b52a22..01e3d8a 100644 --- a/airflow/dags/scripts/get_access_token.py +++ b/airflow/dags/scripts/get_access_token.py @@ -8,6 +8,7 @@ SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") + END_POINT = "https://accounts.spotify.com/api/token" From 5f2213bc8584bf1e47c8da0704912263b95468b0 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 12 Mar 2025 06:07:19 +0000 Subject: [PATCH 5/5] Automated format fixes --- airflow/dags/Bugs_DAG.py | 29 +++++---- airflow/dags/Flo_DAG.py | 10 +-- airflow/dags/Genie_DAG.py | 17 +++-- airflow/dags/Melon_DAG.py | 83 +++++++++++++++---------- airflow/dags/Vibe_DAG.py | 12 ++-- airflow/dags/plugins/get_artist_data.py | 6 +- 6 files changed, 95 insertions(+), 62 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 00ac6f1..7270bf8 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -7,7 +7,6 @@ from plugins.get_artist_data import get_artist_genre, search_artist_id from scripts.get_access_token import get_token - from airflow import DAG from airflow.models import Variable from airflow.operators.python import PythonOperator @@ -28,16 +27,14 @@ def fetch_bugs_chart(): chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") - + artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - + chart_data["entries"].append( { "rank": entry.rank, @@ -46,7 +43,7 @@ def fetch_bugs_chart(): "lastPos": entry.lastPos, "peakPos": entry.peakPos, "image": entry.image, - "genres": genre.split(", ") if genre else [] # ✅ 리스트 변환, + "genres": genre.split(", ") if genre else [], # ✅ 리스트 변환, } ) return chart_data @@ -56,7 +53,8 @@ def fetch_bugs_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") - csv_data = [["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "peakPos", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -66,7 +64,9 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["peakPos"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False) # ✅ 리스트를 문자열로 변환하여 저장 + json.dumps( + entry["genres"], ensure_ascii=False + ), # ✅ 리스트를 문자열로 변환하여 저장 ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -83,7 +83,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -133,4 +133,9 @@ def upload_to_s3(**kwargs): provide_context=True, ) - get_spotify_token_task >> fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task \ No newline at end of file + ( + get_spotify_token_task + >> fetch_bugs_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 5fcba5c..a373a58 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -20,6 +20,7 @@ S3_CSV_KEY = f"raw_data/flo_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/flo_chart_with_genre_{TODAY}.csv" + # 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) @@ -37,7 +38,7 @@ def fetch_flo_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, - "genres": genre.split(", ") if genre else [] + "genres": genre.split(", ") if genre else [], } ) return chart_data @@ -47,7 +48,8 @@ def fetch_flo_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_flo_chart") - csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -57,7 +59,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False) + json.dumps(entry["genres"], ensure_ascii=False), ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -74,7 +76,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index e1f2f5a..0a84c7f 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -20,6 +20,7 @@ S3_CSV_KEY = f"raw_data/genie_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/genie_chart_with_genre_{TODAY}.csv" + # 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) @@ -37,7 +38,7 @@ def fetch_genie_chart(): "peakPos": entry.peakPos, "lastPos": entry.lastPos, "image": entry.image, - "genres": genre.split(", ") if genre else [] + "genres": genre.split(", ") if genre else [], } ) return chart_data @@ -47,7 +48,8 @@ def fetch_genie_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_genie_chart") - csv_data = [["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "peakPos", "lastPos", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -57,7 +59,7 @@ def convert_json_to_csv(**kwargs): entry["peakPos"], entry["lastPos"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False) + json.dumps(entry["genres"], ensure_ascii=False), ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -74,7 +76,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -124,4 +126,9 @@ def upload_to_s3(**kwargs): provide_context=True, ) - get_spotify_token_task >> fetch_genie_chart_task >> convert_json_to_csv_task >> upload_s3_task + ( + get_spotify_token_task + >> fetch_genie_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index d365935..a7971f4 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -1,16 +1,16 @@ import csv import json -import requests from datetime import datetime, timedelta -from plugins.melon import ChartData # melon.py 모듈 import +import requests from plugins.get_artist_data import get_artist_genre, search_artist_id +from plugins.melon import ChartData # melon.py 모듈 import from scripts.get_access_token import get_token from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") @@ -20,60 +20,72 @@ S3_CSV_KEY = f"raw_data/melon_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/melon_chart_with_genre_{TODAY}.csv" + # 1. 멜론 차트 데이터 가져오기 def fetch_melon_chart(): chart = ChartData(fetch=True) - chart_data = { - "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [] - } + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - "genres": genre.split(", ") if genre else [] - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genres": genre.split(", ") if genre else [], + } + ) return chart_data + # 2. JSON → CSV 변환 def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_melon_chart") - csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: - csv_data.append([ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - json.dumps(entry["genres"], ensure_ascii=False) - ]) + csv_data.append( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + json.dumps(entry["genres"], ensure_ascii=False), + ] + ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) return csv_string + # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 4. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + s3_hook.load_string( + csv_string, + key=S3_CSV_KEY, + bucket_name=S3_BUCKET, + replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") + # DAG 설정 default_args = { "owner": "airflow", @@ -89,7 +101,7 @@ def upload_to_s3(**kwargs): schedule_interval="0 1 * * *", # 매일 01:00 실행 catchup=False, ) as dag: - + get_spotify_token_task = PythonOperator( task_id="get_spotify_token", python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 @@ -101,17 +113,22 @@ def upload_to_s3(**kwargs): python_callable=fetch_melon_chart, provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, provide_context=True, ) - - get_spotify_token_task >> fetch_melon_chart_task >> convert_json_to_csv_task >> upload_s3_task + + ( + get_spotify_token_task + >> fetch_melon_chart_task + >> convert_json_to_csv_task + >> upload_s3_task + ) diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 29c92da..4ec1d40 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -3,8 +3,8 @@ from datetime import datetime, timedelta import requests -from plugins.vibe import ChartData # vibe.py 모듈 import from plugins.get_artist_data import get_artist_genre, search_artist_id +from plugins.vibe import ChartData # vibe.py 모듈 import from scripts.get_access_token import get_token from airflow import DAG @@ -20,6 +20,7 @@ S3_CSV_KEY = f"raw_data/vibe_chart_{TODAY}.csv" LOCAL_FILE_PATH = f"/opt/airflow/data/vibe_chart_with_genre_{TODAY}.csv" + # 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) @@ -37,7 +38,7 @@ def fetch_vibe_chart(): "lastPos": entry.lastPos, "isNew": entry.isNew, "image": entry.image, - "genres": genre.split(", ") if genre else [] + "genres": genre.split(", ") if genre else [], } ) return chart_data @@ -47,7 +48,8 @@ def fetch_vibe_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_vibe_chart") - csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + csv_data = [["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]] for entry in data["entries"]: csv_data.append( [ @@ -57,7 +59,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - json.dumps(entry["genres"], ensure_ascii=False) + json.dumps(entry["genres"], ensure_ascii=False), ] ) csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) @@ -74,7 +76,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - #save_csv_locally(csv_string) # 테스트용 로컬 저장 + # save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, diff --git a/airflow/dags/plugins/get_artist_data.py b/airflow/dags/plugins/get_artist_data.py index 02c5fdf..639a064 100644 --- a/airflow/dags/plugins/get_artist_data.py +++ b/airflow/dags/plugins/get_artist_data.py @@ -1,13 +1,13 @@ import requests - - from scripts.get_access_token import get_token + from airflow.models import Variable # Spotify API 설정 SPOTIFY_API_URL = "https://api.spotify.com/v1" SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + # Spotify API에서 아티스트 ID 검색 def search_artist_id(artist_name): SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) @@ -51,4 +51,4 @@ def get_artist_genre(artist_id): print( f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}" ) - return "Unknown" \ No newline at end of file + return "Unknown"