diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 34fc3bd..3e9dc9e 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") S3_BUCKET = "de5-s4tify" @@ -50,7 +50,7 @@ def fetch_bugs_chart(): return chart_data -# 2. JSON → CSV 변환 +# 2. JSON → CSV 변환 (genre를 리스트로 저장) def convert_json_to_csv(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="fetch_bugs_chart") @@ -62,15 +62,14 @@ def convert_json_to_csv(**kwargs): # 헤더 추가 writer.writerow(["rank", "title", "artist", "lastPos", - "peakPos", "image", "genre"]) + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 + genres = entry["genres"] + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -79,7 +78,8 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["peakPos"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 + TODAY, ] ) diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index 5c596e4..769513a 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -59,16 +59,15 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -77,7 +76,8 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 + TODAY, ] ) diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index e0dd127..ae92f55 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -59,25 +59,25 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", "peakPos", - "lastPos", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "peakPos", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 + genres = entry["genres"] + + # CSV에 추가 writer.writerow( [ entry["rank"], entry["title"], entry["artist"], - entry["peakPos"], entry["lastPos"], + entry["peakPos"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 + TODAY, ] ) diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index 8d89066..c7e4fce 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -58,17 +58,15 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -77,7 +75,8 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 + TODAY, ] ) diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 11fc5fa..27436f0 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -6,9 +6,9 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None) -AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None) +# .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None) +AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None) if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index 612a477..de30610 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -14,7 +14,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 날짜 설정 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") # S3 설정 S3_BUCKET = "de5-s4tify" @@ -56,16 +56,15 @@ def convert_json_to_csv(**kwargs): ) # ✅ 모든 필드를 자동으로 따옴표 처리 # 헤더 추가 - writer.writerow(["rank", "title", "artist", - "lastPos", "isNew", "image", "genre"]) + writer.writerow(["rank", "title", "artist", "lastPos", + "isNew", "image", "genre", "date"]) # 데이터 추가 for entry in data["entries"]: - genres = json.dumps( - entry["genres"], ensure_ascii=False - ) # 리스트를 문자열로 변환 - # 이중 따옴표가 포함되면 한번만 보이도록 처리 - genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈 + # 리스트 그대로 저장 (genres는 리스트로 저장) + genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장 + + # CSV에 추가 writer.writerow( [ entry["rank"], @@ -74,7 +73,8 @@ def convert_json_to_csv(**kwargs): entry["lastPos"], entry["isNew"], entry["image"], - genres, + genres, # 수정된 부분: 리스트 그대로 저장 + TODAY, ] ) diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index f7abdd0..b2ca4a0 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -19,17 +19,15 @@ # Snowflake 연결 정보 설정 SNOWFLAKE_OPTIONS = { - "user": Variable.get("SNOWFLAKE_USER"), - "password": Variable.get("SNOWFLAKE_PASSWORD"), - "account": Variable.get("SNOWFLAKE_ACCOUNT"), - "db": Variable.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"), - # "schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"), + "user": os.getenv("SNOWFLAKE_USER"), + "password": os.getenv("SNOWFLAKE_PASSWORD"), + "account": os.getenv("SNOWFLAKE_ACCOUNT"), + "db": os.getenv("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.getenv("SNOWFLAKE_WH", "COMPUTE_WH"), "schema": "RAW_DATA", "role": "ACCOUNTADMIN", "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', - # "url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com", + "url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', } @@ -42,9 +40,9 @@ def spark_session_builder(app_name: str) -> SparkSession: "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.hadoop.fs.s3a.access.key", - Variable.get("AWS_ACCESS_KEY")) .config( + os.getenv("AWS_ACCESS_KEY")) .config( "spark.hadoop.fs.s3a.secret.key", - Variable.get("AWS_SECRET_KEY")) .config( + os.getenv("AWS_SECRET_KEY")) .config( "spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") .config( "spark.hadoop.fs.s3a.aws.credentials.provider", @@ -88,7 +86,8 @@ def check_and_create_table(): image STRING, peakPos INT, isNew BOOLEAN, - source STRING + source STRING, + date DATE -- 날짜 컬럼 추가 ) """ cur.execute(create_table_query) @@ -147,10 +146,11 @@ def insert_data_into_snowflake(df, table_name): source = ( escape_quotes( row["source"]) if row["source"] is not None else "NULL") + date = f"'{row['date']}'" # date 컬럼 추가 query = f""" - INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source) - VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}) + INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source, date) + VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}, {date}) """ cur.execute(query) @@ -168,7 +168,7 @@ def insert_data_into_snowflake(df, table_name): spark = spark_session_builder("S3_to_Snowflake") # 오늘 날짜 기반 S3 데이터 경로 생성 -TODAY = datetime.now().strftime("%Y%m%d") +TODAY = datetime.now().strftime("%Y-%m-%d") S3_BUCKET = "s3a://de5-s4tify" chart_sources = { "bugs": f"{S3_BUCKET}/raw_data/bugs_chart_data/bugs_chart_{TODAY}.csv", @@ -187,7 +187,7 @@ def read_chart_data(source, path): .option("inferSchema", True) .load(path) ) - df.printSchema() # ✅ 데이터 스키마 출력해서 `genre` 확인 + df.printSchema() # 데이터 스키마 출력해서 `genre`와 `date` 확인 return df.withColumn("source", lit(source)) except Exception as e: print(f"⚠️ {source} 데이터 로드 실패: {e}") @@ -211,7 +211,7 @@ def read_chart_data(source, path): col("rank").cast("int")).alias("rank"), col("title"), col("artist"), - col("genre"), # ✅ genre 컬럼 추가 + col("genre"), # genre 컬럼 추가 when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias( "lastPos" ), @@ -223,6 +223,7 @@ def read_chart_data(source, path): "isNew" ), col("source"), + col("date"), # date 컬럼 추가 ) final_df.show(40)