From 46f8c16cf1201a3c5d759fc63a3b5b97c57f2230 Mon Sep 17 00:00:00 2001 From: sanghyeok_boo Date: Sat, 8 Mar 2025 22:04:34 +0900 Subject: [PATCH 1/2] =?UTF-8?q?aws=5Faccess=20=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/conduktor.yml | 2 +- Kafka/connectors/s3-sink-config.json | 33 ------------- Kafka/eventsim_producer.py | 11 ++--- Kafka/test.py | 25 ++++------ Kafka/utils/connect_utils.py | 47 +++++++++++++++++++ Kafka/utils/schema_utils.py | 6 +-- airflow/__init__.py | 0 airflow/dags/plugins/__init__.py | 0 .../dags/{utils => plugins}/spark_utils.py | 0 .../{spark_jobs => scripts}/eventsim_ETL.py | 5 +- airflow/docker-compose.yaml | 1 - 11 files changed, 66 insertions(+), 64 deletions(-) delete mode 100644 Kafka/connectors/s3-sink-config.json create mode 100644 Kafka/utils/connect_utils.py create mode 100644 airflow/__init__.py create mode 100644 airflow/dags/plugins/__init__.py rename airflow/dags/{utils => plugins}/spark_utils.py (100%) rename airflow/dags/{spark_jobs => scripts}/eventsim_ETL.py (96%) diff --git a/Kafka/conduktor.yml b/Kafka/conduktor.yml index c2a03be..dbd2a63 100644 --- a/Kafka/conduktor.yml +++ b/Kafka/conduktor.yml @@ -16,7 +16,7 @@ services: conduktor-console: image: conduktor/conduktor-console:1.26.0 ports: - - "8080:8080" + - "8085:8085" volumes: - conduktor_data:/var/conduktor environment: diff --git a/Kafka/connectors/s3-sink-config.json b/Kafka/connectors/s3-sink-config.json deleted file mode 100644 index cacfecf..0000000 --- a/Kafka/connectors/s3-sink-config.json +++ /dev/null @@ -1,33 +0,0 @@ -{ - "name": "eventsim-s3-sink-connector", - "config": { - "connector.class": "io.confluent.connect.s3.S3SinkConnector", - "tasks.max": "3", - "topics": "eventsim_music_streaming", - "topic.creation.enable": "true", - "topic.creation.default.partitions": "4", - "topic.creation.default.replication.factor" : "1", - "topic.creation.default.cleanup.policy": "delete", - - "s3.region": "ap-northeast-2", - "s3.bucket.name": "eventsim-log", - "s3.part.size": "5242880", - "flush.size": "1000", - "storage.class": "io.confluent.connect.s3.storage.S3Storage", - - "locale": "ko_KR", - "timezone": "Asia/Seoul", - - "format.class": "io.confluent.connect.s3.format.json.JsonFormat", - "schema.compatibility": "NONE", - "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", - "path.format": "'year'=YYYY/'month'=MM/'day'=dd", - "timestamp.extractor": "RecordField", - "timestamp.field": "ts", - "partition.duration.ms": "86400000", - - "aws.access.key.id": "***", - "aws.secret.access.key": "***" - } -} - diff --git a/Kafka/eventsim_producer.py b/Kafka/eventsim_producer.py index c60d89d..01a7671 100644 --- a/Kafka/eventsim_producer.py +++ b/Kafka/eventsim_producer.py @@ -5,20 +5,19 @@ import time from typing import Final -import avro.schema from kafka import KafkaAdminClient from kafka.admin import NewTopic from kafka.errors import TopicAlreadyExistsError from kafka.producer import KafkaProducer -from Kafka.model.music_streaming import EventLog -from Kafka.utils.docker_utils import get_container_id, is_container_running -from Kafka.utils.schema_utils import register_schema, serialize_avro - # Kafka 패키지가 있는 경로 추가 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_DIR, "..")) +from Kafka.model.music_streaming import EventLog +from Kafka.utils.docker_utils import get_container_id, is_container_running +from Kafka.utils.schema_utils import register_schema + SCHEMA_REGISTRY_URL = "http://localhost:8081" @@ -113,8 +112,6 @@ def main(): create_topic(bootstrap_servers, topic_name, 4) register_schema(SCHEMA_REGISTRY_URL, f"{topic_name}-value", schema_dict) - schema = avro.schema.parse(open(SCHEMA_PATH).read()) - producer = KafkaProducer( bootstrap_servers=bootstrap_servers, client_id="eventsim_music_streaming_producer", diff --git a/Kafka/test.py b/Kafka/test.py index 4c1dc09..393671d 100644 --- a/Kafka/test.py +++ b/Kafka/test.py @@ -1,23 +1,16 @@ -# import base64 - - -# encoded_value = 'T2JqAQQUYXZyby5jb2RlYwhudWxsFmF2cm8uc2NoZW1h8Ap7InR5cGUiOiAicmVjb3JkIiwgIm5hbWUiOiAiTXVzaWNTdHJlYW1pbmdFdmVudCIsICJuYW1lc3BhY2UiOiAiY29tLmV4YW1wbGUiLCAiZmllbGRzIjogW3sibmFtZSI6ICJzb25nIiwgInR5cGUiOiBbIm51bGwiLCAic3RyaW5nIl0sICJkZWZhdWx0IjogbnVsbH0sIHsibmFtZSI6ICJtZXRob2QiLCAidHlwZSI6ICJzdHJpbmcifSwgeyJuYW1lIjogImF1dGgiLCAidHlwZSI6ICJzdHJpbmcifSwgeyJuYW1lIjogImxldmVsIiwgInR5cGUiOiB7InR5cGUiOiAiZW51bSIsICJuYW1lIjogIlVzZXJMZXZlbCIsICJzeW1ib2xzIjogWyJmcmVlIiwgInBhaWQiXX19LCB7Im5hbWUiOiAiYXJ0aXN0IiwgInR5cGUiOiBbIm51bGwiLCAic3RyaW5nIl0sICJkZWZhdWx0IjogbnVsbH0sIHsibmFtZSI6ICJsZW5ndGgiLCAidHlwZSI6IFsibnVsbCIsICJmbG9hdCJdLCAiZGVmYXVsdCI6IG51bGx9LCB7Im5hbWUiOiAibG9jYXRpb24iLCAidHlwZSI6IFsibnVsbCIsICJzdHJpbmciXSwgImRlZmF1bHQiOiBudWxsfSwgeyJuYW1lIjogInNlc3Npb25JZCIsICJ0eXBlIjogImludCJ9LCB7Im5hbWUiOiAicGFnZSIsICJ0eXBlIjogInN0cmluZyJ9LCB7Im5hbWUiOiAidXNlcklkIiwgInR5cGUiOiAic3RyaW5nIn0sIHsibmFtZSI6ICJ0cyIsICJ0eXBlIjogImxvbmcifSwgeyJuYW1lIjogInN0YXR1cyIsICJ0eXBlIjogImludCJ9XX0A+mJ1+f9mVszSkk7Jz218fQLqAQIgQnJpbmcgTWUgVG8gTGlmZQZQVVQSTG9nZ2VkIEluAAIWRXZhbmVzY2VuY2UC7xxtQwJOTWlubmVhcG9saXMtU3QuIFBhdWwtQmxvb21pbmd0b24sIE1OLVdJ/MEGEE5leHRTb25nCDg0NzS0uNijlmWQA/pidfn/ZlbM0pJOyc9tfH0=' - -# decoded_bytes = base64.b64decode(encoded_value) - -# print(f"Decoded first byte (Magic Byte): {decoded_bytes[0]}") - -# ''' -# kafka-console-consumer --bootstrap-server localhost:9092 --topic eventsim_music_streaming --from-beginning --property print.key=true --property print.value=true -# ''' - import os import sys - -from Kafka.utils.docker_utils import register_sink_connector +import subprocess BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_DIR, "..")) +from Kafka.utils.docker_utils import register_sink_connector +from Kafka.utils.connect_utils import create_s3_sink_json + +create_s3_sink_json() +register_sink_connector(f"{BASE_DIR}/connectors/s3_sink_config.json") + -register_sink_connector(f"{BASE_DIR}/connectors/s3-sink-config.json") +producer_script = os.path.join(BASE_DIR, "Kafka/" "eventsim_producer.py") +subprocess.run(["python", producer_script], check=True) \ No newline at end of file diff --git a/Kafka/utils/connect_utils.py b/Kafka/utils/connect_utils.py new file mode 100644 index 0000000..e4983a3 --- /dev/null +++ b/Kafka/utils/connect_utils.py @@ -0,0 +1,47 @@ +import os +import json +from Kafka.variables.aws_variables import aws_variables + +def create_s3_sink_json(): + # 현재 파일(= connect_utils.py) 위치 + base_dir = os.path.dirname(os.path.abspath(__file__)) + + # connectors 폴더 경로 + connectors_dir = os.path.join(base_dir, "..", "connectors") + os.makedirs(connectors_dir, exist_ok=True) # 폴더가 없으면 생성 + + # 최종 파일 경로 + file_path = os.path.join(connectors_dir, "s3_sink_config.json") + + config = { + "name": "eventsim-s3-sink-connector", + "config": { + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "tasks.max": "3", + "topics": "eventsim_music_streaming", + "topic.creation.enable": "true", + "topic.creation.default.partitions": "4", + "topic.creation.default.replication.factor": "1", + "topic.creation.default.cleanup.policy": "delete", + "s3.region": "ap-northeast-2", + "s3.bucket.name": "de5-s4tify", + "s3.part.size": "5242880", + "flush.size": "1000", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "locale": "ko_KR", + "timezone": "Asia/Seoul", + "format.class": "io.confluent.connect.s3.format.json.JsonFormat", + "schema.compatibility": "NONE", + "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", + "path.format": "'year'=YYYY/'month'=MM/'day'=dd", + "timestamp.extractor": "RecordField", + "timestamp.field": "ts", + "partition.duration.ms": "86400000", + "aws.access.key.id": aws_variables.get("aws_access_key_id"), + "aws.secret.access.key": aws_variables.get("aws_secret_access_key") + } + } + + with open(file_path, "w") as f: + json.dump(config, f, indent=4) + diff --git a/Kafka/utils/schema_utils.py b/Kafka/utils/schema_utils.py index 6e7189f..cceeb9e 100644 --- a/Kafka/utils/schema_utils.py +++ b/Kafka/utils/schema_utils.py @@ -2,9 +2,9 @@ import json import requests -from avro.io import BinaryEncoder, DatumWriter - +# from avro.io import BinaryEncoder, DatumWriter +''' # Avro 직렬화 함수 def serialize_avro(data, schema): """Avro 데이터를 직렬화하여 바이너리 포맷으로 변환""" @@ -13,7 +13,7 @@ def serialize_avro(data, schema): encoder = BinaryEncoder(bytes_writer) writer.write(data, encoder) return bytes_writer.getvalue() - +''' # Avro 스키마 등록 (Schema Registry에 POST 요청) def register_schema(SCHEMA_REGISTRY_URL: str, subject: str, schema_dict: dict): diff --git a/airflow/__init__.py b/airflow/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow/dags/plugins/__init__.py b/airflow/dags/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/airflow/dags/utils/spark_utils.py b/airflow/dags/plugins/spark_utils.py similarity index 100% rename from airflow/dags/utils/spark_utils.py rename to airflow/dags/plugins/spark_utils.py diff --git a/airflow/dags/spark_jobs/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py similarity index 96% rename from airflow/dags/spark_jobs/eventsim_ETL.py rename to airflow/dags/scripts/eventsim_ETL.py index 5769aeb..9ab83d4 100644 --- a/airflow/dags/spark_jobs/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -3,9 +3,8 @@ from datetime import datetime from airflow.models import Variable -from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, - StructType) -from utils.spark_utils import execute_snowflake_query, spark_session_builder +from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) +from ..dags.plugins.spark_utils import execute_snowflake_query, spark_session_builder BASE_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..") diff --git a/airflow/docker-compose.yaml b/airflow/docker-compose.yaml index f5c6587..cbfe55b 100644 --- a/airflow/docker-compose.yaml +++ b/airflow/docker-compose.yaml @@ -91,7 +91,6 @@ x-airflow-common: - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/data:/opt/airflow/data - - ./spark_jars:/opt/spark/jars # 모든 Airflow 컨테이너가 동일한 JAR 파일을 공유하도록 설정 user: "${AIRFLOW_UID:-50000}:0" networks: - code-with-yu From 8f73aacf4372f52c64783f621a88a6a5e62a6b7c Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Sat, 8 Mar 2025 13:05:17 +0000 Subject: [PATCH 2/2] Automated format fixes --- CHART/bugs-chart.py-main/Bugs_DAG.py | 2 +- CHART/flo-chart.py-main/Flo_DAG.py | 2 +- CHART/genie-chart.py-main/Genie_DAG.py | 2 +- CHART/melon-chart.py-main/Melon_DAG.py | 2 +- CHART/vibe-chart.py-main/Vibe_DAG.py | 2 +- Kafka/eventsim_producer.py | 8 ++++---- Kafka/test.py | 9 +++++---- Kafka/utils/connect_utils.py | 13 +++++++------ Kafka/utils/schema_utils.py | 2 ++ airflow/dags/Bugs_DAG.py | 6 +++--- airflow/dags/Flo_DAG.py | 6 +++--- airflow/dags/Genie_DAG.py | 6 +++--- airflow/dags/Melon_DAG.py | 4 ++-- airflow/dags/Vibe_DAG.py | 6 +++--- airflow/dags/eventsim_ETL_DAG.py | 3 +-- airflow/dags/get_weekly_top200_songs.py | 6 +++--- airflow/dags/plugins/spark_utils.py | 3 ++- airflow/dags/scripts/eventsim_ETL.py | 8 ++++++-- airflow/dags/scripts/get_access_token.py | 1 + airflow/dags/scripts/load_spotify_data.py | 3 ++- airflow/dags/scripts/request_spotify_api.py | 3 ++- airflow/dags/spotify_data_dag.py | 2 +- 22 files changed, 55 insertions(+), 44 deletions(-) diff --git a/CHART/bugs-chart.py-main/Bugs_DAG.py b/CHART/bugs-chart.py-main/Bugs_DAG.py index 511a81c..4e81847 100644 --- a/CHART/bugs-chart.py-main/Bugs_DAG.py +++ b/CHART/bugs-chart.py-main/Bugs_DAG.py @@ -2,10 +2,10 @@ import json from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from bugs import BugsChartPeriod, BugsChartType, ChartData from airflow import DAG +from airflow.operators.python import PythonOperator # 파일 경로 JSON_PATH = "/opt/airflow/dags/files/bugs_chart.json" diff --git a/CHART/flo-chart.py-main/Flo_DAG.py b/CHART/flo-chart.py-main/Flo_DAG.py index e1162c1..2d2506e 100644 --- a/CHART/flo-chart.py-main/Flo_DAG.py +++ b/CHART/flo-chart.py-main/Flo_DAG.py @@ -2,10 +2,10 @@ import json from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from flo import ChartData # flo.py 모듈 import from airflow import DAG +from airflow.operators.python import PythonOperator # 파일 경로 JSON_PATH = "/opt/airflow/dags/files/flo_chart.json" diff --git a/CHART/genie-chart.py-main/Genie_DAG.py b/CHART/genie-chart.py-main/Genie_DAG.py index 04f00b2..bb763a0 100644 --- a/CHART/genie-chart.py-main/Genie_DAG.py +++ b/CHART/genie-chart.py-main/Genie_DAG.py @@ -2,10 +2,10 @@ import json from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from genie import ChartData, GenieChartPeriod # genie.py 모듈 import from airflow import DAG +from airflow.operators.python import PythonOperator # Genie 차트 데이터를 가져와 JSON으로 저장하는 함수 diff --git a/CHART/melon-chart.py-main/Melon_DAG.py b/CHART/melon-chart.py-main/Melon_DAG.py index 3fc4522..e604c33 100644 --- a/CHART/melon-chart.py-main/Melon_DAG.py +++ b/CHART/melon-chart.py-main/Melon_DAG.py @@ -3,9 +3,9 @@ from datetime import datetime, timedelta import requests -from airflow.operators.python import PythonOperator from airflow import DAG +from airflow.operators.python import PythonOperator # 멜론 차트 API 정보 _APP_VERSION = "6.5.8.1" diff --git a/CHART/vibe-chart.py-main/Vibe_DAG.py b/CHART/vibe-chart.py-main/Vibe_DAG.py index 0cca09b..c9478d0 100644 --- a/CHART/vibe-chart.py-main/Vibe_DAG.py +++ b/CHART/vibe-chart.py-main/Vibe_DAG.py @@ -2,10 +2,10 @@ import json from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from vibe import ChartData # vibe.py 모듈 import from airflow import DAG +from airflow.operators.python import PythonOperator # Vibe 차트 데이터를 가져와 JSON으로 저장하는 함수 diff --git a/Kafka/eventsim_producer.py b/Kafka/eventsim_producer.py index 01a7671..ec65d68 100644 --- a/Kafka/eventsim_producer.py +++ b/Kafka/eventsim_producer.py @@ -10,14 +10,14 @@ from kafka.errors import TopicAlreadyExistsError from kafka.producer import KafkaProducer -# Kafka 패키지가 있는 경로 추가 -BASE_DIR = os.path.dirname(os.path.abspath(__file__)) -sys.path.append(os.path.join(BASE_DIR, "..")) - from Kafka.model.music_streaming import EventLog from Kafka.utils.docker_utils import get_container_id, is_container_running from Kafka.utils.schema_utils import register_schema +# Kafka 패키지가 있는 경로 추가 +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +sys.path.append(os.path.join(BASE_DIR, "..")) + SCHEMA_REGISTRY_URL = "http://localhost:8081" diff --git a/Kafka/test.py b/Kafka/test.py index 393671d..ec3327f 100644 --- a/Kafka/test.py +++ b/Kafka/test.py @@ -1,16 +1,17 @@ import os -import sys import subprocess +import sys + +from Kafka.utils.connect_utils import create_s3_sink_json +from Kafka.utils.docker_utils import register_sink_connector BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.join(BASE_DIR, "..")) -from Kafka.utils.docker_utils import register_sink_connector -from Kafka.utils.connect_utils import create_s3_sink_json create_s3_sink_json() register_sink_connector(f"{BASE_DIR}/connectors/s3_sink_config.json") producer_script = os.path.join(BASE_DIR, "Kafka/" "eventsim_producer.py") -subprocess.run(["python", producer_script], check=True) \ No newline at end of file +subprocess.run(["python", producer_script], check=True) diff --git a/Kafka/utils/connect_utils.py b/Kafka/utils/connect_utils.py index e4983a3..696725f 100644 --- a/Kafka/utils/connect_utils.py +++ b/Kafka/utils/connect_utils.py @@ -1,11 +1,13 @@ -import os import json +import os + from Kafka.variables.aws_variables import aws_variables + def create_s3_sink_json(): # 현재 파일(= connect_utils.py) 위치 base_dir = os.path.dirname(os.path.abspath(__file__)) - + # connectors 폴더 경로 connectors_dir = os.path.join(base_dir, "..", "connectors") os.makedirs(connectors_dir, exist_ok=True) # 폴더가 없으면 생성 @@ -38,10 +40,9 @@ def create_s3_sink_json(): "timestamp.field": "ts", "partition.duration.ms": "86400000", "aws.access.key.id": aws_variables.get("aws_access_key_id"), - "aws.secret.access.key": aws_variables.get("aws_secret_access_key") - } + "aws.secret.access.key": aws_variables.get("aws_secret_access_key"), + }, } - + with open(file_path, "w") as f: json.dump(config, f, indent=4) - diff --git a/Kafka/utils/schema_utils.py b/Kafka/utils/schema_utils.py index cceeb9e..f0aa3dd 100644 --- a/Kafka/utils/schema_utils.py +++ b/Kafka/utils/schema_utils.py @@ -2,6 +2,7 @@ import json import requests + # from avro.io import BinaryEncoder, DatumWriter ''' @@ -15,6 +16,7 @@ def serialize_avro(data, schema): return bytes_writer.getvalue() ''' + # Avro 스키마 등록 (Schema Registry에 POST 요청) def register_schema(SCHEMA_REGISTRY_URL: str, subject: str, schema_dict: dict): url = f"{SCHEMA_REGISTRY_URL}/subjects/{subject}/versions" diff --git a/airflow/dags/Bugs_DAG.py b/airflow/dags/Bugs_DAG.py index 4ae1f33..be12c25 100644 --- a/airflow/dags/Bugs_DAG.py +++ b/airflow/dags/Bugs_DAG.py @@ -3,12 +3,12 @@ from datetime import datetime, timedelta import pandas as pd -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook from bugs import BugsChartPeriod, BugsChartType, ChartData from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook """ your-s3-bucket-name을 실제 S3 버킷명으로 바꾸고, diff --git a/airflow/dags/Flo_DAG.py b/airflow/dags/Flo_DAG.py index fc7a94e..065765c 100644 --- a/airflow/dags/Flo_DAG.py +++ b/airflow/dags/Flo_DAG.py @@ -2,12 +2,12 @@ import json from datetime import datetime, timedelta -from airflow.hooks.base_hook import BaseHook -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook from flo import ChartData # flo.py 모듈 import from airflow import DAG +from airflow.hooks.base_hook import BaseHook +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 파일 경로 및 S3 버킷 정보 TODAY = datetime.now().strftime("%Y%m%d") diff --git a/airflow/dags/Genie_DAG.py b/airflow/dags/Genie_DAG.py index 26ce3b7..5f2c7ea 100644 --- a/airflow/dags/Genie_DAG.py +++ b/airflow/dags/Genie_DAG.py @@ -3,12 +3,12 @@ from datetime import datetime, timedelta import pandas as pd -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook from genie import ChartData, GenieChartPeriod # genie.py 모듈 import from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook # 환경 변수 설정 TODAY = datetime.now().strftime("%Y%m%d") diff --git a/airflow/dags/Melon_DAG.py b/airflow/dags/Melon_DAG.py index bf1dcd1..d754270 100644 --- a/airflow/dags/Melon_DAG.py +++ b/airflow/dags/Melon_DAG.py @@ -5,10 +5,10 @@ import boto3 import requests import snowflake.connector -from airflow.hooks.base_hook import BaseHook -from airflow.operators.python import PythonOperator from airflow import DAG +from airflow.hooks.base_hook import BaseHook +from airflow.operators.python import PythonOperator # 멜론 차트 API 정보 _APP_VERSION = "6.5.8.1" diff --git a/airflow/dags/Vibe_DAG.py b/airflow/dags/Vibe_DAG.py index c7f4c84..e9038df 100644 --- a/airflow/dags/Vibe_DAG.py +++ b/airflow/dags/Vibe_DAG.py @@ -3,12 +3,12 @@ from datetime import datetime, timedelta import snowflake.connector -from airflow.hooks.base_hook import BaseHook -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.hooks.s3 import S3Hook from vibe import ChartData # vibe.py 모듈 import from airflow import DAG +from airflow.hooks.base_hook import BaseHook +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook # 파일 경로 및 S3 버킷 정보 TODAY = datetime.now().strftime("%Y%m%d") diff --git a/airflow/dags/eventsim_ETL_DAG.py b/airflow/dags/eventsim_ETL_DAG.py index 268bf0b..8ffb3ac 100644 --- a/airflow/dags/eventsim_ETL_DAG.py +++ b/airflow/dags/eventsim_ETL_DAG.py @@ -1,12 +1,11 @@ import os from datetime import datetime, timedelta +from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.providers.apache.spark.operators.spark_submit import \ SparkSubmitOperator -from airflow import DAG - # S3 및 Snowflake 설정 S3_BUCKET = "s3a://eventsim-log" # Spark JARs 설정 diff --git a/airflow/dags/get_weekly_top200_songs.py b/airflow/dags/get_weekly_top200_songs.py index 57df993..7258999 100644 --- a/airflow/dags/get_weekly_top200_songs.py +++ b/airflow/dags/get_weekly_top200_songs.py @@ -2,9 +2,6 @@ import time from datetime import datetime, timedelta -from airflow.decorators import task -from airflow.models import Variable -from airflow.providers.amazon.aws.hooks.s3 import S3Hook from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By @@ -12,6 +9,9 @@ from selenium.webdriver.support.ui import WebDriverWait from airflow import DAG +from airflow.decorators import task +from airflow.models import Variable +from airflow.providers.amazon.aws.hooks.s3 import S3Hook @task diff --git a/airflow/dags/plugins/spark_utils.py b/airflow/dags/plugins/spark_utils.py index d78459c..6c1a461 100644 --- a/airflow/dags/plugins/spark_utils.py +++ b/airflow/dags/plugins/spark_utils.py @@ -1,9 +1,10 @@ import os import snowflake.connector -from airflow.models import connection from pyspark.sql import SparkSession +from airflow.models import connection + # Spark JARs 설정 SPARK_HOME = "/opt/spark/" SPARK_JARS = ",".join( diff --git a/airflow/dags/scripts/eventsim_ETL.py b/airflow/dags/scripts/eventsim_ETL.py index 9ab83d4..255181f 100644 --- a/airflow/dags/scripts/eventsim_ETL.py +++ b/airflow/dags/scripts/eventsim_ETL.py @@ -2,9 +2,13 @@ import sys from datetime import datetime +from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, + StructType) + from airflow.models import Variable -from pyspark.sql.types import (IntegerType, LongType, StringType, StructField, StructType) -from ..dags.plugins.spark_utils import execute_snowflake_query, spark_session_builder + +from ..dags.plugins.spark_utils import (execute_snowflake_query, + spark_session_builder) BASE_DIR = os.path.abspath( os.path.join(os.path.dirname(__file__), "..") diff --git a/airflow/dags/scripts/get_access_token.py b/airflow/dags/scripts/get_access_token.py index d15b834..a23958e 100644 --- a/airflow/dags/scripts/get_access_token.py +++ b/airflow/dags/scripts/get_access_token.py @@ -3,6 +3,7 @@ import os import requests + from airflow.models import Variable SPOTIFY_CLIENT_ID = Variable.get("SPOTIFY_CLIENT_ID") diff --git a/airflow/dags/scripts/load_spotify_data.py b/airflow/dags/scripts/load_spotify_data.py index 202f520..56a4a8e 100644 --- a/airflow/dags/scripts/load_spotify_data.py +++ b/airflow/dags/scripts/load_spotify_data.py @@ -2,9 +2,10 @@ import boto3 import pandas as pd -from airflow.models import Variable from scripts.request_spotify_api import * +from airflow.models import Variable + TODAY = datetime.now().strftime("%Y-%m-%d") AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID") diff --git a/airflow/dags/scripts/request_spotify_api.py b/airflow/dags/scripts/request_spotify_api.py index d2e2c03..06f3886 100644 --- a/airflow/dags/scripts/request_spotify_api.py +++ b/airflow/dags/scripts/request_spotify_api.py @@ -5,9 +5,10 @@ import pandas as pd import requests -from airflow.models import Variable from scripts.get_access_token import get_token +from airflow.models import Variable + TODAY = datetime.now().strftime("%Y-%m-%d") END_POINT = "https://api.spotify.com/v1" diff --git a/airflow/dags/spotify_data_dag.py b/airflow/dags/spotify_data_dag.py index b2be154..3189adb 100644 --- a/airflow/dags/spotify_data_dag.py +++ b/airflow/dags/spotify_data_dag.py @@ -1,11 +1,11 @@ from datetime import datetime, timedelta -from airflow.operators.python import PythonOperator from scripts.crawling_spotify_data import * from scripts.load_spotify_data import * from scripts.request_spotify_api import * from airflow import DAG +from airflow.operators.python import PythonOperator default_args = { "depends_on_past": False,