From 60b70e6d837f7b9665effbc7b78ab88ae495ad8e Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Tue, 11 Mar 2025 20:49:42 +0900 Subject: [PATCH 1/2] =?UTF-8?q?kafka=20docker-compose.yml=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/conduktor.yml | 2 +- Kafka/{test.py => register_connector.py} | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) rename Kafka/{test.py => register_connector.py} (69%) diff --git a/Kafka/conduktor.yml b/Kafka/conduktor.yml index dbd2a63..c2a03be 100644 --- a/Kafka/conduktor.yml +++ b/Kafka/conduktor.yml @@ -16,7 +16,7 @@ services: conduktor-console: image: conduktor/conduktor-console:1.26.0 ports: - - "8085:8085" + - "8080:8080" volumes: - conduktor_data:/var/conduktor environment: diff --git a/Kafka/test.py b/Kafka/register_connector.py similarity index 69% rename from Kafka/test.py rename to Kafka/register_connector.py index ec3327f..d9f0983 100644 --- a/Kafka/test.py +++ b/Kafka/register_connector.py @@ -1,5 +1,4 @@ import os -import subprocess import sys from Kafka.utils.connect_utils import create_s3_sink_json @@ -11,7 +10,3 @@ create_s3_sink_json() register_sink_connector(f"{BASE_DIR}/connectors/s3_sink_config.json") - - -producer_script = os.path.join(BASE_DIR, "Kafka/" "eventsim_producer.py") -subprocess.run(["python", producer_script], check=True) From beb073704377fddb9a4e363da41d43a44956c80e Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Tue, 11 Mar 2025 11:52:27 +0000 Subject: [PATCH 2/2] Automated format fixes --- airflow/dags/scripts/load_spotify_data.py | 18 ++-- airflow/dags/scripts/request_spotify_api.py | 107 +++++++++++--------- airflow/dags/spotify_data_dag.py | 35 +++---- 3 files changed, 86 insertions(+), 74 deletions(-) diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 967061e..193389c 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -14,24 +14,26 @@ BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" + def conn_to_s3(): s3_client = boto3.client( - 's3', + "s3", aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key = AWS_SECRET_ACCESS_KEY + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) - + return s3_client - + + def load_s3_bucket(file_name): - + s3_client = conn_to_s3() - file_path = f'data/{file_name}' + file_path = f"data/{file_name}" bucket_name = BUCKET_NAME object_name = f"{OBJECT_NAME}/{file_name}" - + try: s3_client.upload_file(file_path, bucket_name, object_name) - + except Exception as e: print(f"error:{e}") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index c12d31e..55391dd 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -2,11 +2,11 @@ import time from datetime import datetime from typing import Any, Dict, Optional -from scripts.load_spotify_data import * import pandas as pd import requests from scripts.get_access_token import get_token +from scripts.load_spotify_data import * from airflow.models import Variable @@ -17,87 +17,96 @@ LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") - - -#아티스트 top 10 트랙 가져오기 +# 아티스트 top 10 트랙 가져오기 def get_arti_top10(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] arti_top10_list = [] object_name = f"spotify_artist_top10_{logical_date}.csv" - - #csv 파일 읽어오기 + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row['artist_id']) - - #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row["artist_id"]) + + # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT+f'/artists/{id}/top-tracks/' + end_point = END_POINT + f"/artists/{id}/top-tracks/" top_10_info = extract(end_point) - + try: - for track in top_10_info['tracks']: - arti_top10_list.append({ - "album": track['album']['name'], - "artist_id": id, - "song_id" : track['id'], - "title" : track['name'] - }) + for track in top_10_info["tracks"]: + arti_top10_list.append( + { + "album": track["album"]["name"], + "artist_id": id, + "song_id": track["id"], + "title": track["name"], + } + ) except Exception as e: print(f"error:{top_10_info}") - - #task_instance.xcom_push(key='artist_top10', value=arti_top10_list) + + # task_instance.xcom_push(key='artist_top10', value=arti_top10_list) artist_top10_df = pd.DataFrame(arti_top10_list) - artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False) + artist_top10_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - -#아티스트 정보 가져오기 + +# 아티스트 정보 가져오기 def get_artist_info(logical_date, **kwargs): - - task_instance = kwargs['ti'] + + task_instance = kwargs["ti"] artist_info_list = [] - object_name =f"spotify_artist_info_{logical_date}.csv" - - #csv 파일 읽어오기 + object_name = f"spotify_artist_info_{logical_date}.csv" + + # csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) - + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row['artist_id']) - + artist_id = ast.literal_eval(row["artist_id"]) + for i in range(len(artist_id)): id = artist_id[i] - + try: - end_point = END_POINT+f'/artists/{id}' + end_point = END_POINT + f"/artists/{id}" artist_info = extract(end_point) - #print(artist_info['name']) - artist_info_list.append({ - "artist" : artist_info['name'], - "artist_id": id, - "artist_genre": artist_info['genres'] - }) + # print(artist_info['name']) + artist_info_list.append( + { + "artist": artist_info["name"], + "artist_id": id, + "artist_genre": artist_info["genres"], + } + ) except Exception as e: time.sleep(20) print(f"error:{artist_info}") - - #task_instance.xcom_push(key="artist_info", value=artist_info_list) + + # task_instance.xcom_push(key="artist_info", value=artist_info_list) artist_info_df = pd.DataFrame(artist_info_list) - artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False) + artist_info_df.to_csv( + f"data/{object_name}", + encoding="utf-8-sig", + index=False) load_s3_bucket(object_name) - # 크롤링 데이터 읽어오는 함수 def read_crawling_csv(execution_date) -> pd.DataFrame: - - daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv") - + + daily_chart_crawling = pd.read_csv( + f"data/spotify_crawling_data_{execution_date}.csv" + ) + return daily_chart_crawling diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 7675084..cd8ab8c 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -11,35 +11,36 @@ "depends_on_past": False, "start_date": datetime(2025, 2, 28), "retries": 1, - "retry_delay": timedelta(seconds=60) + "retry_delay": timedelta(seconds=60), } with DAG( - dag_id = 'GetSpotifyArtistData', + dag_id="GetSpotifyArtistData", default_args=default_args, catchup=False, - tags=['final_project'], - schedule_interval='0 11 * * *' -)as dag: - + tags=["final_project"], + schedule_interval="0 11 * * *", +) as dag: + extract_globalTop50_data = PythonOperator( - task_id = 'extract_global_top50', + task_id="extract_global_top50", python_callable=data_crawling, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistInfo_data = PythonOperator( - task_id = 'extract_artist_info', + task_id="extract_artist_info", python_callable=get_artist_info, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - + extract_artistTop10_data = PythonOperator( - task_id = 'extract_artist_top10', + task_id="extract_artist_top10", python_callable=get_arti_top10, - op_kwargs={'logical_date': '{{ ds }}'}, + op_kwargs={"logical_date": "{{ ds }}"}, ) - - - extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] \ No newline at end of file + + extract_globalTop50_data >> [ + extract_artistInfo_data, + extract_artistTop10_data]