From ed786ed4a200d13b68e41461395bff146986bf3e Mon Sep 17 00:00:00 2001 From: gland78 Date: Fri, 14 Mar 2025 15:27:43 +0900 Subject: [PATCH 1/4] =?UTF-8?q?=EC=A4=91=EA=B0=84=20commit?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airflow/dags/Bugs_DAG.py | 5 +- airflow/dags/Flo_DAG.py | 5 +- airflow/dags/Genie_DAG.py | 5 +- airflow/dags/Melon_DAG.py | 5 +- airflow/dags/Vibe_DAG.py | 5 +- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 89 +++++++++---------- 6 files changed, 58 insertions(+), 56 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 34fc3bd..6ee5b2f 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") S3_BUCKET = "de5-s4tify" @@ -62,7 +62,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", "lastPos", - "peakPos", "image", "genre"]) + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -80,6 +80,7 @@ def convert_json_to_csv(**kwargs): entry["peakPos"], entry["image"], genres, + TODAY, ] ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 5c596e4..27fcec4 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -60,7 +60,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -78,6 +78,7 @@ def convert_json_to_csv(**kwargs): entry["isNew"], entry["image"], genres, + TODAY, ] ) diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index e0dd127..ec7e7d4 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -60,7 +60,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", "peakPos", - "lastPos", "image", "genre"]) + "lastPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -78,6 +78,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["image"], genres, + TODAY, ] ) diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 8d89066..db409b4 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -59,7 +59,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -78,6 +78,7 @@ def convert_json_to_csv(**kwargs): entry["isNew"], entry["image"], genres, + TODAY, ] ) diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 612a477..3b6adc8 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -57,7 +57,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -75,6 +75,7 @@ def convert_json_to_csv(**kwargs): entry["isNew"], entry["image"], genres, + TODAY, ] ) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index f7abdd0..36c3e82 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -4,6 +4,7 @@ import snowflake.connector from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, lit, when +from pyspark.sql import functions as F from airflow.models import Variable @@ -24,33 +25,24 @@ "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - # "schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), "schema": "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', - # "url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", } - # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( - SparkSession.builder.appName(app_name) .config( - "spark.jars", - SPARK_JARS) .config( - "spark.hadoop.fs.s3a.impl", - "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( - "spark.hadoop.fs.s3a.access.key", - Variable.get("AWS_ACCESS_KEY")) .config( - "spark.hadoop.fs.s3a.secret.key", - Variable.get("AWS_SECRET_KEY")) .config( - "spark.hadoop.fs.s3a.endpoint", - "s3.amazonaws.com") .config( - "spark.hadoop.fs.s3a.aws.credentials.provider", - "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", - ) .getOrCreate()) - + SparkSession.builder.appName(app_name) + .config("spark.jars", SPARK_JARS) + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") + .config("spark.hadoop.fs.s3a.access.key", Variable.get("AWS_ACCESS_KEY")) + .config("spark.hadoop.fs.s3a.secret.key", Variable.get("AWS_SECRET_KEY")) + .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") + .getOrCreate() + ) # Snowflake에서 SQL 실행 함수 def check_and_create_table(): @@ -88,7 +80,8 @@ def check_and_create_table(): image STRING, peakPos INT, isNew BOOLEAN, - source STRING + source STRING, + date DATE -- 날짜 컬럼 추가 ) """ cur.execute(create_table_query) @@ -110,6 +103,16 @@ def escape_quotes(value): return "NULL" return "'{}'".format(value.replace("'", "''")) +# `genre` 컬럼에서 추가 따옴표를 처리하는 함수 +def clean_genre(value): + if value: + # 장르 목록을 '[]' 또는 다른 구분자로 구분하고, 구분자 처리 + cleaned_value = value.replace('""', '"') # 따옴표 문제 해결 + cleaned_value = cleaned_value.replace('"[', '[').replace(']"', ']') + # 구분자가 있는 경우 이를 처리 + cleaned_value = cleaned_value.replace('|', ', ') # 구분자 변경(예: '|' -> ', ') + return cleaned_value + return value # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): @@ -127,30 +130,19 @@ def insert_data_into_snowflake(df, table_name): for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] - title = escape_quotes( - row["title"]) if row["title"] is not None else "NULL" - artist = ( - escape_quotes( - row["artist"]) if row["artist"] is not None else "NULL") - genre = ( - escape_quotes( - row["genre"]) if row["genre"] is not None else "NULL") # 🎵 genre 추가 + title = escape_quotes(row["title"]) if row["title"] is not None else "NULL" + artist = escape_quotes(row["artist"]) if row["artist"] is not None else "NULL" + genre = escape_quotes(clean_genre(row["genre"])) if row["genre"] is not None else "NULL" # 🎵 genre 처리 lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] - image = escape_quotes( - row["image"]) if row["image"] is not None else "NULL" + image = escape_quotes(row["image"]) if row["image"] is not None else "NULL" peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] - isNew = ( - "NULL" - if row["isNew"] is None - else ("TRUE" if row["isNew"] else "FALSE") - ) - source = ( - escape_quotes( - row["source"]) if row["source"] is not None else "NULL") + isNew = "NULL" if row["isNew"] is None else ("TRUE" if row["isNew"] else "FALSE") + source = escape_quotes(row["source"]) if row["source"] is not None else "NULL" + date = f"'{row['date']}'" # date 컬럼 처리 query = f""" - INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source) - VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source, date) + VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}, {date}) """ cur.execute(query) @@ -160,7 +152,6 @@ def insert_data_into_snowflake(df, table_name): print("✅ Data inserted into Snowflake successfully.") except Exception as e: - print(query) print(f"⚠️ Error inserting data into Snowflake: {e}") @@ -178,7 +169,6 @@ def insert_data_into_snowflake(df, table_name): "vibe": f"{S3_BUCKET}/raw_data/vibe_chart_data/vibe_chart_{TODAY}.csv", } - def read_chart_data(source, path): try: df = ( @@ -187,7 +177,7 @@ def read_chart_data(source, path): .option("inferSchema", True) .load(path) ) - df.printSchema() # ✅ 데이터 스키마 출력해서 `genre` 확인 + df.printSchema() # 데이터 스키마 출력해서 `genre`와 `date` 확인 return df.withColumn("source", lit(source)) except Exception as e: print(f"⚠️ {source} 데이터 로드 실패: {e}") @@ -211,7 +201,7 @@ def read_chart_data(source, path): col("rank").cast("int")).alias("rank"), col("title"), col("artist"), - col("genre"), # ✅ genre 컬럼 추가 + col("genre"), # genre 컬럼 추가 when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias( "lastPos" ), @@ -223,18 +213,25 @@ def read_chart_data(source, path): "isNew" ), col("source"), + col("date"), # date 컬럼 추가 + ) + + # 최종 데이터 프레임에서 genre 컬럼 정리 + final_df_cleaned = final_df.withColumn( + "genre", F.when(F.col("genre").isNotNull(), clean_genre(F.col("genre"))).otherwise(F.lit(None)) ) + final_df = final_df.withColumnRenamed("date_time", "date") - final_df.show(40) + final_df_cleaned.show(40) # 데이터 확인 - final_df.groupBy("source").agg(count("*").alias("count")).show() + final_df_cleaned.groupBy("source").agg(count("*").alias("count")).show() # Snowflake에서 테이블 존재 여부 확인 및 생성 check_and_create_table() # Snowflake에 데이터 적재 - insert_data_into_snowflake(final_df, "music_charts") + insert_data_into_snowflake(final_df_cleaned, "music_charts") else: print("❌ 저장할 차트 데이터가 없습니다.") From d97c631c95f0414f667d73f106a1c76e35132a83 Mon Sep 17 00:00:00 2001 From: gland78 Date: Fri, 14 Mar 2025 16:29:14 +0900 Subject: [PATCH 2/4] add_genre data, fix_data_format(spark_script) --- airflow/dags/Bugs_DAG.py | 15 +++++----- airflow/dags/Flo_DAG.py | 15 +++++----- airflow/dags/Genie_DAG.py | 18 +++++------ airflow/dags/Melon_DAG.py | 13 ++++---- airflow/dags/Vibe_DAG.py | 15 +++++----- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 30 +++++-------------- 6 files changed, 41 insertions(+), 65 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 6ee5b2f..14a51a5 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -42,7 +42,7 @@ def fetch_bugs_chart(): "title": entry.title, "artist": entry.artist, "lastPos": entry.lastPos, - "peakPos": entry.peakPos, + "peakPos": entry.peakPos, "image": entry.image, "genres": genre.split(", ") if genre else [], # ✅ 리스트 변환, } @@ -50,7 +50,7 @@ def fetch_bugs_chart(): return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (genre를 리스트로 저장) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") @@ -66,11 +66,10 @@ def convert_json_to_csv(**kwargs): # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 + genres = entry["genres"] + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -79,7 +78,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["peakPos"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 TODAY, ] ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 27fcec4..db0c6eb 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -59,16 +59,14 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -77,7 +75,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 TODAY, ] ) @@ -85,6 +83,7 @@ def convert_json_to_csv(**kwargs): return output.getvalue() + # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index ec7e7d4..20cb413 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -59,32 +59,30 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "peakPos", - "lastPos", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 + genres = entry["genres"] + + # CSV에 추가 writer.writerow( [ entry["rank"], entry["title"], entry["artist"], - entry["peakPos"], entry["lastPos"], + entry["peakPos"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 TODAY, ] ) return output.getvalue() - # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index db409b4..1561e32 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -58,17 +58,14 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -77,7 +74,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 TODAY, ] ) diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 3b6adc8..fba589c 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -56,16 +56,14 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -74,7 +72,7 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 TODAY, ] ) @@ -82,6 +80,7 @@ def convert_json_to_csv(**kwargs): return output.getvalue() + # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 36c3e82..a3e019d 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -4,7 +4,6 @@ import snowflake.connector from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, lit, when -from pyspark.sql import functions as F from airflow.models import Variable @@ -103,16 +102,6 @@ def escape_quotes(value): return "NULL" return "'{}'".format(value.replace("'", "''")) -# `genre` 컬럼에서 추가 따옴표를 처리하는 함수 -def clean_genre(value): - if value: - # 장르 목록을 '[]' 또는 다른 구분자로 구분하고, 구분자 처리 - cleaned_value = value.replace('""', '"') # 따옴표 문제 해결 - cleaned_value = cleaned_value.replace('"[', '[').replace(']"', ']') - # 구분자가 있는 경우 이를 처리 - cleaned_value = cleaned_value.replace('|', ', ') # 구분자 변경(예: '|' -> ', ') - return cleaned_value - return value # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): @@ -132,13 +121,13 @@ def insert_data_into_snowflake(df, table_name): rank = "NULL" if row["rank"] is None else row["rank"] title = escape_quotes(row["title"]) if row["title"] is not None else "NULL" artist = escape_quotes(row["artist"]) if row["artist"] is not None else "NULL" - genre = escape_quotes(clean_genre(row["genre"])) if row["genre"] is not None else "NULL" # 🎵 genre 처리 + genre = escape_quotes(row["genre"]) if row["genre"] is not None else "NULL" # 🎵 genre 추가 lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] image = escape_quotes(row["image"]) if row["image"] is not None else "NULL" peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] isNew = "NULL" if row["isNew"] is None else ("TRUE" if row["isNew"] else "FALSE") source = escape_quotes(row["source"]) if row["source"] is not None else "NULL" - date = f"'{row['date']}'" # date 컬럼 처리 + date = f"'{row['date']}'" # date 컬럼 추가 query = f""" INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source, date) @@ -152,6 +141,7 @@ def insert_data_into_snowflake(df, table_name): print("✅ Data inserted into Snowflake successfully.") except Exception as e: + print(query) print(f"⚠️ Error inserting data into Snowflake: {e}") @@ -159,7 +149,7 @@ def insert_data_into_snowflake(df, table_name): spark = spark_session_builder("S3_to_Snowflake") # 오늘 날짜 기반 S3 데이터 경로 생성 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") S3_BUCKET = "s3a://de5-s4tify" chart_sources = { "bugs": f"{S3_BUCKET}/raw_data/bugs_chart_data/bugs_chart_{TODAY}.csv", @@ -216,22 +206,16 @@ def read_chart_data(source, path): col("date"), # date 컬럼 추가 ) - # 최종 데이터 프레임에서 genre 컬럼 정리 - final_df_cleaned = final_df.withColumn( - "genre", F.when(F.col("genre").isNotNull(), clean_genre(F.col("genre"))).otherwise(F.lit(None)) - ) - final_df = final_df.withColumnRenamed("date_time", "date") - - final_df_cleaned.show(40) + final_df.show(40) # 데이터 확인 - final_df_cleaned.groupBy("source").agg(count("*").alias("count")).show() + final_df.groupBy("source").agg(count("*").alias("count")).show() # Snowflake에서 테이블 존재 여부 확인 및 생성 check_and_create_table() # Snowflake에 데이터 적재 - insert_data_into_snowflake(final_df_cleaned, "music_charts") + insert_data_into_snowflake(final_df, "music_charts") else: print("❌ 저장할 차트 데이터가 없습니다.") From 5876cd605c24278ed0a8c5e0643a0cc215f199a4 Mon Sep 17 00:00:00 2001 From: gland78 Date: Fri, 14 Mar 2025 16:30:00 +0900 Subject: [PATCH 3/4] fix env --- airflow/dags/S3_Spark_SnowFlake_DAG.py | 6 +++--- airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 11fc5fa..27436f0 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -6,9 +6,9 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) -AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None) +# .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None) +AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None) if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index a3e019d..4812391 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -19,15 +19,15 @@ # Snowflake 연결 정보 설정 SNOWFLAKE_OPTIONS = { - "user": Variable.get("SNOWFLAKE_USER"), - "password": Variable.get("SNOWFLAKE_PASSWORD"), - "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "user": os.getenv("SNOWFLAKE_USER"), + "password": os.getenv("SNOWFLAKE_PASSWORD"), + "account": os.getenv("SNOWFLAKE_ACCOUNT"), + "db": os.getenv("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.getenv("SNOWFLAKE_WH", "COMPUTE_WH"), "schema": "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', + "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } # Spark Session 생성 함수 @@ -36,8 +36,8 @@ def spark_session_builder(app_name: str) -> SparkSession: SparkSession.builder.appName(app_name) .config("spark.jars", SPARK_JARS) .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .config("spark.hadoop.fs.s3a.access.key", Variable.get("AWS_ACCESS_KEY")) - .config("spark.hadoop.fs.s3a.secret.key", Variable.get("AWS_SECRET_KEY")) + .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY")) + .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY")) .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") .getOrCreate() From b9ec7176d0062270565026f8b4cf650a93fae104 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Fri, 14 Mar 2025 07:31:09 +0000 Subject: [PATCH 4/4] Automated format fixes --- airflow/dags/Bugs_DAG.py | 4 +- airflow/dags/Flo_DAG.py | 4 +- airflow/dags/Genie_DAG.py | 3 +- airflow/dags/Melon_DAG.py | 3 +- airflow/dags/Vibe_DAG.py | 4 +- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 50 +++++++++++++------ 6 files changed, 45 insertions(+), 23 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 14a51a5..3e9dc9e 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -42,7 +42,7 @@ def fetch_bugs_chart(): "title": entry.title, "artist": entry.artist, "lastPos": entry.lastPos, - "peakPos": entry.peakPos, + "peakPos": entry.peakPos, "image": entry.image, "genres": genre.split(", ") if genre else [], # ✅ 리스트 변환, } @@ -62,7 +62,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", "lastPos", - "peakPos", "image", "genre", "date"]) + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index db0c6eb..769513a 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -59,7 +59,8 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -83,7 +84,6 @@ def convert_json_to_csv(**kwargs): return output.getvalue() - # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 20cb413..ae92f55 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -60,7 +60,7 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", "lastPos", - "peakPos", "image", "genre", "date"]) + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -83,6 +83,7 @@ def convert_json_to_csv(**kwargs): return output.getvalue() + # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 1561e32..c7e4fce 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -58,7 +58,8 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index fba589c..de30610 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -56,7 +56,8 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre", "date"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: @@ -80,7 +81,6 @@ def convert_json_to_csv(**kwargs): return output.getvalue() - # 3. 로컬에 CSV 저장 (테스트용) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 4812391..b2ca4a0 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -30,18 +30,25 @@ "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } + # Spark Session 생성 함수 def spark_session_builder(app_name: str) -> SparkSession: return ( - SparkSession.builder.appName(app_name) - .config("spark.jars", SPARK_JARS) - .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") - .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY")) - .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY")) - .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") - .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") - .getOrCreate() - ) + SparkSession.builder.appName(app_name) .config( + "spark.jars", + SPARK_JARS) .config( + "spark.hadoop.fs.s3a.impl", + "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( + "spark.hadoop.fs.s3a.access.key", + os.getenv("AWS_ACCESS_KEY")) .config( + "spark.hadoop.fs.s3a.secret.key", + os.getenv("AWS_SECRET_KEY")) .config( + "spark.hadoop.fs.s3a.endpoint", + "s3.amazonaws.com") .config( + "spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", + ) .getOrCreate()) + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): @@ -119,14 +126,26 @@ def insert_data_into_snowflake(df, table_name): for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] - title = escape_quotes(row["title"]) if row["title"] is not None else "NULL" - artist = escape_quotes(row["artist"]) if row["artist"] is not None else "NULL" - genre = escape_quotes(row["genre"]) if row["genre"] is not None else "NULL" # 🎵 genre 추가 + title = escape_quotes( + row["title"]) if row["title"] is not None else "NULL" + artist = ( + escape_quotes( + row["artist"]) if row["artist"] is not None else "NULL") + genre = ( + escape_quotes( + row["genre"]) if row["genre"] is not None else "NULL") # 🎵 genre 추가 lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] - image = escape_quotes(row["image"]) if row["image"] is not None else "NULL" + image = escape_quotes( + row["image"]) if row["image"] is not None else "NULL" peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] - isNew = "NULL" if row["isNew"] is None else ("TRUE" if row["isNew"] else "FALSE") - source = escape_quotes(row["source"]) if row["source"] is not None else "NULL" + isNew = ( + "NULL" + if row["isNew"] is None + else ("TRUE" if row["isNew"] else "FALSE") + ) + source = ( + escape_quotes( + row["source"]) if row["source"] is not None else "NULL") date = f"'{row['date']}'" # date 컬럼 추가 query = f""" @@ -159,6 +178,7 @@ def insert_data_into_snowflake(df, table_name): "vibe": f"{S3_BUCKET}/raw_data/vibe_chart_data/vibe_chart_{TODAY}.csv", } + def read_chart_data(source, path): try: df = (