diff --git a/airflow/.env b/airflow/.env index cbc0216..ca5fac9 100644 --- a/airflow/.env +++ b/airflow/.env @@ -18,16 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU AWS_SECRET_ACCESS_KEY=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 AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정 -# BSH SNOWFLAKE CONN INFO -SNOWFLAKE_USER_BSH= BSH -SNOWFLAKE_PASSWORD_BSH= BSH1234! -SNOWFLAKE_SILVER_SCHEMA = RAW_DATA -SNOWFLAKE_GOLD_SCHEMA = ANALYTICS - #S3_Spark_SnowFlake_ELT -SNOWFLAKE_USER= BY -SNOWFLAKE_PASSWORD= Zmfflsprtm1234 +SNOWFLAKE_USER= BSH +SNOWFLAKE_PASSWORD= BSH1234! +SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 SNOWFLAKE_WH= COMPUTE_WH +SNOWFLAKE_DB=S4TIFY SNOWFLAKE_SCHEMA = RAW_DATA -SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 -SNOWFLAKE_DB=S4TIFY \ No newline at end of file +SNOWFLAKE_ROLE=ANALYTICS_USERS \ No newline at end of file diff --git a/airflow/Dockerfile b/airflow/Dockerfile index 490e5ab..717504f 100644 --- a/airflow/Dockerfile +++ b/airflow/Dockerfile @@ -9,9 +9,19 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li apt-get update && apt-get install -y \ wget openjdk-11-jdk && \ mkdir -p $SPARK_JAR_DIR && \ - wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ - wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ - wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \ + wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar \ + https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \ + wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar \ + https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \ + # Snowflake JDBC Driver + wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.13.33.jar \ + https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.33/snowflake-jdbc-3.13.33.jar && \ + # Snowflake Spark Connector + wget -O $SPARK_JAR_DIR/spark-snowflake_2.12-2.12.0-spark_3.4.jar \ + https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar && \ + # Clean up to reduce image size apt-get clean && \ rm -rf /var/lib/apt/lists/* diff --git a/airflow/dags/ELT_eventsim_song_count_DAG.py b/airflow/dags/ELT_eventsim_song_count_DAG.py index 03f0a91..667578e 100644 --- a/airflow/dags/ELT_eventsim_song_count_DAG.py +++ b/airflow/dags/ELT_eventsim_song_count_DAG.py @@ -1,129 +1,14 @@ import os from datetime import timedelta -import pandas as pd -import snowflake.connector -from dotenv import load_dotenv -from spark_utils import execute_snowflake_query +from dags.plugins.variables import SPARK_JARS from airflow import DAG -from airflow.exceptions import AirflowFailException -from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator +from airflow.providers.apache.spark.operators.spark_submit import \ + SparkSubmitOperator from airflow.utils.dates import days_ago -load_dotenv() - -# SNOWFLAKE 설정 -SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" -SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") -SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" -SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" -SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") - -SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER_BSH"), - "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), - "account": os.environ.get("SNOWFLAKE_ACCOUNT"), - "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), -} - - -def extract_data_from_snowflake(): - """ - Snowflake에서 데이터를 읽어오는 함수 - """ - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA - - query = f""" - SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS - FROM {SNOWFLAKE_SOURCE_TABLE} - """ - try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_PROPERTIES["user"], - password=SNOWFLAKE_PROPERTIES["password"], - account=SNOWFLAKE_PROPERTIES["account"], - database=SNOWFLAKE_PROPERTIES["db"], - schema=SNOWFLAKE_PROPERTIES["schema"], - warehouse=SNOWFLAKE_PROPERTIES["warehouse"], - role=SNOWFLAKE_PROPERTIES["role"], - ) - cur = conn.cursor() - rows = cur.execute(query).fetchall() - columns = [desc[0] for desc in cur.description] - - df = pd.DataFrame(rows, columns=columns) - print(df.head(5)) - print("Data successfully extracted from Snowflake!") - return df - - except Exception as e: - print(f"Error extracting data from Snowflake: {e}") - return None - - finally: - cur.close() - conn.close() - - -def process_song_counts(**kwargs): - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs["ti"] - df = ti.xcom_pull(task_ids="extract_data") - - if df is None or df.empty: - raise AirflowFailException( - "No data available for processing song counts.") - - # 노래 카운트 계산 - song_counts = df.groupby( - ["SONG", "ARTIST"]).size().reset_index(name="song_count") - - # 테이블 비우기 - execute_snowflake_query( - f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES - ) - - # SQL 쿼리 (executemany 사용) - insert_query = f""" - INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count) - VALUES (%s, %s, %s) - """ - - # DataFrame을 리스트로 변환 후 executemany 실행 - execute_snowflake_query( - insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist() - ) - - print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.") - - -def process_artist_counts(**kwargs): - SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA - ti = kwargs["ti"] - df = ti.xcom_pull(task_ids="extract_data") - if df is None or df.empty: - raise AirflowFailException( - "No data available for processing artist counts.") - - artist_counts = df.groupby( - "ARTIST").size().reset_index(name="artist_count") - execute_snowflake_query( - f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};", - SNOWFLAKE_PROPERTIES) - insert_query = f""" - INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count) - VALUES (%s, %s) - """ - execute_snowflake_query( - insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist() - ) - print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.") - - # DAG 설정 default_args = { "owner": "sanghyoek_boo", @@ -148,26 +33,14 @@ def process_artist_counts(**kwargs): dag=dag, ) -extract_data_task = PythonOperator( - task_id="extract_data", - python_callable=extract_data_from_snowflake, - provide_context=True, - dag=dag, -) - -process_song_task = PythonOperator( - task_id="process_song_counts", - python_callable=process_song_counts, - provide_context=True, - dag=dag, -) - -process_artist_task = PythonOperator( - task_id="process_artist_counts", - python_callable=process_artist_counts, - provide_context=True, +spark_submit_task = SparkSubmitOperator( + task_id="process_songs_and_artists_spark", + application="dags/scripts/ELT_eventsim_script.py", + conn_id="spark_conn", + executor_memory="2g", + driver_memory="1g", + jars=SPARK_JARS, dag=dag, ) -trigger_dag_task >> extract_data_task >> [ - process_song_task, process_artist_task] +trigger_dag_task >> spark_submit_task diff --git a/airflow/dags/ETL_eventsim_DAG.py b/airflow/dags/ETL_eventsim_DAG.py index 668dbc8..ba539fd 100644 --- a/airflow/dags/ETL_eventsim_DAG.py +++ b/airflow/dags/ETL_eventsim_DAG.py @@ -1,22 +1,14 @@ -import os from datetime import datetime, timedelta +from dags.plugins.variables import SPARK_JARS + from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -# S3 및 Snowflake 설정 +# S3 설정 S3_BUCKET = "s3a://de5-s4tify" -# Spark JARs 설정 -SPARK_HOME = os.environ.get("SPARK_JAR_DIR") -SPARK_JARS = ",".join( - [ - os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"), - os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), - os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), - ] -) default_args = { "owner": "sanghyeok_boo", @@ -41,7 +33,7 @@ # SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE spark_job = SparkSubmitOperator( task_id="spark_process_s3_upsert", - application="/opt/airflow/dags/scripts/ETL_eventsim_script.py", + application="dags/scripts/ETL_eventsim_script.py", conn_id="spark_conn", application_args=[ S3_BUCKET, @@ -50,13 +42,6 @@ executor_memory="2g", driver_memory="1g", jars=SPARK_JARS, - conf={ - "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", - "spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}", - "spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}", - "spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com", - "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider", - }, dag=dag, ) diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/snowflake_utils.py similarity index 69% rename from airflow/dags/plugins/spark_utils.py rename to airflow/dags/plugins/snowflake_utils.py index 4261ee7..183c540 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/snowflake_utils.py @@ -6,41 +6,6 @@ from airflow.exceptions import AirflowFailException -# Spark JARs 설정 -# SPARK_HOME 설정 -SPARK_HOME = "/opt/spark" -os.environ["SPARK_HOME"] = SPARK_HOME - -# JAR 경로 설정 -SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars") -SPARK_JARS_LIST = [ - "snowflake-jdbc-3.9.2.jar", - "hadoop-aws-3.3.4.jar", - "aws-java-sdk-bundle-1.12.262.jar", -] -SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar) - for jar in SPARK_JARS_LIST]) - - -# Spark Session builder -def spark_session_builder(app_name: str) -> SparkSession: - """_summary_ - spark session builder for AWS S3 and Snowflake - Args: - app_name (str): spark session anme - - Returns: - SparkSession - """ - return ( - SparkSession.builder.appName(f"{app_name}") .config( - "spark.jars", - SPARK_JARS) .config( - "spark.driver.extraClassPath", - "/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config( - "spark.executor.extraClassPath", - SPARK_JARS) .getOrCreate()) - def execute_snowflake_query( query: str, snowflake_options: dict, data=None, fetch=False @@ -84,7 +49,6 @@ def execute_snowflake_query( if fetch: result = cur.fetchall() # 데이터 가져오기 - print("result: " + result) if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성 df = pd.DataFrame( result, columns=[ diff --git a/airflow/dags/plugins/spark_snowflake_conn.py b/airflow/dags/plugins/spark_snowflake_conn.py index ab33988..1bf43b6 100644 --- a/airflow/dags/plugins/spark_snowflake_conn.py +++ b/airflow/dags/plugins/spark_snowflake_conn.py @@ -1,31 +1,24 @@ +import os from datetime import datetime -import snowflake.connector +from dags.plugins.variables import SPARK_JARS from pyspark.sql import SparkSession -from airflow.models import Variable - -SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER") -SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD") -SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT") -SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL") -SNOWFLAKE_DB = "S4TIFY" -SNOWFLAKE_SCHEMA = "RAW_DATA" - -AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") -AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY") +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") TODAY = datetime.now().strftime("%Y-%m-%d") snowflake_options = { - "sfURL": SNOWFLAKE_URL, - "sfDatabase": SNOWFLAKE_DB, - "sfSchema": SNOWFLAKE_SCHEMA, - "sfWarehouse": "COMPUTE_WH", - "sfRole": "ANALYTICS_USERS", - "sfUser": SNOWFLAKE_USER, - "sfPassword": SNOWFLKAE_USER_PWD, + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), } @@ -37,32 +30,17 @@ def create_spark_session(app_name: str): .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") - .config( - "spark.jars", - "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar", - ) + .config("spark.jars", SPARK_JARS) .getOrCreate() ) return spark -def create_snowflake_conn(): - conn = snowflake.connector.connect( - user=SNOWFLAKE_USER, - password=SNOWFLKAE_USER_PWD, - account=SNOWFLAKE_ACCOUNT, - warehouse="COMPUTE_WH", - database=SNOWFLAKE_DB, - schema=SNOWFLAKE_SCHEMA, - ) - - return conn - - def create_snowflake_table(sql): - conn = create_snowflake_conn() + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() cur = conn.cursor() try: diff --git a/airflow/dags/plugins/variables.py b/airflow/dags/plugins/variables.py new file mode 100644 index 0000000..a2c8a6e --- /dev/null +++ b/airflow/dags/plugins/variables.py @@ -0,0 +1,49 @@ +import os + +SNOWFLAKE_PROPERTIES = { + "user": os.environ.get("SNOWFLAKE_USER"), + "password": os.environ.get("SNOWFLAKE_PASSWORD"), + "account": os.environ.get("SNOWFLAKE_ACCOUNT"), + "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), + "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), + "schema": os.getenv("SNOWFLAKE_SCHEMA"), + "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), + "driver": "net.snowflake.client.jdbc.SnowflakeDriver", + "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', +} +snowflake_options = { + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": os.getenv("SNOWFLAKE_SCHEMA"), + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), +} + +snowflake_gold_options = { + "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com", + "sfDatabase": os.getenv("SNOWFLAKE_DB"), + "sfSchema": "ANALYTICS", + "sfWarehouse": os.getenv("SNOWFLAKE_WH"), + "sfRole": os.getenv("SNOWFLAKE_ROLE"), + "sfUser": os.getenv("SNOWFLAKE_USER"), + "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"), +} +# Spark JARs 설정 +SPARK_JAR_DIR = os.environ.get("SPARK_JAR_DIR") +SPARK_JARS = ",".join( + [ + os.path.join( + SPARK_JAR_DIR, + "snowflake-jdbc-3.13.33.jar"), + os.path.join( + SPARK_JAR_DIR, + "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join( + SPARK_JAR_DIR, + "hadoop-aws-3.3.4.jar"), + os.path.join( + SPARK_JAR_DIR, + "aws-java-sdk-bundle-1.12.262.jar"), + ]) diff --git a/airflow/dags/scripts/ELT_eventsim_script.py b/airflow/dags/scripts/ELT_eventsim_script.py new file mode 100644 index 0000000..c2d8717 --- /dev/null +++ b/airflow/dags/scripts/ELT_eventsim_script.py @@ -0,0 +1,58 @@ +import os + +from dags.plugins.spark_snowflake_conn import create_spark_session +from dags.plugins.variables import snowflake_gold_options, snowflake_options +from pyspark.sql.functions import desc + +# SNOWFLAKE 설정 +SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG" +SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA") +SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS" +SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS" +SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS") + + +def main(): + spark = create_spark_session("Spark_Snowflake_Processing") + + # Snowflake에서 데이터 읽기 + df = ( + spark.read.format("snowflake") + .options(**snowflake_options) + .option("dbtable", SNOWFLAKE_SOURCE_TABLE) + .load() + ) + + df.show() # 데이터 확인 + + # SONG 집계 처리 + song_counts = ( + df.groupBy("SONG", "ARTIST") + .count() + .withColumnRenamed("count", "song_count") + .orderBy(desc("song_count")) + ) + + # ARTIST 집계 처리 + artist_counts = ( + df.groupBy("ARTIST") + .count() + .withColumnRenamed("count", "artist_count") + .orderBy(desc("artist_count")) + ) + + # Snowflake에 저장 + song_counts.write.format("snowflake").options( + **snowflake_gold_options).option( + "dbtable", SNOWFLAKE_TARGET_SONG_TABLE).mode("overwrite").save() + + artist_counts.write.format("snowflake").options( + **snowflake_gold_options).option( + "dbtable", SNOWFLAKE_TARGET_ARTIST_TABLE).mode("overwrite").save() + + print("(INFO): 데이터 적재 완료") + spark.stop() + + +if __name__ == "__main__": + main() diff --git a/airflow/dags/scripts/ETL_eventsim_script.py b/airflow/dags/scripts/ETL_eventsim_script.py index 3005552..f22d45f 100644 --- a/airflow/dags/scripts/ETL_eventsim_script.py +++ b/airflow/dags/scripts/ETL_eventsim_script.py @@ -1,30 +1,15 @@ -import os import sys from datetime import datetime -from dotenv import load_dotenv -from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, - StructType) -from spark_utils import (escape_quotes, execute_snowflake_query, - spark_session_builder) - -load_dotenv() +from dags.plugins.snowflake_utils import execute_snowflake_query +from dags.plugins.spark_snowflake_conn import create_spark_session +from dags.plugins.variables import SNOWFLAKE_PROPERTIES, snowflake_options # SNOW_FLAKE 설정 SNOWFLAKE_TABLE = "EVENTSIM_LOG" SNOWFLAKE_TEMP_TABLE = "EVENTS_TABLE_TEMP" SNOWFLAKE_SCHEMA = "RAW_DATA" -SNOWFLAKE_PROPERTIES = { - "user": os.environ.get("SNOWFLAKE_USER_BSH"), - "password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"), - "account": os.environ.get("SNOWFLAKE_ACCOUNT"), - "db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"), - "warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"), - "schema": SNOWFLAKE_SCHEMA if SNOWFLAKE_SCHEMA else "RAW_DATA", - "role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"), - "driver": "net.snowflake.client.jdbc.SnowflakeDriver", - "url": f'jdbc:snowflake://{os.environ.get("SNOWFLAKE_ACCOUNT")}.snowflakecomputing.com', -} + S3_BUCKET = sys.argv[1] DATA_INTERVAL_START = sys.argv[2] # 날짜 변환 (data_interval_start -> year/month/day 형식) @@ -34,7 +19,7 @@ day = date_obj.strftime("%d") # ------------------------------------------------- -spark = spark_session_builder("app") +spark = create_spark_session("etl_streaming_session") # S3에서 데이터 읽어오기 df = spark.read.json( @@ -70,10 +55,7 @@ ); """ execute_snowflake_query(create_table_sql, SNOWFLAKE_PROPERTIES) -print("Create Table") -# -----------------------UPSERT---------------------- -# Snowflake TEMP 테이블에 데이터 적재 create_temp_table_sql = f""" CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} ( song STRING, @@ -85,18 +67,10 @@ ); """ execute_snowflake_query(create_temp_table_sql, SNOWFLAKE_PROPERTIES) -print("TEMP 테이블 확인 완료") - -# TEMP 테이블에 데이터 INSERT -# data_to_insert = [tuple(row) for row in df_clean.collect()] -# for row in data_to_insert: -# insert_temp_table_sql = f""" -# INSERT INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE} (song, artist, location, sessionId, userId, ts) -# VALUES (%s, %s, %s, %s, %s, %s) -# """ - -# execute_snowflake_query(insert_temp_table_sql, SNOWFLAKE_PROPERTIES, data=row) -df_clean.write.format("jdbc").options(**SNOWFLAKE_PROPERTIES).option( +print("테이블 생성 완료") +# -----------------------UPSERT---------------------- +# Snowflake TEMP 테이블에 데이터 적재 +df_clean.write.format("snowflake").options(**snowflake_options).option( "dbtable", f"{SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TEMP_TABLE}" ).mode("overwrite").save() print("TEMP 테이블 적재 완료") diff --git a/airflow/dags/scripts/crawling_spotify_data.py b/airflow/dags/scripts/crawling_spotify_data.py index 420769a..5439626 100644 --- a/airflow/dags/scripts/crawling_spotify_data.py +++ b/airflow/dags/scripts/crawling_spotify_data.py @@ -4,13 +4,12 @@ from datetime import datetime, timedelta import pandas as pd +from scripts.load_spotify_data import * from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By -from scripts.load_spotify_data import * - TODAY = datetime.now().strftime("%Y-%m-%d") FILE_PATH = f"data/spotify_crawling_data{TODAY}.csv" @@ -30,7 +29,7 @@ def save_as_csv_file(df, logical_date): file_path = f"data/spotify_crawling_data_{TODAY}.csv" df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False) - load_s3_bucket(f'spotify_crawling_data_{logical_date}.csv') + load_s3_bucket(f"spotify_crawling_data_{logical_date}.csv") def data_crawling(logical_date): diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index 3fbb066..35199b7 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -85,6 +85,8 @@ x-airflow-common: # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks # for other purpose (development, test and especially production usage) build/extend Airflow image. AIRFLOW_CONN_AWS_S3: aws://$AWS_ACCESS_KEY_ID:$AWS_SECRET_ACCESS_KEY@$AWS_DEFAULT_REGION + AIRFLOW_CONN_SNOWFLAKE_CONN: snowflake://$SNOWFLAKE_USER:$SNOWFLAKE_PASSWORD@$SNOWFLAKE_SCHEMA/?account=$SNOWFLAKE_ACCOUNT&warehouse=$SNOWFLAKE_WH&database=$SNOWFLAKE_DB&schema=$SNOWFLAKE_SCHEMA&role=$SNOWFLAKE_ROLE + AIRFLOW_CONN_SPARK_CONN: Spark://spark://spark-master:7077/?deploy-mode=client&spark-binary=spark-submit _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- selenium pandas numpy webdriver-manager oauth2client gspread snowflake-connector-python pyspark python-dotenv apache-airflow-providers-apache-spark apache-airflow-providers-snowflake} # The following line can be used to set a custom config file, stored in the local config folder # If you want to use it, outcommen t it and replace airflow.cfg with the name of your config file