diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index 1a5705a..0e492f8 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -62,11 +62,10 @@ def transformation(): ) # 데이터 읽고 중복 제거 - artist_info_df = extract( - "spotify_artist_info", - artist_info_schema).dropDuplicates( - ["artist_id"]) - global_top50_df = extract("spotify_crawling_data", global_top50_schema) + artist_info_df = extract("artist_info", artist_info_schema).dropDuplicates( + ["artist_id"] + ) + global_top50_df = extract("crawling_data", global_top50_schema) global_top50_df = global_top50_df.withColumn( "artist_id", explode("artist_id")) @@ -86,12 +85,12 @@ def extract(file_name, schema): spark = create_spark_session("artist_global_table") df = spark.read.csv( - f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}/spotify_{file_name}_{TODAY}.csv", header=True, schema=schema, ) - if file_name == "spotify_crawling_data": + if file_name == "crawling_data": df = ( df.withColumn( "artist", split( @@ -106,7 +105,7 @@ def extract(file_name, schema): col("artist").isNull(), lit( [""])).otherwise( col("artist")), )) - if file_name == "spotify_artist_info": + if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") ) # 불필요한 문자 제거 diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index 9dd22fa..a525f37 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -60,12 +60,12 @@ def transformation(): # 데이터 읽어오고 중복 제거 artist_top10_df = extract( - "spotify_artist_top10", artist_top10_schema - ).dropDuplicates(["song_id"]) - artist_info_df = extract( - "spotify_artist_info", - artist_info_schema).dropDuplicates( - ["artist_id"]) + "artist_top10", + artist_top10_schema).dropDuplicates( + ["song_id"]) + artist_info_df = extract("artist_info", artist_info_schema).dropDuplicates( + ["artist_id"] + ) artist_info_top10_df = artist_info_df.join( artist_top10_df, on="artist_id", how="outer" @@ -111,12 +111,12 @@ def extract(file_name, schema): spark = create_spark_session("artist_top10_table") df = spark.read.csv( - f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", + f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}/spotify_{file_name}_{TODAY}.csv", header=True, schema=schema, ) - if file_name == "spotify_artist_info": + if file_name == "artist_info": df = df.withColumn( "artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "") ) # 불필요한 문자 제거 diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 5439626..26fc9f8 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -26,10 +26,11 @@ def make_dataframe(): # 크롤링 데이터를 csv로 저장 def save_as_csv_file(df, logical_date): + dir_path = "crawling_data" file_path = f"data/spotify_crawling_data_{TODAY}.csv" df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False) - load_s3_bucket(f"spotify_crawling_data_{logical_date}.csv") + load_s3_bucket(dir_path, f"spotify_crawling_data_{logical_date}.csv") def data_crawling(logical_date): diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 53b3e7a..f496e33 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -5,11 +5,11 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook -def load_s3_bucket(file_name): +def load_s3_bucket(dir_name, file_name): s3_hook = S3Hook(aws_conn_id="AWS_S3") s3_bucket = "de5-s4tify" - s3_key = f"raw_data/{file_name}" + s3_key = f"raw_data/{dir_name}/{file_name}" local_file_path = f"data/{file_name}" diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index 8df83dd..e6863b9 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -29,6 +29,7 @@ def get_arti_top10(logical_date, **kwargs): task_instance = kwargs["ti"] arti_top10_list = [] + dir_name = "artist_top10" object_name = f"spotify_artist_top10_{logical_date}.csv" # csv 파일 읽어오기 @@ -65,7 +66,7 @@ def get_arti_top10(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(object_name) + load_s3_bucket(dir_name, object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}") @@ -76,6 +77,7 @@ def get_artist_info(logical_date, **kwargs): task_instance = kwargs["ti"] artist_info_list = [] + dir_name = "artist_info" object_name = f"spotify_artist_info_{logical_date}.csv" # csv 파일 읽어오기 @@ -109,7 +111,7 @@ def get_artist_info(logical_date, **kwargs): encoding="utf-8-sig", index=False) try: - load_s3_bucket(object_name) + load_s3_bucket(dir_name, object_name) os.remove(f"data/{object_name}") except Exception as e: print(f"error: {e}")