From 3fa0eed4049b37c429f560a43193fd71957188de Mon Sep 17 00:00:00 2001 From: gland78 Date: Thu, 13 Mar 2025 15:49:23 +0900 Subject: [PATCH 1/2] fix_data_format --- airflow/dags/Bugs_DAG.py | 44 ++++++----- airflow/dags/Flo_DAG.py | 79 ++++++++++--------- airflow/dags/Genie_DAG.py | 72 +++++++++-------- airflow/dags/Melon_DAG.py | 51 +++++++----- airflow/dags/Vibe_DAG.py | 45 ++++++----- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 79 +++++++------------ 6 files changed, 196 insertions(+), 174 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index a597e33..cb6a8dc 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,6 +1,7 @@ import csv import json from datetime import datetime, timedelta +import io import requests from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData @@ -53,24 +54,29 @@ def fetch_bugs_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "peakPos", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( - [ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["peakPos"], - entry["image"], - json.dumps( - entry["genres"], ensure_ascii=False - ), # ✅ 리스트를 문자열로 변환하여 저장 - ] - ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow([ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["peakPos"], + entry["image"], + genres, + ]) + + return output.getvalue() # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) @@ -83,7 +89,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -106,7 +112,7 @@ def upload_to_s3(**kwargs): "bugs_chart_dag", default_args=default_args, schedule_interval="10 0 * * *", # 매일 00:10 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index d72a328..19a9422 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,5 +1,6 @@ import csv import json +import io from datetime import datetime, timedelta import requests @@ -24,59 +25,65 @@ # 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime( - "%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append( - { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - "genres": genre.split(", ") if genre else [], - } - ) + + chart_data["entries"].append({ + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genres": genre.split(", ") if genre else [], + }) + return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_flo_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( - [ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), - ] - ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string - - -# 3. 로컬에 CSV 저장 (테스트용) + genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow([ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres, + ]) + + return output.getvalue() + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) - -# 4. AWS S3 업로드 +# 3. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +106,7 @@ def upload_to_s3(**kwargs): "flo_chart_dag", default_args=default_args, schedule_interval="20 0 * * *", # 매일 00:20 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 0ad279c..2844fdd 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,5 +1,6 @@ import csv import json +import io from datetime import datetime, timedelta import requests @@ -24,59 +25,68 @@ # 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - chart_data = {"date": chart.date.strftime( - "%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "peakPos": entry.peakPos, - "lastPos": entry.lastPos, - "image": entry.image, - "genres": genre.split(", ") if genre else [], - } + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "peakPos": entry.peakPos, + "lastPos": entry.lastPos, + "image": entry.image, + "genres": genre.split(", ") if genre else [], + } ) + return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (쉼표 포함된 데이터도 깨지지 않도록 수정) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_genie_chart") - csv_data = [["rank", "title", "artist", - "peakPos", "lastPos", "image", "genre"]] - for entry in data["entries"]: - csv_data.append( - [ - entry["rank"], - entry["title"], - entry["artist"], - entry["peakPos"], - entry["lastPos"], - entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), - ] - ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]) -# 3. 로컬에 CSV 저장 (테스트용) + # 데이터 추가 + for entry in data["entries"]: + genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow([ + entry["rank"], + entry["title"], + entry["artist"], + entry["peakPos"], + entry["lastPos"], + entry["image"], + genres, + ]) + + return output.getvalue() + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) -# 4. AWS S3 업로드 +# 3. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +109,7 @@ def upload_to_s3(**kwargs): "genie_chart_dag", default_args=default_args, schedule_interval="30 0 * * *", # 매일 00:30 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 6622984..2d924d8 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -1,5 +1,6 @@ import csv import json +import io from datetime import datetime, timedelta import requests @@ -24,12 +25,13 @@ # 1. 멜론 차트 데이터 가져오기 def fetch_melon_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime( - "%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) + chart_data["entries"].append( { "rank": entry.rank, @@ -48,25 +50,33 @@ def fetch_melon_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_melon_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( - [ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), - ] - ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + + writer.writerow([ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres, + ]) + return output.getvalue() -# 3. 로컬에 CSV 저장 (테스트용) + +# 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) @@ -76,7 +86,8 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 + s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +110,7 @@ def upload_to_s3(**kwargs): "melon_chart_dag", default_args=default_args, schedule_interval="0 1 * * *", # 매일 01:00 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 9b5958d..1be52ab 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,5 +1,6 @@ import csv import json +import io from datetime import datetime, timedelta import requests @@ -24,8 +25,7 @@ # 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime( - "%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) @@ -48,22 +48,29 @@ def fetch_vibe_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_vibe_chart") - csv_data = [["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]] + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + + # 헤더 추가 + writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + + # 데이터 추가 for entry in data["entries"]: - csv_data.append( - [ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - json.dumps(entry["genres"], ensure_ascii=False), - ] - ) - csv_string = "\n".join(",".join(map(str, row)) for row in csv_data) - return csv_string + genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + # 이중 따옴표가 포함되면 한번만 보이도록 처리 + genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + writer.writerow([ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres + ]) + + return output.getvalue() # 3. 로컬에 CSV 저장 (테스트용) @@ -76,7 +83,7 @@ def save_csv_locally(csv_string): def upload_to_s3(**kwargs): ti = kwargs["ti"] csv_string = ti.xcom_pull(task_ids="convert_json_to_csv") - # save_csv_locally(csv_string) # 테스트용 로컬 저장 + save_csv_locally(csv_string) # 테스트용 로컬 저장 s3_hook = S3Hook(aws_conn_id="S4tify_S3") s3_hook.load_string( csv_string, @@ -99,7 +106,7 @@ def upload_to_s3(**kwargs): "vibe_chart_dag", default_args=default_args, schedule_interval="45 0 * * *", # 매일 00:45 실행 - catchup=False, + catchup=True, ) as dag: get_spotify_token_task = PythonOperator( diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index e5a54ba..734a31d 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -24,11 +24,8 @@ "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": ( - Variable.get("SNOWFLAKE_SCHEMA") - if Variable.get("SNOWFLAKE_SCHEMA") - else "raw_data" - ), + #"schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), + "schema" : "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', @@ -70,22 +67,27 @@ def check_and_create_table(): cur = conn.cursor() # 테이블 존재 여부 확인 - cur.execute(f"SHOW TABLES LIKE 'music_charts'") + cur.execute(f""" + SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = '{SNOWFLAKE_OPTIONS["schema"]}' + AND UPPER(TABLE_NAME) = 'MUSIC_CHARTS' + """) result = cur.fetchone() if result is None: - # 테이블이 존재하지 않으면 생성 - create_table_query = """ - CREATE OR REPLACE TABLE ADHOC.music_charts ( + # 테이블이 없으면 생성 + create_table_query = f""" + CREATE TABLE IF NOT EXISTS {SNOWFLAKE_OPTIONS['schema']}.music_charts ( rank INT, title STRING, artist STRING, + genre STRING, -- 🎵 genre 컬럼 추가 lastPos INT, image STRING, peakPos INT, isNew BOOLEAN, source STRING - ); + ) """ cur.execute(create_table_query) print("✅ music_charts 테이블 생성 완료.") @@ -123,35 +125,20 @@ def insert_data_into_snowflake(df, table_name): ) cur = conn.cursor() - # DataFrame을 순회하며 INSERT 쿼리 실행 for row in df.collect(): - # None 값을 NULL로 처리하고, 문자열 값은 작은따옴표로 감쌈 - rank = f"NULL" if row["rank"] is None else row["rank"] - title = escape_quotes( - f"NULL" if row["title"] is None else f"'{row['title']}'" - ) - artist = escape_quotes( - f"NULL" if row["artist"] is None else f"'{row['artist']}'" - ) - lastPos = f"NULL" if row["lastPos"] is None else row["lastPos"] - image = escape_quotes( - f"NULL" if row["image"] is None else f"'{row['image']}'" - ) - peakPos = f"NULL" if row["peakPos"] is None else row["peakPos"] - # isNew 값은 TRUE/FALSE로 처리하고 NULL은 그대로 처리 - isNew = ( - f"NULL" - if row["isNew"] is None - else ("True" if row["isNew"] else "FALSE") - ) - source = escape_quotes( - f"NULL" if row["source"] is None else f"'{row['source']}'" - ) + rank = "NULL" if row["rank"] is None else row["rank"] + title = escape_quotes(row["title"]) if row["title"] is not None else "NULL" + artist = escape_quotes(row["artist"]) if row["artist"] is not None else "NULL" + genre = escape_quotes(row["genre"]) if row["genre"] is not None else "NULL" # 🎵 genre 추가 + lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] + image = escape_quotes(row["image"]) if row["image"] is not None else "NULL" + peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] + isNew = "NULL" if row["isNew"] is None else ("TRUE" if row["isNew"] else "FALSE") + source = escape_quotes(row["source"]) if row["source"] is not None else "NULL" - # 삽입할 쿼리 (컬럼 이름은 큰따옴표 없이) query = f""" - INSERT INTO {table_name} (rank, title, artist, lastPos, image, peakPos, isNew, source) - VALUES ({rank}, {title}, {artist}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source) + VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) """ cur.execute(query) @@ -159,6 +146,7 @@ def insert_data_into_snowflake(df, table_name): cur.close() conn.close() print("✅ Data inserted into Snowflake successfully.") + except Exception as e: print(query) print(f"⚠️ Error inserting data into Snowflake: {e}") @@ -187,12 +175,12 @@ def read_chart_data(source, path): .option("inferSchema", True) .load(path) ) + df.printSchema() # ✅ 데이터 스키마 출력해서 `genre` 확인 return df.withColumn("source", lit(source)) except Exception as e: print(f"⚠️ {source} 데이터 로드 실패: {e}") return None - # 차트 데이터 읽기 및 병합 dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] dfs = [df for df in dfs if df is not None] @@ -206,21 +194,14 @@ def read_chart_data(source, path): merged_df = merged_df.unionByName(df, allowMissingColumns=True) final_df = merged_df.select( - when( - col("rank").rlike("^[0-9]+$"), - col("rank").cast("int")).alias("rank"), + when(col("rank").rlike("^[0-9]+$"), col("rank").cast("int")).alias("rank"), col("title"), col("artist"), - when( - col("lastPos").rlike("^[0-9]+$"), - col("lastPos").cast("int")).alias("lastPos"), + col("genre"), # ✅ genre 컬럼 추가 + when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias("lastPos"), col("image"), - when( - col("peakPos").rlike("^[0-9]+$"), - col("peakPos").cast("int")).alias("peakPos"), - when( - col("isNew").rlike("^(true|false)$"), - col("isNew").cast("boolean")).alias("isNew"), + when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias("peakPos"), + when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias("isNew"), col("source"), ) From 0f91dd387b14f010ff6c1c3dd96d154ba4cf0e00 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Thu, 13 Mar 2025 06:50:03 +0000 Subject: [PATCH 2/2] Automated format fixes --- airflow/dags/Bugs_DAG.py | 33 ++++++----- airflow/dags/Flo_DAG.py | 58 +++++++++++-------- airflow/dags/Genie_DAG.py | 53 ++++++++++------- airflow/dags/Melon_DAG.py | 42 ++++++++------ airflow/dags/Vibe_DAG.py | 38 +++++++----- .../dags/scripts/S3_Spark_SnowFlake_ELT.py | 54 ++++++++++++----- airflow/dags/scripts/crawling_spotify_data.py | 14 +++-- airflow/dags/scripts/request_spotify_api.py | 10 ++-- airflow/dags/spotify_data_dag.py | 18 +++--- 9 files changed, 196 insertions(+), 124 deletions(-) diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index cb6a8dc..34fc3bd 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -1,7 +1,7 @@ import csv +import io import json from datetime import datetime, timedelta -import io import requests from plugins.bugs import BugsChartPeriod, BugsChartType, ChartData @@ -56,25 +56,32 @@ def convert_json_to_csv(**kwargs): data = ti.xcom_pull(task_ids="fetch_bugs_chart") output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "peakPos", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "peakPos", "image", "genre"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 # 이중 따옴표가 포함되면 한번만 보이도록 처리 genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 - writer.writerow([ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["peakPos"], - entry["image"], - genres, - ]) + writer.writerow( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["peakPos"], + entry["image"], + genres, + ] + ) return output.getvalue() diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 19a9422..5c596e4 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -1,6 +1,6 @@ import csv -import json import io +import json from datetime import datetime, timedelta import requests @@ -25,22 +25,25 @@ # 1. FLO 차트 데이터 가져오기 및 JSON 변환 def fetch_flo_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) genre = get_artist_genre(artist_id) - chart_data["entries"].append({ - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "lastPos": entry.lastPos, - "isNew": entry.isNew, - "image": entry.image, - "genres": genre.split(", ") if genre else [], - }) + chart_data["entries"].append( + { + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "lastPos": entry.lastPos, + "isNew": entry.isNew, + "image": entry.image, + "genres": genre.split(", ") if genre else [], + } + ) return chart_data @@ -51,33 +54,42 @@ def convert_json_to_csv(**kwargs): data = ti.xcom_pull(task_ids="fetch_flo_chart") output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 # 이중 따옴표가 포함되면 한번만 보이도록 처리 genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 - writer.writerow([ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - genres, - ]) + writer.writerow( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres, + ] + ) return output.getvalue() + # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: f.write(csv_string) + # 3. AWS S3 업로드 def upload_to_s3(**kwargs): ti = kwargs["ti"] diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 2844fdd..e0dd127 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -1,6 +1,6 @@ import csv -import json import io +import json from datetime import datetime, timedelta import requests @@ -25,7 +25,8 @@ # 1. Genie 차트 데이터 가져오기 및 JSON 변환 def fetch_genie_chart(): chart = ChartData(chartPeriod=GenieChartPeriod.Realtime, fetch=True) - chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") @@ -34,14 +35,14 @@ def fetch_genie_chart(): chart_data["entries"].append( { - "rank": entry.rank, - "title": entry.title, - "artist": entry.artist, - "peakPos": entry.peakPos, - "lastPos": entry.lastPos, - "image": entry.image, - "genres": genre.split(", ") if genre else [], - } + "rank": entry.rank, + "title": entry.title, + "artist": entry.artist, + "peakPos": entry.peakPos, + "lastPos": entry.lastPos, + "image": entry.image, + "genres": genre.split(", ") if genre else [], + } ) return chart_data @@ -53,28 +54,36 @@ def convert_json_to_csv(**kwargs): data = ti.xcom_pull(task_ids="fetch_genie_chart") output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "peakPos", "lastPos", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "peakPos", + "lastPos", "image", "genre"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 # 이중 따옴표가 포함되면 한번만 보이도록 처리 genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 - writer.writerow([ - entry["rank"], - entry["title"], - entry["artist"], - entry["peakPos"], - entry["lastPos"], - entry["image"], - genres, - ]) + writer.writerow( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["peakPos"], + entry["lastPos"], + entry["image"], + genres, + ] + ) return output.getvalue() + # 3. 로컬에 CSV 저장 (테스트용, 삭제 용이하도록 별도 함수) def save_csv_locally(csv_string): with open(LOCAL_FILE_PATH, "w", encoding="utf-8") as f: diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 2d924d8..8d89066 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -1,6 +1,6 @@ import csv -import json import io +import json from datetime import datetime, timedelta import requests @@ -25,8 +25,9 @@ # 1. 멜론 차트 데이터 가져오기 def fetch_melon_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} - + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} + for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) @@ -50,28 +51,35 @@ def fetch_melon_chart(): def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_melon_chart") - + output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 - + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 # 이중 따옴표가 포함되면 한번만 보이도록 처리 genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 - writer.writerow([ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - genres, - ]) + writer.writerow( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres, + ] + ) return output.getvalue() diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 1be52ab..612a477 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -1,6 +1,6 @@ import csv -import json import io +import json from datetime import datetime, timedelta import requests @@ -25,7 +25,8 @@ # 1. VIBE 차트 데이터 가져오기 및 JSON 변환 def fetch_vibe_chart(): chart = ChartData(fetch=True) - chart_data = {"date": chart.date.strftime("%Y-%m-%d %H:%M:%S"), "entries": []} + chart_data = {"date": chart.date.strftime( + "%Y-%m-%d %H:%M:%S"), "entries": []} for entry in chart.entries: print(f"📊 차트 데이터 처리: {entry.rank}. {entry.title} - {entry.artist}") artist_id = search_artist_id(entry.artist) @@ -50,25 +51,32 @@ def convert_json_to_csv(**kwargs): data = ti.xcom_pull(task_ids="fetch_vibe_chart") output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_ALL) # ✅ 모든 필드를 자동으로 따옴표 처리 - + writer = csv.writer( + output, quoting=csv.QUOTE_ALL + ) # ✅ 모든 필드를 자동으로 따옴표 처리 + # 헤더 추가 - writer.writerow(["rank", "title", "artist", "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", + "lastPos", "isNew", "image", "genre"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps(entry["genres"], ensure_ascii=False) # 리스트를 문자열로 변환 + genres = json.dumps( + entry["genres"], ensure_ascii=False + ) # 리스트를 문자열로 변환 # 이중 따옴표가 포함되면 한번만 보이도록 처리 genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 - writer.writerow([ - entry["rank"], - entry["title"], - entry["artist"], - entry["lastPos"], - entry["isNew"], - entry["image"], - genres - ]) + writer.writerow( + [ + entry["rank"], + entry["title"], + entry["artist"], + entry["lastPos"], + entry["isNew"], + entry["image"], + genres, + ] + ) return output.getvalue() diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index 734a31d..eca84fb 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -24,8 +24,8 @@ "account": Variable.get("SNOWFLAKE_ACCOUNT"), "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - #"schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), - "schema" : "RAW_DATA", + # "schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), + "schema": "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', @@ -67,11 +67,13 @@ def check_and_create_table(): cur = conn.cursor() # 테이블 존재 여부 확인 - cur.execute(f""" - SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA = '{SNOWFLAKE_OPTIONS["schema"]}' + cur.execute( + f""" + SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = '{SNOWFLAKE_OPTIONS["schema"]}' AND UPPER(TABLE_NAME) = 'MUSIC_CHARTS' - """) + """ + ) result = cur.fetchone() if result is None: @@ -127,14 +129,26 @@ def insert_data_into_snowflake(df, table_name): for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] - title = escape_quotes(row["title"]) if row["title"] is not None else "NULL" - artist = escape_quotes(row["artist"]) if row["artist"] is not None else "NULL" - genre = escape_quotes(row["genre"]) if row["genre"] is not None else "NULL" # 🎵 genre 추가 + title = escape_quotes( + row["title"]) if row["title"] is not None else "NULL" + artist = ( + escape_quotes( + row["artist"]) if row["artist"] is not None else "NULL") + genre = ( + escape_quotes( + row["genre"]) if row["genre"] is not None else "NULL") # 🎵 genre 추가 lastPos = "NULL" if row["lastPos"] is None else row["lastPos"] - image = escape_quotes(row["image"]) if row["image"] is not None else "NULL" + image = escape_quotes( + row["image"]) if row["image"] is not None else "NULL" peakPos = "NULL" if row["peakPos"] is None else row["peakPos"] - isNew = "NULL" if row["isNew"] is None else ("TRUE" if row["isNew"] else "FALSE") - source = escape_quotes(row["source"]) if row["source"] is not None else "NULL" + isNew = ( + "NULL" + if row["isNew"] is None + else ("TRUE" if row["isNew"] else "FALSE") + ) + source = ( + escape_quotes( + row["source"]) if row["source"] is not None else "NULL") query = f""" INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source) @@ -181,6 +195,7 @@ def read_chart_data(source, path): print(f"⚠️ {source} 데이터 로드 실패: {e}") return None + # 차트 데이터 읽기 및 병합 dfs = [read_chart_data(source, path) for source, path in chart_sources.items()] dfs = [df for df in dfs if df is not None] @@ -194,14 +209,21 @@ def read_chart_data(source, path): merged_df = merged_df.unionByName(df, allowMissingColumns=True) final_df = merged_df.select( - when(col("rank").rlike("^[0-9]+$"), col("rank").cast("int")).alias("rank"), + when(col("rank").rlike("^[0-9]+$"), + col("rank").cast("int")).alias("rank"), col("title"), col("artist"), col("genre"), # ✅ genre 컬럼 추가 - when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias("lastPos"), + when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias( + "lastPos" + ), col("image"), - when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias("peakPos"), - when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias("isNew"), + when(col("peakPos").rlike("^[0-9]+$"), col("peakPos").cast("int")).alias( + "peakPos" + ), + when(col("isNew").rlike("^(true|false)$"), col("isNew").cast("boolean")).alias( + "isNew" + ), col("source"), ) diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 24a477c..2ff2a8b 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -55,10 +55,16 @@ def data_crawling(): try: # top50 리스트 가져오기 - scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]') - driver.execute_script(""" - arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); - """, scroll_element) + scroll_element = driver.find_element( + By.XPATH, + '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]', + ) + driver.execute_script( + """ + arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'}); + """, + scroll_element, + ) # 페이지 로딩 대기 driver.implicitly_wait(30) song_lists = driver.find_elements( diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index b9088df..8df83dd 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -66,10 +66,9 @@ def get_arti_top10(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 아티스트 정보 가져오기 @@ -111,10 +110,9 @@ def get_artist_info(logical_date, **kwargs): index=False) try: load_s3_bucket(object_name) - os.remove(f'data/{object_name}') + os.remove(f"data/{object_name}") except Exception as e: - print(f'error: {e}') - + print(f"error: {e}") # 크롤링 데이터 읽어오는 함수 diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index 6ae5738..561bbee 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -4,8 +4,8 @@ from scripts.request_spotify_api import * from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator default_args = { "depends_on_past": False, @@ -40,13 +40,15 @@ python_callable=get_arti_top10, op_kwargs={"logical_date": "{{ ds }}"}, ) - + remove_crawling_data = BashOperator( - task_id = 'remove_crawling_data', - bash_command='rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv', - dag=dag + task_id="remove_crawling_data", + bash_command="rm -f /opt/airflow/data/spotify_crawling_data_{{ ds }}.csv", + dag=dag, ) - extract_globalTop50_data >> [ - extract_artistInfo_data, - extract_artistTop10_data] >> remove_crawling_data + ( + extract_globalTop50_data + >> [extract_artistInfo_data, extract_artistTop10_data] + >> remove_crawling_data + )