From 7ff516d22cf37ef9242a7a88623601c0957280c3 Mon Sep 17 00:00:00 2001 From: Yeojun <52143231+Yun024@users.noreply.github.com> Date: Sun, 16 Mar 2025 19:11:43 +0900 Subject: [PATCH 1/2] dir_path_change --- airflow/dags/scripts/ELT_artist_info_globalTop50.py | 10 +++++----- airflow/dags/scripts/ELT_artist_info_top10.py | 8 ++++---- airflow/dags/scripts/crawling_spotify_data.py | 5 +++-- airflow/dags/scripts/load_spotify_data.py | 4 ++-- airflow/dags/scripts/request_spotify_api.py | 6 ++++-- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 1a5705a..e31f4e5 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -63,10 +63,10 @@ def transformation(): # 데이터 읽고 중복 제거 artist_info_df = extract( - "spotify_artist_info", + "artist_info", artist_info_schema).dropDuplicates( ["artist_id"]) - global_top50_df = extract("spotify_crawling_data", global_top50_schema) + global_top50_df = extract("crawling_data", global_top50_schema) global_top50_df = global_top50_df.withColumn( "artist_id", explode("artist_id")) @@ -86,12 +86,12 @@ def extract(file_name, schema): spark = create_spark_session("artist_global_table") df = spark.read.csv( - f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}/spotify_{file_name}_{TODAY}.csv", header=True, schema=schema, ) - if file_name == "spotify_crawling_data": + if file_name == "crawling_data": df = ( df.withColumn( "artist", split( @@ -106,7 +106,7 @@ def extract(file_name, schema): col("artist").isNull(), lit( [""])).otherwise( col("artist")), )) - if file_name == "spotify_artist_info": + if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") ) # 불필요한 문자 제거 diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index 9dd22fa..e22e34d 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -60,10 +60,10 @@ def transformation(): # 데이터 읽어오고 중복 제거 artist_top10_df = extract( - "spotify_artist_top10", artist_top10_schema + "artist_top10", artist_top10_schema ).dropDuplicates(["song_id"]) artist_info_df = extract( - "spotify_artist_info", + "artist_info", artist_info_schema).dropDuplicates( ["artist_id"]) @@ -111,12 +111,12 @@ def extract(file_name, schema): spark = create_spark_session("artist_top10_table") df = spark.read.csv( - f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}/spotify_{file_name}_{TODAY}.csv", header=True, schema=schema, ) - if file_name == "spotify_artist_info": + if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") ) # 불필요한 문자 제거 diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 5439626..8d05a9e 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -25,11 +25,12 @@ def make_dataframe(): # 크롤링 데이터를 csv로 저장 def save_as_csv_file(df, logical_date): - + + dir_path = "crawling_data" file_path = f"data/spotify_crawling_data_{TODAY}.csv" df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False) - load_s3_bucket(f"spotify_crawling_data_{logical_date}.csv") + load_s3_bucket(dir_path,f"spotify_crawling_data_{logical_date}.csv") def data_crawling(logical_date): diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 53b3e7a..4c21465 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -5,11 +5,11 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook -def load_s3_bucket(file_name): +def load_s3_bucket(dir_name,file_name): s3_hook = S3Hook(aws_conn_id="AWS_S3") s3_bucket = "de5-s4tify" - s3_key = f"raw_data/{file_name}" + s3_key = f"raw_data/{dir_name}/{file_name}" local_file_path = f"data/{file_name}" diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 8df83dd..649a87f 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -29,6 +29,7 @@ def get_arti_top10(logical_date, **kwargs): task_instance = kwargs["ti"] arti_top10_list = [] + dir_name = "artist_top10" object_name = f"spotify_artist_top10_{logical_date}.csv" # csv 파일 읽어오기 @@ -65,7 +66,7 @@ def get_arti_top10(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(object_name) + load_s3_bucket(dir_name,object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}") @@ -76,6 +77,7 @@ def get_artist_info(logical_date, **kwargs): task_instance = kwargs["ti"] artist_info_list = [] + dir_name = "artist_info" object_name = f"spotify_artist_info_{logical_date}.csv" # csv 파일 읽어오기 @@ -109,7 +111,7 @@ def get_artist_info(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(object_name) + load_s3_bucket(dir_name,object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}") From 2dec2987d1e875d1142b3bb3b0cd68cf0cc9e2a7 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sun, 16 Mar 2025 10:12:47 +0000 Subject: [PATCH 2/2] Automated format fixes --- airflow/dags/scripts/ELT_artist_info_globalTop50.py | 7 +++---- airflow/dags/scripts/ELT_artist_info_top10.py | 12 ++++++------ airflow/dags/scripts/crawling_spotify_data.py | 4 ++-- airflow/dags/scripts/load_spotify_data.py | 2 +- airflow/dags/scripts/request_spotify_api.py | 4 ++-- 5 files changed, 14 insertions(+), 15 deletions(-) diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index e31f4e5..0e492f8 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -62,10 +62,9 @@ def transformation(): ) # 데이터 읽고 중복 제거 - artist_info_df = extract( - "artist_info", - artist_info_schema).dropDuplicates( - ["artist_id"]) + artist_info_df = extract("artist_info", artist_info_schema).dropDuplicates( + ["artist_id"] + ) global_top50_df = extract("crawling_data", global_top50_schema) global_top50_df = global_top50_df.withColumn( diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index e22e34d..a525f37 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -60,12 +60,12 @@ def transformation(): # 데이터 읽어오고 중복 제거 artist_top10_df = extract( - "artist_top10", artist_top10_schema - ).dropDuplicates(["song_id"]) - artist_info_df = extract( - "artist_info", - artist_info_schema).dropDuplicates( - ["artist_id"]) + "artist_top10", + artist_top10_schema).dropDuplicates( + ["song_id"]) + artist_info_df = extract("artist_info", artist_info_schema).dropDuplicates( + ["artist_id"] + ) artist_info_top10_df = artist_info_df.join( artist_top10_df, on="artist_id", how="outer" diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 8d05a9e..26fc9f8 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -25,12 +25,12 @@ def make_dataframe(): # 크롤링 데이터를 csv로 저장 def save_as_csv_file(df, logical_date): - + dir_path = "crawling_data" file_path = f"data/spotify_crawling_data_{TODAY}.csv" df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False) - load_s3_bucket(dir_path,f"spotify_crawling_data_{logical_date}.csv") + load_s3_bucket(dir_path, f"spotify_crawling_data_{logical_date}.csv") def data_crawling(logical_date): diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 4c21465..f496e33 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -5,7 +5,7 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook -def load_s3_bucket(dir_name,file_name): +def load_s3_bucket(dir_name, file_name): s3_hook = S3Hook(aws_conn_id="AWS_S3") s3_bucket = "de5-s4tify" diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 649a87f..e6863b9 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -66,7 +66,7 @@ def get_arti_top10(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(dir_name,object_name) + load_s3_bucket(dir_name, object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}") @@ -111,7 +111,7 @@ def get_artist_info(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(dir_name,object_name) + load_s3_bucket(dir_name, object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}")