Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 0 additions & 74 deletions airflow/dags/scripts/join_spotify_data.py

This file was deleted.

41 changes: 41 additions & 0 deletions airflow/dags/scripts/load_spotify_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime

import boto3
import pandas as pd
from airflow.models import Variable
from scripts.request_spotify_api import *

TODAY = datetime.now().strftime("%Y-%m-%d")

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"


def conn_to_s3():
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

return s3_client


def load_s3_bucket():

s3_client = conn_to_s3()

file_path = f"data/spotify_top50_artistData_{TODAY}.csv"
bucket_name = BUCKET_NAME
object_name = f"{OBJECT_NAME}/spotify_top50_artistData_{TODAY}.csv"

try:
s3_client.upload_file(file_path, bucket_name, object_name)

except Exception as e:
print(f"error:{e}")


247 changes: 120 additions & 127 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
@@ -1,161 +1,154 @@
import ast
import json
import os
from datetime import datetime

import pandas as pd
import requests
from airflow.models import Variable
from scripts.get_access_token import get_token
from typing import Dict, Any, Optional

"""
1.발급 받은 액세스 토큰으로 top50 차트 요청 -> top 50차트는 매일 갱신
+) 가끔 액세스 토큰이 만료 -> 다시 받아올 수 있게함 (env 파일 업뎃)

2. top 50 차트에 있는 아티스트 id 추출 + 아티스트 정보 가져오기

3. top 50에 있는 track_id로 track 세부 정보 추출

4. track_id를 기반으로 audio 세부 정보 가져오기
import requests
import pandas as pd
from datetime import datetime
import ast
import time

"""

TODAY = datetime.now().strftime("%Y-%m-%d")
END_POINT = "https://api.spotify.com/v1"
SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN")


def get_arti_top_10():

arti_top_10_list = []
top_10_tracks_df = pd.DataFrame(
columns=[
"album",
"artist_id",
"song_id",
"title"])
END_POINT = 'https://api.spotify.com/v1'

# csv 파일 읽어오기
song_info = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv")
SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN")
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

def transformation(**kwargs):

task_instance = kwargs['ti']

#중복데이터 제거
artist_info_df = pd.DataFrame(task_instance.xcom_pull(task_ids='extract_artist_info', key='artist_info')).drop_duplicates(subset=['artist_id'])
artist_top10_df = pd.DataFrame(task_instance.xcom_pull(task_ids='extract_artist_top10', key='artist_top10')).drop_duplicates(subset=['song_id'])

global_top50_artist_df = pd.merge(artist_info_df, artist_top10_df, on='artist_id', how='outer')

column = ['artist_id', 'artist', 'genre', 'album', 'song_id', 'title']
real_global_top50_artist_df = global_top50_artist_df[column]

for index, row in real_global_top50_artist_df.iterrows():
artist = row['artist']
track = row['title']
genre_list = []

url = f'https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json'

response = requests.get(url).json()

try:
for genre in response['track']['toptags']['tag']:
genre_list.append(genre['name'])
except:
pass

real_global_top50_artist_df.at[index, 'genre'] = genre_list

real_global_top50_artist_df.to_csv(f"data/spotify_top50_artistData_{TODAY}.csv",encoding="utf-8-sig")


#아티스트 top 10 트랙 가져오기
def get_arti_top10(**kwargs):

task_instance = kwargs['ti']
arti_top10_list = []

#csv 파일 읽어오기
song_info = read_crawling_csv()

for _, row in song_info.iterrows():

artist_id = ast.literal_eval(row["artist_id"])

# 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
artist_id = ast.literal_eval(row['artist_id'])
#피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
for i in range(len(artist_id)):
id = artist_id[i]
end_point = END_POINT + f"/artists/{id}/top-tracks/"
end_point = END_POINT+f'/artists/{id}/top-tracks/'

top_10_info = extract(end_point)

for track in top_10_info["tracks"]:
arti_top_10_list.append(
{
"album": track["album"]["name"],
"artist_id": id,
"song_id": track["id"],
"title": track["name"],
}
)

print(arti_top_10_list)
top_10_tracks_df = pd.DataFrame(arti_top_10_list)
# top_10_tracks_df.to_csv(f"arti_top10_track_{TODAY}.csv", encoding="utf-8-sig")

return top_10_tracks_df

try:
for track in top_10_info['tracks']:
arti_top10_list.append({
"album": track['album']['name'],
"artist_id": id,
"song_id" : track['id'],
"title" : track['name']
})
except:
time.sleep(5)

top_10_info = extract(end_point)

for track in top_10_info['tracks']:
arti_top10_list.append({
"album": track['album']['name'],
"artist_id": id,
"song_id" : track['id'],
"title" : track['name']
})

task_instance.xcom_push(key='artist_top10', value=arti_top10_list)


def get_artist_info():

#아티스트 정보 가져오기
def get_artist_info(**kwargs):

task_instance = kwargs['ti']

artist_info_list = []

# csv 파일 읽어오기
song_info = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv")

artist_info_df = pd.DataFrame(columns=["artist", "artist_id", "genre"])


#csv 파일 읽어오기
song_info = read_crawling_csv()

for _, row in song_info.iterrows():
artist_id = ast.literal_eval(row["artist_id"])

artist_id = ast.literal_eval(row['artist_id'])
for i in range(len(artist_id)):
id = artist_id[i]

end_point = END_POINT + f"/artists/{id}"
end_point = END_POINT+f'/artists/{id}'
artist_info = extract(end_point)
# print(artist_info['name'])
artist_info_list.append(
{
"artist": artist_info["name"],
"artist_id": id,
"genre": artist_info["genres"],
}
)

artist_info_df = pd.DataFrame(artist_info_list)
# artist_info_df.to_csv(f"arti_info_{TODAY}.csv", encoding="utf-8-sig")

return artist_info_df


# 노래 제목, 아티스트 id, 아티스트, 노래 id, 노래 인기도 담은 df 반환
def get_global_top50():

song_info_list = []
song_info_df = pd.DataFrame(
columns=["title", "artist", "artist_id", "song_id", "popularity"]
)
end_point = END_POINT + "/playlists/4RunlK9lvAC8ZtRbbbPWzD"
track_data = extract(end_point)

# 각 트랙 정보 추가
for track in track_data["tracks"]["items"]:
for artist in track["track"]["artists"]: # 여러 아티스트 처리
song_info_list.append(
{
"title": track["track"]["name"],
"artist": artist["name"],
"artist_id": artist["id"],
"song_id": track["track"]["id"],
"popularity": track["track"]["popularity"],
}
)

song_info_df = pd.DataFrame(song_info_list)
song_info_df.to_csv(
f"global_top50_track_{TODAY}.csv",
encoding="utf-8-sig")

return song_info_df


# 정보 요청
def extract(url):

artist_info_list.append({
"artist" : artist_info['name'],
"artist_id": id,
"genre": artist_info['genres']
})

task_instance.xcom_push(key="artist_info", value=artist_info_list)


# 크롤링 데이터 읽어오는 함수
def read_crawling_csv() -> pd.DataFrame:

daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{TODAY}.csv")

return daily_chart_crawling

#API 요청 함수
def extract(url: str) -> Optional[Dict[str, Any]]:

access_token = SPOTIFY_ACCESS_TOKEN

headers = {
"Authorization": f"Bearer {access_token}",
}

payload = {"grant_type": "client_credentials"}


response = requests.get(url, headers=headers)

if response.status_code == 200:
if response.status_code == 200:
result = response.json()

elif response.status_code == 400 or response.status_code == 401: # 토큰 만료시
get_token() # .env 파일에 새로 업로드 ㄱ

elif response.status_code == 400 or response.status_code == 401 : #토큰 만료시 재요청
time.sleep(3)
get_token() #Variable에 저장된 token 변경
response = requests.get(url, headers=headers)

result = response.json()

else:
print(response.text)

return result


if __name__ == "__main__":
get_arti_top_10()
Loading
Loading