Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#Dockerfile
FROM apache/airflow:2.9.1

USER root

ENV SPARK_JAR_DIR=/opt/spark/jars

USER root
RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.list && \
apt-get update && apt-get install -y \
wget openjdk-11-jdk && \
Expand All @@ -14,9 +15,32 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Set JAVA_HOME environment variable
ENV JAVA_HOME /usr
RUN apt-get update && apt-get install -y \
curl \
unzip \
wget \
libnss3 \
libgconf-2-4 \
libx11-xcb1 \
libappindicator3-1 \
fonts-liberation \
xdg-utils \
&& rm -rf /var/lib/apt/lists/*

# Google Chrome download
RUN apt-get update && apt-get install -y wget && \
wget -q -O google-chrome.deb http://dl.google.com/linux/chrome/deb/pool/main/g/google-chrome-stable/google-chrome-stable_132.0.6834.159-1_amd64.deb \
&& apt install -y ./google-chrome.deb \
&& rm google-chrome.deb

# ChromeDriver download
RUN apt-get update && \
apt-get install -y wget unzip && \
wget -q -O chromedriver.zip https://storage.googleapis.com/chrome-for-testing-public/132.0.6834.159/linux64/chromedriver-linux64.zip && \
unzip chromedriver.zip && \
mv chromedriver-linux64/chromedriver /usr/local/bin/chromedriver && \
chmod +x /usr/local/bin/chromedriver && \
rm chromedriver.zip

USER airflow

Expand Down
21 changes: 11 additions & 10 deletions airflow/dags/get_weekly_top200_songs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,27 @@

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


@task
def extract(countries, **kwargs):

user_id = Variable.get("SPOTIFY_ID")
user_pass = Variable.get("SPOTIFY_PASS")
user_id = os.getenv("AIRFLOW_VAR_SPOTIFY_ID")
user_pass = os.getenv("AIRFLOW_VAR_SPOTIFY_PASS")

execution_date = kwargs["ds"]
exec_date = datetime.strptime(execution_date, "%Y-%m-%d")
current_weekday = exec_date.weekday()
days_since_last_thursday = (current_weekday - 3) % 7
if current_weekday >= 5:
last_thursday = exec_date - timedelta(days=days_since_last_thursday)
else:
last_thursday = exec_date - \
timedelta(days=days_since_last_thursday + 7)
date = last_thursday.strftime("%Y-%m-%d")

if current_weekday >= 5: # 주말(토, 일)인 경우 → 이번 주 목요일 가져오기
days_until_thursday = (3 - current_weekday) % 7
target_date = exec_date + timedelta(days=days_until_thursday)
else: # 평일(월~금)인 경우 → 지난주 목요일 가져오기
days_since_thursday = (current_weekday - 3) % 7
target_date = exec_date - timedelta(days=days_since_thursday)

date = target_date.strftime("%Y-%m-%d")

options = webdriver.ChromeOptions()
options.add_argument("--headless")
Expand Down
9 changes: 2 additions & 7 deletions airflow/dags/scripts/crawling_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@

import pandas as pd
from selenium import webdriver
from selenium.webdriver import ActionChains

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager

TODAY = datetime.now().strftime("%Y-%m-%d")

Expand Down Expand Up @@ -48,11 +45,9 @@ def data_crawling():
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")

service = Service("/opt/airflow/chromedriver-linux64/chromedriver")

url = "https://open.spotify.com/playlist/37i9dQZEVXbMDoHDwVN2tF"

with webdriver.Chrome(service=service, options=chrome_options) as driver:
with webdriver.Chrome(service=Service(), options=chrome_options) as driver:

print("크롤링 시작")

Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/scripts/get_access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

from airflow.models import Variable

SPOTIFY_CLIENT_ID = Variable.get("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = Variable.get("SPOTIFY_CLIENT_SECRET")
SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID")
SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET")
END_POINT = "https://accounts.spotify.com/api/token"


Expand Down
46 changes: 15 additions & 31 deletions airflow/dags/scripts/load_spotify_data.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,23 @@
from datetime import datetime

import boto3
import pandas as pd
from scripts.request_spotify_api import *

from airflow.models import Variable

TODAY = datetime.now().strftime("%Y-%m-%d")

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"


def conn_to_s3():
s3_client = boto3.client(
"s3",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
)

return s3_client
import os

from airflow.providers.amazon.aws.hooks.s3 import S3Hook

def load_s3_bucket(file_name):

s3_client = conn_to_s3()
file_path = f"data/{file_name}"
bucket_name = BUCKET_NAME
object_name = f"{OBJECT_NAME}/{file_name}"

s3_hook = S3Hook(aws_conn_id="AWS_S3")
s3_bucket = "de5-s4tify"
s3_key = f"raw_data/{file_name}"

local_file_path = f'data/{file_name}'

try:
s3_client.upload_file(file_path, bucket_name, object_name)

s3_hook.load_file(
filename=local_file_path,
key=s3_key,
bucket_name=s3_bucket,
replace=True
)

except Exception as e:
print(f"error:{e}")
113 changes: 54 additions & 59 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
from datetime import datetime
from typing import Any, Dict, Optional
import os

import pandas as pd
import requests
Expand All @@ -13,100 +14,94 @@
TODAY = datetime.now().strftime("%Y-%m-%d")
END_POINT = "https://api.spotify.com/v1"

SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN")
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")
SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None)
if not SPOTIFY_ACCESS_TOKEN:
print("⚠️ Access Token 없음. 새로운 토큰을 가져옵니다.")
get_token()
SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None)

LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY")


# 아티스트 top 10 트랙 가져오기
#아티스트 top 10 트랙 가져오기
def get_arti_top10(logical_date, **kwargs):

task_instance = kwargs["ti"]
task_instance = kwargs['ti']
arti_top10_list = []
object_name = f"spotify_artist_top10_{logical_date}.csv"

# csv 파일 읽어오기
#csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():

artist_id = ast.literal_eval(row["artist_id"])

# 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
artist_id = ast.literal_eval(row['artist_id'])
#피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재
for i in range(len(artist_id)):
id = artist_id[i]
end_point = END_POINT + f"/artists/{id}/top-tracks/"
end_point = END_POINT+f'/artists/{id}/top-tracks/'

top_10_info = extract(end_point)

try:
for track in top_10_info["tracks"]:
arti_top10_list.append(
{
"album": track["album"]["name"],
"artist_id": id,
"song_id": track["id"],
"title": track["name"],
}
)
for track in top_10_info['tracks']:
arti_top10_list.append({
"album": track['album']['name'],
"artist_id": id,
"song_id" : track['id'],
"title" : track['name']
})
except Exception as e:
print(f"error:{top_10_info}")

# task_instance.xcom_push(key='artist_top10', value=arti_top10_list)
#task_instance.xcom_push(key='artist_top10', value=arti_top10_list)
artist_top10_df = pd.DataFrame(arti_top10_list)
artist_top10_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False)
load_s3_bucket(object_name)



# 아티스트 정보 가져오기
#아티스트 정보 가져오기
def get_artist_info(logical_date, **kwargs):

task_instance = kwargs["ti"]
task_instance = kwargs['ti']
artist_info_list = []
object_name = f"spotify_artist_info_{logical_date}.csv"

# csv 파일 읽어오기
object_name =f"spotify_artist_info_{logical_date}.csv"
#csv 파일 읽어오기
song_info = read_crawling_csv(logical_date)

for _, row in song_info.iterrows():
artist_id = ast.literal_eval(row["artist_id"])

artist_id = ast.literal_eval(row['artist_id'])
for i in range(len(artist_id)):
id = artist_id[i]

try:
end_point = END_POINT + f"/artists/{id}"
end_point = END_POINT+f'/artists/{id}'
artist_info = extract(end_point)
# print(artist_info['name'])
artist_info_list.append(
{
"artist": artist_info["name"],
"artist_id": id,
"artist_genre": artist_info["genres"],
}
)
#print(artist_info['name'])
artist_info_list.append({
"artist" : artist_info['name'],
"artist_id": id,
"artist_genre": artist_info['genres']
})
except Exception as e:
time.sleep(20)
print(f"error:{artist_info}")

# task_instance.xcom_push(key="artist_info", value=artist_info_list)
#task_instance.xcom_push(key="artist_info", value=artist_info_list)
artist_info_df = pd.DataFrame(artist_info_list)
artist_info_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False)
load_s3_bucket(object_name)



# 크롤링 데이터 읽어오는 함수
def read_crawling_csv(execution_date) -> pd.DataFrame:

daily_chart_crawling = pd.read_csv(
f"data/spotify_crawling_data_{execution_date}.csv"
)


daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv")

return daily_chart_crawling


Expand Down
Loading
Loading