Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Security improvements! #84

Merged
merged 13 commits into from
May 22, 2024
16 changes: 3 additions & 13 deletions analyzer_init.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
from typing import Any

from discord_analyzer import RnDaoAnalyzer
from utils.daolytics_uitls import (
get_mongo_credentials,
get_neo4j_credentials,
get_saga_db_location,
)
from utils.credentials import get_mongo_credentials, get_neo4j_credentials


class AnalyzerInit:
Expand All @@ -17,23 +13,17 @@ class AnalyzerInit:
def __init__(self, guild_id: str) -> None:
self.guild_id = guild_id

def get_analyzer(self) -> tuple[RnDaoAnalyzer, dict[str, Any]]:
def get_analyzer(self) -> RnDaoAnalyzer:
"""
Returns:
---------
analyzer : RnDaoAnalyzer
mongo_creds : dict[str, Any]
"""
analyzer = RnDaoAnalyzer(self.guild_id)

# credentials
mongo_creds = get_mongo_credentials()
neo4j_creds = get_neo4j_credentials()
saga_mongo_location = get_saga_db_location()

mongo_creds["db_name"] = saga_mongo_location["db_name"]
mongo_creds["collection_name"] = saga_mongo_location["collection_name"]
mongo_creds["connection_str"] = self._get_mongo_connection(mongo_creds)

analyzer.set_mongo_database_info(
mongo_db_host=mongo_creds["host"],
Expand All @@ -45,7 +35,7 @@ def get_analyzer(self) -> tuple[RnDaoAnalyzer, dict[str, Any]]:
analyzer.database_connect()
analyzer.setup_neo4j_metrics()

return analyzer, mongo_creds
return analyzer

def _get_mongo_connection(self, mongo_creds: dict[str, Any]):
user = mongo_creds["user"]
Expand Down
9 changes: 4 additions & 5 deletions automation/utils/automation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@
from typing import Any
from uuid import uuid1

from utils.get_mongo_client import MongoSingleton
from utils.get_rabbitmq import prepare_rabbit_mq
from utils.mongo import MongoSingleton
from utils.rabbitmq import RabbitMQSingleton


class AutomationBase:
def __init__(self) -> None:
"""
utilities for automation workflow
"""
mongo_singleton = MongoSingleton.get_instance()
self.mongo_client = mongo_singleton.get_client()
self.rabbitmq = prepare_rabbit_mq()
self.mongo_client = MongoSingleton.get_instance().get_client()
self.rabbitmq = RabbitMQSingleton.get_instance().get_client()

def _get_users_from_guildmembers(
self, guild_id: str, user_ids: list[str], strategy: str = "ngu"
Expand Down
2 changes: 1 addition & 1 deletion automation/utils/model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from utils.get_automation_env import get_automations_env
from utils.get_mongo_client import MongoSingleton
from utils.mongo import MongoSingleton

from .interfaces import Automation

Expand Down
2 changes: 1 addition & 1 deletion discord_analyzer/analyzer/utils/guild.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from utils.get_mongo_client import MongoSingleton
from utils.mongo import MongoSingleton


class Guild:
Expand Down
72 changes: 24 additions & 48 deletions discord_utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import logging
from typing import Any

from analyzer_init import AnalyzerInit
from automation.automation_workflow import AutomationWorkflow
from tc_messageBroker.rabbit_mq.saga.saga_base import get_saga
from utils.daolytics_uitls import get_mongo_credentials, get_saga_db_location
from utils.credentials import get_mongo_credentials
from utils.get_guild_utils import get_guild_community_ids
from utils.get_rabbitmq import prepare_rabbit_mq
from utils.rabbitmq import RabbitMQSingleton
from utils.transactions_ordering import sort_transactions


def analyzer_recompute(sagaId: str, rabbit_creds: dict[str, Any]):
mongo_creds = get_mongo_credentials()
saga_mongo_location = get_saga_db_location()

saga = get_saga_instance(
sagaId=sagaId,
connection=mongo_creds["connection_str"],
saga_db=saga_mongo_location["db_name"],
saga_collection=saga_mongo_location["collection_name"],
)
def analyzer_recompute(sagaId: str):
saga = get_saga_instance(sagaId=sagaId)
if saga is None:
logging.warn(
f"Warn: Saga not found!, stopping the recompute for sagaId: {sagaId}"
Expand All @@ -30,7 +21,7 @@ def analyzer_recompute(sagaId: str, rabbit_creds: dict[str, Any]):

logging.info("Initializing the analyzer")
analyzer_init = AnalyzerInit(guildId)
analyzer, mongo_creds = analyzer_init.get_analyzer()
analyzer = analyzer_init.get_analyzer()
logging.info("Analyzer initialized")

def recompute_wrapper(**kwargs):
Expand All @@ -44,30 +35,21 @@ def publish_wrapper(**kwargs):
saga.next(
publish_method=publish_wrapper,
call_function=recompute_wrapper,
mongo_creds=mongo_creds,
)

return rabbit_creds, sagaId, mongo_creds
return sagaId


def analyzer_run_once(sagaId: str, rabbit_creds: dict[str, Any]):
mongo_creds = get_mongo_credentials()
saga_mongo_location = get_saga_db_location()

saga = get_saga_instance(
sagaId=sagaId,
connection=mongo_creds["connection_str"],
saga_db=saga_mongo_location["db_name"],
saga_collection=saga_mongo_location["collection_name"],
)
def analyzer_run_once(sagaId: str):
saga = get_saga_instance(sagaId=sagaId)
if saga is None:
logging.warn(f"Saga not found!, stopping the run_once for sagaId: {sagaId}")
else:
platform_id = saga.data["platformId"]
guildId = get_guild_community_ids(platform_id)

analyzer_init = AnalyzerInit(guildId)
analyzer, mongo_creds = analyzer_init.get_analyzer()
analyzer = analyzer_init.get_analyzer()

def run_once_wrapper(**kwargs):
analyzer.run_once()
Expand All @@ -78,38 +60,32 @@ def publish_wrapper(**kwargs):
saga.next(
publish_method=publish_wrapper,
call_function=run_once_wrapper,
mongo_creds=mongo_creds,
)
return rabbit_creds, sagaId, mongo_creds
return sagaId


def get_saga_instance(sagaId: str):
mongo_creds = get_mongo_credentials()

def get_saga_instance(sagaId: str, connection: str, saga_db: str, saga_collection: str):
saga = get_saga(
sagaId=sagaId,
connection_url=connection,
db_name=saga_db,
collection=saga_collection,
connection_url=mongo_creds["connection_str"],
db_name="Saga",
collection="sagas",
)
if saga is None:
raise ValueError(f"Saga with sagaId: {sagaId} not found!")

return saga


def publish_on_success(connection, result, *args, **kwargs):
# we must get these three things
try:
# rabbitmq creds
# TODO: remove sending it in future
_ = args[0][0]
sagaId = args[0][1]
mongo_creds = args[0][2]
logging.info(f"SAGAID: {sagaId}: ON_SUCCESS callback! ")

saga = get_saga_instance(
sagaId=sagaId,
connection=mongo_creds["connection_str"],
saga_db=mongo_creds["db_name"],
saga_collection=mongo_creds["collection_name"],
)
rabbitmq = prepare_rabbit_mq()
sagaId = args[0]
logging.info(f"SAGAID: {sagaId}: ON_SUCCESS callback!")

saga = get_saga_instance(sagaId=sagaId)
rabbitmq = RabbitMQSingleton.get_instance().get_client()

transactions = saga.choreography.transactions

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ services:
condition: service_healthy
mongo:
image: "mongo:6.0.8"
attach: false
environment:
- MONGO_INITDB_ROOT_USERNAME=root
- MONGO_INITDB_ROOT_PASSWORD=pass
Expand All @@ -53,6 +54,7 @@ services:
start_period: 40s
neo4j:
image: "neo4j:5.9.0"
attach: false
environment:
- NEO4J_AUTH=neo4j/password
- NEO4J_PLUGINS=["apoc", "graph-data-science"]
Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pymongo==4.3.3
pytest==7.2.0
python-dateutil==2.8.2
pytz==2022.7.1
python-dotenv==0.21.1
python-dotenv>=1.0.0
six==1.16.0
tomli==2.0.1
networkx==3.1
Expand All @@ -18,11 +18,11 @@ pytest-cov==4.0.0
coverage==7.2.5
python-dateutil==2.8.2
tqdm
tc-messageBroker==1.4.0
tc-messageBroker==1.6.7
sentry-sdk
rq
redis
tc-core-analyzer-lib==1.3.0
tc-neo4j-lib==1.0.0
tc-core-analyzer-lib==1.3.1
tc-neo4j-lib==1.0.2
pybars3
backoff==2.2.1
49 changes: 10 additions & 39 deletions server.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
"""
start the project using rabbitMQ
"""

import functools
import logging
from typing import Any

import backoff
from discord_utils import analyzer_recompute, analyzer_run_once, publish_on_success
from pika.exceptions import AMQPConnectionError, ConnectionClosedByBroker
from redis import Redis
from rq import Queue as RQ_Queue
from tc_messageBroker.message_broker import RabbitMQ
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.daolytics_uitls import (
get_rabbit_mq_credentials,
get_redis_credentials,
get_sentryio_service_creds,
)
from utils.rabbitmq import RabbitMQSingleton
from utils.redis import RedisSingleton
from utils.sentryio_service import set_up_sentryio


Expand All @@ -28,35 +24,16 @@
max_time=60 * 60 * 3,
)
def analyzer():
rabbit_mq_creds = get_rabbit_mq_credentials()
sentry_creds = get_sentryio_service_creds()

# sentryio service
set_up_sentryio(sentry_creds["dsn"], sentry_creds["env"])
redis_creds = get_redis_credentials()

rabbit_mq = RabbitMQ(
broker_url=rabbit_mq_creds["broker_url"],
port=rabbit_mq_creds["port"],
username=rabbit_mq_creds["username"],
password=rabbit_mq_creds["password"],
)

redis = Redis(
host=redis_creds["host"],
port=redis_creds["port"],
password=redis_creds["pass"],
)
set_up_sentryio()
rabbit_mq = RabbitMQSingleton.get_instance().get_client()
redis = RedisSingleton.get_instance().get_client()

# 24 hours equal to 86400 seconds
rq_queue = RQ_Queue(connection=redis, default_timeout=86400)

analyzer_recompute = functools.partial(
recompute_wrapper, redis_queue=rq_queue, rabbit_mq_creds=rabbit_mq_creds
)
analyzer_run_once = functools.partial(
run_once_wrapper, redis_queue=rq_queue, rabbit_mq_creds=rabbit_mq_creds
)
analyzer_recompute = functools.partial(recompute_wrapper, redis_queue=rq_queue)
analyzer_run_once = functools.partial(run_once_wrapper, redis_queue=rq_queue)

rabbit_mq.connect(Queue.DISCORD_ANALYZER, heartbeat=60)

Expand All @@ -70,29 +47,23 @@ def analyzer():
rabbit_mq.channel.start_consuming()


def recompute_wrapper(
body: dict[str, Any], redis_queue: RQ_Queue, rabbit_mq_creds: dict[str, Any]
):
def recompute_wrapper(body: dict[str, Any], redis_queue: RQ_Queue):
sagaId = body["content"]["uuid"]
logging.info(f"SAGAID:{sagaId} recompute job Adding to queue")

redis_queue.enqueue(
analyzer_recompute,
sagaId=sagaId,
rabbit_creds=rabbit_mq_creds,
on_success=publish_on_success,
)


def run_once_wrapper(
body: dict[str, Any], redis_queue: RQ_Queue, rabbit_mq_creds: dict[str, Any]
):
def run_once_wrapper(body: dict[str, Any], redis_queue: RQ_Queue):
sagaId = body["content"]["uuid"]
logging.info(f"SAGAID:{sagaId} run_once job Adding to queue")
redis_queue.enqueue(
analyzer_run_once,
sagaId=sagaId,
rabbit_creds=rabbit_mq_creds,
on_success=publish_on_success,
)

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_analyzer_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from analyzer_init import AnalyzerInit
from bson.objectid import ObjectId
from utils.get_mongo_client import MongoSingleton
from utils.mongo import MongoSingleton


def test_analyzer_init():
Expand Down Expand Up @@ -98,7 +98,7 @@ def test_analyzer_init():

mongo_client[guildId]["rawinfos"].insert_many(rawinfo_samples)

tc_discord_analyzer, _ = analyzer.get_analyzer()
tc_discord_analyzer = analyzer.get_analyzer()

tc_discord_analyzer.recompute_analytics()

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_assess_engagement_mention.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from discord_analyzer.analyzer.analyzer_heatmaps import Heatmaps
from discord_analyzer.analyzer.utils.analyzer_db_manager import AnalyzerDBManager
from tc_core_analyzer_lib.utils.activity import DiscordActivity
from utils.daolytics_uitls import get_mongo_credentials, get_neo4j_credentials
from utils.credentials import get_mongo_credentials, get_neo4j_credentials

from .utils.analyzer_setup import launch_db_access
from .utils.remove_and_setup_guild import setup_db_guild
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_assess_engagement_reactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from discord_analyzer.analyzer.analyzer_heatmaps import Heatmaps
from discord_analyzer.analyzer.utils.analyzer_db_manager import AnalyzerDBManager
from tc_core_analyzer_lib.utils.activity import DiscordActivity
from utils.daolytics_uitls import get_mongo_credentials, get_neo4j_credentials
from utils.credentials import get_mongo_credentials, get_neo4j_credentials

from .utils.analyzer_setup import launch_db_access
from .utils.remove_and_setup_guild import setup_db_guild
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_assess_engagement_replies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from discord_analyzer.analyzer.analyzer_heatmaps import Heatmaps
from discord_analyzer.analyzer.utils.analyzer_db_manager import AnalyzerDBManager
from tc_core_analyzer_lib.utils.activity import DiscordActivity
from utils.daolytics_uitls import get_mongo_credentials, get_neo4j_credentials
from utils.credentials import get_mongo_credentials, get_neo4j_credentials

from .utils.analyzer_setup import launch_db_access
from .utils.remove_and_setup_guild import setup_db_guild
Expand Down
Loading
Loading