From af76d03e66a3c666c5050a80907b229662ae28cd Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:16:39 +0900 Subject: [PATCH 1/5] =?UTF-8?q?[feat]=20spotify=20ELT=20DAG=20=EC=83=9D?= =?UTF-8?q?=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 83 +++++++++++++++ .../scripts/ELT_artist_info_globalTop50.py | 95 +++++++++++++++++ airflow/dags/scripts/ELT_artist_info_top10.py | 100 ++++++++++++++++++ airflow/dags/spotify_ELT_DAG.py | 39 +++++++ 4 files changed, 317 insertions(+) create mode 100644 airflow/dags/plugins/spark_snowflake_conn.py create mode 100644 airflow/dags/scripts/ELT_artist_info_globalTop50.py create mode 100644 airflow/dags/scripts/ELT_artist_info_top10.py create mode 100644 airflow/dags/spotify_ELT_DAG.py diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py new file mode 100644 index 0000000..313dedc --- /dev/null +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -0,0 +1,83 @@ +from pyspark.sql import SparkSession +from airflow.models import Variable + +import snowflake.connector +from datetime import datetime + +SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") +SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") +SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") +SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") +SNOWFLAKE_DB = 'test' +SNOWFLAKE_SCHEMA = 'test_schema' + +AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") + + +TODAY = datetime.now().strftime("%Y-%m-%d") + +snowflake_options = { + "sfURL": SNOWFLAKE_URL, + "sfDatabase": SNOWFLAKE_DB, + "sfSchema": SNOWFLAKE_SCHEMA, + "sfWarehouse": "COMPUTE_WH", + "sfRole": "ACCOUNTADMIN", + "sfUser": SNOWFLAKE_USER, + "sfPassword": SNOWFLKAE_USER_PWD +} + +def create_spark_session(app_name: str): + # ǵ connection cluster master spark master ּҷ + spark = SparkSession.builder \ + .appName(f"{app_name}") \ + .master("local[*]") \ + .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \ + .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \ + .config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \ + .getOrCreate() + + return spark + + +def create_snowflake_conn(): + conn = snowflake.connector.connect( + user = SNOWFLAKE_USER, + password = SNOWFLKAE_USER_PWD, + account = SNOWFLAKE_ACCOUNT, + warehouse = "COMPUTE_WH", + database = SNOWFLAKE_DB , + schema = SNOWFLAKE_SCHEMA + ) + + return conn + + +def create_snowflake_table(sql): + + conn = create_snowflake_conn() + cur = conn.cursor() + + try: + cur.execute("BEGIN"); + cur.execute(sql) + cur.execute("COMMIT"); + conn.commit() + + except Exception as e: + print(f"error:{e}") + cur.execute("ROLLBACK"); + + +def write_snowflake_spark_dataframe(table_name, df): + + + df.show() + + df.write \ + .format("snowflake") \ + .options(**snowflake_options) \ + .option("dbtable", f"{table_name}") \ + .mode("append") \ + .save() \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py new file mode 100644 index 0000000..5bfbcab --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -0,0 +1,95 @@ +from pyspark.sql.functions import col, split, regexp_replace, explode, lit, when, current_date +from pyspark.sql.types import StringType, StructType, StructField, IntegerType +from airflow.models import Variable + +import requests +import snowflake.connector +from datetime import datetime + +from plugins.spark_snowflake_conn import * + +import os + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = 'de5-s4tify' +OBJECT_NAME = 'raw_data' + +TODAY = datetime.now().strftime("%Y-%m-%d") +def load(): + + #̺ ִ Ȯϴ sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_globalTop50( + artist_id VARCHAR(100), + rank INT, + title VARCHAR(100), + artist VARCHAR(100), + artist_name VARCHAR(100), + artist_genre ARRAY, + date_time DATE + ) + """ + + create_snowflake_table(sql) + + transform_df = transformation() + transform_df.show() + + # Null ִ + #transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) + + write_snowflake_spark_dataframe('artist_info_globalTop50', transform_df) + + +def transformation(): + + artist_info_schema = StructType([ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True) + ]) + + global_top50_schema = StructType([ + StructField("rank", IntegerType(), True), + StructField("title", StringType(), True), + StructField("artist", StringType(), True ), + StructField("artist_id", StringType(), True) + ]) + + # а ߺ + artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) + global_top50_df = extract("spotify_crawling_data", global_top50_schema) + + global_top50_df = global_top50_df.withColumn("artist_id", explode("artist_id")) + + artist_info_top50_df = global_top50_df.join(artist_info_df, on='artist_id', how='outer') + + artist_info_top50_df = artist_info_top50_df.withColumn("date_time", current_date()) + + return artist_info_top50_df + + +def extract(file_name, schema): + + spark = create_spark_session('artist_global_table') + + df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) + + if file_name == 'spotify_crawling_data': + df = ( + df.withColumn("artist", split(regexp_replace(col("artist"), r"[\[\]']", ""), ", " )) + .withColumn("artist_id", split(regexp_replace(col("artist_id"), r"[\[\]']", ""), ", ")) + .fillna({"title": ""}) + .withColumn("artist", when(col("artist").isNull(), lit([""])).otherwise(col("artist"))) + ) + if file_name == 'spotify_artist_info': + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ʿ + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ǥ 迭 ȯ + df = df.withColumnRenamed("artist", "artist_name") + + return df + +if __name__ == "__main__": + load() + \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py new file mode 100644 index 0000000..7392867 --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -0,0 +1,100 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, udf, split, regexp_replace, current_date +from pyspark.sql.types import ArrayType, StringType, StructType, StructField, ArrayType +from airflow.models import Variable + +import requests +from datetime import datetime +from dags.plugins.spark_snowflake_conn import * + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = 'de5-s4tify' +OBJECT_NAME = 'raw_data' + +TODAY = datetime.now().strftime("%Y-%m-%d") + +def load(): + + #̺ ִ Ȯϴ sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_top10( + artist_id VARCHAR(100), + artist VARCHAR(100), + artist_genre ARRAY, + album VARCHAR(100), + song_id VARCHAR(100), + title VARCHAR(100), + date_time DATE, + song_genre ARRAY + ) + """ + + #̺ + create_snowflake_table(sql) + + transform_df = transformation() + + write_snowflake_spark_dataframe('artist_info_top10', transform_df) + +def transformation(): + + #Ű + artist_info_schema = StructType([ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True) + ]) + + artist_top10_schema = StructType([ + StructField("album", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("song_id", StringType(), True), + StructField("title", StringType(), True), + ]) + + # о ߺ + artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id']) + artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) + + artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer') + + #¥ ߰ + artist_info_top10_df = artist_info_top10_df.withColumn("date_time", current_date()) + + #뷡 帣 ߰ + artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title"))) + + return artist_info_top10_df + + +def add_song_genre(artist, track): + + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" + + try: + response = requests.get(url).json() + return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] + except requests.exceptions.RequestException as e: + print(f"API û : {e}") + return ["API Error"] + except KeyError: + return ["Unknown"] + +add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + +def extract(file_name, schema): + + spark = create_spark_session('artist_top10_table') + df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) + + if file_name == 'spotify_artist_info': + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ʿ + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ǥ 迭 ȯ + + return df + + +if __name__ == "__main__": + load() + diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py new file mode 100644 index 0000000..497d753 --- /dev/null +++ b/airflow/dags/spotify_ELT_DAG.py @@ -0,0 +1,39 @@ +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from datetime import datetime, timedelta + +from scripts.crawling_spotify_data import * +from scripts.request_spotify_api import * +from scripts.load_spotify_data import * + +#DAG ⺻ +default_args = { + 'owner': 'yerin', + 'depends_on_past': False, + 'start_date': datetime(2025, 3, 2), + 'retries': 1, + 'retry_delay': timedelta(minutes=5) +} +with DAG( + dag_id='SpotifyDataProcessing', + default_args=default_args, + catchup=False, + tags=['final_project'], + schedule='0 12 * * *' +) as dag: + + artist_info_Top10_table = SparkSubmitOperator( + task_id = 'artist_info_globalTop50_table', + application='dags/scripts/ELT_artist_info_top10.py', + conn_id='spark_default', + dag=dag + ) + + artist_info_globalTop50_table = SparkSubmitOperator( + task_id = 'artist_info_globalTop50_table', + application='dags/scripts/ELT_artist_info_globalTop50.py', + conn_id='spark_default', + dag=dag + ) + + [artist_info_Top10_table , artist_info_globalTop50_table] \ No newline at end of file From 2663b3b1212a3d04e64d4ca765ad304881a8faa2 Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:22:25 +0900 Subject: [PATCH 2/5] =?UTF-8?q?[fix]=20=EC=A3=BC=EC=84=9D=20=EA=B9=A8?= =?UTF-8?q?=EC=A7=90=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/spotify_ELT_DAG.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 497d753..b7a45af 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -6,7 +6,7 @@ from scripts.request_spotify_api import * from scripts.load_spotify_data import * -#DAG ⺻ +#DAG 기본 설정 default_args = { 'owner': 'yerin', 'depends_on_past': False, From b1485410a11d531a9996d039dc77ed95ca26cc9a Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:43:07 +0900 Subject: [PATCH 3/5] =?UTF-8?q?[feat]=20spotify=20ELT=20DAG=20=EC=83=9D?= =?UTF-8?q?=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 6 +++--- airflow/dags/spotify_ELT_DAG.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 313dedc..84a442d 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -8,8 +8,8 @@ SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") -SNOWFLAKE_DB = 'test' -SNOWFLAKE_SCHEMA = 'test_schema' +SNOWFLAKE_DB = 'S4TIFY' +SNOWFLAKE_SCHEMA = 'RAW_DATA' AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") @@ -22,7 +22,7 @@ "sfDatabase": SNOWFLAKE_DB, "sfSchema": SNOWFLAKE_SCHEMA, "sfWarehouse": "COMPUTE_WH", - "sfRole": "ACCOUNTADMIN", + "sfRole": "ANALYTICS_USERS", "sfUser": SNOWFLAKE_USER, "sfPassword": SNOWFLKAE_USER_PWD } diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index b7a45af..4bfe9b6 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -23,7 +23,7 @@ ) as dag: artist_info_Top10_table = SparkSubmitOperator( - task_id = 'artist_info_globalTop50_table', + task_id = 'artist_info_top10_table', application='dags/scripts/ELT_artist_info_top10.py', conn_id='spark_default', dag=dag From 2d60990273c640d65467499a9ad061c992e83ece Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:52:10 +0900 Subject: [PATCH 4/5] =?UTF-8?q?[fix]=20utf-8=20=ED=8F=AC=EB=A7=B7=ED=8C=85?= =?UTF-8?q?=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 2 +- .../scripts/ELT_artist_info_globalTop50.py | 10 +++++----- airflow/dags/scripts/ELT_artist_info_top10.py | 18 +++++++++--------- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 84a442d..56bf816 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -28,7 +28,7 @@ } def create_spark_session(app_name: str): - # ǵ connection cluster master spark master ּҷ + #만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 spark = SparkSession.builder \ .appName(f"{app_name}") \ .master("local[*]") \ diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 5bfbcab..9f735e8 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -18,7 +18,7 @@ TODAY = datetime.now().strftime("%Y-%m-%d") def load(): - #̺ ִ Ȯϴ sql + #테이블 있는지 확인하는 sql sql = """ CREATE TABLE IF NOT EXISTS artist_info_globalTop50( artist_id VARCHAR(100), @@ -36,7 +36,7 @@ def load(): transform_df = transformation() transform_df.show() - # Null ִ + # Null 값이 있는 행 출력 #transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) write_snowflake_spark_dataframe('artist_info_globalTop50', transform_df) @@ -57,7 +57,7 @@ def transformation(): StructField("artist_id", StringType(), True) ]) - # а ߺ + #데이터 읽고 중복 제거 artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) global_top50_df = extract("spotify_crawling_data", global_top50_schema) @@ -84,8 +84,8 @@ def extract(file_name, schema): .withColumn("artist", when(col("artist").isNull(), lit([""])).otherwise(col("artist"))) ) if file_name == 'spotify_artist_info': - df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ʿ - df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ǥ 迭 ȯ + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거 + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환 df = df.withColumnRenamed("artist", "artist_name") return df diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index 7392867..6fbcb85 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -16,7 +16,7 @@ def load(): - #̺ ִ Ȯϴ sql + #테이블 있는지 확인하는 sql sql = """ CREATE TABLE IF NOT EXISTS artist_info_top10( artist_id VARCHAR(100), @@ -30,7 +30,7 @@ def load(): ) """ - #̺ + #테이블 없으면 생성 create_snowflake_table(sql) transform_df = transformation() @@ -39,7 +39,7 @@ def load(): def transformation(): - #Ű + #스키마 정의 artist_info_schema = StructType([ StructField("artist", StringType(), True), StructField("artist_id", StringType(), True), @@ -53,16 +53,16 @@ def transformation(): StructField("title", StringType(), True), ]) - # о ߺ + #데이터 읽어오고 중복 제거 artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id']) artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer') - #¥ ߰ + #날짜 데이터 추가 artist_info_top10_df = artist_info_top10_df.withColumn("date_time", current_date()) - #뷡 帣 ߰ + #노래 장르 데이터 추가 artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title"))) return artist_info_top10_df @@ -76,7 +76,7 @@ def add_song_genre(artist, track): response = requests.get(url).json() return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] except requests.exceptions.RequestException as e: - print(f"API û : {e}") + print(f"API 요청 오류: {e}") return ["API Error"] except KeyError: return ["Unknown"] @@ -89,8 +89,8 @@ def extract(file_name, schema): df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) if file_name == 'spotify_artist_info': - df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ʿ - df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ǥ 迭 ȯ + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거 + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환 return df From e3951b69cf7e5e4359ea04f24bad13a81152e4c3 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Fri, 14 Mar 2025 05:52:48 +0000 Subject: [PATCH 5/5] Automated format fixes --- airflow/dags/plugins/spark_snowflake_conn.py | 84 +++++----- .../scripts/ELT_artist_info_globalTop50.py | 145 ++++++++++------- airflow/dags/scripts/ELT_artist_info_top10.py | 147 +++++++++++------- airflow/dags/spotify_ELT_DAG.py | 50 +++--- 4 files changed, 246 insertions(+), 180 deletions(-) diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index 56bf816..ab33988 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -1,15 +1,16 @@ -from pyspark.sql import SparkSession -from airflow.models import Variable +from datetime import datetime import snowflake.connector -from datetime import datetime +from pyspark.sql import SparkSession -SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") -SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") +from airflow.models import Variable + +SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") +SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") -SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") -SNOWFLAKE_DB = 'S4TIFY' -SNOWFLAKE_SCHEMA = 'RAW_DATA' +SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") +SNOWFLAKE_DB = "S4TIFY" +SNOWFLAKE_SCHEMA = "RAW_DATA" AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") @@ -24,60 +25,61 @@ "sfWarehouse": "COMPUTE_WH", "sfRole": "ANALYTICS_USERS", "sfUser": SNOWFLAKE_USER, - "sfPassword": SNOWFLKAE_USER_PWD + "sfPassword": SNOWFLKAE_USER_PWD, } + def create_spark_session(app_name: str): - #만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 - spark = SparkSession.builder \ - .appName(f"{app_name}") \ - .master("local[*]") \ - .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \ - .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \ - .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \ - .config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \ + # 만약 정의된 connection이 cluster라면 master를 spark master 주소로 변경 + spark = ( + SparkSession.builder.appName(f"{app_name}") + .master("local[*]") + .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) + .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") + .config( + "spark.jars", + "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar", + ) .getOrCreate() - + ) + return spark def create_snowflake_conn(): conn = snowflake.connector.connect( - user = SNOWFLAKE_USER, - password = SNOWFLKAE_USER_PWD, - account = SNOWFLAKE_ACCOUNT, - warehouse = "COMPUTE_WH", - database = SNOWFLAKE_DB , - schema = SNOWFLAKE_SCHEMA + user=SNOWFLAKE_USER, + password=SNOWFLKAE_USER_PWD, + account=SNOWFLAKE_ACCOUNT, + warehouse="COMPUTE_WH", + database=SNOWFLAKE_DB, + schema=SNOWFLAKE_SCHEMA, ) return conn def create_snowflake_table(sql): - + conn = create_snowflake_conn() cur = conn.cursor() - + try: - cur.execute("BEGIN"); + cur.execute("BEGIN") cur.execute(sql) - cur.execute("COMMIT"); + cur.execute("COMMIT") conn.commit() - + except Exception as e: print(f"error:{e}") - cur.execute("ROLLBACK"); - - + cur.execute("ROLLBACK") + + def write_snowflake_spark_dataframe(table_name, df): - - + df.show() - - df.write \ - .format("snowflake") \ - .options(**snowflake_options) \ - .option("dbtable", f"{table_name}") \ - .mode("append") \ - .save() \ No newline at end of file + + df.write.format("snowflake").options(**snowflake_options).option( + "dbtable", f"{table_name}" + ).mode("append").save() diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 9f735e8..efff14a 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -1,28 +1,30 @@ -from pyspark.sql.functions import col, split, regexp_replace, explode, lit, when, current_date -from pyspark.sql.types import StringType, StructType, StructField, IntegerType -from airflow.models import Variable +import os +from datetime import datetime import requests import snowflake.connector -from datetime import datetime - -from plugins.spark_snowflake_conn import * +from plugins.spark_snowflake_conn import * +from pyspark.sql.functions import (col, current_date, explode, lit, + regexp_replace, split, when) +from pyspark.sql.types import IntegerType, StringType, StructField, StructType -import os +from airflow.models import Variable LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") -BUCKET_NAME = 'de5-s4tify' -OBJECT_NAME = 'raw_data' +BUCKET_NAME = "de5-s4tify" +OBJECT_NAME = "raw_data" TODAY = datetime.now().strftime("%Y-%m-%d") -def load(): - - #테이블 있는지 확인하는 sql + + +def load(): + + # 테이블 있는지 확인하는 sql sql = """ CREATE TABLE IF NOT EXISTS artist_info_globalTop50( artist_id VARCHAR(100), - rank INT, + rank INT, title VARCHAR(100), artist VARCHAR(100), artist_name VARCHAR(100), @@ -30,66 +32,93 @@ def load(): date_time DATE ) """ - + create_snowflake_table(sql) transform_df = transformation() transform_df.show() - + # Null 값이 있는 행 출력 - #transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) + # transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) - write_snowflake_spark_dataframe('artist_info_globalTop50', transform_df) + write_snowflake_spark_dataframe("artist_info_globalTop50", transform_df) def transformation(): - - artist_info_schema = StructType([ - StructField("artist", StringType(), True), - StructField("artist_id", StringType(), True), - StructField("artist_genre", StringType(), True) - ]) - - global_top50_schema = StructType([ - StructField("rank", IntegerType(), True), - StructField("title", StringType(), True), - StructField("artist", StringType(), True ), - StructField("artist_id", StringType(), True) - ]) - - #데이터 읽고 중복 제거 - artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) + + artist_info_schema = StructType( + [ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True), + ] + ) + + global_top50_schema = StructType( + [ + StructField("rank", IntegerType(), True), + StructField("title", StringType(), True), + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + ] + ) + + # 데이터 읽고 중복 제거 + artist_info_df = extract( + "spotify_artist_info", + artist_info_schema).dropDuplicates( + ["artist_id"]) global_top50_df = extract("spotify_crawling_data", global_top50_schema) - - global_top50_df = global_top50_df.withColumn("artist_id", explode("artist_id")) - - artist_info_top50_df = global_top50_df.join(artist_info_df, on='artist_id', how='outer') - - artist_info_top50_df = artist_info_top50_df.withColumn("date_time", current_date()) - + + global_top50_df = global_top50_df.withColumn( + "artist_id", explode("artist_id")) + + artist_info_top50_df = global_top50_df.join( + artist_info_df, on="artist_id", how="outer" + ) + + artist_info_top50_df = artist_info_top50_df.withColumn( + "date_time", current_date()) + return artist_info_top50_df - + def extract(file_name, schema): - - spark = create_spark_session('artist_global_table') - - df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) - - if file_name == 'spotify_crawling_data': + + spark = create_spark_session("artist_global_table") + + df = spark.read.csv( + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + header=True, + schema=schema, + ) + + if file_name == "spotify_crawling_data": df = ( - df.withColumn("artist", split(regexp_replace(col("artist"), r"[\[\]']", ""), ", " )) - .withColumn("artist_id", split(regexp_replace(col("artist_id"), r"[\[\]']", ""), ", ")) - .fillna({"title": ""}) - .withColumn("artist", when(col("artist").isNull(), lit([""])).otherwise(col("artist"))) - ) - if file_name == 'spotify_artist_info': - df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거 - df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환 + df.withColumn( + "artist", split( + regexp_replace( + col("artist"), r"[\[\]']", ""), ", ")) .withColumn( + "artist_id", split( + regexp_replace( + col("artist_id"), r"[\[\]']", ""), ", "), ) .fillna( + { + "title": ""}) .withColumn( + "artist", when( + col("artist").isNull(), lit( + [""])).otherwise( + col("artist")), )) + if file_name == "spotify_artist_info": + df = df.withColumn( + "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") + ) # 불필요한 문자 제거 + df = df.withColumn( + "artist_genre", split(df["artist_genre"], ", ") + ) # 쉼표 기준으로 배열 변환 df = df.withColumnRenamed("artist", "artist_name") - + return df - + + if __name__ == "__main__": load() - \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index 6fbcb85..e55f5d5 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -1,22 +1,24 @@ -from pyspark.sql import SparkSession -from pyspark.sql.functions import col, udf, split, regexp_replace, current_date -from pyspark.sql.types import ArrayType, StringType, StructType, StructField, ArrayType -from airflow.models import Variable +from datetime import datetime import requests -from datetime import datetime -from dags.plugins.spark_snowflake_conn import * +from dags.plugins.spark_snowflake_conn import * +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, current_date, regexp_replace, split, udf +from pyspark.sql.types import ArrayType, StringType, StructField, StructType + +from airflow.models import Variable LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") -BUCKET_NAME = 'de5-s4tify' -OBJECT_NAME = 'raw_data' +BUCKET_NAME = "de5-s4tify" +OBJECT_NAME = "raw_data" TODAY = datetime.now().strftime("%Y-%m-%d") -def load(): - - #테이블 있는지 확인하는 sql + +def load(): + + # 테이블 있는지 확인하는 sql sql = """ CREATE TABLE IF NOT EXISTS artist_info_top10( artist_id VARCHAR(100), @@ -29,72 +31,103 @@ def load(): song_genre ARRAY ) """ - - #테이블 없으면 생성 + + # 테이블 없으면 생성 create_snowflake_table(sql) transform_df = transformation() - - write_snowflake_spark_dataframe('artist_info_top10', transform_df) - + + write_snowflake_spark_dataframe("artist_info_top10", transform_df) + + def transformation(): - - #스키마 정의 - artist_info_schema = StructType([ - StructField("artist", StringType(), True), - StructField("artist_id", StringType(), True), - StructField("artist_genre", StringType(), True) - ]) - - artist_top10_schema = StructType([ - StructField("album", StringType(), True), - StructField("artist_id", StringType(), True), - StructField("song_id", StringType(), True), - StructField("title", StringType(), True), - ]) - - #데이터 읽어오고 중복 제거 - artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id']) - artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) - - artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer') - - #날짜 데이터 추가 - artist_info_top10_df = artist_info_top10_df.withColumn("date_time", current_date()) - - #노래 장르 데이터 추가 - artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title"))) - + + # 스키마 정의 + artist_info_schema = StructType( + [ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True), + ] + ) + + artist_top10_schema = StructType( + [ + StructField("album", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("song_id", StringType(), True), + StructField("title", StringType(), True), + ] + ) + + # 데이터 읽어오고 중복 제거 + artist_top10_df = extract( + "spotify_artist_top10", artist_top10_schema + ).dropDuplicates(["song_id"]) + artist_info_df = extract( + "spotify_artist_info", + artist_info_schema).dropDuplicates( + ["artist_id"]) + + artist_info_top10_df = artist_info_df.join( + artist_top10_df, on="artist_id", how="outer" + ) + + # 날짜 데이터 추가 + artist_info_top10_df = artist_info_top10_df.withColumn( + "date_time", current_date()) + + # 노래 장르 데이터 추가 + artist_info_top10_df = artist_info_top10_df.withColumn( + "song_genre", add_song_genre_udf(col("artist"), col("title")) + ) + return artist_info_top10_df def add_song_genre(artist, track): - + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" - + try: response = requests.get(url).json() - return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] + return [ + genre["name"] for genre in response.get( + "track", + {}).get( + "toptags", + {}).get( + "tag", + [])] except requests.exceptions.RequestException as e: print(f"API 요청 오류: {e}") return ["API Error"] except KeyError: return ["Unknown"] - + + add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + def extract(file_name, schema): - - spark = create_spark_session('artist_top10_table') - df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) - - if file_name == 'spotify_artist_info': - df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 불필요한 문자 제거 - df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 쉼표 기준으로 배열 변환 - - return df + + spark = create_spark_session("artist_top10_table") + df = spark.read.csv( + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + header=True, + schema=schema, + ) + + if file_name == "spotify_artist_info": + df = df.withColumn( + "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") + ) # 불필요한 문자 제거 + df = df.withColumn( + "artist_genre", split(df["artist_genre"], ", ") + ) # 쉼표 기준으로 배열 변환 + + return df if __name__ == "__main__": load() - diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 4bfe9b6..2f4ea9d 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -1,39 +1,41 @@ -from airflow import DAG -from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from datetime import datetime, timedelta from scripts.crawling_spotify_data import * -from scripts.request_spotify_api import * from scripts.load_spotify_data import * +from scripts.request_spotify_api import * + +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator -#DAG 기본 설정 +# DAG 기본 설정 default_args = { - 'owner': 'yerin', - 'depends_on_past': False, - 'start_date': datetime(2025, 3, 2), - 'retries': 1, - 'retry_delay': timedelta(minutes=5) + "owner": "yerin", + "depends_on_past": False, + "start_date": datetime(2025, 3, 2), + "retries": 1, + "retry_delay": timedelta(minutes=5), } with DAG( - dag_id='SpotifyDataProcessing', + dag_id="SpotifyDataProcessing", default_args=default_args, catchup=False, - tags=['final_project'], - schedule='0 12 * * *' + tags=["final_project"], + schedule="0 12 * * *", ) as dag: - + artist_info_Top10_table = SparkSubmitOperator( - task_id = 'artist_info_top10_table', - application='dags/scripts/ELT_artist_info_top10.py', - conn_id='spark_default', - dag=dag + task_id="artist_info_top10_table", + application="dags/scripts/ELT_artist_info_top10.py", + conn_id="spark_default", + dag=dag, ) - + artist_info_globalTop50_table = SparkSubmitOperator( - task_id = 'artist_info_globalTop50_table', - application='dags/scripts/ELT_artist_info_globalTop50.py', - conn_id='spark_default', - dag=dag + task_id="artist_info_globalTop50_table", + application="dags/scripts/ELT_artist_info_globalTop50.py", + conn_id="spark_default", + dag=dag, ) - - [artist_info_Top10_table , artist_info_globalTop50_table] \ No newline at end of file + + [artist_info_Top10_table, artist_info_globalTop50_table]