Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHART/bugs-chart.py-main/Bugs_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json
from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from bugs import BugsChartPeriod, BugsChartType, ChartData

from airflow import DAG
from airflow.operators.python import PythonOperator

# 파일 경로
JSON_PATH = "/opt/airflow/dags/files/bugs_chart.json"
Expand Down
2 changes: 1 addition & 1 deletion CHART/flo-chart.py-main/Flo_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json
from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from flo import ChartData # flo.py 모듈 import

from airflow import DAG
from airflow.operators.python import PythonOperator

# 파일 경로
JSON_PATH = "/opt/airflow/dags/files/flo_chart.json"
Expand Down
2 changes: 1 addition & 1 deletion CHART/genie-chart.py-main/Genie_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json
from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from genie import ChartData, GenieChartPeriod # genie.py 모듈 import

from airflow import DAG
from airflow.operators.python import PythonOperator


# Genie 차트 데이터를 가져와 JSON으로 저장하는 함수
Expand Down
2 changes: 1 addition & 1 deletion CHART/melon-chart.py-main/Melon_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from datetime import datetime, timedelta

import requests
from airflow.operators.python import PythonOperator

from airflow import DAG
from airflow.operators.python import PythonOperator

# 멜론 차트 API 정보
_APP_VERSION = "6.5.8.1"
Expand Down
2 changes: 1 addition & 1 deletion CHART/vibe-chart.py-main/Vibe_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json
from datetime import datetime, timedelta

from airflow.operators.python import PythonOperator
from vibe import ChartData # vibe.py 모듈 import

from airflow import DAG
from airflow.operators.python import PythonOperator


# Vibe 차트 데이터를 가져와 JSON으로 저장하는 함수
Expand Down
2 changes: 1 addition & 1 deletion Kafka/conduktor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
conduktor-console:
image: conduktor/conduktor-console:1.26.0
ports:
- "8080:8080"
- "8085:8085"
volumes:
- conduktor_data:/var/conduktor
environment:
Expand Down
33 changes: 0 additions & 33 deletions Kafka/connectors/s3-sink-config.json

This file was deleted.

5 changes: 1 addition & 4 deletions Kafka/eventsim_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
import time
from typing import Final

import avro.schema
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka.producer import KafkaProducer

from Kafka.model.music_streaming import EventLog
from Kafka.utils.docker_utils import get_container_id, is_container_running
from Kafka.utils.schema_utils import register_schema, serialize_avro
from Kafka.utils.schema_utils import register_schema

# Kafka 패키지가 있는 경로 추가
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -113,8 +112,6 @@ def main():
create_topic(bootstrap_servers, topic_name, 4)
register_schema(SCHEMA_REGISTRY_URL, f"{topic_name}-value", schema_dict)

schema = avro.schema.parse(open(SCHEMA_PATH).read())

producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="eventsim_music_streaming_producer",
Expand Down
22 changes: 8 additions & 14 deletions Kafka/test.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
# import base64


# encoded_value = 'T2JqAQQUYXZyby5jb2RlYwhudWxsFmF2cm8uc2NoZW1h8Ap7InR5cGUiOiAicmVjb3JkIiwgIm5hbWUiOiAiTXVzaWNTdHJlYW1pbmdFdmVudCIsICJuYW1lc3BhY2UiOiAiY29tLmV4YW1wbGUiLCAiZmllbGRzIjogW3sibmFtZSI6ICJzb25nIiwgInR5cGUiOiBbIm51bGwiLCAic3RyaW5nIl0sICJkZWZhdWx0IjogbnVsbH0sIHsibmFtZSI6ICJtZXRob2QiLCAidHlwZSI6ICJzdHJpbmcifSwgeyJuYW1lIjogImF1dGgiLCAidHlwZSI6ICJzdHJpbmcifSwgeyJuYW1lIjogImxldmVsIiwgInR5cGUiOiB7InR5cGUiOiAiZW51bSIsICJuYW1lIjogIlVzZXJMZXZlbCIsICJzeW1ib2xzIjogWyJmcmVlIiwgInBhaWQiXX19LCB7Im5hbWUiOiAiYXJ0aXN0IiwgInR5cGUiOiBbIm51bGwiLCAic3RyaW5nIl0sICJkZWZhdWx0IjogbnVsbH0sIHsibmFtZSI6ICJsZW5ndGgiLCAidHlwZSI6IFsibnVsbCIsICJmbG9hdCJdLCAiZGVmYXVsdCI6IG51bGx9LCB7Im5hbWUiOiAibG9jYXRpb24iLCAidHlwZSI6IFsibnVsbCIsICJzdHJpbmciXSwgImRlZmF1bHQiOiBudWxsfSwgeyJuYW1lIjogInNlc3Npb25JZCIsICJ0eXBlIjogImludCJ9LCB7Im5hbWUiOiAicGFnZSIsICJ0eXBlIjogInN0cmluZyJ9LCB7Im5hbWUiOiAidXNlcklkIiwgInR5cGUiOiAic3RyaW5nIn0sIHsibmFtZSI6ICJ0cyIsICJ0eXBlIjogImxvbmcifSwgeyJuYW1lIjogInN0YXR1cyIsICJ0eXBlIjogImludCJ9XX0A+mJ1+f9mVszSkk7Jz218fQLqAQIgQnJpbmcgTWUgVG8gTGlmZQZQVVQSTG9nZ2VkIEluAAIWRXZhbmVzY2VuY2UC7xxtQwJOTWlubmVhcG9saXMtU3QuIFBhdWwtQmxvb21pbmd0b24sIE1OLVdJ/MEGEE5leHRTb25nCDg0NzS0uNijlmWQA/pidfn/ZlbM0pJOyc9tfH0='

# decoded_bytes = base64.b64decode(encoded_value)

# print(f"Decoded first byte (Magic Byte): {decoded_bytes[0]}")

# '''
# kafka-console-consumer --bootstrap-server localhost:9092 --topic eventsim_music_streaming --from-beginning --property print.key=true --property print.value=true
# '''

import os
import subprocess
import sys

from Kafka.utils.connect_utils import create_s3_sink_json
from Kafka.utils.docker_utils import register_sink_connector

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(os.path.join(BASE_DIR, ".."))


register_sink_connector(f"{BASE_DIR}/connectors/s3-sink-config.json")
create_s3_sink_json()
register_sink_connector(f"{BASE_DIR}/connectors/s3_sink_config.json")


producer_script = os.path.join(BASE_DIR, "Kafka/" "eventsim_producer.py")
subprocess.run(["python", producer_script], check=True)
48 changes: 48 additions & 0 deletions Kafka/utils/connect_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
import os

from Kafka.variables.aws_variables import aws_variables


def create_s3_sink_json():
# 현재 파일(= connect_utils.py) 위치
base_dir = os.path.dirname(os.path.abspath(__file__))

# connectors 폴더 경로
connectors_dir = os.path.join(base_dir, "..", "connectors")
os.makedirs(connectors_dir, exist_ok=True) # 폴더가 없으면 생성

# 최종 파일 경로
file_path = os.path.join(connectors_dir, "s3_sink_config.json")

config = {
"name": "eventsim-s3-sink-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "eventsim_music_streaming",
"topic.creation.enable": "true",
"topic.creation.default.partitions": "4",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.cleanup.policy": "delete",
"s3.region": "ap-northeast-2",
"s3.bucket.name": "de5-s4tify",
"s3.part.size": "5242880",
"flush.size": "1000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"locale": "ko_KR",
"timezone": "Asia/Seoul",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd",
"timestamp.extractor": "RecordField",
"timestamp.field": "ts",
"partition.duration.ms": "86400000",
"aws.access.key.id": aws_variables.get("aws_access_key_id"),
"aws.secret.access.key": aws_variables.get("aws_secret_access_key"),
},
}

with open(file_path, "w") as f:
json.dump(config, f, indent=4)
4 changes: 3 additions & 1 deletion Kafka/utils/schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import json

import requests
from avro.io import BinaryEncoder, DatumWriter

# from avro.io import BinaryEncoder, DatumWriter

'''
# Avro 직렬화 함수
def serialize_avro(data, schema):
"""Avro 데이터를 직렬화하여 바이너리 포맷으로 변환"""
Expand All @@ -13,6 +14,7 @@ def serialize_avro(data, schema):
encoder = BinaryEncoder(bytes_writer)
writer.write(data, encoder)
return bytes_writer.getvalue()
'''


# Avro 스키마 등록 (Schema Registry에 POST 요청)
Expand Down
Empty file added airflow/__init__.py
Empty file.
6 changes: 3 additions & 3 deletions airflow/dags/Bugs_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from datetime import datetime, timedelta

import pandas as pd
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from bugs import BugsChartPeriod, BugsChartType, ChartData

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

"""
your-s3-bucket-name을 실제 S3 버킷명으로 바꾸고,
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/Flo_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import json
from datetime import datetime, timedelta

from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from flo import ChartData # flo.py 모듈 import

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 파일 경로 및 S3 버킷 정보
TODAY = datetime.now().strftime("%Y%m%d")
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/Genie_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from datetime import datetime, timedelta

import pandas as pd
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from genie import ChartData, GenieChartPeriod # genie.py 모듈 import

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

# 환경 변수 설정
TODAY = datetime.now().strftime("%Y%m%d")
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/Melon_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import boto3
import requests
import snowflake.connector
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator

# 멜론 차트 API 정보
_APP_VERSION = "6.5.8.1"
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/Vibe_DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
from datetime import datetime, timedelta

import snowflake.connector
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from vibe import ChartData # vibe.py 모듈 import

from airflow import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

# 파일 경로 및 S3 버킷 정보
TODAY = datetime.now().strftime("%Y%m%d")
Expand Down
3 changes: 1 addition & 2 deletions airflow/dags/eventsim_ETL_DAG.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.apache.spark.operators.spark_submit import \
SparkSubmitOperator

from airflow import DAG

# S3 및 Snowflake 설정
S3_BUCKET = "s3a://eventsim-log"
# Spark JARs 설정
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/get_weekly_top200_songs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
import time
from datetime import datetime, timedelta

from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

from airflow import DAG
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


@task
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os

import snowflake.connector
from airflow.models import connection
from pyspark.sql import SparkSession

from airflow.models import connection

# Spark JARs 설정
SPARK_HOME = "/opt/spark/"
SPARK_JARS = ",".join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
import sys
from datetime import datetime

from airflow.models import Variable
from pyspark.sql.types import (IntegerType, LongType, StringType, StructField,
StructType)
from utils.spark_utils import execute_snowflake_query, spark_session_builder

from airflow.models import Variable

from ..dags.plugins.spark_utils import (execute_snowflake_query,
spark_session_builder)

BASE_DIR = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")
Expand Down
1 change: 1 addition & 0 deletions airflow/dags/scripts/get_access_token.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os

import requests

from airflow.models import Variable

SPOTIFY_CLIENT_ID = Variable.get("SPOTIFY_CLIENT_ID")
Expand Down
3 changes: 2 additions & 1 deletion airflow/dags/scripts/load_spotify_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

import boto3
import pandas as pd
from airflow.models import Variable
from scripts.request_spotify_api import *

from airflow.models import Variable

TODAY = datetime.now().strftime("%Y-%m-%d")

AWS_ACCESS_KEY_ID = Variable.get("AWS_ACCESS_KEY_ID")
Expand Down
3 changes: 2 additions & 1 deletion airflow/dags/scripts/request_spotify_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

import pandas as pd
import requests
from airflow.models import Variable
from scripts.get_access_token import get_token

from airflow.models import Variable

TODAY = datetime.now().strftime("%Y-%m-%d")
END_POINT = "https://api.spotify.com/v1"

Expand Down
Loading