diff --git a/airflow/.env b/airflow/.env index ca5fac9..c5860d5 100644 --- a/airflow/.env +++ b/airflow/.env @@ -25,4 +25,4 @@ SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658 SNOWFLAKE_WH= COMPUTE_WH SNOWFLAKE_DB=S4TIFY SNOWFLAKE_SCHEMA = RAW_DATA -SNOWFLAKE_ROLE=ANALYTICS_USERS \ No newline at end of file +SNOWFLAKE_ROLE=ANALYTICS_USERS diff --git a/airflow/dags/S3_Spark_SnowFlake_DAG.py b/airflow/dags/S3_Spark_SnowFlake_DAG.py index 27436f0..fec3677 100644 --- a/airflow/dags/S3_Spark_SnowFlake_DAG.py +++ b/airflow/dags/S3_Spark_SnowFlake_DAG.py @@ -2,13 +2,12 @@ from datetime import datetime, timedelta from airflow import DAG -from airflow.models import Variable from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator # .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지) -AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None) -AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None) +AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID") +AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") if not AWS_ACCESS_KEY or not AWS_SECRET_KEY: raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.") diff --git a/airflow/dags/scripts/ELT_artist_info_globalTop50.py b/airflow/dags/scripts/ELT_artist_info_globalTop50.py index efff14a..1a5705a 100644 --- a/airflow/dags/scripts/ELT_artist_info_globalTop50.py +++ b/airflow/dags/scripts/ELT_artist_info_globalTop50.py @@ -8,9 +8,7 @@ regexp_replace, split, when) from pyspark.sql.types import IntegerType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/scripts/ELT_artist_info_top10.py b/airflow/dags/scripts/ELT_artist_info_top10.py index e55f5d5..9dd22fa 100644 --- a/airflow/dags/scripts/ELT_artist_info_top10.py +++ b/airflow/dags/scripts/ELT_artist_info_top10.py @@ -6,9 +6,7 @@ from pyspark.sql.functions import col, current_date, regexp_replace, split, udf from pyspark.sql.types import ArrayType, StringType, StructField, StructType -from airflow.models import Variable - -LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY") +LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY") BUCKET_NAME = "de5-s4tify" OBJECT_NAME = "raw_data" diff --git a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py index b2ca4a0..0fec472 100644 --- a/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py +++ b/airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py @@ -5,7 +5,7 @@ from pyspark.sql import SparkSession from pyspark.sql.functions import col, count, lit, when -from airflow.models import Variable +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # Spark JARs 설정 SPARK_HOME = "/opt/spark/" @@ -40,9 +40,9 @@ def spark_session_builder(app_name: str) -> SparkSession: "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .config( "spark.hadoop.fs.s3a.access.key", - os.getenv("AWS_ACCESS_KEY")) .config( + os.getenv("AWS_ACCESS_KEY_ID")) .config( "spark.hadoop.fs.s3a.secret.key", - os.getenv("AWS_SECRET_KEY")) .config( + os.getenv("AWS_SECRET_ACCESS_KEY")) .config( "spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") .config( "spark.hadoop.fs.s3a.aws.credentials.provider", @@ -50,19 +50,18 @@ def spark_session_builder(app_name: str) -> SparkSession: ) .getOrCreate()) +# snowflake connector +def create_snowflake_conn(): + hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA") + conn = hook.get_conn() + cur = conn.cursor() + return conn, cur + + # Snowflake에서 SQL 실행 함수 def check_and_create_table(): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() + conn, cur = create_snowflake_conn() # 테이블 존재 여부 확인 cur.execute( @@ -113,16 +112,7 @@ def escape_quotes(value): # Snowflake에서 SQL 실행 함수 def insert_data_into_snowflake(df, table_name): try: - conn = snowflake.connector.connect( - user=SNOWFLAKE_OPTIONS["user"], - password=SNOWFLAKE_OPTIONS["password"], - account=SNOWFLAKE_OPTIONS["account"], - database=SNOWFLAKE_OPTIONS["db"], - schema=SNOWFLAKE_OPTIONS["schema"], - warehouse=SNOWFLAKE_OPTIONS["warehouse"], - role=SNOWFLAKE_OPTIONS["role"], - ) - cur = conn.cursor() + conn, cur = create_snowflake_conn() for row in df.collect(): rank = "NULL" if row["rank"] is None else row["rank"] diff --git a/airflow/dags/spotify_ELT_DAG.py b/airflow/dags/spotify_ELT_DAG.py index 2f4ea9d..8c3fb96 100644 --- a/airflow/dags/spotify_ELT_DAG.py +++ b/airflow/dags/spotify_ELT_DAG.py @@ -8,6 +8,18 @@ from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator +# Spakr JARs 설정 +SPARK_HOME = os.environ.get("SPARK_JAR_DIR") +SPARK_JARS = ",".join( + [ + os.path.join(SPARK_HOME, "snowflake-jdbc-3.13.33.jar"), + os.path.join(SPARK_HOME, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"), + os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"), + os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"), + ] +) + + # DAG 기본 설정 default_args = { "owner": "yerin", @@ -27,14 +39,16 @@ artist_info_Top10_table = SparkSubmitOperator( task_id="artist_info_top10_table", application="dags/scripts/ELT_artist_info_top10.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, ) artist_info_globalTop50_table = SparkSubmitOperator( task_id="artist_info_globalTop50_table", application="dags/scripts/ELT_artist_info_globalTop50.py", - conn_id="spark_default", + conn_id="spark_conn", + jars=SPARK_JARS, dag=dag, )