diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py new file mode 100644 index 0000000..ab33988 --- /dev/null +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -0,0 +1,85 @@ +from datetime import datetime + +import snowflake.connector +from pyspark.sql import SparkSession + +from airflow.models import Variable + +SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") +SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") +SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") +SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") +SNOWFLAKE_DB = "S4TIFY" +SNOWFLAKE_SCHEMA = "RAW_DATA" + +AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") + + +TODAY = datetime.now().strftime("%Y-%m-%d") + +snowflake_options = { + "sfURL": SNOWFLAKE_URL, + "sfDatabase": SNOWFLAKE_DB, + "sfSchema": SNOWFLAKE_SCHEMA, + "sfWarehouse": "COMPUTE_WH", + "sfRole": "ANALYTICS_USERS", + "sfUser": SNOWFLAKE_USER, + "sfPassword": SNOWFLKAE_USER_PWD, +} + + +def create_spark_session(app_name: str): + # 만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 + spark = ( + SparkSession.builder.appName(f"{app_name}") + .master("local[*]") + .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) + .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") + .config( + "spark.jars", + "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar", + ) + .getOrCreate() + ) + + return spark + + +def create_snowflake_conn(): + conn = snowflake.connector.connect( + user=SNOWFLAKE_USER, + password=SNOWFLKAE_USER_PWD, + account=SNOWFLAKE_ACCOUNT, + warehouse="COMPUTE_WH", + database=SNOWFLAKE_DB, + schema=SNOWFLAKE_SCHEMA, + ) + + return conn + + +def create_snowflake_table(sql): + + conn = create_snowflake_conn() + cur = conn.cursor() + + try: + cur.execute("BEGIN") + cur.execute(sql) + cur.execute("COMMIT") + conn.commit() + + except Exception as e: + print(f"error:{e}") + cur.execute("ROLLBACK") + + +def write_snowflake_spark_dataframe(table_name, df): + + df.show() + + df.write.format("snowflake").options(**snowflake_options).option( + "dbtable", f"{table_name}" + ).mode("append").save() diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py new file mode 100644 index 0000000..efff14a --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -0,0 +1,124 @@ +import os +from datetime import datetime + +import requests +import snowflake.connector +from plugins.spark_snowflake_conn import * +from pyspark.sql.functions import (col, current_date, explode, lit, + regexp_replace, split, when) +from pyspark.sql.types import IntegerType, StringType, StructField, StructType + +from airflow.models import Variable + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = "de5-s4tify" +OBJECT_NAME = "raw_data" + +TODAY = datetime.now().strftime("%Y-%m-%d") + + +def load(): + + # 테이블 있는지 확인하는 sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_globalTop50( + artist_id VARCHAR(100), + rank INT, + title VARCHAR(100), + artist VARCHAR(100), + artist_name VARCHAR(100), + artist_genre ARRAY, + date_time DATE + ) + """ + + create_snowflake_table(sql) + + transform_df = transformation() + transform_df.show() + + # Null 값이 있는 행 출력 + # transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) + + write_snowflake_spark_dataframe("artist_info_globalTop50", transform_df) + + +def transformation(): + + artist_info_schema = StructType( + [ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True), + ] + ) + + global_top50_schema = StructType( + [ + StructField("rank", IntegerType(), True), + StructField("title", StringType(), True), + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + ] + ) + + # 데이터 읽고 중복 제거 + artist_info_df = extract( + "spotify_artist_info", + artist_info_schema).dropDuplicates( + ["artist_id"]) + global_top50_df = extract("spotify_crawling_data", global_top50_schema) + + global_top50_df = global_top50_df.withColumn( + "artist_id", explode("artist_id")) + + artist_info_top50_df = global_top50_df.join( + artist_info_df, on="artist_id", how="outer" + ) + + artist_info_top50_df = artist_info_top50_df.withColumn( + "date_time", current_date()) + + return artist_info_top50_df + + +def extract(file_name, schema): + + spark = create_spark_session("artist_global_table") + + df = spark.read.csv( + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + header=True, + schema=schema, + ) + + if file_name == "spotify_crawling_data": + df = ( + df.withColumn( + "artist", split( + regexp_replace( + col("artist"), r"[\[\]']", ""), ", ")) .withColumn( + "artist_id", split( + regexp_replace( + col("artist_id"), r"[\[\]']", ""), ", "), ) .fillna( + { + "title": ""}) .withColumn( + "artist", when( + col("artist").isNull(), lit( + [""])).otherwise( + col("artist")), )) + if file_name == "spotify_artist_info": + df = df.withColumn( + "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") + ) # 불필요한 문자 제거 + df = df.withColumn( + "artist_genre", split(df["artist_genre"], ", ") + ) # 쉼표 기준으로 배열 변환 + df = df.withColumnRenamed("artist", "artist_name") + + return df + + +if __name__ == "__main__": + load() diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py new file mode 100644 index 0000000..e55f5d5 --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -0,0 +1,133 @@ +from datetime import datetime + +import requests +from dags.plugins.spark_snowflake_conn import * +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, current_date, regexp_replace, split, udf +from pyspark.sql.types import ArrayType, StringType, StructField, StructType + +from airflow.models import Variable + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = "de5-s4tify" +OBJECT_NAME = "raw_data" + +TODAY = datetime.now().strftime("%Y-%m-%d") + + +def load(): + + # 테이블 있는지 확인하는 sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_top10( + artist_id VARCHAR(100), + artist VARCHAR(100), + artist_genre ARRAY, + album VARCHAR(100), + song_id VARCHAR(100), + title VARCHAR(100), + date_time DATE, + song_genre ARRAY + ) + """ + + # 테이블 없으면 생성 + create_snowflake_table(sql) + + transform_df = transformation() + + write_snowflake_spark_dataframe("artist_info_top10", transform_df) + + +def transformation(): + + # 스키마 정의 + artist_info_schema = StructType( + [ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True), + ] + ) + + artist_top10_schema = StructType( + [ + StructField("album", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("song_id", StringType(), True), + StructField("title", StringType(), True), + ] + ) + + # 데이터 읽어오고 중복 제거 + artist_top10_df = extract( + "spotify_artist_top10", artist_top10_schema + ).dropDuplicates(["song_id"]) + artist_info_df = extract( + "spotify_artist_info", + artist_info_schema).dropDuplicates( + ["artist_id"]) + + artist_info_top10_df = artist_info_df.join( + artist_top10_df, on="artist_id", how="outer" + ) + + # 날짜 데이터 추가 + artist_info_top10_df = artist_info_top10_df.withColumn( + "date_time", current_date()) + + # 노래 장르 데이터 추가 + artist_info_top10_df = artist_info_top10_df.withColumn( + "song_genre", add_song_genre_udf(col("artist"), col("title")) + ) + + return artist_info_top10_df + + +def add_song_genre(artist, track): + + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" + + try: + response = requests.get(url).json() + return [ + genre["name"] for genre in response.get( + "track", + {}).get( + "toptags", + {}).get( + "tag", + [])] + except requests.exceptions.RequestException as e: + print(f"API 요청 오류: {e}") + return ["API Error"] + except KeyError: + return ["Unknown"] + + +add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + + +def extract(file_name, schema): + + spark = create_spark_session("artist_top10_table") + df = spark.read.csv( + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + header=True, + schema=schema, + ) + + if file_name == "spotify_artist_info": + df = df.withColumn( + "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") + ) # 불필요한 문자 제거 + df = df.withColumn( + "artist_genre", split(df["artist_genre"], ", ") + ) # 쉼표 기준으로 배열 변환 + + return df + + +if __name__ == "__main__": + load() diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py new file mode 100644 index 0000000..2f4ea9d --- /dev/null +++ b/airflow/dags/spotify_ELT_DAG.py @@ -0,0 +1,41 @@ +from datetime import datetime, timedelta + +from scripts.crawling_spotify_data import * +from scripts.load_spotify_data import * +from scripts.request_spotify_api import * + +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator + +# DAG 기본 설정 +default_args = { + "owner": "yerin", + "depends_on_past": False, + "start_date": datetime(2025, 3, 2), + "retries": 1, + "retry_delay": timedelta(minutes=5), +} +with DAG( + dag_id="SpotifyDataProcessing", + default_args=default_args, + catchup=False, + tags=["final_project"], + schedule="0 12 * * *", +) as dag: + + artist_info_Top10_table = SparkSubmitOperator( + task_id="artist_info_top10_table", + application="dags/scripts/ELT_artist_info_top10.py", + conn_id="spark_default", + dag=dag, + ) + + artist_info_globalTop50_table = SparkSubmitOperator( + task_id="artist_info_globalTop50_table", + application="dags/scripts/ELT_artist_info_globalTop50.py", + conn_id="spark_default", + dag=dag, + ) + + [artist_info_Top10_table, artist_info_globalTop50_table]