Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions airflow/dags/plugins/spark_snowflake_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from datetime import datetime

import snowflake.connector
from pyspark.sql import SparkSession

from airflow.models import Variable

SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER")
SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL")
SNOWFLAKE_DB = "S4TIFY"
SNOWFLAKE_SCHEMA = "RAW_DATA"

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")


TODAY = datetime.now().strftime("%Y-%m-%d")

snowflake_options = {
"sfURL": SNOWFLAKE_URL,
"sfDatabase": SNOWFLAKE_DB,
"sfSchema": SNOWFLAKE_SCHEMA,
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ANALYTICS_USERS",
"sfUser": SNOWFLAKE_USER,
"sfPassword": SNOWFLKAE_USER_PWD,
}


def create_spark_session(app_name: str):
# 만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경
spark = (
SparkSession.builder.appName(f"{app_name}")
.master("local[*]")
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
.config(
"spark.jars",
"/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar",
)
.getOrCreate()
)

return spark


def create_snowflake_conn():
conn = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLKAE_USER_PWD,
account=SNOWFLAKE_ACCOUNT,
warehouse="COMPUTE_WH",
database=SNOWFLAKE_DB,
schema=SNOWFLAKE_SCHEMA,
)

return conn


def create_snowflake_table(sql):

conn = create_snowflake_conn()
cur = conn.cursor()

try:
cur.execute("BEGIN")
cur.execute(sql)
cur.execute("COMMIT")
conn.commit()

except Exception as e:
print(f"error:{e}")
cur.execute("ROLLBACK")


def write_snowflake_spark_dataframe(table_name, df):

df.show()

df.write.format("snowflake").options(**snowflake_options).option(
"dbtable", f"{table_name}"
).mode("append").save()
124 changes: 124 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_globalTop50.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
from datetime import datetime

import requests
import snowflake.connector
from plugins.spark_snowflake_conn import *
from pyspark.sql.functions import (col, current_date, explode, lit,
regexp_replace, split, when)
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from airflow.models import Variable

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"

TODAY = datetime.now().strftime("%Y-%m-%d")


def load():

# 테이블 있는지 확인하는 sql
sql = """
CREATE TABLE IF NOT EXISTS artist_info_globalTop50(
artist_id VARCHAR(100),
rank INT,
title VARCHAR(100),
artist VARCHAR(100),
artist_name VARCHAR(100),
artist_genre ARRAY,
date_time DATE
)
"""

create_snowflake_table(sql)

transform_df = transformation()
transform_df.show()

# Null 값이 있는 행 출력
# transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False)

write_snowflake_spark_dataframe("artist_info_globalTop50", transform_df)


def transformation():

artist_info_schema = StructType(
[
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True),
]
)

global_top50_schema = StructType(
[
StructField("rank", IntegerType(), True),
StructField("title", StringType(), True),
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
]
)

# 데이터 읽고 중복 제거
artist_info_df = extract(
"spotify_artist_info",
artist_info_schema).dropDuplicates(
["artist_id"])
global_top50_df = extract("spotify_crawling_data", global_top50_schema)

global_top50_df = global_top50_df.withColumn(
"artist_id", explode("artist_id"))

artist_info_top50_df = global_top50_df.join(
artist_info_df, on="artist_id", how="outer"
)

artist_info_top50_df = artist_info_top50_df.withColumn(
"date_time", current_date())

return artist_info_top50_df


def extract(file_name, schema):

spark = create_spark_session("artist_global_table")

df = spark.read.csv(
f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv",
header=True,
schema=schema,
)

if file_name == "spotify_crawling_data":
df = (
df.withColumn(
"artist", split(
regexp_replace(
col("artist"), r"[\[\]']", ""), ", ")) .withColumn(
"artist_id", split(
regexp_replace(
col("artist_id"), r"[\[\]']", ""), ", "), ) .fillna(
{
"title": ""}) .withColumn(
"artist", when(
col("artist").isNull(), lit(
[""])).otherwise(
col("artist")), ))
if file_name == "spotify_artist_info":
df = df.withColumn(
"artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")
) # 불필요한 문자 제거
df = df.withColumn(
"artist_genre", split(df["artist_genre"], ", ")
) # 쉼표 기준으로 배열 변환
df = df.withColumnRenamed("artist", "artist_name")

return df


if __name__ == "__main__":
load()
133 changes: 133 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_top10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from datetime import datetime

import requests
from dags.plugins.spark_snowflake_conn import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date, regexp_replace, split, udf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

from airflow.models import Variable

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"

TODAY = datetime.now().strftime("%Y-%m-%d")


def load():

# 테이블 있는지 확인하는 sql
sql = """
CREATE TABLE IF NOT EXISTS artist_info_top10(
artist_id VARCHAR(100),
artist VARCHAR(100),
artist_genre ARRAY,
album VARCHAR(100),
song_id VARCHAR(100),
title VARCHAR(100),
date_time DATE,
song_genre ARRAY
)
"""

# 테이블 없으면 생성
create_snowflake_table(sql)

transform_df = transformation()

write_snowflake_spark_dataframe("artist_info_top10", transform_df)


def transformation():

# 스키마 정의
artist_info_schema = StructType(
[
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True),
]
)

artist_top10_schema = StructType(
[
StructField("album", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("song_id", StringType(), True),
StructField("title", StringType(), True),
]
)

# 데이터 읽어오고 중복 제거
artist_top10_df = extract(
"spotify_artist_top10", artist_top10_schema
).dropDuplicates(["song_id"])
artist_info_df = extract(
"spotify_artist_info",
artist_info_schema).dropDuplicates(
["artist_id"])

artist_info_top10_df = artist_info_df.join(
artist_top10_df, on="artist_id", how="outer"
)

# 날짜 데이터 추가
artist_info_top10_df = artist_info_top10_df.withColumn(
"date_time", current_date())

# 노래 장르 데이터 추가
artist_info_top10_df = artist_info_top10_df.withColumn(
"song_genre", add_song_genre_udf(col("artist"), col("title"))
)

return artist_info_top10_df


def add_song_genre(artist, track):

url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json"

try:
response = requests.get(url).json()
return [
genre["name"] for genre in response.get(
"track",
{}).get(
"toptags",
{}).get(
"tag",
[])]
except requests.exceptions.RequestException as e:
print(f"API 요청 오류: {e}")
return ["API Error"]
except KeyError:
return ["Unknown"]


add_song_genre_udf = udf(add_song_genre, ArrayType(StringType()))


def extract(file_name, schema):

spark = create_spark_session("artist_top10_table")
df = spark.read.csv(
f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv",
header=True,
schema=schema,
)

if file_name == "spotify_artist_info":
df = df.withColumn(
"artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")
) # 불필요한 문자 제거
df = df.withColumn(
"artist_genre", split(df["artist_genre"], ", ")
) # 쉼표 기준으로 배열 변환

return df


if __name__ == "__main__":
load()
41 changes: 41 additions & 0 deletions airflow/dags/spotify_ELT_DAG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime, timedelta

from scripts.crawling_spotify_data import *
from scripts.load_spotify_data import *
from scripts.request_spotify_api import *

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

# DAG 기본 설정
default_args = {
"owner": "yerin",
"depends_on_past": False,
"start_date": datetime(2025, 3, 2),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
with DAG(
dag_id="SpotifyDataProcessing",
default_args=default_args,
catchup=False,
tags=["final_project"],
schedule="0 12 * * *",
) as dag:

artist_info_Top10_table = SparkSubmitOperator(
task_id="artist_info_top10_table",
application="dags/scripts/ELT_artist_info_top10.py",
conn_id="spark_default",
dag=dag,
)

artist_info_globalTop50_table = SparkSubmitOperator(
task_id="artist_info_globalTop50_table",
application="dags/scripts/ELT_artist_info_globalTop50.py",
conn_id="spark_default",
dag=dag,
)

[artist_info_Top10_table, artist_info_globalTop50_table]