Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions airflow/.env
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ AWS_ACCESS_KEY_ID=AKIA4RRVVY55VT3PCIZU
AWS_SECRET_ACCESS_KEY=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
AWS_DEFAULT_REGION=ap-northeast-2 # 사용 지역 설정

# BSH SNOWFLAKE CONN INFO
SNOWFLAKE_USER_BSH= BSH
SNOWFLAKE_PASSWORD_BSH= BSH1234!
SNOWFLAKE_SILVER_SCHEMA = RAW_DATA
SNOWFLAKE_GOLD_SCHEMA = ANALYTICS

#S3_Spark_SnowFlake_ELT
SNOWFLAKE_USER= BY
SNOWFLAKE_PASSWORD= Zmfflsprtm1234
SNOWFLAKE_USER= BSH
SNOWFLAKE_PASSWORD= BSH1234!
SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658
SNOWFLAKE_WH= COMPUTE_WH
SNOWFLAKE_DB=S4TIFY
SNOWFLAKE_SCHEMA = RAW_DATA
SNOWFLAKE_ACCOUNT=KJQEOVI-GR23658
SNOWFLAKE_DB=S4TIFY
SNOWFLAKE_ROLE=ANALYTICS_USERS
16 changes: 13 additions & 3 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@ RUN echo "deb http://deb.debian.org/debian bullseye main" >> /etc/apt/sources.li
apt-get update && apt-get install -y \
wget openjdk-11-jdk && \
mkdir -p $SPARK_JAR_DIR && \
wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \
wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \
wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \
wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.9.2.jar \
https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.9.2/snowflake-jdbc-3.9.2.jar && \
wget -O $SPARK_JAR_DIR/hadoop-aws-3.3.4.jar \
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar && \
wget -O $SPARK_JAR_DIR/aws-java-sdk-bundle-1.12.262.jar \
https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar && \
# Snowflake JDBC Driver
wget -O $SPARK_JAR_DIR/snowflake-jdbc-3.13.33.jar \
https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.33/snowflake-jdbc-3.13.33.jar && \
# Snowflake Spark Connector
wget -O $SPARK_JAR_DIR/spark-snowflake_2.12-2.12.0-spark_3.4.jar \
https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar && \
# Clean up to reduce image size
apt-get clean && \
rm -rf /var/lib/apt/lists/*

Expand Down
149 changes: 11 additions & 138 deletions airflow/dags/ELT_eventsim_song_count_DAG.py
Original file line number Diff line number Diff line change
@@ -1,129 +1,14 @@
import os
from datetime import timedelta

import pandas as pd
import snowflake.connector
from dotenv import load_dotenv
from spark_utils import execute_snowflake_query
from dags.plugins.variables import SPARK_JARS

from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator
from airflow.utils.dates import days_ago

load_dotenv()

# SNOWFLAKE 설정
SNOWFLAKE_SOURCE_TABLE = "EVENTSIM_LOG"
SNOWFLAKE_SOURCE_SCHEMA = os.environ.get("SNOWFLAKE_SILVER_SCHMEA", "RAW_DATA")
SNOWFLAKE_TARGET_SONG_TABLE = "EVENTSIM_SONG_COUNTS"
SNOWFLAKE_TARGET_ARTIST_TABLE = "EVENTSIM_ARTIST_COUNTS"
SNOWFLAKE_TARGET_SCHEMA = os.environ.get("SNOWFLAKE_GOLD_SCHEMA", "ANALYTICS")

SNOWFLAKE_PROPERTIES = {
"user": os.environ.get("SNOWFLAKE_USER_BSH"),
"password": os.environ.get("SNOWFLAKE_PASSWORD_BSH"),
"account": os.environ.get("SNOWFLAKE_ACCOUNT"),
"db": os.environ.get("SNOWFLAKE_DB", "S4TIFY"),
"warehouse": os.environ.get("SNOWFLAKE_WH", "COMPUTE_WH"),
"role": os.environ.get("SNOWFLAKE_ROLE", "ANALYTICS_USERS"),
}


def extract_data_from_snowflake():
"""
Snowflake에서 데이터를 읽어오는 함수
"""
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_SOURCE_SCHEMA

query = f"""
SELECT SONG, ARTIST, LOCATION, SESSIONID, USERID, TS
FROM {SNOWFLAKE_SOURCE_TABLE}
"""
try:
conn = snowflake.connector.connect(
user=SNOWFLAKE_PROPERTIES["user"],
password=SNOWFLAKE_PROPERTIES["password"],
account=SNOWFLAKE_PROPERTIES["account"],
database=SNOWFLAKE_PROPERTIES["db"],
schema=SNOWFLAKE_PROPERTIES["schema"],
warehouse=SNOWFLAKE_PROPERTIES["warehouse"],
role=SNOWFLAKE_PROPERTIES["role"],
)
cur = conn.cursor()
rows = cur.execute(query).fetchall()
columns = [desc[0] for desc in cur.description]

df = pd.DataFrame(rows, columns=columns)
print(df.head(5))
print("Data successfully extracted from Snowflake!")
return df

except Exception as e:
print(f"Error extracting data from Snowflake: {e}")
return None

finally:
cur.close()
conn.close()


def process_song_counts(**kwargs):
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA
ti = kwargs["ti"]
df = ti.xcom_pull(task_ids="extract_data")

if df is None or df.empty:
raise AirflowFailException(
"No data available for processing song counts.")

# 노래 카운트 계산
song_counts = df.groupby(
["SONG", "ARTIST"]).size().reset_index(name="song_count")

# 테이블 비우기
execute_snowflake_query(
f"TRUNCATE TABLE {SNOWFLAKE_TARGET_SONG_TABLE};", SNOWFLAKE_PROPERTIES
)

# SQL 쿼리 (executemany 사용)
insert_query = f"""
INSERT INTO {SNOWFLAKE_TARGET_SONG_TABLE} (SONG, ARTIST, song_count)
VALUES (%s, %s, %s)
"""

# DataFrame을 리스트로 변환 후 executemany 실행
execute_snowflake_query(
insert_query, SNOWFLAKE_PROPERTIES, data=song_counts.values.tolist()
)

print(f"Data written to {SNOWFLAKE_TARGET_SONG_TABLE} in Snowflake.")


def process_artist_counts(**kwargs):
SNOWFLAKE_PROPERTIES["schema"] = SNOWFLAKE_TARGET_SCHEMA
ti = kwargs["ti"]
df = ti.xcom_pull(task_ids="extract_data")
if df is None or df.empty:
raise AirflowFailException(
"No data available for processing artist counts.")

artist_counts = df.groupby(
"ARTIST").size().reset_index(name="artist_count")
execute_snowflake_query(
f"TRUNCATE TABLE {SNOWFLAKE_TARGET_ARTIST_TABLE};",
SNOWFLAKE_PROPERTIES)
insert_query = f"""
INSERT INTO {SNOWFLAKE_TARGET_ARTIST_TABLE} (ARTIST, artist_count)
VALUES (%s, %s)
"""
execute_snowflake_query(
insert_query, SNOWFLAKE_PROPERTIES, artist_counts.values.tolist()
)
print(f"Data written to {SNOWFLAKE_TARGET_ARTIST_TABLE} in Snowflake.")


# DAG 설정
default_args = {
"owner": "sanghyoek_boo",
Expand All @@ -148,26 +33,14 @@ def process_artist_counts(**kwargs):
dag=dag,
)

extract_data_task = PythonOperator(
task_id="extract_data",
python_callable=extract_data_from_snowflake,
provide_context=True,
dag=dag,
)

process_song_task = PythonOperator(
task_id="process_song_counts",
python_callable=process_song_counts,
provide_context=True,
dag=dag,
)

process_artist_task = PythonOperator(
task_id="process_artist_counts",
python_callable=process_artist_counts,
provide_context=True,
spark_submit_task = SparkSubmitOperator(
task_id="process_songs_and_artists_spark",
application="dags/scripts/ELT_eventsim_script.py",
conn_id="spark_conn",
executor_memory="2g",
driver_memory="1g",
jars=SPARK_JARS,
dag=dag,
)

trigger_dag_task >> extract_data_task >> [
process_song_task, process_artist_task]
trigger_dag_task >> spark_submit_task
23 changes: 4 additions & 19 deletions airflow/dags/ETL_eventsim_DAG.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
import os
from datetime import datetime, timedelta

from dags.plugins.variables import SPARK_JARS

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

# S3 및 Snowflake 설정
# S3 설정
S3_BUCKET = "s3a://de5-s4tify"
# Spark JARs 설정
SPARK_HOME = os.environ.get("SPARK_JAR_DIR")
SPARK_JARS = ",".join(
[
os.path.join(SPARK_HOME, "snowflake-jdbc-3.9.2.jar"),
os.path.join(SPARK_HOME, "hadoop-aws-3.3.4.jar"),
os.path.join(SPARK_HOME, "aws-java-sdk-bundle-1.12.262.jar"),
]
)

default_args = {
"owner": "sanghyeok_boo",
Expand All @@ -41,7 +33,7 @@
# SparkSubmitOperator: Spark에서 S3 데이터를 처리하고 Snowflake에 MERGE
spark_job = SparkSubmitOperator(
task_id="spark_process_s3_upsert",
application="/opt/airflow/dags/scripts/ETL_eventsim_script.py",
application="dags/scripts/ETL_eventsim_script.py",
conn_id="spark_conn",
application_args=[
S3_BUCKET,
Expand All @@ -50,13 +42,6 @@
executor_memory="2g",
driver_memory="1g",
jars=SPARK_JARS,
conf={
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
"spark.hadoop.fs.s3a.access.key": "{{ conn.aws_s3.login }}",
"spark.hadoop.fs.s3a.secret.key": "{{ conn.aws_s3.password }}",
"spark.hadoop.fs.s3a.endpoint": "s3.ap-northeast-2.amazonaws.com",
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
},
dag=dag,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,6 @@

from airflow.exceptions import AirflowFailException

# Spark JARs 설정
# SPARK_HOME 설정
SPARK_HOME = "/opt/spark"
os.environ["SPARK_HOME"] = SPARK_HOME

# JAR 경로 설정
SPARK_JARS_DIR = os.path.join(SPARK_HOME, "jars")
SPARK_JARS_LIST = [
"snowflake-jdbc-3.9.2.jar",
"hadoop-aws-3.3.4.jar",
"aws-java-sdk-bundle-1.12.262.jar",
]
SPARK_JARS = ",".join([os.path.join(SPARK_JARS_DIR, jar)
for jar in SPARK_JARS_LIST])


# Spark Session builder
def spark_session_builder(app_name: str) -> SparkSession:
"""_summary_
spark session builder for AWS S3 and Snowflake
Args:
app_name (str): spark session anme

Returns:
SparkSession
"""
return (
SparkSession.builder.appName(f"{app_name}") .config(
"spark.jars",
SPARK_JARS) .config(
"spark.driver.extraClassPath",
"/opt/spark/jars/snowflake-jdbc-3.9.2.jar") .config(
"spark.executor.extraClassPath",
SPARK_JARS) .getOrCreate())


def execute_snowflake_query(
query: str, snowflake_options: dict, data=None, fetch=False
Expand Down Expand Up @@ -84,7 +49,6 @@ def execute_snowflake_query(

if fetch:
result = cur.fetchall() # 데이터 가져오기
print("result: " + result)
if cur.description: # 컬럼 정보가 존재할 경우에만 DataFrame 생성
df = pd.DataFrame(
result, columns=[
Expand Down
52 changes: 15 additions & 37 deletions airflow/dags/plugins/spark_snowflake_conn.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
import os
from datetime import datetime

import snowflake.connector
from dags.plugins.variables import SPARK_JARS
from pyspark.sql import SparkSession

from airflow.models import Variable

SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER")
SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL")
SNOWFLAKE_DB = "S4TIFY"
SNOWFLAKE_SCHEMA = "RAW_DATA"

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

TODAY = datetime.now().strftime("%Y-%m-%d")

snowflake_options = {
"sfURL": SNOWFLAKE_URL,
"sfDatabase": SNOWFLAKE_DB,
"sfSchema": SNOWFLAKE_SCHEMA,
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ANALYTICS_USERS",
"sfUser": SNOWFLAKE_USER,
"sfPassword": SNOWFLKAE_USER_PWD,
"sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
"sfDatabase": os.getenv("SNOWFLAKE_DB"),
"sfSchema": os.getenv("SNOWFLAKE_SCHEMA"),
"sfWarehouse": os.getenv("SNOWFLAKE_WH"),
"sfRole": os.getenv("SNOWFLAKE_ROLE"),
"sfUser": os.getenv("SNOWFLAKE_USER"),
"sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
}


Expand All @@ -37,32 +30,17 @@ def create_spark_session(app_name: str):
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
.config(
"spark.jars",
"/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar",
)
.config("spark.jars", SPARK_JARS)
.getOrCreate()
)

return spark


def create_snowflake_conn():
conn = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLKAE_USER_PWD,
account=SNOWFLAKE_ACCOUNT,
warehouse="COMPUTE_WH",
database=SNOWFLAKE_DB,
schema=SNOWFLAKE_SCHEMA,
)

return conn


def create_snowflake_table(sql):

conn = create_snowflake_conn()
hook = SnowflakeHook(snowflake_conn_id="SNOWFLAKE_CONN", schema="RAW_DATA")
conn = hook.get_conn()
cur = conn.cursor()

try:
Expand Down
Loading