Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions airflow/dags/Bugs_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 날짜 설정
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")


S3_BUCKET = "de5-s4tify"
Expand Down Expand Up @@ -50,7 +50,7 @@ def fetch_bugs_chart():
return chart_data


# 2. JSON → CSV 변환
# 2. JSON → CSV 변환 (genre를 리스트로 저장)
def convert_json_to_csv(**kwargs):
ti = kwargs["ti"]
data = ti.xcom_pull(task_ids="fetch_bugs_chart")
Expand All @@ -62,15 +62,14 @@ def convert_json_to_csv(**kwargs):

# 헤더 추가
writer.writerow(["rank", "title", "artist", "lastPos",
"peakPos", "image", "genre"])
"peakPos", "image", "genre", "date"])

# 데이터 추가
for entry in data["entries"]:
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
# 리스트 그대로 저장
genres = entry["genres"]

# CSV에 추가
writer.writerow(
[
entry["rank"],
Expand All @@ -79,7 +78,8 @@ def convert_json_to_csv(**kwargs):
entry["lastPos"],
entry["peakPos"],
entry["image"],
genres,
genres, # 수정된 부분: 리스트 그대로 저장
TODAY,
]
)

Expand Down
18 changes: 9 additions & 9 deletions airflow/dags/Flo_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 날짜 설정
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")

# S3 설정
S3_BUCKET = "de5-s4tify"
Expand Down Expand Up @@ -59,16 +59,15 @@ def convert_json_to_csv(**kwargs):
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"])
writer.writerow(["rank", "title", "artist", "lastPos",
"isNew", "image", "genre", "date"])

# 데이터 추가
for entry in data["entries"]:
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
# 리스트 그대로 저장 (genres는 리스트로 저장)
genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장

# CSV에 추가
writer.writerow(
[
entry["rank"],
Expand All @@ -77,7 +76,8 @@ def convert_json_to_csv(**kwargs):
entry["lastPos"],
entry["isNew"],
entry["image"],
genres,
genres, # 수정된 부분: 리스트 그대로 저장
TODAY,
]
)

Expand Down
20 changes: 10 additions & 10 deletions airflow/dags/Genie_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 날짜 설정
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")

# S3 설정
S3_BUCKET = "de5-s4tify"
Expand Down Expand Up @@ -59,25 +59,25 @@ def convert_json_to_csv(**kwargs):
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist", "peakPos",
"lastPos", "image", "genre"])
writer.writerow(["rank", "title", "artist", "lastPos",
"peakPos", "image", "genre", "date"])

# 데이터 추가
for entry in data["entries"]:
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
# 리스트 그대로 저장
genres = entry["genres"]

# CSV에 추가
writer.writerow(
[
entry["rank"],
entry["title"],
entry["artist"],
entry["peakPos"],
entry["lastPos"],
entry["peakPos"],
entry["image"],
genres,
genres, # 수정된 부분: 리스트 그대로 저장
TODAY,
]
)

Expand Down
17 changes: 8 additions & 9 deletions airflow/dags/Melon_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 날짜 설정
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")

# S3 설정
S3_BUCKET = "de5-s4tify"
Expand Down Expand Up @@ -58,17 +58,15 @@ def convert_json_to_csv(**kwargs):
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"])
writer.writerow(["rank", "title", "artist", "lastPos",
"isNew", "image", "genre", "date"])

# 데이터 추가
for entry in data["entries"]:
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
# 리스트 그대로 저장 (genres는 리스트로 저장)
genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장

# CSV에 추가
writer.writerow(
[
entry["rank"],
Expand All @@ -77,7 +75,8 @@ def convert_json_to_csv(**kwargs):
entry["lastPos"],
entry["isNew"],
entry["image"],
genres,
genres, # 수정된 부분: 리스트 그대로 저장
TODAY,
]
)

Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/S3_Spark_SnowFlake_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

# Airflow Variables에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지)
AWS_ACCESS_KEY = Variable.get("AWS_ACCESS_KEY", default_var=None)
AWS_SECRET_KEY = Variable.get("AWS_SECRET_KEY", default_var=None)
# .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지)
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None)
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None)

if not AWS_ACCESS_KEY or not AWS_SECRET_KEY:
raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.")
Expand Down
18 changes: 9 additions & 9 deletions airflow/dags/Vibe_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 날짜 설정
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")

# S3 설정
S3_BUCKET = "de5-s4tify"
Expand Down Expand Up @@ -56,16 +56,15 @@ def convert_json_to_csv(**kwargs):
) # ✅ 모든 필드를 자동으로 따옴표 처리

# 헤더 추가
writer.writerow(["rank", "title", "artist",
"lastPos", "isNew", "image", "genre"])
writer.writerow(["rank", "title", "artist", "lastPos",
"isNew", "image", "genre", "date"])

# 데이터 추가
for entry in data["entries"]:
genres = json.dumps(
entry["genres"], ensure_ascii=False
) # 리스트를 문자열로 변환
# 이중 따옴표가 포함되면 한번만 보이도록 처리
genres = genres.replace('""', '"') # 이중 따옴표를 하나로 바꿈
# 리스트 그대로 저장 (genres는 리스트로 저장)
genres = entry["genres"] # 수정된 부분: 리스트 그대로 저장

# CSV에 추가
writer.writerow(
[
entry["rank"],
Expand All @@ -74,7 +73,8 @@ def convert_json_to_csv(**kwargs):
entry["lastPos"],
entry["isNew"],
entry["image"],
genres,
genres, # 수정된 부분: 리스트 그대로 저장
TODAY,
]
)

Expand Down
33 changes: 17 additions & 16 deletions airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@

# Snowflake 연결 정보 설정
SNOWFLAKE_OPTIONS = {
"user": Variable.get("SNOWFLAKE_USER"),
"password": Variable.get("SNOWFLAKE_PASSWORD"),
"account": Variable.get("SNOWFLAKE_ACCOUNT"),
"db": Variable.get("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": Variable.get("SNOWFLAKE_WH", "COMPUTE_WH"),
# "schema": (Variable.get("SNOWFLAKE_SCHEMA")if Variable.get("SNOWFLAKE_SCHEMA")else "raw_data"),
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"db": os.getenv("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": os.getenv("SNOWFLAKE_WH", "COMPUTE_WH"),
"schema": "RAW_DATA",
"role": "ACCOUNTADMIN",
"driver": "net.snowflake.client.jdbc.SnowflakeDriver",
"url": f'jdbc:snowflake://{Variable.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com',
# "url" : "jdbc:snowflake://kjqeovi-gr23658.snowflakecomputing.com",
"url": f'jdbc:snowflake://{os.getenv("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com',
}


Expand All @@ -42,9 +40,9 @@ def spark_session_builder(app_name: str) -> SparkSession:
"spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") .config(
"spark.hadoop.fs.s3a.access.key",
Variable.get("AWS_ACCESS_KEY")) .config(
os.getenv("AWS_ACCESS_KEY")) .config(
"spark.hadoop.fs.s3a.secret.key",
Variable.get("AWS_SECRET_KEY")) .config(
os.getenv("AWS_SECRET_KEY")) .config(
"spark.hadoop.fs.s3a.endpoint",
"s3.amazonaws.com") .config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
Expand Down Expand Up @@ -88,7 +86,8 @@ def check_and_create_table():
image STRING,
peakPos INT,
isNew BOOLEAN,
source STRING
source STRING,
date DATE -- 날짜 컬럼 추가
)
"""
cur.execute(create_table_query)
Expand Down Expand Up @@ -147,10 +146,11 @@ def insert_data_into_snowflake(df, table_name):
source = (
escape_quotes(
row["source"]) if row["source"] is not None else "NULL")
date = f"'{row['date']}'" # date 컬럼 추가

query = f"""
INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source)
VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source})
INSERT INTO {table_name} (rank, title, artist, genre, lastPos, image, peakPos, isNew, source, date)
VALUES ({rank}, {title}, {artist}, {genre}, {lastPos}, {image}, {peakPos}, {isNew}, {source}, {date})
"""
cur.execute(query)

Expand All @@ -168,7 +168,7 @@ def insert_data_into_snowflake(df, table_name):
spark = spark_session_builder("S3_to_Snowflake")

# 오늘 날짜 기반 S3 데이터 경로 생성
TODAY = datetime.now().strftime("%Y%m%d")
TODAY = datetime.now().strftime("%Y-%m-%d")
S3_BUCKET = "s3a://de5-s4tify"
chart_sources = {
"bugs": f"{S3_BUCKET}/raw_data/bugs_chart_data/bugs_chart_{TODAY}.csv",
Expand All @@ -187,7 +187,7 @@ def read_chart_data(source, path):
.option("inferSchema", True)
.load(path)
)
df.printSchema() # 데이터 스키마 출력해서 `genre` 확인
df.printSchema() # 데이터 스키마 출력해서 `genre`와 `date` 확인
return df.withColumn("source", lit(source))
except Exception as e:
print(f"⚠️ {source} 데이터 로드 실패: {e}")
Expand All @@ -211,7 +211,7 @@ def read_chart_data(source, path):
col("rank").cast("int")).alias("rank"),
col("title"),
col("artist"),
col("genre"), # genre 컬럼 추가
col("genre"), # genre 컬럼 추가
when(col("lastPos").rlike("^[0-9]+$"), col("lastPos").cast("int")).alias(
"lastPos"
),
Expand All @@ -223,6 +223,7 @@ def read_chart_data(source, path):
"isNew"
),
col("source"),
col("date"), # date 컬럼 추가
)

final_df.show(40)
Expand Down