Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions airflow/dags/plugins/spark_snowflake_conn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from pyspark.sql import SparkSession
from airflow.models import Variable

import snowflake.connector
from datetime import datetime

SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER")
SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL")
SNOWFLAKE_DB = 'test'
SNOWFLAKE_SCHEMA = 'test_schema'

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")


TODAY = datetime.now().strftime("%Y-%m-%d")

snowflake_options = {
"sfURL": SNOWFLAKE_URL,
"sfDatabase": SNOWFLAKE_DB,
"sfSchema": SNOWFLAKE_SCHEMA,
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ACCOUNTADMIN",
"sfUser": SNOWFLAKE_USER,
"sfPassword": SNOWFLKAE_USER_PWD
}

def create_spark_session(app_name: str):
#만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경
spark = SparkSession.builder \
.appName(f"{app_name}") \
.master("local[*]") \
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \
.getOrCreate()

return spark


def create_snowflake_conn():
conn = snowflake.connector.connect(
user = SNOWFLAKE_USER,
password = SNOWFLKAE_USER_PWD,
account = SNOWFLAKE_ACCOUNT,
warehouse = "COMPUTE_WH",
database = SNOWFLAKE_DB ,
schema = SNOWFLAKE_SCHEMA
)

return conn


def create_snowflake_table(sql):

conn = create_snowflake_conn()
cur = conn.cursor()

try:
cur.execute("BEGIN");
cur.execute(sql)
cur.execute("COMMIT");
conn.commit()

except Exception as e:
print(f"error:{e}")
cur.execute("ROLLBACK");


def write_snowflake_spark_dataframe(table_name, df):


df.show()

df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", f"{table_name}") \
.mode("append") \
.save()
95 changes: 95 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_globalTop50.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from pyspark.sql.functions import col, split, regexp_replace, explode, lit, when, current_date
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from airflow.models import Variable

import requests
import snowflake.connector
from datetime import datetime

from plugins.spark_snowflake_conn import *

import os

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = 'de5-s4tify'
OBJECT_NAME = 'raw_data'

TODAY = datetime.now().strftime("%Y-%m-%d")
def load():

#테이블 있는지 확인하는 sql
sql = """
CREATE TABLE IF NOT EXISTS artist_info_globalTop50(
artist_id VARCHAR(100),
rank INT,
title VARCHAR(100),
artist VARCHAR(100),
artist_name VARCHAR(100),
artist_genre ARRAY,
date_time DATE
)
"""

create_snowflake_table(sql)

transform_df = transformation()
transform_df.show()

# Null 값이 있는 행 출력
#transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False)

write_snowflake_spark_dataframe('artist_info_globalTop50', transform_df)


def transformation():

artist_info_schema = StructType([
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True)
])

global_top50_schema = StructType([
StructField("rank", IntegerType(), True),
StructField("title", StringType(), True),
StructField("artist", StringType(), True ),
StructField("artist_id", StringType(), True)
])

#데이터 읽고 중복 제거
artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id'])
global_top50_df = extract("spotify_crawling_data", global_top50_schema)

global_top50_df = global_top50_df.withColumn("artist_id", explode("artist_id"))

artist_info_top50_df = global_top50_df.join(artist_info_df, on='artist_id', how='outer')

artist_info_top50_df = artist_info_top50_df.withColumn("date_time", current_date())

return artist_info_top50_df


def extract(file_name, schema):

spark = create_spark_session('artist_global_table')

df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema)

if file_name == 'spotify_crawling_data':
df = (
df.withColumn("artist", split(regexp_replace(col("artist"), r"[\[\]']", ""), ", " ))
.withColumn("artist_id", split(regexp_replace(col("artist_id"), r"[\[\]']", ""), ", "))
.fillna({"title": ""})
.withColumn("artist", when(col("artist").isNull(), lit([""])).otherwise(col("artist")))
)
if file_name == 'spotify_artist_info':
df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거
df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환
df = df.withColumnRenamed("artist", "artist_name")

return df

if __name__ == "__main__":
load()

100 changes: 100 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_top10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, split, regexp_replace, current_date
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, ArrayType
from airflow.models import Variable

import requests
from datetime import datetime
from dags.plugins.spark_snowflake_conn import *

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = 'de5-s4tify'
OBJECT_NAME = 'raw_data'

TODAY = datetime.now().strftime("%Y-%m-%d")

def load():

#테이블 있는지 확인하는 sql
sql = """
CREATE TABLE IF NOT EXISTS artist_info_top10(
artist_id VARCHAR(100),
artist VARCHAR(100),
artist_genre ARRAY,
album VARCHAR(100),
song_id VARCHAR(100),
title VARCHAR(100),
date_time DATE,
song_genre ARRAY
)
"""

#테이블 없으면 생성
create_snowflake_table(sql)

transform_df = transformation()

write_snowflake_spark_dataframe('artist_info_top10', transform_df)

def transformation():

#스키마 정의
artist_info_schema = StructType([
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True)
])

artist_top10_schema = StructType([
StructField("album", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("song_id", StringType(), True),
StructField("title", StringType(), True),
])

#데이터 읽어오고 중복 제거
artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id'])
artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id'])

artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer')

#날짜 데이터 추가
artist_info_top10_df = artist_info_top10_df.withColumn("date_time", current_date())

#노래 장르 데이터 추가
artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title")))

return artist_info_top10_df


def add_song_genre(artist, track):

url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json"

try:
response = requests.get(url).json()
return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])]
except requests.exceptions.RequestException as e:
print(f"API 요청 오류: {e}")
return ["API Error"]
except KeyError:
return ["Unknown"]

add_song_genre_udf = udf(add_song_genre, ArrayType(StringType()))

def extract(file_name, schema):

spark = create_spark_session('artist_top10_table')
df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema)

if file_name == 'spotify_artist_info':
df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거
df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환

return df


if __name__ == "__main__":
load()

39 changes: 39 additions & 0 deletions airflow/dags/spotify_ELT_DAG.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

from scripts.crawling_spotify_data import *
from scripts.request_spotify_api import *
from scripts.load_spotify_data import *

#DAG 湲곕낯 „ㅼ•
default_args = {
'owner': 'yerin',
'depends_on_past': False,
'start_date': datetime(2025, 3, 2),
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='SpotifyDataProcessing',
default_args=default_args,
catchup=False,
tags=['final_project'],
schedule='0 12 * * *'
) as dag:

artist_info_Top10_table = SparkSubmitOperator(
task_id = 'artist_info_globalTop50_table',
application='dags/scripts/ELT_artist_info_top10.py',
conn_id='spark_default',
dag=dag
)

artist_info_globalTop50_table = SparkSubmitOperator(
task_id = 'artist_info_globalTop50_table',
application='dags/scripts/ELT_artist_info_globalTop50.py',
conn_id='spark_default',
dag=dag
)

[artist_info_Top10_table , artist_info_globalTop50_table]
Loading