From afcd306baebcfc507f159c73466894cdf752c930 Mon Sep 17 00:00:00 2001 From: gland78 Date: Wed, 12 Mar 2025 10:11:51 +0900 Subject: [PATCH] add_genre_data --- airflow/dags/Bugs_DAG.py | 183 ++++++++------- airflow/dags/Flo_DAG.py | 199 ++++++++-------- airflow/dags/Genie_DAG.py | 192 ++++++++-------- airflow/dags/S3_Spark_SnowFlake_DAG.py | 69 ++++++ airflow/dags/Vibe_DAG.py | 214 +++++++++--------- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 201 ++++++++++++++++ 6 files changed, 673 insertions(+), 385 deletions(-) create mode 100644 airflow/dags/S3_Spark_SnowFlake_DAG.py create mode 100644 airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index be12c25..589afaa 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,95 +1,120 @@ import csv import json +import requests from datetime import datetime, timedelta -import pandas as pd -from bugs import BugsChartPeriod, BugsChartType, ChartData +from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData +from scripts.get_access_token import get_token from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook - -""" -your-s3-bucket-name을 실제 S3 버킷명으로 바꾸고, -✅ Snowflake 연결 정보 및 테이블명을 맞게 설정 -""" +from airflow.models import Variable # 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -# 파일 경로 -JSON_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/bugs_chart_{TODAY}.csv" - -# S3 설정 +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/bugs_chart/bugs_chart_{TODAY}.csv" - -""" -# Snowflake 설정 -SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" -SNOWFLAKE_TABLE = "raw_data" -""" - - -# 1. Bugs 차트 데이터 가져오기 및 JSON 저장 +S3_CSV_KEY = f"raw_data/bugs_chart_with_genre_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/bugs_chart_with_genre_{TODAY}.csv" + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + print("headers : ", headers) + + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + print(f"🔍 검색된 아티스트: {artist_name} -> ID: {artist_id}") + return artist_id + else: + print(f"❌ 아티스트 검색 실패: {artist_name}, 상태 코드: {response.status_code}, 응답: {response.json()}") + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + genre_str = ", ".join(genres) if genres else "Unknown" + print(f"🎵 장르 검색: ID {artist_id} -> {genre_str}") + return genre_str + else: + print(f"❌ 장르 검색 실패: ID {artist_id}, 상태 코드: {response.status_code}, 응답: {response.json()}") + return "Unknown" + +# 1. Bugs 차트 데이터 가져오기 및 JSON 변환 def fetch_bugs_chart(): chart = ChartData( chartType=BugsChartType.All, chartPeriod=BugsChartPeriod.Realtime, - fetch=True) + fetch=True + ) chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "peakPos": entry.peakPos, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "peakPos": entry.peakPos, + "image": entry.image, + "genre": genre + }) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "peakPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") - - -# 3. AWS S3 업로드 -def upload_to_s3(): +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_bugs_chart") + csv_data = [["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["peakPos"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") - # s3_hook.load_file(filename=JSON_PATH, key=S3_JSON_KEY, bucket_name=S3_BUCKET, replace=True) - s3_hook.load_file( - filename=CSV_PATH, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True - ) + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") - -"""# 4. Snowflake 업로드 -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - df = pd.read_csv(CSV_PATH) - snowflake_hook.run(f"DELETE FROM {SNOWFLAKE_TABLE} WHERE DATE = '{TODAY}';") - snowflake_hook.insert_rows(table=SNOWFLAKE_TABLE, rows=df.values.tolist(), target_fields=df.columns.tolist()) - print(f"✅ Snowflake 업로드 완료: {SNOWFLAKE_TABLE}") -""" # DAG 설정 default_args = { "owner": "airflow", @@ -105,29 +130,29 @@ def upload_to_snowflake(): schedule_interval="10 0 * * *", # 매일 00:10 실행 catchup=False, ) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) fetch_bugs_chart_task = PythonOperator( task_id="fetch_bugs_chart", python_callable=fetch_bugs_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - ) - """ - - # DAG 실행 순서 - ( - fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task - ) # upload_snowflake_task + + get_spotify_token_task >> fetch_bugs_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 065765c..ba25a3d 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,98 +1,104 @@ import csv import json +import requests from datetime import datetime, timedelta -from flo import ChartData # flo.py 모듈 import - +from plugins.flo import ChartData # flo.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import Variable -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/flo_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/flo_chart/flo_chart_{TODAY}.csv" - - -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -"""# Snowflake 저장 함수 -def save_to_snowflake(): - # Airflow 연결에서 Snowflake 연결 정보 가져오기 - connection = BaseHook.get_connection('s4tify_SnowFlake') # Airflow에 설정된 Snowflake 연결 정보 사용 - - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, # Airflow 연결의 username 사용 - password=connection.password, # Airflow 연결의 password 사용 - account=connection.host, # Airflow 연결의 account 정보 사용 - warehouse=connection.extra_dejson.get('warehouse'), # Extra 정보에서 warehouse 읽기 - database=connection.extra_dejson.get('database'), # Extra 정보에서 database 읽기 - schema=connection.extra_dejson.get('schema'), # Extra 정보에서 schema 읽기 - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM flo_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO flo_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - -# 1. FLO 차트 데이터 가져오기 및 JSON 저장 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/flo_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/flo_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - print(f"✅ JSON 저장 완료: {JSON_PATH}") + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre + }) + return chart_data # 2. JSON → CSV 변환 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - data = json.load(f) - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in data["entries"]: - writer.writerow(entry) - print(f"✅ CSV 변환 완료: {CSV_PATH}") +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_flo_chart") + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -110,38 +116,29 @@ def convert_json_to_csv(): schedule_interval="20 0 * * *", # 매일 00:20 실행 catchup=False, ) as dag: + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) fetch_flo_chart_task = PythonOperator( task_id="fetch_flo_chart", python_callable=fetch_flo_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", - python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_flo_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + + get_spotify_token_task >> fetch_flo_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 5f2c7ea..e997baa 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,97 +1,103 @@ import csv import json +import requests from datetime import datetime, timedelta -import pandas as pd -from genie import ChartData, GenieChartPeriod # genie.py 모듈 import - +from plugins.genie import ChartData, GenieChartPeriod # genie.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +from airflow.models import Variable -# 환경 변수 설정 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/genie_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_CSV_KEY = f"raw_data/genie_chart/genie_chart_{TODAY}.csv" -# SNOWFLAKE_CONN_ID = "S4tify_SnowFlake" # Airflow에서 설정한 Snowflake 연결 ID - -# Genie 차트 데이터를 가져와 JSON으로 저장 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/genie_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/genie_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "peakPos": entry.peakPos, - "lastPos": entry.lastPos, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환 후 저장 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "peakPos", "lastPos", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH - - -# S3 업로드 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=S3_CSV_KEY, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{S3_CSV_KEY}") - - -""" -# Snowflake 업로드 (Airflow SnowflakeHook 사용) -def upload_to_snowflake(): - snowflake_hook = SnowflakeHook(snowflake_conn_id=SNOWFLAKE_CONN_ID) - - # Snowflake에 CSV 파일 업로드 - df = pd.read_csv(CSV_PATH) - - # Snowflake 테이블에 데이터를 삽입 - # 'genie_chart' 테이블에 맞는 컬럼 이름과 데이터 형식을 확인하고 매핑 - snowflake_hook.run(f"DELETE FROM genie_chart WHERE date = '{TODAY}';") - - # Snowflake 테이블에 데이터 삽입 - snowflake_hook.insert_rows( - table="genie_chart", - rows=df.values.tolist(), - target_fields=df.columns.tolist() - ) - - print(f"✅ Snowflake 업로드 완료: genie_chart") -""" + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "peakPos": entry.peakPos, + "lastPos": entry.lastPos, + "image": entry.image, + "genre": genre + }) + return chart_data + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_genie_chart") + csv_data = [["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["peakPos"], entry["lastPos"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 default_args = { @@ -108,27 +114,29 @@ def upload_to_snowflake(): schedule_interval="30 0 * * *", # 매일 00:30 실행 catchup=False, ) as dag: - + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_genie_chart_task = PythonOperator( task_id="fetch_genie_chart", python_callable=fetch_genie_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - upload_to_s3_task = PythonOperator( + + upload_s3_task = PythonOperator( task_id="upload_to_s3", python_callable=upload_to_s3, + provide_context=True, ) - """ - upload_to_snowflake_task = PythonOperator( - task_id="upload_to_snowflake", - python_callable=upload_to_snowflake, - )""" - - ( - fetch_genie_chart_task >> convert_json_to_csv_task >> upload_to_s3_task - ) # upload_to_snowflake_task + + get_spotify_token_task >> fetch_genie_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py new file mode 100644 index 0000000..5006fe6 --- /dev/null +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -0,0 +1,69 @@ +from datetime import datetime, timedelta +import os +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from airflow.models import Variable + +# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) +AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) +AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None) + +if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: + raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") + +# S3 및 Snowflake 설정 +S3_BUCKET = "s3a://de5-s4tify" + +# Spark JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR", "/opt/spark/jars") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# DAG 기본 설정 +default_args = { + 'owner': 'airflow', + 'start_date': datetime(2025, 3, 4), + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 's3_to_snowflake_pipeline', + default_args=default_args, + description='Read from S3, process data with Spark, and store in Snowflake', + schedule_interval='0 2 * * *', + catchup=False, +) + +# Spark 작업 스크립트 경로 설정 +AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '/opt/airflow') +spark_script_path = os.path.abspath(os.path.join(AIRFLOW_HOME, "dags", "scripts", "S3_Spark_SnowFlake_ELT.py")) + +# SparkSubmitOperator 설정 +spark_submit_task = SparkSubmitOperator( + task_id="spark_submit_task", + application=spark_script_path, + conn_id="spark_conn", + executor_memory='4g', + executor_cores=4, + driver_memory='4g', + name='s3_to_snowflake_pipeline', + execution_timeout=timedelta(minutes=45), + verbose=True, + jars=SPARK_JARS, + conf={ + "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", + "spark.hadoop.fs.s3a.access.key": AWS_ACCESS_KEY, + "spark.hadoop.fs.s3a.secret.key": AWS_SECRET_KEY, + "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", + "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + }, + dag=dag, +) + +spark_submit_task diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index e9038df..5e9f700 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,106 +1,103 @@ import csv import json +import requests from datetime import datetime, timedelta -import snowflake.connector -from vibe import ChartData # vibe.py 모듈 import - +from plugins.vibe import ChartData # vibe.py 모듈 import +from scripts.get_access_token import get_token from airflow import DAG -from airflow.hooks.base_hook import BaseHook from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import Variable -# 파일 경로 및 S3 버킷 정보 +# 날짜 설정 TODAY = datetime.now().strftime("%Y%m%d") -JSON_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.json" -CSV_PATH = f"/opt/airflow/data/vibe_chart_{TODAY}.csv" -S3_BUCKET = "de5-s4tify" -S3_JSON_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.json" -S3_CSV_KEY = f"raw_data/vibe_chart/vibe_chart_{TODAY}.csv" - - -# AWS S3 업로드 함수 -def upload_to_s3(): - s3_hook = S3Hook(aws_conn_id="S4tify_S3") # Airflow Connection ID 사용 - file_name = CSV_PATH.split("/")[-1] - s3_hook.load_file( - filename=CSV_PATH, bucket_name=S3_BUCKET, key=file_name, replace=True - ) - print(f"✅ S3 업로드 완료: s3://{S3_BUCKET}/{file_name}") - - -""" -# Snowflake 저장 함수 -def save_to_snowflake(): - connection = BaseHook.get_connection('S4tify_SnowFlake') # Snowflake 연결 가져오기 - # Snowflake 연결 생성 - conn = snowflake.connector.connect( - user=connection.login, - password=connection.password, - account=connection.host, - warehouse=connection.extra_dejson.get('warehouse'), - database=connection.extra_dejson.get('database'), - schema=connection.extra_dejson.get('schema'), - ) - - cursor = conn.cursor() - cursor.execute(f"DELETE FROM vibe_chart WHERE date = '{TODAY}'") - with open(CSV_PATH, "r", encoding="utf-8") as f: - next(f) # 헤더 스킵 - for line in f: - values = line.strip().split(",") - cursor.execute( - "INSERT INTO vibe_chart (rank, title, artist, lastPos, isNew, image, date) VALUES (%s, %s, %s, %s, %s, %s, %s)", - (*values, TODAY) - ) - conn.commit() - cursor.close() - conn.close() - print("✅ Snowflake 저장 완료") -""" - - -# Vibe 차트 데이터를 가져와 JSON으로 저장하는 함수 +# S3 설정 +S3_BUCKET = "de5-s4tify" +S3_CSV_KEY = f"raw_data/vibe_chart_{TODAY}.csv" +LOCAL_FILE_PATH = f"/opt/airflow/data/vibe_chart_with_genre_{TODAY}.csv" + +# Spotify API 설정 +SPOTIFY_API_URL = "https://api.spotify.com/v1" +SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + +# Spotify API에서 아티스트 ID 검색 +def search_artist_id(artist_name): + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/search" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + params = {"q": artist_name, "type": "artist", "limit": 1} + response = requests.get(url, headers=headers, params=params) + + if response.status_code == 200: + artists = response.json().get("artists", {}).get("items", []) + artist_id = artists[0]["id"] if artists else None + return artist_id + return None + +# Spotify API에서 아티스트 장르 가져오기 +def get_artist_genre(artist_id): + if not artist_id: + return "Unknown" + + SPOTIFY_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN", default_var=None) + url = f"{SPOTIFY_API_URL}/artists/{artist_id}" + headers = {"Authorization": f"Bearer {SPOTIFY_TOKEN}"} + response = requests.get(url, headers=headers) + + if response.status_code == 200: + genres = response.json().get("genres", []) + return ", ".join(genres) if genres else "Unknown" + return "Unknown" + +# 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - chart_data = { "date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), - "entries": [ - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - } - for entry in chart.entries - ], + "entries": [] } - - with open(JSON_PATH, "w", encoding="utf-8") as f: - json.dump(chart_data, f, ensure_ascii=False, indent=4) - - print(f"✅ JSON 저장 완료: {JSON_PATH}") - return JSON_PATH - - -# JSON을 CSV로 변환하는 함수 -def convert_json_to_csv(): - with open(JSON_PATH, "r", encoding="utf-8") as f: - chart_data = json.load(f) - - fields = ["rank", "title", "artist", "lastPos", "isNew", "image"] - with open(CSV_PATH, "w", newline="", encoding="utf-8") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=fields) - writer.writeheader() - for entry in chart_data["entries"]: - writer.writerow(entry) - - print(f"✅ CSV 변환 완료: {CSV_PATH}") - return CSV_PATH + for entry in chart.entries: + print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") + artist_id = search_artist_id(entry.artist) + genre = get_artist_genre(artist_id) + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genre": genre + }) + return chart_data + +# 2. JSON → CSV 변환 +def convert_json_to_csv(**kwargs): + ti = kwargs["ti"] + data = ti.xcom_pull(task_ids="fetch_vibe_chart") + csv_data = [["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]] + for entry in data["entries"]: + csv_data.append([ + entry["rank"], entry["title"], entry["artist"], entry["lastPos"], entry["isNew"], entry["image"], entry["genre"] + ]) + csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) + return csv_string + +# 3. 로컬에 CSV 저장 (테스트용) +def save_csv_locally(csv_string): + with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: + f.write(csv_string) + +# 4. AWS S3 업로드 +def upload_to_s3(**kwargs): + ti = kwargs["ti"] + csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") + #save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") + s3_hook.load_string(csv_string, key=S3_CSV_KEY, bucket_name=S3_BUCKET, replace=True) + print(f"✅ S3 업로드 완료: {S3_CSV_KEY}") # DAG 설정 @@ -118,38 +115,29 @@ def convert_json_to_csv(): schedule_interval="45 0 * * *", # 매일 00:45 실행 catchup=False, ) as dag: - + + get_spotify_token_task = PythonOperator( + task_id="get_spotify_token", + python_callable=get_token, # ✅ 먼저 실행해서 Variable 갱신 + provide_context=True, + ) + fetch_vibe_chart_task = PythonOperator( task_id="fetch_vibe_chart", python_callable=fetch_vibe_chart, + provide_context=True, ) - + convert_json_to_csv_task = PythonOperator( task_id="convert_json_to_csv", python_callable=convert_json_to_csv, + provide_context=True, ) - - """upload_json_to_s3_task = PythonOperator( - task_id="upload_json_to_s3", - python_callable=upload_to_s3, - op_kwargs={"file_path": JSON_PATH, "bucket_name": S3_BUCKET, "object_name": S3_JSON_KEY}, - )""" - - upload_csv_to_s3_task = PythonOperator( - task_id="upload_csv_to_s3", + + upload_s3_task = PythonOperator( + task_id="upload_to_s3", python_callable=upload_to_s3, - op_kwargs={ - "file_path": CSV_PATH, - "bucket_name": S3_BUCKET, - "object_name": S3_CSV_KEY, - }, + provide_context=True, ) - - """save_to_snowflake_task = PythonOperator( - task_id="save_to_snowflake", - python_callable=save_to_snowflake, - )""" - - # 작업 순서 정의 - (fetch_vibe_chart_task >> convert_json_to_csv_task >> - upload_csv_to_s3_task) # save_to_snowflake_task + + get_spotify_token_task >> fetch_vibe_chart_task >> convert_json_to_csv_task >> upload_s3_task diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py new file mode 100644 index 0000000..83a1685 --- /dev/null +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -0,0 +1,201 @@ +import os +from datetime import datetime +import snowflake.connector +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, lit, when, count +from airflow.models import Variable + +# Spark JARs 설정 +SPARK_HOME = "/opt/spark/" +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "jars", "snowflake-jdbc-3.9.2.jar"), + os.path.join(SPARK_HOME, "jars", "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "jars", "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + +# Snowflake 연결 정보 설정 +SNOWFLAKE_OPTIONS = { + "user": Variable.get("SNOWFLAKE_USER"), + "password": Variable.get("SNOWFLAKE_PASSWORD"), + "account": Variable.get("SNOWFLAKE_ACCOUNT"), + "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "schema": Variable.get("SNOWFLAKE_SCHEMA") if Variable.get("SNOWFLAKE_SCHEMA") else "raw_data", + "role": "ACCOUNTADMIN", + "driver": "net.snowflake.client.jdbc.SnowflakeDriver", + "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + #"url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", +} + +# Spark Session 생성 함수 +def spark_session_builder(app_name: str) -> SparkSession: + return ( + SparkSession.builder.appName(app_name) + .config("spark.jars", SPARK_JARS) + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", Variable.get("AWS_ACCESS_KEY")) + .config("spark.hadoop.fs.s3a.secret.key", Variable.get("AWS_SECRET_KEY")) + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + .getOrCreate() + ) + +# Snowflake에서 SQL 실행 함수 +def check_and_create_table(): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # 테이블 존재 여부 확인 + cur.execute(f"SHOW TABLES LIKE 'music_charts'") + result = cur.fetchone() + + if result is None: + # 테이블이 존재하지 않으면 생성 + create_table_query = """ + CREATE OR REPLACE TABLE ADHOC.music_charts ( + rank INT, + title STRING, + artist STRING, + lastPos INT, + image STRING, + peakPos INT, + isNew BOOLEAN, + source STRING + ); + """ + cur.execute(create_table_query) + print("✅ music_charts 테이블 생성 완료.") + else: + print("ℹ️ music_charts 테이블이 이미 존재합니다.") + + conn.commit() + cur.close() + conn.close() + + except Exception as e: + print(f"⚠️ 테이블 확인 및 생성 중 오류 발생: {e}") + + +# 문자열에서 작은따옴표 처리 및 NULL 값 처리 +def escape_quotes(value): + if value is None: + return "NULL" # None인 경우에는 'NULL'로 처리 + return f"'{value.replace('\'', '\'\'')}'" # 작은따옴표는 두 개로 이스케이프 처리 + +# Snowflake에서 SQL 실행 함수 +def insert_data_into_snowflake(df, table_name): + try: + conn = snowflake.connector.connect( + user=SNOWFLAKE_OPTIONS["user"], + password=SNOWFLAKE_OPTIONS["password"], + account=SNOWFLAKE_OPTIONS["account"], + database=SNOWFLAKE_OPTIONS["db"], + schema=SNOWFLAKE_OPTIONS["schema"], + warehouse=SNOWFLAKE_OPTIONS["warehouse"], + role=SNOWFLAKE_OPTIONS["role"], + ) + cur = conn.cursor() + + # DataFrame을 순회하며 INSERT 쿼리 실행 + for row in df.collect(): + # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 + rank = f"NULL" if row['rank'] is None else row['rank'] + title = escape_quotes(f"NULL" if row['title'] is None else f"'{row['title']}'") + artist = escape_quotes(f"NULL" if row['artist'] is None else f"'{row['artist']}'") + lastPos = f"NULL" if row['lastPos'] is None else row['lastPos'] + image = escape_quotes(f"NULL" if row['image'] is None else f"'{row['image']}'") + peakPos = f"NULL" if row['peakPos'] is None else row['peakPos'] + # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 + isNew = f"NULL" if row['isNew'] is None else ("True" if row['isNew'] else "FALSE") + source = escape_quotes(f"NULL" if row['source'] is None else f"'{row['source']}'") + + # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) + query = f""" + INSERT INTO {table_name} (rank, title, artist, lastPos, image, peakPos, isNew, source) + VALUES ({rank}, {title}, {artist}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + """ + cur.execute(query) + + conn.commit() + cur.close() + conn.close() + print("✅ Data inserted into Snowflake successfully.") + except Exception as e: + print(query) + print(f"⚠️ Error inserting data into Snowflake: {e}") + + + + +# Spark 세션 생성 +spark = spark_session_builder("S3_to_Snowflake") + +# 오늘 날짜 기반 S3 데이터 경로 생성 +TODAY = datetime.now().strftime("%Y%m%d") +S3_BUCKET = "s3a://de5-s4tify" +chart_sources = { + "bugs": f"{S3_BUCKET}/raw_data/bugs_chart/bugs_chart_{TODAY}.csv", + "flo": f"{S3_BUCKET}/raw_data/flo_chart/flo_chart_{TODAY}.csv", + "genie": f"{S3_BUCKET}/raw_data/genie_chart/genie_chart_{TODAY}.csv", + "melon": f"{S3_BUCKET}/raw_data/melon_chart/melon_chart_{TODAY}.csv", + "vibe": f"{S3_BUCKET}/raw_data/vibe_chart/vibe_chart_{TODAY}.csv" +} + +def read_chart_data(source, path): + try: + df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(path) + return df.withColumn("source", lit(source)) + except Exception as e: + print(f"⚠️ {source} 데이터 로드 실패: {e}") + return None + +# 차트 데이터 읽기 및 병합 +dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] +dfs = [df for df in dfs if df is not None] + +for df in dfs: + df.show(40) + +if dfs: + merged_df = dfs[0] + for df in dfs[1:]: + merged_df = merged_df.unionByName(df, allowMissingColumns=True) + + final_df = merged_df.select( + when(col("rank").rlike("^[0-9]+$"), col("rank").cast("int")).alias("rank"), + col("title"), + col("artist"), + when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias("lastPos"), + col("image"), + when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias("peakPos"), + when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias("isNew"), + col("source") + ) + + final_df.show(40) + + # 데이터 확인 + final_df.groupBy("source").agg(count("*").alias("count")).show() + + # Snowflake에서 테이블 존재 여부 확인 및 생성 + check_and_create_table() + + # Snowflake에 데이터 적재 + insert_data_into_snowflake(final_df, "music_charts") + +else: + print("❌ 저장할 차트 데이터가 없습니다.") + +# Spark 세션 종료 +spark.stop()