From 00d9eab8546d5679782cb210b98926b89fccf990 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Tue, 11 Mar 2025 22:42:21 +0900 Subject: [PATCH] [Upload]airflow_docker_intergration --- airflow/Dockerfile | 30 ++++- airflow/dags/get_weekly_top200_songs.py | 21 ++-- airflow/dags/scripts/crawling_spotify_data.py | 9 +- airflow/dags/scripts/get_access_token.py | 4 +- airflow/dags/scripts/load_spotify_data.py | 46 +++---- airflow/dags/scripts/request_spotify_api.py | 113 +++++++++--------- airflow/dags/spotify_data_dag.py | 35 +++--- airflow/docker-compose.yaml | 5 +- 8 files changed, 132 insertions(+), 131 deletions(-) diff --git a/airflow/Dockerfile b/airflow/Dockerfile index c5cbb6f..490e5ab 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -1,9 +1,10 @@ #Dockerfile FROM apache/airflow:2.9.1 +USER root + ENV SPARK_JAR_DIR=/opt/spark/jars -USER root RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.list && \ apt-get update && apt-get install -y \ wget openjdk-11-jdk && \ @@ -14,9 +15,32 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li apt-get clean && \ rm -rf /var/lib/apt/lists/* -# Set JAVA_HOME environment variable -ENV JAVA_HOME /usr +RUN apt-get update && apt-get install -y \ + curl \ + unzip \ + wget \ + libnss3 \ + libgconf-2-4 \ + libx11-xcb1 \ + libappindicator3-1 \ + fonts-liberation \ + xdg-utils \ + && rm -rf /var/lib/apt/lists/* + +# Google Chrome download +RUN apt-get update && apt-get install -y wget && \ + wget -q -O google-chrome.deb http://dl.google.com/linux/chrome/deb/pool/main/g/google-chrome-stable/google-chrome-stable_132.0.6834.159-1_amd64.deb \ + && apt install -y ./google-chrome.deb \ + && rm google-chrome.deb +# ChromeDriver download +RUN apt-get update && \ + apt-get install -y wget unzip && \ + wget -q -O chromedriver.zip https://storage.googleapis.com/chrome-for-testing-public/132.0.6834.159/linux64/chromedriver-linux64.zip && \ + unzip chromedriver.zip && \ + mv chromedriver-linux64/chromedriver /usr/local/bin/chromedriver && \ + chmod +x /usr/local/bin/chromedriver && \ + rm chromedriver.zip USER airflow diff --git a/airflow/dags/get_weekly_top200_songs.py b/airflow/dags/get_weekly_top200_songs.py index 7258999..be084a9 100644 --- a/airflow/dags/get_weekly_top200_songs.py +++ b/airflow/dags/get_weekly_top200_songs.py @@ -10,26 +10,27 @@ from airflow import DAG from airflow.decorators import task -from airflow.models import Variable from airflow.providers.amazon.aws.hooks.s3 import S3Hook @task def extract(countries, **kwargs): - user_id = Variable.get("SPOTIFY_ID") - user_pass = Variable.get("SPOTIFY_PASS") + user_id = os.getenv("AIRFLOW_VAR_SPOTIFY_ID") + user_pass = os.getenv("AIRFLOW_VAR_SPOTIFY_PASS") execution_date = kwargs["ds"] exec_date = datetime.strptime(execution_date, "%Y-%m-%d") current_weekday = exec_date.weekday() - days_since_last_thursday = (current_weekday - 3) % 7 - if current_weekday >= 5: - last_thursday = exec_date - timedelta(days=days_since_last_thursday) - else: - last_thursday = exec_date - \ - timedelta(days=days_since_last_thursday + 7) - date = last_thursday.strftime("%Y-%m-%d") + + if current_weekday >= 5: # 주말(토, 일)인 경우 → 이번 주 목요일 가져오기 + days_until_thursday = (3 - current_weekday) % 7 + target_date = exec_date + timedelta(days=days_until_thursday) + else: # 평일(월~금)인 경우 → 지난주 목요일 가져오기 + days_since_thursday = (current_weekday - 3) % 7 + target_date = exec_date - timedelta(days=days_since_thursday) + + date = target_date.strftime("%Y-%m-%d") options = webdriver.ChromeOptions() options.add_argument("--headless") diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index eb6f1e2..1a38c45 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -5,13 +5,10 @@ import pandas as pd from selenium import webdriver -from selenium.webdriver import ActionChains + from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -from webdriver_manager.chrome import ChromeDriverManager TODAY = datetime.now().strftime("%Y-%m-%d") @@ -48,11 +45,9 @@ def data_crawling(): chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") - service = Service("/opt/airflow/chromedriver-linux64/chromedriver") - url = "https://open.spotify.com/playlist/37i9dQZEVXbMDoHDwVN2tF" - with webdriver.Chrome(service=service, options=chrome_options) as driver: + with webdriver.Chrome(service=Service(), options=chrome_options) as driver: print("크롤링 시작") diff --git a/airflow/dags/scripts/get_access_token.py b/airflow/dags/scripts/get_access_token.py index a23958e..3b52a22 100644 --- a/airflow/dags/scripts/get_access_token.py +++ b/airflow/dags/scripts/get_access_token.py @@ -6,8 +6,8 @@ from airflow.models import Variable -SPOTIFY_CLIENT_ID = Variable.get("SPOTIFY_CLIENT_ID") -SPOTIFY_CLIENT_SECRET = Variable.get("SPOTIFY_CLIENT_SECRET") +SPOTIFY_CLIENT_ID = os.getenv("SPOTIFY_CLIENT_ID") +SPOTIFY_CLIENT_SECRET = os.getenv("SPOTIFY_CLIENT_SECRET") END_POINT = "https://accounts.spotify.com/api/token" diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 193389c..67561a8 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -1,39 +1,23 @@ -from datetime import datetime - -import boto3 import pandas as pd -from scripts.request_spotify_api import * - -from airflow.models import Variable - -TODAY = datetime.now().strftime("%Y-%m-%d") - -AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") - -BUCKET_NAME = "de5-s4tify" -OBJECT_NAME = "raw_data" - - -def conn_to_s3(): - s3_client = boto3.client( - "s3", - aws_access_key_id=AWS_ACCESS_KEY_ID, - aws_secret_access_key=AWS_SECRET_ACCESS_KEY, - ) - - return s3_client +import os +from airflow.providers.amazon.aws.hooks.s3 import S3Hook def load_s3_bucket(file_name): - s3_client = conn_to_s3() - file_path = f"data/{file_name}" - bucket_name = BUCKET_NAME - object_name = f"{OBJECT_NAME}/{file_name}" - + s3_hook = S3Hook(aws_conn_id="AWS_S3") + s3_bucket = "de5-s4tify" + s3_key = f"raw_data/{file_name}" + + local_file_path = f'data/{file_name}' + try: - s3_client.upload_file(file_path, bucket_name, object_name) - + s3_hook.load_file( + filename=local_file_path, + key=s3_key, + bucket_name=s3_bucket, + replace=True + ) + except Exception as e: print(f"error:{e}") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 55391dd..23df461 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -2,6 +2,7 @@ import time from datetime import datetime from typing import Any, Dict, Optional +import os import pandas as pd import requests @@ -13,100 +14,94 @@ TODAY = datetime.now().strftime("%Y-%m-%d") END_POINT = "https://api.spotify.com/v1" -SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN") -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) +if not SPOTIFY_ACCESS_TOKEN: + print("⚠️ Access Token 없음. 새로운 토큰을 가져옵니다.") + get_token() + SPOTIFY_ACCESS_TOKEN = Variable.get("SPOTIFY_ACCESS_TOKEN",default_var=None) + +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") -# 아티스트 top 10 트랙 가져오기 +#아티스트 top 10 트랙 가져오기 def get_arti_top10(logical_date, **kwargs): - - task_instance = kwargs["ti"] + + task_instance = kwargs['ti'] arti_top10_list = [] object_name = f"spotify_artist_top10_{logical_date}.csv" - - # csv 파일 읽어오기 + + #csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) for _, row in song_info.iterrows(): - - artist_id = ast.literal_eval(row["artist_id"]) - - # 피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 + + artist_id = ast.literal_eval(row['artist_id']) + + #피처링 등의 이유로 아티스트가 2명 이상인 경우가 존재 for i in range(len(artist_id)): id = artist_id[i] - end_point = END_POINT + f"/artists/{id}/top-tracks/" + end_point = END_POINT+f'/artists/{id}/top-tracks/' top_10_info = extract(end_point) - + try: - for track in top_10_info["tracks"]: - arti_top10_list.append( - { - "album": track["album"]["name"], - "artist_id": id, - "song_id": track["id"], - "title": track["name"], - } - ) + for track in top_10_info['tracks']: + arti_top10_list.append({ + "album": track['album']['name'], + "artist_id": id, + "song_id" : track['id'], + "title" : track['name'] + }) except Exception as e: print(f"error:{top_10_info}") - - # task_instance.xcom_push(key='artist_top10', value=arti_top10_list) + + #task_instance.xcom_push(key='artist_top10', value=arti_top10_list) artist_top10_df = pd.DataFrame(arti_top10_list) - artist_top10_df.to_csv( - f"data/{object_name}", - encoding="utf-8-sig", - index=False) + artist_top10_df.to_csv(f"data/{object_name}",encoding="utf-8-sig", index=False) load_s3_bucket(object_name) + - -# 아티스트 정보 가져오기 +#아티스트 정보 가져오기 def get_artist_info(logical_date, **kwargs): - - task_instance = kwargs["ti"] + + task_instance = kwargs['ti'] artist_info_list = [] - object_name = f"spotify_artist_info_{logical_date}.csv" - - # csv 파일 읽어오기 + object_name =f"spotify_artist_info_{logical_date}.csv" + + #csv 파일 읽어오기 song_info = read_crawling_csv(logical_date) - + for _, row in song_info.iterrows(): - artist_id = ast.literal_eval(row["artist_id"]) - + artist_id = ast.literal_eval(row['artist_id']) + for i in range(len(artist_id)): id = artist_id[i] - + try: - end_point = END_POINT + f"/artists/{id}" + end_point = END_POINT+f'/artists/{id}' artist_info = extract(end_point) - # print(artist_info['name']) - artist_info_list.append( - { - "artist": artist_info["name"], - "artist_id": id, - "artist_genre": artist_info["genres"], - } - ) + #print(artist_info['name']) + artist_info_list.append({ + "artist" : artist_info['name'], + "artist_id": id, + "artist_genre": artist_info['genres'] + }) except Exception as e: time.sleep(20) print(f"error:{artist_info}") - - # task_instance.xcom_push(key="artist_info", value=artist_info_list) + + #task_instance.xcom_push(key="artist_info", value=artist_info_list) artist_info_df = pd.DataFrame(artist_info_list) - artist_info_df.to_csv( - f"data/{object_name}", - encoding="utf-8-sig", - index=False) + artist_info_df.to_csv(f"data/{object_name}", encoding="utf-8-sig", index=False) load_s3_bucket(object_name) + # 크롤링 데이터 읽어오는 함수 def read_crawling_csv(execution_date) -> pd.DataFrame: - - daily_chart_crawling = pd.read_csv( - f"data/spotify_crawling_data_{execution_date}.csv" - ) - + + daily_chart_crawling = pd.read_csv(f"data/spotify_crawling_data_{execution_date}.csv") + return daily_chart_crawling diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index cd8ab8c..342dbab 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -1,7 +1,6 @@ from datetime import datetime, timedelta from scripts.crawling_spotify_data import * -from scripts.load_spotify_data import * from scripts.request_spotify_api import * from airflow import DAG @@ -11,36 +10,36 @@ "depends_on_past": False, "start_date": datetime(2025, 2, 28), "retries": 1, - "retry_delay": timedelta(seconds=60), + "retry_delay": timedelta(seconds=60) } with DAG( - dag_id="GetSpotifyArtistData", + dag_id = 'GetSpotifyArtistData', default_args=default_args, catchup=False, - tags=["final_project"], - schedule_interval="0 11 * * *", -) as dag: - + tags=['final_project'], + schedule_interval='0 11 * * *' +)as dag: + extract_globalTop50_data = PythonOperator( - task_id="extract_global_top50", + task_id = 'extract_global_top50', python_callable=data_crawling, - op_kwargs={"logical_date": "{{ ds }}"}, + op_kwargs={'logical_date': '{{ ds }}'}, ) - + extract_artistInfo_data = PythonOperator( - task_id="extract_artist_info", + task_id = 'extract_artist_info', python_callable=get_artist_info, - op_kwargs={"logical_date": "{{ ds }}"}, + op_kwargs={'logical_date': '{{ ds }}'}, ) - + extract_artistTop10_data = PythonOperator( - task_id="extract_artist_top10", + task_id = 'extract_artist_top10', python_callable=get_arti_top10, - op_kwargs={"logical_date": "{{ ds }}"}, + op_kwargs={'logical_date': '{{ ds }}'}, ) + + + extract_globalTop50_data >> [extract_artistInfo_data, extract_artistTop10_data] - extract_globalTop50_data >> [ - extract_artistInfo_data, - extract_artistTop10_data] diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index cbfe55b..1ca5b11 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -60,6 +60,8 @@ x-airflow-common: build: context: . dockerfile: Dockerfile + env_file: + - .env environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor @@ -81,7 +83,8 @@ x-airflow-common: AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. - _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- pandas numpy oauth2client gspread snowflake-connector-python pyspark dotenv-python apache-airflow-providers-apache-spark} + AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark} # The following line can be used to set a custom config file, stored in the local config folder # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'