Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions airflow/dags/scripts/crawling_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ def data_crawling():
By.XPATH, '//*[@id="main"]//div[@role="row"]'
)

print(len(song_lists))

for i in range(1, len(song_lists)):

artist = []
Expand All @@ -92,7 +90,6 @@ def data_crawling():
By.XPATH,
f'//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]/div[{i}]/div/div[2]/div/div',
)
print(song_title.text)

global_top50_df.loc[i, "title"] = song_title.text

Expand All @@ -106,9 +103,7 @@ def data_crawling():
arti_list = arti_info.find_elements(By.TAG_NAME, "a")

for arti in arti_list:
print(arti.text)
artist.append(arti.text)
print(arti.get_attribute("href")[-22:])
artist_id.append(arti.get_attribute("href")[-22:])

global_top50_df.loc[i, "artist"] = artist
Expand Down
23 changes: 10 additions & 13 deletions airflow/dags/scripts/load_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,24 @@
BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"


def conn_to_s3():
s3_client = boto3.client(
"s3",
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_secret_access_key = AWS_SECRET_ACCESS_KEY
)

return s3_client


def load_s3_bucket():


def load_s3_bucket(file_name):

s3_client = conn_to_s3()

file_path = f"data/spotify_top50_artistData_{TODAY}.csv"
file_path = f'data/{file_name}'
bucket_name = BUCKET_NAME
object_name = f"{OBJECT_NAME}/spotify_top50_artistData_{TODAY}.csv"

object_name = f"{OBJECT_NAME}/{file_name}"
try:
s3_client.upload_file(file_path, bucket_name, object_name)

except Exception as e:
print(f"error:{e}")
176 changes: 65 additions & 111 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from datetime import datetime
from typing import Any, Dict, Optional
from scripts.load_spotify_data import *

import pandas as pd
import requests
Expand All @@ -16,134 +17,87 @@
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")


def transformation(**kwargs):

task_instance = kwargs["ti"]

# 중복데이터 제거
artist_info_df = pd.DataFrame(
task_instance.xcom_pull(
task_ids="extract_artist_info",
key="artist_info")).drop_duplicates(
subset=["artist_id"])
artist_top10_df = pd.DataFrame(
task_instance.xcom_pull(
task_ids="extract_artist_top10",
key="artist_top10")).drop_duplicates(
subset=["song_id"])

global_top50_artist_df = pd.merge(
artist_info_df, artist_top10_df, on="artist_id", how="outer"
)

column = ["artist_id", "artist", "genre", "album", "song_id", "title"]
real_global_top50_artist_df = global_top50_artist_df[column]

for index, row in real_global_top50_artist_df.iterrows():
artist = row["artist"]
track = row["title"]
genre_list = []

url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json"

response = requests.get(url).json()

try:
for genre in response["track"]["toptags"]["tag"]:
genre_list.append(genre["name"])
except BaseException:
pass

real_global_top50_artist_df.at[index, "genre"] = genre_list

real_global_top50_artist_df.to_csv(
f"data/spotify_top50_artistData_{TODAY}.csv", encoding="utf-8-sig"
)


# 아티스트 top 10 트랙 가져오기
def get_arti_top10(**kwargs):

task_instance = kwargs["ti"]
#아티스트 top 10 트랙 가져오기
def get_arti_top10(logical_date, **kwargs):

task_instance = kwargs['ti']
arti_top10_list = []

# csv 파일 읽어오기
song_info = read_crawling_csv()
object_name = f"spotify_artist_top10_{logical_date}.csv"

#csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():

artist_id = ast.literal_eval(row["artist_id"])

# 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
artist_id = ast.literal_eval(row['artist_id'])
#피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
for i in range(len(artist_id)):
id = artist_id[i]
end_point = END_POINT + f"/artists/{id}/top-tracks/"
end_point = END_POINT+f'/artists/{id}/top-tracks/'

top_10_info = extract(end_point)

try:
for track in top_10_info["tracks"]:
arti_top10_list.append(
{
"album": track["album"]["name"],
"artist_id": id,
"song_id": track["id"],
"title": track["name"],
}
)
except BaseException:
time.sleep(5)

top_10_info = extract(end_point)

for track in top_10_info["tracks"]:
arti_top10_list.append(
{
"album": track["album"]["name"],
"artist_id": id,
"song_id": track["id"],
"title": track["name"],
}
)

task_instance.xcom_push(key="artist_top10", value=arti_top10_list)


# 아티스트 정보 가져오기
def get_artist_info(**kwargs):

task_instance = kwargs["ti"]

for track in top_10_info['tracks']:
arti_top10_list.append({
"album": track['album']['name'],
"artist_id": id,
"song_id" : track['id'],
"title" : track['name']
})
except Exception as e:
print(f"error:{top_10_info}")

#task_instance.xcom_push(key='artist_top10', value=arti_top10_list)
artist_top10_df = pd.DataFrame(arti_top10_list)
artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False)
load_s3_bucket(object_name)


#아티스트 정보 가져오기
def get_artist_info(logical_date, **kwargs):

task_instance = kwargs['ti']
artist_info_list = []

# csv 파일 읽어오기
song_info = read_crawling_csv()

object_name =f"spotify_artist_info_{logical_date}.csv"

#csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():
artist_id = ast.literal_eval(row["artist_id"])

artist_id = ast.literal_eval(row['artist_id'])
for i in range(len(artist_id)):
id = artist_id[i]

end_point = END_POINT + f"/artists/{id}"
artist_info = extract(end_point)
artist_info_list.append(
{
"artist": artist_info["name"],

try:
end_point = END_POINT+f'/artists/{id}'
artist_info = extract(end_point)
#print(artist_info['name'])
artist_info_list.append({
"artist" : artist_info['name'],
"artist_id": id,
"genre": artist_info["genres"],
}
)

task_instance.xcom_push(key="artist_info", value=artist_info_list)

"artist_genre": artist_info['genres']
})
except Exception as e:
time.sleep(20)
print(f"error:{artist_info}")

#task_instance.xcom_push(key="artist_info", value=artist_info_list)
artist_info_df = pd.DataFrame(artist_info_list)
artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False)
load_s3_bucket(object_name)



# 크롤링 데이터 읽어오는 함수
def read_crawling_csv() -> pd.DataFrame:

daily_chart_crawling = pd.read_csv(
f"data/spotify_crawling_data_{TODAY}.csv")

def read_crawling_csv(execution_date) -> pd.DataFrame:

daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv")

return daily_chart_crawling


Expand Down
50 changes: 19 additions & 31 deletions airflow/dags/spotify_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,48 +10,36 @@
default_args = {
"depends_on_past": False,
"start_date": datetime(2025, 2, 28),
"retries": 2,
"retry_delay": timedelta(minutes=5),
"retries": 1,
"retry_delay": timedelta(seconds=60)
}


with DAG(
dag_id="GetSpotifyArtistData",
dag_id = 'GetSpotifyArtistData',
default_args=default_args,
catchup=False,
tags=["final_project"],
schedule_interval="0 11 * * *",
) as dag:

tags=['final_project'],
schedule_interval='0 11 * * *'
)as dag:
extract_globalTop50_data = PythonOperator(
task_id="extract_global_top50", python_callable=data_crawling
task_id = 'extract_global_top50',
python_callable=data_crawling,
op_kwargs={'logical_date': '{{ ds }}'},
)

extract_artistInfo_data = PythonOperator(
task_id="extract_artist_info",
task_id = 'extract_artist_info',
python_callable=get_artist_info,
retries=2,
retry_delay=timedelta(seconds=30),
op_kwargs={'logical_date': '{{ ds }}'},
)

extract_artistTop10_data = PythonOperator(
task_id="extract_artist_top10",
task_id = 'extract_artist_top10',
python_callable=get_arti_top10,
retries=2,
retry_delay=timedelta(seconds=30),
)

transformation_data = PythonOperator(
task_id="transformation_data", python_callable=transformation
)

load_data = PythonOperator(
task_id="load_data",
python_callable=load_s3_bucket)

(
extract_globalTop50_data
>> [extract_artistInfo_data, extract_artistTop10_data]
>> transformation_data
>> load_data
op_kwargs={'logical_date': '{{ ds }}'},
)


extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data]
Loading