Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_globalTop50_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, split, regexp_replace, explode
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType
from airflow.models import Variable

import requests
import snowflake.connector
from datetime import datetime

SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER")
SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL")
SNOWFLAKE_DB = 'S4TIFY'
SNOWFLAKE_SCHEMA = 'RAW_DATA'

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = 'de5-s4tify'
OBJECT_NAME = 'raw_data'

TODAY = datetime.now().strftime("%Y-%m-%d")

snowflake_options = {
"sfURL": SNOWFLAKE_URL,
"sfDatabase": SNOWFLAKE_DB,
"sfSchema": SNOWFLAKE_SCHEMA,
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ANALYTICS_USERS",
"sfUser": SNOWFLAKE_USER,
"sfPassword": SNOWFLKAE_USER_PWD
}

def create_spark_session():
spark = SparkSession.builder \
.appName("s3 read test") \
.master("local[*]") \
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \
.getOrCreate()

return spark


def load():
# Snowflake 翱唸
conn = snowflake.connector.connect(
user = SNOWFLAKE_USER,
password = SNOWFLKAE_USER_PWD,
account = SNOWFLAKE_ACCOUNT,
warehouse = "COMPUTE_WH",
database = SNOWFLAKE_DB ,
schema = SNOWFLAKE_SCHEMA
)

cur = conn.cursor()

#纔檜綰 氈朝雖 檣朝 sql
try:
cur.execute("BEGIN");

sql = """
CREATE TABLE IF NOT EXISTS artist_info_globalTop50(
artist_id VARCHAR(100),
title VARCHAR(100),
rank INT,
artist VARCHAR(100),
artist_name VARCHAR(100),
artist_genre ARRAY
)
"""
cur.execute(sql)
cur.execute("COMMIT");
conn.commit()

except Exception as e:
print(f"error:{e}")
cur.execute("ROLLBACK");

transform_df = transformation()
transform_df.show()

transform_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "artist_info_globalTop50") \
.mode("append") \
.save()

def transformation():

artist_info_schema = StructType([
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True)
])

global_top50_schema = StructType([
StructField("rank", IntegerType(), True),
StructField("title", StringType(), True),
StructField("artist", StringType(), True ),
StructField("artist_id", StringType(), True)
])

#等檜攪 檗堅 醞犒 薯剪
artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id'])
global_top50_df = extract("spotify_crawling_data", global_top50_schema)

global_top50_df = global_top50_df.withColumn("artist_id", explode("artist_id"))

artist_info_top50_df = global_top50_df.join(artist_info_df, on='artist_id', how='outer')

artist_info_top50_df.show()

return artist_info_top50_df


def extract(file_name, schema):

spark = create_spark_session()

df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema)

if file_name == 'spotify_crawling_data':
df = (
df.withColumn("artist", split(regexp_replace(df["artist"], r"[\[\]']", ""), ", "))
.withColumn("artist_id", split(regexp_replace(df["artist_id"], r"[\[\]']", ""), ", "))
)
if file_name == 'spotify_artist_info':
df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 碳蹂 僥濠 薯剪
df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 蔑 晦遽戲煎 寡翮 滲
df = df.withColumnRenamed("artist", "artist_name")

df.show()

return df
150 changes: 150 additions & 0 deletions airflow/dags/scripts/ELT_artist_info_top10_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, split, regexp_replace
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, ArrayType
from airflow.models import Variable

import requests
import snowflake.connector
from datetime import datetime

SNOWFLAKE_USER = Variable.get("SNOWFLAKE_USER")
SNOWFLKAE_USER_PWD = Variable.get("SNOWFLAKE_PASSWORD")
SNOWFLAKE_ACCOUNT = Variable.get("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_URL = Variable.get("SNOWFLAKE_URL")
SNOWFLAKE_DB = 'S4TIFY'
SNOWFLAKE_SCHEMA = 'RAW_DATA'

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = Variable.get("AWS_SECRET_ACCESS_KEY")
LAST_FM_API_KEY = Variable.get("LAST_FM_API_KEY")

BUCKET_NAME = 'de5-s4tify'
OBJECT_NAME = 'raw_data'

TODAY = datetime.now().strftime("%Y-%m-%d")

snowflake_options = {
"sfURL": SNOWFLAKE_URL,
"sfDatabase": SNOWFLAKE_DB,
"sfSchema": SNOWFLAKE_SCHEMA,
"sfWarehouse": "COMPUTE_WH",
"sfRole": "ANALYTICS_USERS",
"sfUser": SNOWFLAKE_USER,
"sfPassword": SNOWFLKAE_USER_PWD
}

def create_spark_session():
spark = SparkSession.builder \
.appName("s3 read test") \
.master("local[*]") \
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) \
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.config("spark.jars", "/path/to/spark-snowflake_2.12-2.12.0-spark_3.4.jar,/path/to/snowflake-jdbc-3.13.33.jar") \
.getOrCreate()

return spark

def load():

conn = snowflake.connector.connect(
user = SNOWFLAKE_USER,
password = SNOWFLKAE_USER_PWD,
account = SNOWFLAKE_ACCOUNT,
warehouse = "COMPUTE_WH",
database = SNOWFLAKE_DB ,
schema = SNOWFLAKE_SCHEMA
)

cur = conn.cursor()

try:
cur.execute("BEGIN");

sql = """
CREATE TABLE IF NOT EXISTS artist_info_top10(
artist_id VARCHAR(100),
artist VARCHAR(100),
artist_genre ARRAY,
album VARCHAR(100),
song_id VARCHAR(100),
title VARCHAR(100),
song_genre ARRAY
)
"""
cur.execute(sql)
cur.execute("COMMIT");
conn.commit()

except Exception as e:
print(f"error:{e}")
cur.execute("ROLLBACK");

transform_df = transformation()
transform_df.show()

transform_df.write \
.format("snowflake") \
.options(**snowflake_options) \
.option("dbtable", "artist_info_top10") \
.mode("append") \
.save()

def transformation():

#蝶酈葆 薑曖
artist_info_schema = StructType([
StructField("artist", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("artist_genre", StringType(), True)
])

artist_top10_schema = StructType([
StructField("album", StringType(), True),
StructField("artist_id", StringType(), True),
StructField("song_id", StringType(), True),
StructField("title", StringType(), True),
])

#等檜攪 檗橫螃堅 醞犒 薯剪
artist_top10_df = extract("spotify_artist_top10", artist_top10_schema).dropDuplicates(['song_id'])
artist_info_df = extract("spotify_artist_info", artist_info_schema).dropDuplicates(['artist_id'])

artist_info_top10_df = artist_info_df.join(artist_top10_df, on='artist_id', how='outer')

#喻楚 濰腦 等檜攪 蹺陛
artist_info_top10_df = artist_info_top10_df.withColumn("song_genre", add_song_genre_udf(col("artist"), col("title")))

return artist_info_top10_df


def add_song_genre(artist, track):

url = f"https://ws.audioscrobbler.com/2.0/?method=track.getInfo&api_key={LAST_FM_API_KEY}&artist={artist}&track={track}&format=json"

try:
response = requests.get(url).json()
return [genre['name'] for genre in response.get('track', {}).get('toptags', {}).get('tag', [])]
except requests.exceptions.RequestException as e:
print(f"API 蹂羶 螃盟: {e}")
return ["API Error"]
except KeyError:
return ["Unknown"]

add_song_genre_udf = udf(add_song_genre, ArrayType(StringType()))

def extract(file_name, schema):

spark = create_spark_session()
df = spark.read.csv(f"s3a://{BUCKET_NAME}/{OBJECT_NAME}/{file_name}_{TODAY}.csv", header=True, schema=schema)

if file_name == 'spotify_artist_info':
df = df.withColumn("artist_genre", regexp_replace(df["artist_genre"], "[\\[\\]']", "")) # 碳蹂 僥濠 薯剪
df = df.withColumn("artist_genre", split(df["artist_genre"], ", ")) # 蔑 晦遽戲煎 寡翮 滲

return df


if __name__ == "__main__":
load()

21 changes: 13 additions & 8 deletions airflow/dags/scripts/crawling_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from scripts.load_spotify_data import *

TODAY = datetime.now().strftime("%Y-%m-%d")

Expand All @@ -23,11 +24,14 @@ def make_dataframe():


# �禺﹞諤� �域�圉未 csv諢� �€��
def save_as_csv_file(df):
def save_as_csv_file(df, logical_date):

file_path = f"data/spotify_crawling_data_{logical_date}.csv"

file_path = f"data/spotify_crawling_data_{TODAY}.csv"
df.index.name = 'rank'
df.to_csv(file_path, encoding='utf-8', mode='w', header=True, index=True)
load_s3_bucket(f'spotify_crawling_data_{logical_date}.csv')

df.to_csv(file_path, encoding="utf-8", mode="w", header=True, index=False)


def data_crawling():
Expand All @@ -51,15 +55,16 @@ def data_crawling():
print("�禺﹞諤� ��")

driver.get(url)
driver.implicitly_wait(20)
driver.implicitly_wait(200)

try:
scroll_element = driver.find_element(By.XPATH, '//*[@id="main"]/div/div[2]/div[5]/div/div[2]/div[2]/div/main/section/div[2]/div[3]/div/div[1]/div/div[2]/div[2]')
driver.execute_script("""
arguments[0].scrollIntoView({behavior: 'smooth', block: 'end'});
""", scroll_element)
# top50 謔科�� 穈€�賄篣�
driver.implicitly_wait(100)
driver.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")
# �鴔€ 諢 �€篣�
time.sleep(2)

song_lists = driver.find_elements(
By.XPATH, '//*[@id="main"]//div[@role="row"]'
)
Expand Down
14 changes: 12 additions & 2 deletions airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ def get_arti_top10(logical_date, **kwargs):
except Exception as e:
print(f"error:{top_10_info}")

# task_instance.xcom_push(key='artist_top10', value=arti_top10_list)
artist_top10_df = pd.DataFrame(arti_top10_list)
artist_top10_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
load_s3_bucket(object_name)

try:
load_s3_bucket(object_name)
os.remove(f'data/{object_name}')
except Exception as e:
print(f'error: {e}')


# ��欠 �陷 穈€�賄篣�
Expand Down Expand Up @@ -98,13 +103,18 @@ def get_artist_info(logical_date, **kwargs):
time.sleep(20)
print(f"error:{artist_info}")

# task_instance.xcom_push(key="artist_info", value=artist_info_list)
artist_info_df = pd.DataFrame(artist_info_list)
artist_info_df.to_csv(
f"data/{object_name}",
encoding="utf-8-sig",
index=False)
load_s3_bucket(object_name)

try:
load_s3_bucket(object_name)
os.remove(f'data/{object_name}')
except Exception as e:
print(f'error: {e}')


# �禺﹞諤� �域�� �趣�月� �到�
Expand Down
Loading
Loading