Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/.env
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658
SNOWFLAKE_WH= COMPUTE_WH
SNOWFLAKE_DB=S4TIFY
SNOWFLAKE_SCHEMA = RAW_DATA
SNOWFLAKE_ROLE=ANALYTICS_USERS
SNOWFLAKE_ROLE=ANALYTICS_USERS
5 changes: 2 additions & 3 deletions airflow/dags/S3_Spark_SnowFlake_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

# .env에서 AWS 자격 증명 불러오기 (없으면 예외 발생 방지)
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY", default_var=None)
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY", default_var=None)
AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

if not AWS_ACCESS_KEY or not AWS_SECRET_KEY:
raise ValueError("AWS_ACCESS_KEY 또는 AWS_SECRET_KEY가 설정되지 않았습니다.")
Expand Down
4 changes: 1 addition & 3 deletions airflow/dags/scripts/ELT_artist_info_globalTop50.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
regexp_replace, split, when)
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

from airflow.models import Variable

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")
LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"
Expand Down
4 changes: 1 addition & 3 deletions airflow/dags/scripts/ELT_artist_info_top10.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
from pyspark.sql.functions import col, current_date, regexp_replace, split, udf
from pyspark.sql.types import ArrayType, StringType, StructField, StructType

from airflow.models import Variable

LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")
LAST_FM_API_KEY = os.getenv("LAST_FM_API_KEY")

BUCKET_NAME = "de5-s4tify"
OBJECT_NAME = "raw_data"
Expand Down
36 changes: 13 additions & 23 deletions airflow/dags/scripts/S3_Spark_SnowFlake_ELT.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, when

from airflow.models import Variable
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

# Spark JARs 설정
SPARK_HOME = "/opt/spark/"
Expand Down Expand Up @@ -40,29 +40,28 @@ def spark_session_builder(app_name: str) -> SparkSession:
"spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem") .config(
"spark.hadoop.fs.s3a.access.key",
os.getenv("AWS_ACCESS_KEY")) .config(
os.getenv("AWS_ACCESS_KEY_ID")) .config(
"spark.hadoop.fs.s3a.secret.key",
os.getenv("AWS_SECRET_KEY")) .config(
os.getenv("AWS_SECRET_ACCESS_KEY")) .config(
"spark.hadoop.fs.s3a.endpoint",
"s3.amazonaws.com") .config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
) .getOrCreate())


# snowflake connector
def create_snowflake_conn():
hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA")
conn = hook.get_conn()
cur = conn.cursor()
return conn, cur


# Snowflake에서 SQL 실행 함수
def check_and_create_table():
try:
conn = snowflake.connector.connect(
user=SNOWFLAKE_OPTIONS["user"],
password=SNOWFLAKE_OPTIONS["password"],
account=SNOWFLAKE_OPTIONS["account"],
database=SNOWFLAKE_OPTIONS["db"],
schema=SNOWFLAKE_OPTIONS["schema"],
warehouse=SNOWFLAKE_OPTIONS["warehouse"],
role=SNOWFLAKE_OPTIONS["role"],
)
cur = conn.cursor()
conn, cur = create_snowflake_conn()

# 테이블 존재 여부 확인
cur.execute(
Expand Down Expand Up @@ -113,16 +112,7 @@ def escape_quotes(value):
# Snowflake에서 SQL 실행 함수
def insert_data_into_snowflake(df, table_name):
try:
conn = snowflake.connector.connect(
user=SNOWFLAKE_OPTIONS["user"],
password=SNOWFLAKE_OPTIONS["password"],
account=SNOWFLAKE_OPTIONS["account"],
database=SNOWFLAKE_OPTIONS["db"],
schema=SNOWFLAKE_OPTIONS["schema"],
warehouse=SNOWFLAKE_OPTIONS["warehouse"],
role=SNOWFLAKE_OPTIONS["role"],
)
cur = conn.cursor()
conn, cur = create_snowflake_conn()

for row in df.collect():
rank = "NULL" if row["rank"] is None else row["rank"]
Expand Down
18 changes: 16 additions & 2 deletions airflow/dags/spotify_ELT_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

# Spakr JARs 설정
SPARK_HOME = os.environ.get("SPARK_JAR_DIR")
SPARK_JARS = ",".join(
[
os.path.join(SPARK_HOME, "snowflake-jdbc-3.13.33.jar"),
os.path.join(SPARK_HOME, "spark-snowflake_2.12-2.12.0-spark_3.4.jar"),
os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"),
os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"),
]
)


# DAG 기본 설정
default_args = {
"owner": "yerin",
Expand All @@ -27,14 +39,16 @@
artist_info_Top10_table = SparkSubmitOperator(
task_id="artist_info_top10_table",
application="dags/scripts/ELT_artist_info_top10.py",
conn_id="spark_default",
conn_id="spark_conn",
jars=SPARK_JARS,
dag=dag,
)

artist_info_globalTop50_table = SparkSubmitOperator(
task_id="artist_info_globalTop50_table",
application="dags/scripts/ELT_artist_info_globalTop50.py",
conn_id="spark_default",
conn_id="spark_conn",
jars=SPARK_JARS,
dag=dag,
)

Expand Down