Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Kafka/conduktor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
conduktor-console:
image: conduktor/conduktor-console:1.26.0
ports:
- "8085:8085"
- "8080:8080"
volumes:
- conduktor_data:/var/conduktor
environment:
Expand Down
5 changes: 0 additions & 5 deletions Kafka/test.py → Kafka/register_connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import subprocess
import sys

from Kafka.utils.connect_utils import create_s3_sink_json
Expand All @@ -11,7 +10,3 @@

create_s3_sink_json()
register_sink_connector(f"{BASE_DIR}/connectors/s3_sink_config.json")


producer_script = os.path.join(BASE_DIR, "Kafka/" "eventsim_producer.py")
subprocess.run(["python", producer_script], check=True)
18 changes: 10 additions & 8 deletions airflow/dags/scripts/load_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"


def conn_to_s3():
s3_client = boto3.client(
's3',
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key = AWS_SECRET_ACCESS_KEY
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

return s3_client



def load_s3_bucket(file_name):

s3_client = conn_to_s3()
file_path = f'data/{file_name}'
file_path = f"data/{file_name}"
bucket_name = BUCKET_NAME
object_name = f"{OBJECT_NAME}/{file_name}"

try:
s3_client.upload_file(file_path, bucket_name, object_name)

except Exception as e:
print(f"error:{e}")
107 changes: 58 additions & 49 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import time
from datetime import datetime
from typing import Any, Dict, Optional
from scripts.load_spotify_data import *

import pandas as pd
import requests
from scripts.get_access_token import get_token
from scripts.load_spotify_data import *

from airflow.models import Variable

Expand All @@ -17,87 +17,96 @@
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")




#아티스트 top 10 트랙 가져오기
# 아티스트 top 10 트랙 가져오기
def get_arti_top10(logical_date, **kwargs):
task_instance = kwargs['ti']

task_instance = kwargs["ti"]
arti_top10_list = []
object_name = f"spotify_artist_top10_{logical_date}.csv"
#csv 파일 읽어오기

# csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():
artist_id = ast.literal_eval(row['artist_id'])
#피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재

artist_id = ast.literal_eval(row["artist_id"])

# 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
for i in range(len(artist_id)):
id = artist_id[i]
end_point = END_POINT+f'/artists/{id}/top-tracks/'
end_point = END_POINT + f"/artists/{id}/top-tracks/"

top_10_info = extract(end_point)

try:
for track in top_10_info['tracks']:
arti_top10_list.append({
"album": track['album']['name'],
"artist_id": id,
"song_id" : track['id'],
"title" : track['name']
})
for track in top_10_info["tracks"]:
arti_top10_list.append(
{
"album": track["album"]["name"],
"artist_id": id,
"song_id": track["id"],
"title": track["name"],
}
)
except Exception as e:
print(f"error:{top_10_info}")
#task_instance.xcom_push(key='artist_top10', value=arti_top10_list)

# task_instance.xcom_push(key='artist_top10', value=arti_top10_list)
artist_top10_df = pd.DataFrame(arti_top10_list)
artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False)
artist_top10_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
load_s3_bucket(object_name)


#아티스트 정보 가져오기

# 아티스트 정보 가져오기
def get_artist_info(logical_date, **kwargs):
task_instance = kwargs['ti']

task_instance = kwargs["ti"]
artist_info_list = []
object_name =f"spotify_artist_info_{logical_date}.csv"
#csv 파일 읽어오기
object_name = f"spotify_artist_info_{logical_date}.csv"

# csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():
artist_id = ast.literal_eval(row['artist_id'])
artist_id = ast.literal_eval(row["artist_id"])

for i in range(len(artist_id)):
id = artist_id[i]

try:
end_point = END_POINT+f'/artists/{id}'
end_point = END_POINT + f"/artists/{id}"
artist_info = extract(end_point)
#print(artist_info['name'])
artist_info_list.append({
"artist" : artist_info['name'],
"artist_id": id,
"artist_genre": artist_info['genres']
})
# print(artist_info['name'])
artist_info_list.append(
{
"artist": artist_info["name"],
"artist_id": id,
"artist_genre": artist_info["genres"],
}
)
except Exception as e:
time.sleep(20)
print(f"error:{artist_info}")
#task_instance.xcom_push(key="artist_info", value=artist_info_list)

# task_instance.xcom_push(key="artist_info", value=artist_info_list)
artist_info_df = pd.DataFrame(artist_info_list)
artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False)
artist_info_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
load_s3_bucket(object_name)



# 크롤링 데이터 읽어오는 함수
def read_crawling_csv(execution_date) -> pd.DataFrame:

daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv")


daily_chart_crawling = pd.read_csv(
f"data/spotify_crawling_data_{execution_date}.csv"
)

return daily_chart_crawling


Expand Down
35 changes: 18 additions & 17 deletions airflow/dags/spotify_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,36 @@
"depends_on_past": False,
"start_date": datetime(2025, 2, 28),
"retries": 1,
"retry_delay": timedelta(seconds=60)
"retry_delay": timedelta(seconds=60),
}


with DAG(
dag_id = 'GetSpotifyArtistData',
dag_id="GetSpotifyArtistData",
default_args=default_args,
catchup=False,
tags=['final_project'],
schedule_interval='0 11 * * *'
)as dag:
tags=["final_project"],
schedule_interval="0 11 * * *",
) as dag:

extract_globalTop50_data = PythonOperator(
task_id = 'extract_global_top50',
task_id="extract_global_top50",
python_callable=data_crawling,
op_kwargs={'logical_date': '{{ ds }}'},
op_kwargs={"logical_date": "{{ ds }}"},
)

extract_artistInfo_data = PythonOperator(
task_id = 'extract_artist_info',
task_id="extract_artist_info",
python_callable=get_artist_info,
op_kwargs={'logical_date': '{{ ds }}'},
op_kwargs={"logical_date": "{{ ds }}"},
)

extract_artistTop10_data = PythonOperator(
task_id = 'extract_artist_top10',
task_id="extract_artist_top10",
python_callable=get_arti_top10,
op_kwargs={'logical_date': '{{ ds }}'},
op_kwargs={"logical_date": "{{ ds }}"},
)


extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data]

extract_globalTop50_data >> [
extract_artistInfo_data,
extract_artistTop10_data]