From e88ce350afe184e1f15442651941159950bd7f9d Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 7 Mar 2025 20:16:53 +0900 Subject: [PATCH] =?UTF-8?q?[fix]=20=EB=85=B8=EB=9E=98=20=EC=9E=A5=EB=A5=B4?= =?UTF-8?q?=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EC=B6=94=EA=B0=80=20=EB=B0=8F?= =?UTF-8?q?=20=EC=BD=94=EB=93=9C=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/scripts/join_spotify_data.py | 74 ------ airflow/dags/scripts/load_spotify_data.py | 41 ++++ airflow/dags/scripts/request_spotify_api.py | 247 ++++++++++---------- airflow/dags/spotify_data_dag.py | 55 +++-- 4 files changed, 196 insertions(+), 221 deletions(-) delete mode 100644 airflow/dags/scripts/join_spotify_data.py create mode 100644 airflow/dags/scripts/load_spotify_data.py diff --git a/airflow/dags/scripts/join_spotify_data.py b/airflow/dags/scripts/join_spotify_data.py deleted file mode 100644 index b80b9ca..0000000 --- a/airflow/dags/scripts/join_spotify_data.py +++ /dev/null @@ -1,74 +0,0 @@ -from datetime import datetime - -import boto3 -import pandas as pd -from airflow.models import Variable -from scripts.request_spotify_api import * - -TODAY = datetime.now().strftime("%Y-%m-%d") - -AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") - -BUCKET_NAME = "de5-s4tify" -OBJECT_NAME = "raw_data" - - -def conn_to_s3(): - s3_client = boto3.client( - "s3", - aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY, - ) - - return s3_client - - -def load_s3_bucket(): - - s3_client = conn_to_s3() - - file_path = f"data/spotify_join_data_{TODAY}.csv" - bucket_name = BUCKET_NAME - object_name = f"{OBJECT_NAME}/spotify_join_data_{TODAY}.csv" - - try: - s3_client.upload_file(file_path, bucket_name, object_name) - - except Exception as e: - print(f"error:{e}") - - -def processing_csv_file(df): - if isinstance(df["artist"][0], str): - df["artist"] = df["artist"].apply(eval) - df["artist_id"] = df["artist_id"].apply(eval) - - # 리스트로 된 'artist' 컬럼을 펼치기 - exploded_df = df.explode(["artist", "artist_id"]) - - return exploded_df - - -def read_and_merge(): - df = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv") - global_top50_df = processing_csv_file(df) - - artist_info_df = get_artist_info() - # 중복데이터 제거 - artist_info_df = artist_info_df.drop_duplicates(subset=["artist_id"]) - artist_top10_df = get_arti_top_10() - - # merged_df1 = pd.merge(global_top50_df, artist_info_df, on = 'artist_id', how='outer') - global_top50_artist_df = pd.merge( - artist_info_df, artist_top10_df, on="artist_id") - - column = ["artist_id", "artist", "genre", "album", "song_id", "title"] - real_global_top50_artist_df = global_top50_artist_df[column] - - real_global_top50_artist_df.to_csv( - f"data/spotify_join_data_{TODAY}.csv", encoding="utf-8-sig" - ) - - # 버킷 업로드 - load_s3_bucket() diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py new file mode 100644 index 0000000..7d5826d --- /dev/null +++ b/airflow/dags/scripts/load_spotify_data.py @@ -0,0 +1,41 @@ +from datetime import datetime + +import boto3 +import pandas as pd +from airflow.models import Variable +from scripts.request_spotify_api import * + +TODAY = datetime.now().strftime("%Y-%m-%d") + +AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") + +BUCKET_NAME = "de5-s4tify" +OBJECT_NAME = "raw_data" + + +def conn_to_s3(): + s3_client = boto3.client( + "s3", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + ) + + return s3_client + + +def load_s3_bucket(): + + s3_client = conn_to_s3() + + file_path = f"data/spotify_top50_artistData_{TODAY}.csv" + bucket_name = BUCKET_NAME + object_name = f"{OBJECT_NAME}/spotify_top50_artistData_{TODAY}.csv" + + try: + s3_client.upload_file(file_path, bucket_name, object_name) + + except Exception as e: + print(f"error:{e}") + + diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index de0645b..3a13183 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -1,161 +1,154 @@ -import ast -import json -import os -from datetime import datetime - -import pandas as pd -import requests from airflow.models import Variable from scripts.get_access_token import get_token +from typing import Dict, Any, Optional -""" -1.발급 받은 액세스 토큰으로 top50 차트 요청 -> top 50차트는 매일 갱신 -+) 가끔 액세스 토큰이 만료 -> 다시 받아올 수 있게함 (env 파일 업뎃) - -2. top 50 차트에 있는 아티스트 id 추출 + 아티스트 정보 가져오기 - -3. top 50에 있는 track_id로 track 세부 정보 추출 - -4. track_id를 기반으로 audio 세부 정보 가져오기 +import requests +import pandas as pd +from datetime import datetime +import ast +import time -""" TODAY = datetime.now().strftime("%Y-%m-%d") -END_POINT = "https://api.spotify.com/v1" -SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN") - - -def get_arti_top_10(): - - arti_top_10_list = [] - top_10_tracks_df = pd.DataFrame( - columns=[ - "album", - "artist_id", - "song_id", - "title"]) +END_POINT = 'https://api.spotify.com/v1' - # csv 파일 읽어오기 - song_info = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv") +SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN") +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +def transformation(**kwargs): + + task_instance = kwargs['ti'] + + #중복데이터 제거 + artist_info_df = pd.DataFrame(task_instance.xcom_pull(task_ids='extract_artist_info', key='artist_info')).drop_duplicates(subset=['artist_id']) + artist_top10_df = pd.DataFrame(task_instance.xcom_pull(task_ids='extract_artist_top10', key='artist_top10')).drop_duplicates(subset=['song_id']) + + global_top50_artist_df = pd.merge(artist_info_df, artist_top10_df, on='artist_id', how='outer') + + column = ['artist_id', 'artist', 'genre', 'album', 'song_id', 'title'] + real_global_top50_artist_df = global_top50_artist_df[column] + + for index, row in real_global_top50_artist_df.iterrows(): + artist = row['artist'] + track = row['title'] + genre_list = [] + + url = f'https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json' + + response = requests.get(url).json() + + try: + for genre in response['track']['toptags']['tag']: + genre_list.append(genre['name']) + except: + pass + + real_global_top50_artist_df.at[index, 'genre'] = genre_list + + real_global_top50_artist_df.to_csv(f"data/spotify_top50_artistData_{TODAY}.csv",encoding="utf-8-sig") + + +#아티스트 top 10 트랙 가져오기 +def get_arti_top10(**kwargs): + + task_instance = kwargs['ti'] + arti_top10_list = [] + + #csv 파일 읽어오기 + song_info = read_crawling_csv() for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row["artist_id"]) - - # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row['artist_id']) + + #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT + f"/artists/{id}/top-tracks/" + end_point = END_POINT+f'/artists/{id}/top-tracks/' top_10_info = extract(end_point) - - for track in top_10_info["tracks"]: - arti_top_10_list.append( - { - "album": track["album"]["name"], - "artist_id": id, - "song_id": track["id"], - "title": track["name"], - } - ) - - print(arti_top_10_list) - top_10_tracks_df = pd.DataFrame(arti_top_10_list) - # top_10_tracks_df.to_csv(f"arti_top10_track_{TODAY}.csv", encoding="utf-8-sig") - - return top_10_tracks_df + + try: + for track in top_10_info['tracks']: + arti_top10_list.append({ + "album": track['album']['name'], + "artist_id": id, + "song_id" : track['id'], + "title" : track['name'] + }) + except: + time.sleep(5) + + top_10_info = extract(end_point) + + for track in top_10_info['tracks']: + arti_top10_list.append({ + "album": track['album']['name'], + "artist_id": id, + "song_id" : track['id'], + "title" : track['name'] + }) + + task_instance.xcom_push(key='artist_top10', value=arti_top10_list) -def get_artist_info(): + +#아티스트 정보 가져오기 +def get_artist_info(**kwargs): + + task_instance = kwargs['ti'] + artist_info_list = [] - - # csv 파일 읽어오기 - song_info = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv") - - artist_info_df = pd.DataFrame(columns=["artist", "artist_id", "genre"]) - + + #csv 파일 읽어오기 + song_info = read_crawling_csv() + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row["artist_id"]) - + artist_id = ast.literal_eval(row['artist_id']) + for i in range(len(artist_id)): id = artist_id[i] - - end_point = END_POINT + f"/artists/{id}" + + end_point = END_POINT+f'/artists/{id}' artist_info = extract(end_point) - # print(artist_info['name']) - artist_info_list.append( - { - "artist": artist_info["name"], - "artist_id": id, - "genre": artist_info["genres"], - } - ) - - artist_info_df = pd.DataFrame(artist_info_list) - # artist_info_df.to_csv(f"arti_info_{TODAY}.csv", encoding="utf-8-sig") - - return artist_info_df - - -# 노래 제목, 아티스트 id, 아티스트, 노래 id, 노래 인기도 담은 df 반환 -def get_global_top50(): - - song_info_list = [] - song_info_df = pd.DataFrame( - columns=["title", "artist", "artist_id", "song_id", "popularity"] - ) - end_point = END_POINT + "/playlists/4RunlK9lvAC8ZtRbbbPWzD" - track_data = extract(end_point) - - # 각 트랙 정보 추가 - for track in track_data["tracks"]["items"]: - for artist in track["track"]["artists"]: # 여러 아티스트 처리 - song_info_list.append( - { - "title": track["track"]["name"], - "artist": artist["name"], - "artist_id": artist["id"], - "song_id": track["track"]["id"], - "popularity": track["track"]["popularity"], - } - ) - - song_info_df = pd.DataFrame(song_info_list) - song_info_df.to_csv( - f"global_top50_track_{TODAY}.csv", - encoding="utf-8-sig") - - return song_info_df - - -# 정보 요청 -def extract(url): - + artist_info_list.append({ + "artist" : artist_info['name'], + "artist_id": id, + "genre": artist_info['genres'] + }) + + task_instance.xcom_push(key="artist_info", value=artist_info_list) + + +# 크롤링 데이터 읽어오는 함수 +def read_crawling_csv() -> pd.DataFrame: + + daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv") + + return daily_chart_crawling + +#API 요청 함수 +def extract(url: str) -> Optional[Dict[str, Any]]: + access_token = SPOTIFY_ACCESS_TOKEN headers = { "Authorization": f"Bearer {access_token}", } - - payload = {"grant_type": "client_credentials"} - + response = requests.get(url, headers=headers) - - if response.status_code == 200: + + if response.status_code == 200: result = response.json() - - elif response.status_code == 400 or response.status_code == 401: # 토큰 만료시 - get_token() # .env 파일에 새로 업로드 ㄱ + + elif response.status_code == 400 or response.status_code == 401 : #토큰 만료시 재요청 + time.sleep(3) + get_token() #Variable에 저장된 token 변경 response = requests.get(url, headers=headers) - result = response.json() - + else: print(response.text) return result - - -if __name__ == "__main__": - get_arti_top_10() diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 9789e84..909e83c 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -1,41 +1,56 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from scripts.crawling_spotify_data import * -from scripts.join_spotify_data import * from scripts.request_spotify_api import * +from scripts.load_spotify_data import * -from airflow import DAG default_args = { "depends_on_past": False, "start_date": datetime(2025, 2, 28), "retries": 2, - "retry_delay": timedelta(minutes=5), + "retry_delay": timedelta(minutes=5) } with DAG( - dag_id="GetSpotifyArtistData", + dag_id = 'GetSpotifyArtistData', default_args=default_args, catchup=False, - tags=["final_project"], - schedule_interval="0 11 * * *", -) as dag: - + tags=['final_project'], + schedule_interval='0 11 * * *' +)as dag: + extract_globalTop50_data = PythonOperator( - task_id="crawling_global_top50", python_callable=data_crawling + task_id = 'extract_global_top50', + python_callable=data_crawling ) - - transformation_data = PythonOperator( - task_id="join_data", - python_callable=read_and_merge, + + extract_artistInfo_data = PythonOperator( + task_id = 'extract_artist_info', + python_callable=get_artist_info, retries=2, - retry_delay=timedelta(seconds=30), + retry_delay=timedelta(seconds=30) ) - + + extract_artistTop10_data = PythonOperator( + task_id = 'extract_artist_top10', + python_callable=get_arti_top10, + retries=2, + retry_delay = timedelta(seconds=30) + ) + + transformation_data = PythonOperator( + task_id='transformation_data', + python_callable=transformation + ) + load_data = PythonOperator( - task_id="load_data", - python_callable=load_s3_bucket) - - extract_globalTop50_data >> transformation_data >> load_data + task_id='load_data', + python_callable=load_s3_bucket + ) + + extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] >> transformation_data >> load_data + \ No newline at end of file