From d053bd1d41d27389c1abe0d37d1343b9f0ef975d Mon Sep 17 00:00:00 2001 From: YEERRin Date: Tue, 11 Mar 2025 14:52:08 +0900 Subject: [PATCH] =?UTF-8?q?[fix]=20=EC=8A=A4=ED=8F=AC=ED=8B=B0=ED=8C=8C?= =?UTF-8?q?=EC=9D=B4=20=EA=B8=80=EB=A1=9C=EB=B2=8C=20=ED=83=9150=20?= =?UTF-8?q?=EC=B0=A8=ED=8A=B8=20=EC=88=98=EC=A7=91=20DAG=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/scripts/crawling_spotify_data.py | 5 - airflow/dags/scripts/load_spotify_data.py | 23 +-- airflow/dags/scripts/request_spotify_api.py | 176 +++++++----------- airflow/dags/spotify_data_dag.py | 50 ++--- 4 files changed, 94 insertions(+), 160 deletions(-) diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index d945253..eb6f1e2 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -70,8 +70,6 @@ def data_crawling(): By.XPATH, '//*[@id="main"]//div[@role="row"]' ) - print(len(song_lists)) - for i in range(1, len(song_lists)): artist = [] @@ -92,7 +90,6 @@ def data_crawling(): By.XPATH, f'//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]/div[{i}]/div/div[2]/div/div', ) - print(song_title.text) global_top50_df.loc[i, "title"] = song_title.text @@ -106,9 +103,7 @@ def data_crawling(): arti_list = arti_info.find_elements(By.TAG_NAME, "a") for arti in arti_list: - print(arti.text) artist.append(arti.text) - print(arti.get_attribute("href")[-22:]) artist_id.append(arti.get_attribute("href")[-22:]) global_top50_df.loc[i, "artist"] = artist diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 56a4a8e..967061e 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -14,27 +14,24 @@ BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" - def conn_to_s3(): s3_client = boto3.client( - "s3", + 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + aws_secret_access_key = AWS_SECRET_ACCESS_KEY ) - + return s3_client - - -def load_s3_bucket(): - + +def load_s3_bucket(file_name): + s3_client = conn_to_s3() - - file_path = f"data/spotify_top50_artistData_{TODAY}.csv" + file_path = f'data/{file_name}' bucket_name = BUCKET_NAME - object_name = f"{OBJECT_NAME}/spotify_top50_artistData_{TODAY}.csv" - + object_name = f"{OBJECT_NAME}/{file_name}" + try: s3_client.upload_file(file_path, bucket_name, object_name) - + except Exception as e: print(f"error:{e}") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 06f3886..c12d31e 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -2,6 +2,7 @@ import time from datetime import datetime from typing import Any, Dict, Optional +from scripts.load_spotify_data import * import pandas as pd import requests @@ -16,134 +17,87 @@ LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") -def transformation(**kwargs): - task_instance = kwargs["ti"] - # 중복데이터 제거 - artist_info_df = pd.DataFrame( - task_instance.xcom_pull( - task_ids="extract_artist_info", - key="artist_info")).drop_duplicates( - subset=["artist_id"]) - artist_top10_df = pd.DataFrame( - task_instance.xcom_pull( - task_ids="extract_artist_top10", - key="artist_top10")).drop_duplicates( - subset=["song_id"]) - - global_top50_artist_df = pd.merge( - artist_info_df, artist_top10_df, on="artist_id", how="outer" - ) - - column = ["artist_id", "artist", "genre", "album", "song_id", "title"] - real_global_top50_artist_df = global_top50_artist_df[column] - - for index, row in real_global_top50_artist_df.iterrows(): - artist = row["artist"] - track = row["title"] - genre_list = [] - - url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" - - response = requests.get(url).json() - - try: - for genre in response["track"]["toptags"]["tag"]: - genre_list.append(genre["name"]) - except BaseException: - pass - - real_global_top50_artist_df.at[index, "genre"] = genre_list - - real_global_top50_artist_df.to_csv( - f"data/spotify_top50_artistData_{TODAY}.csv", encoding="utf-8-sig" - ) - - -# 아티스트 top 10 트랙 가져오기 -def get_arti_top10(**kwargs): - - task_instance = kwargs["ti"] +#아티스트 top 10 트랙 가져오기 +def get_arti_top10(logical_date, **kwargs): + + task_instance = kwargs['ti'] arti_top10_list = [] - - # csv 파일 읽어오기 - song_info = read_crawling_csv() + object_name = f"spotify_artist_top10_{logical_date}.csv" + + #csv 파일 읽어오기 + song_info = read_crawling_csv(logical_date) for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row["artist_id"]) - - # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row['artist_id']) + + #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT + f"/artists/{id}/top-tracks/" + end_point = END_POINT+f'/artists/{id}/top-tracks/' top_10_info = extract(end_point) - + try: - for track in top_10_info["tracks"]: - arti_top10_list.append( - { - "album": track["album"]["name"], - "artist_id": id, - "song_id": track["id"], - "title": track["name"], - } - ) - except BaseException: - time.sleep(5) - - top_10_info = extract(end_point) - - for track in top_10_info["tracks"]: - arti_top10_list.append( - { - "album": track["album"]["name"], - "artist_id": id, - "song_id": track["id"], - "title": track["name"], - } - ) - - task_instance.xcom_push(key="artist_top10", value=arti_top10_list) - - -# 아티스트 정보 가져오기 -def get_artist_info(**kwargs): - - task_instance = kwargs["ti"] - + for track in top_10_info['tracks']: + arti_top10_list.append({ + "album": track['album']['name'], + "artist_id": id, + "song_id" : track['id'], + "title" : track['name'] + }) + except Exception as e: + print(f"error:{top_10_info}") + + #task_instance.xcom_push(key='artist_top10', value=arti_top10_list) + artist_top10_df = pd.DataFrame(arti_top10_list) + artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False) + load_s3_bucket(object_name) + + +#아티스트 정보 가져오기 +def get_artist_info(logical_date, **kwargs): + + task_instance = kwargs['ti'] artist_info_list = [] - - # csv 파일 읽어오기 - song_info = read_crawling_csv() - + object_name =f"spotify_artist_info_{logical_date}.csv" + + #csv 파일 읽어오기 + song_info = read_crawling_csv(logical_date) + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row["artist_id"]) - + artist_id = ast.literal_eval(row['artist_id']) + for i in range(len(artist_id)): id = artist_id[i] - - end_point = END_POINT + f"/artists/{id}" - artist_info = extract(end_point) - artist_info_list.append( - { - "artist": artist_info["name"], + + try: + end_point = END_POINT+f'/artists/{id}' + artist_info = extract(end_point) + #print(artist_info['name']) + artist_info_list.append({ + "artist" : artist_info['name'], "artist_id": id, - "genre": artist_info["genres"], - } - ) - - task_instance.xcom_push(key="artist_info", value=artist_info_list) - + "artist_genre": artist_info['genres'] + }) + except Exception as e: + time.sleep(20) + print(f"error:{artist_info}") + + #task_instance.xcom_push(key="artist_info", value=artist_info_list) + artist_info_df = pd.DataFrame(artist_info_list) + artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False) + load_s3_bucket(object_name) + + # 크롤링 데이터 읽어오는 함수 -def read_crawling_csv() -> pd.DataFrame: - - daily_chart_crawling = pd.read_csv( - f"data/spotify_crawling_data_{TODAY}.csv") - +def read_crawling_csv(execution_date) -> pd.DataFrame: + + daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv") + return daily_chart_crawling diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 3189adb..7675084 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -10,48 +10,36 @@ default_args = { "depends_on_past": False, "start_date": datetime(2025, 2, 28), - "retries": 2, - "retry_delay": timedelta(minutes=5), + "retries": 1, + "retry_delay": timedelta(seconds=60) } with DAG( - dag_id="GetSpotifyArtistData", + dag_id = 'GetSpotifyArtistData', default_args=default_args, catchup=False, - tags=["final_project"], - schedule_interval="0 11 * * *", -) as dag: - + tags=['final_project'], + schedule_interval='0 11 * * *' +)as dag: + extract_globalTop50_data = PythonOperator( - task_id="extract_global_top50", python_callable=data_crawling + task_id = 'extract_global_top50', + python_callable=data_crawling, + op_kwargs={'logical_date': '{{ ds }}'}, ) - + extract_artistInfo_data = PythonOperator( - task_id="extract_artist_info", + task_id = 'extract_artist_info', python_callable=get_artist_info, - retries=2, - retry_delay=timedelta(seconds=30), + op_kwargs={'logical_date': '{{ ds }}'}, ) - + extract_artistTop10_data = PythonOperator( - task_id="extract_artist_top10", + task_id = 'extract_artist_top10', python_callable=get_arti_top10, - retries=2, - retry_delay=timedelta(seconds=30), - ) - - transformation_data = PythonOperator( - task_id="transformation_data", python_callable=transformation - ) - - load_data = PythonOperator( - task_id="load_data", - python_callable=load_s3_bucket) - - ( - extract_globalTop50_data - >> [extract_artistInfo_data, extract_artistTop10_data] - >> transformation_data - >> load_data + op_kwargs={'logical_date': '{{ ds }}'}, ) + + + extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] \ No newline at end of file