From af76d03e66a3c666c5050a80907b229662ae28cd Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:16:39 +0900 Subject: [PATCH 1/2] =?UTF-8?q?[feat]=20spotify=20ELT=20DAG=20=EC=83=9D?= =?UTF-8?q?=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/plugins/spark_snowflake_conn.py | 83 +++++++++++++++ .../scripts/ELT_artist_info_globalTop50.py | 95 +++++++++++++++++ airflow/dags/scripts/ELT_artist_info_top10.py | 100 ++++++++++++++++++ airflow/dags/spotify_ELT_DAG.py | 39 +++++++ 4 files changed, 317 insertions(+) create mode 100644 airflow/dags/plugins/spark_snowflake_conn.py create mode 100644 airflow/dags/scripts/ELT_artist_info_globalTop50.py create mode 100644 airflow/dags/scripts/ELT_artist_info_top10.py create mode 100644 airflow/dags/spotify_ELT_DAG.py diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py new file mode 100644 index 0000000..313dedc --- /dev/null +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -0,0 +1,83 @@ +from pyspark.sql import SparkSession +from airflow.models import Variable + +import snowflake.connector +from datetime import datetime + +SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") +SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") +SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") +SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") +SNOWFLAKE_DB = 'test' +SNOWFLAKE_SCHEMA = 'test_schema' + +AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") + + +TODAY = datetime.now().strftime("%Y-%m-%d") + +snowflake_options = { + "sfURL": SNOWFLAKE_URL, + "sfDatabase": SNOWFLAKE_DB, + "sfSchema": SNOWFLAKE_SCHEMA, + "sfWarehouse": "COMPUTE_WH", + "sfRole": "ACCOUNTADMIN", + "sfUser": SNOWFLAKE_USER, + "sfPassword": SNOWFLKAE_USER_PWD +} + +def create_spark_session(app_name: str): + #¸¸¾à Á¤ÀÇµÈ connectionÀÌ cluster¶ó¸é master¸¦ spark master ÁÖ¼Ò·Î º¯°æ + spark = SparkSession.builder \ + .appName(f"{app_name}") \ + .master("local[*]") \ + .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \ + .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \ + .config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \ + .getOrCreate() + + return spark + + +def create_snowflake_conn(): + conn = snowflake.connector.connect( + user = SNOWFLAKE_USER, + password = SNOWFLKAE_USER_PWD, + account = SNOWFLAKE_ACCOUNT, + warehouse = "COMPUTE_WH", + database = SNOWFLAKE_DB , + schema = SNOWFLAKE_SCHEMA + ) + + return conn + + +def create_snowflake_table(sql): + + conn = create_snowflake_conn() + cur = conn.cursor() + + try: + cur.execute("BEGIN"); + cur.execute(sql) + cur.execute("COMMIT"); + conn.commit() + + except Exception as e: + print(f"error:{e}") + cur.execute("ROLLBACK"); + + +def write_snowflake_spark_dataframe(table_name, df): + + + df.show() + + df.write \ + .format("snowflake") \ + .options(**snowflake_options) \ + .option("dbtable", f"{table_name}") \ + .mode("append") \ + .save() \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py new file mode 100644 index 0000000..5bfbcab --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -0,0 +1,95 @@ +from pyspark.sql.functions import col, split, regexp_replace, explode, lit, when, current_date +from pyspark.sql.types import StringType, StructType, StructField, IntegerType +from airflow.models import Variable + +import requests +import snowflake.connector +from datetime import datetime + +from plugins.spark_snowflake_conn import * + +import os + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = 'de5-s4tify' +OBJECT_NAME = 'raw_data' + +TODAY = datetime.now().strftime("%Y-%m-%d") +def load(): + + #Å×À̺í ÀÖ´ÂÁö È®ÀÎÇÏ´Â sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_globalTop50( + artist_id VARCHAR(100), + rank INT, + title VARCHAR(100), + artist VARCHAR(100), + artist_name VARCHAR(100), + artist_genre ARRAY, + date_time DATE + ) + """ + + create_snowflake_table(sql) + + transform_df = transformation() + transform_df.show() + + # Null °ªÀÌ ÀÖ´Â Çà Ãâ·Â + #transform_df.filter(col("title") == "Sweet Dreams (feat. Miguel)").show(truncate=False) + + write_snowflake_spark_dataframe('artist_info_globalTop50', transform_df) + + +def transformation(): + + artist_info_schema = StructType([ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True) + ]) + + global_top50_schema = StructType([ + StructField("rank", IntegerType(), True), + StructField("title", StringType(), True), + StructField("artist", StringType(), True ), + StructField("artist_id", StringType(), True) + ]) + + #µ¥ÀÌÅÍ Àаí Áߺ¹ Á¦°Å + artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) + global_top50_df = extract("spotify_crawling_data", global_top50_schema) + + global_top50_df = global_top50_df.withColumn("artist_id", explode("artist_id")) + + artist_info_top50_df = global_top50_df.join(artist_info_df, on='artist_id', how='outer') + + artist_info_top50_df = artist_info_top50_df.withColumn("date_time", current_date()) + + return artist_info_top50_df + + +def extract(file_name, schema): + + spark = create_spark_session('artist_global_table') + + df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) + + if file_name == 'spotify_crawling_data': + df = ( + df.withColumn("artist", split(regexp_replace(col("artist"), r"[\[\]']", ""), ", " )) + .withColumn("artist_id", split(regexp_replace(col("artist_id"), r"[\[\]']", ""), ", ")) + .fillna({"title": ""}) + .withColumn("artist", when(col("artist").isNull(), lit([""])).otherwise(col("artist"))) + ) + if file_name == 'spotify_artist_info': + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ºÒÇÊ¿äÇÑ ¹®ÀÚ Á¦°Å + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ½°Ç¥ ±âÁØÀ¸·Î ¹è¿­ º¯È¯ + df = df.withColumnRenamed("artist", "artist_name") + + return df + +if __name__ == "__main__": + load() + \ No newline at end of file diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py new file mode 100644 index 0000000..7392867 --- /dev/null +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -0,0 +1,100 @@ +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, udf, split, regexp_replace, current_date +from pyspark.sql.types import ArrayType, StringType, StructType, StructField, ArrayType +from airflow.models import Variable + +import requests +from datetime import datetime +from dags.plugins.spark_snowflake_conn import * + +LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") + +BUCKET_NAME = 'de5-s4tify' +OBJECT_NAME = 'raw_data' + +TODAY = datetime.now().strftime("%Y-%m-%d") + +def load(): + + #Å×À̺í ÀÖ´ÂÁö È®ÀÎÇÏ´Â sql + sql = """ + CREATE TABLE IF NOT EXISTS artist_info_top10( + artist_id VARCHAR(100), + artist VARCHAR(100), + artist_genre ARRAY, + album VARCHAR(100), + song_id VARCHAR(100), + title VARCHAR(100), + date_time DATE, + song_genre ARRAY + ) + """ + + #Å×ÀÌºí ¾øÀ¸¸é »ý¼º + create_snowflake_table(sql) + + transform_df = transformation() + + write_snowflake_spark_dataframe('artist_info_top10', transform_df) + +def transformation(): + + #½ºÅ°¸¶ Á¤ÀÇ + artist_info_schema = StructType([ + StructField("artist", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("artist_genre", StringType(), True) + ]) + + artist_top10_schema = StructType([ + StructField("album", StringType(), True), + StructField("artist_id", StringType(), True), + StructField("song_id", StringType(), True), + StructField("title", StringType(), True), + ]) + + #µ¥ÀÌÅÍ Àоî¿À°í Áߺ¹ Á¦°Å + artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id']) + artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id']) + + artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer') + + #³¯Â¥ µ¥ÀÌÅÍ Ãß°¡ + artist_info_top10_df = artist_info_top10_df.withColumn("date_time", current_date()) + + #³ë·¡ À帣 µ¥ÀÌÅÍ Ãß°¡ + artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title"))) + + return artist_info_top10_df + + +def add_song_genre(artist, track): + + url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json" + + try: + response = requests.get(url).json() + return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])] + except requests.exceptions.RequestException as e: + print(f"API ¿äû ¿À·ù: {e}") + return ["API Error"] + except KeyError: + return ["Unknown"] + +add_song_genre_udf = udf(add_song_genre, ArrayType(StringType())) + +def extract(file_name, schema): + + spark = create_spark_session('artist_top10_table') + df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema) + + if file_name == 'spotify_artist_info': + df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # ºÒÇÊ¿äÇÑ ¹®ÀÚ Á¦°Å + df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # ½°Ç¥ ±âÁØÀ¸·Î ¹è¿­ º¯È¯ + + return df + + +if __name__ == "__main__": + load() + diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py new file mode 100644 index 0000000..497d753 --- /dev/null +++ b/airflow/dags/spotify_ELT_DAG.py @@ -0,0 +1,39 @@ +from airflow import DAG +from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator +from datetime import datetime, timedelta + +from scripts.crawling_spotify_data import * +from scripts.request_spotify_api import * +from scripts.load_spotify_data import * + +#DAG ±âº» ¼³Á¤ +default_args = { + 'owner': 'yerin', + 'depends_on_past': False, + 'start_date': datetime(2025, 3, 2), + 'retries': 1, + 'retry_delay': timedelta(minutes=5) +} +with DAG( + dag_id='SpotifyDataProcessing', + default_args=default_args, + catchup=False, + tags=['final_project'], + schedule='0 12 * * *' +) as dag: + + artist_info_Top10_table = SparkSubmitOperator( + task_id = 'artist_info_globalTop50_table', + application='dags/scripts/ELT_artist_info_top10.py', + conn_id='spark_default', + dag=dag + ) + + artist_info_globalTop50_table = SparkSubmitOperator( + task_id = 'artist_info_globalTop50_table', + application='dags/scripts/ELT_artist_info_globalTop50.py', + conn_id='spark_default', + dag=dag + ) + + [artist_info_Top10_table , artist_info_globalTop50_table] \ No newline at end of file From 2663b3b1212a3d04e64d4ca765ad304881a8faa2 Mon Sep 17 00:00:00 2001 From: YEERRin Date: Fri, 14 Mar 2025 14:22:25 +0900 Subject: [PATCH 2/2] =?UTF-8?q?[fix]=20=EC=A3=BC=EC=84=9D=20=EA=B9=A8?= =?UTF-8?q?=EC=A7=90=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/spotify_ELT_DAG.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 497d753..b7a45af 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -6,7 +6,7 @@ from scripts.request_spotify_api import * from scripts.load_spotify_data import * -#DAG ±âº» ¼³Á¤ +#DAG 기본 설정 default_args = { 'owner': 'yerin', 'depends_on_past': False,