diff --git a/setup.py b/setup.py index 5a925c2..c661e3c 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="tc-analyzer-lib", - version="1.4.3", + version="1.4.4", author="Mohammad Amin Dadgar, TogetherCrew", maintainer="Mohammad Amin Dadgar", maintainer_email="dadgaramin96@gmail.com", diff --git a/tc_analyzer_lib/metrics/heatmaps/heatmaps.py b/tc_analyzer_lib/metrics/heatmaps/heatmaps.py index 2dd349c..060397e 100644 --- a/tc_analyzer_lib/metrics/heatmaps/heatmaps.py +++ b/tc_analyzer_lib/metrics/heatmaps/heatmaps.py @@ -39,7 +39,11 @@ def __init__( self.analyzer_config = analyzer_config self.utils = HeatmapsUtils(platform_id) - async def start(self, from_start: bool = False) -> list[dict]: + async def start( + self, + from_start: bool = False, + batch_return: int = 5, + ): """ Based on the rawdata creates and stores the heatmap data @@ -75,13 +79,14 @@ async def start(self, from_start: bool = False) -> list[dict]: async for bot in cursor: bot_ids.append(bot["id"]) + index = 0 while analytics_date.date() < datetime.now().date(): start_day = analytics_date.replace( hour=0, minute=0, second=0, microsecond=0 ) end_day = start_day + timedelta(days=1) logging.info( - f"{log_prefix} ANALYZING HEATMAPS {start_day.date()} - {end_day.date()}!" + f"{log_prefix} ANALYZING HEATMAPS {start_day.date()} - {end_day.date()}! | index: {index}" ) # getting the active resource_ids (activities being done there by users) @@ -150,10 +155,18 @@ async def start(self, from_start: bool = False) -> list[dict]: heatmaps_results.extend(day_results) + if index % batch_return == 0: + yield heatmaps_results + # emptying it + heatmaps_results = [] + + index += 1 + # analyze next day analytics_date += timedelta(days=1) - return heatmaps_results + # returning any other values + yield heatmaps_results async def _prepare_heatmaps_document( self, date: datetime, resource_id: str, author_id: str diff --git a/tc_analyzer_lib/tc_analyzer.py b/tc_analyzer_lib/tc_analyzer.py index 6ae4ee3..ebb0732 100644 --- a/tc_analyzer_lib/tc_analyzer.py +++ b/tc_analyzer_lib/tc_analyzer.py @@ -85,20 +85,19 @@ async def run_once(self): resources=self.resources, analyzer_config=self.analyzer_config, ) - heatmaps_data = await heatmaps_analysis.start(from_start=False) - - # storing heatmaps since memberactivities use them - analytics_data = {} - analytics_data["heatmaps"] = heatmaps_data - analytics_data["memberactivities"] = (None, None) - - self.DB_connections.store_analytics_data( - analytics_data=analytics_data, - platform_id=self.platform_id, - graph_schema=self.graph_schema, - remove_memberactivities=False, - remove_heatmaps=False, - ) + async for heatmaps_data in heatmaps_analysis.start(from_start=False): + # storing heatmaps since memberactivities use them + analytics_data = {} + analytics_data["heatmaps"] = heatmaps_data + analytics_data["memberactivities"] = (None, None) + + self.DB_connections.store_analytics_data( + analytics_data=analytics_data, + platform_id=self.platform_id, + graph_schema=self.graph_schema, + remove_memberactivities=False, + remove_heatmaps=False, + ) memberactivity_analysis = MemberActivities( platform_id=self.platform_id, @@ -149,13 +148,12 @@ async def recompute(self): resources=self.resources, analyzer_config=self.analyzer_config, ) - heatmaps_data = await heatmaps_analysis.start(from_start=True) - # storing heatmaps since memberactivities use them + # This is to remove heatmaps data + # TODO: in future remove heatmaps better instead of using the lines below analytics_data = {} - analytics_data["heatmaps"] = heatmaps_data + analytics_data["heatmaps"] = [] analytics_data["memberactivities"] = (None, None) - self.DB_connections.store_analytics_data( analytics_data=analytics_data, platform_id=self.platform_id, @@ -164,6 +162,20 @@ async def recompute(self): remove_heatmaps=True, ) + async for heatmaps_data in heatmaps_analysis.start(from_start=True): + # storing heatmaps since memberactivities use them + analytics_data = {} + analytics_data["heatmaps"] = heatmaps_data + analytics_data["memberactivities"] = (None, None) + + self.DB_connections.store_analytics_data( + analytics_data=analytics_data, + platform_id=self.platform_id, + graph_schema=self.graph_schema, + remove_memberactivities=False, + remove_heatmaps=False, + ) + # run the member_activity analyze logging.info( f"Analyzing the MemberActivities data for platform: {self.platform_id}!" diff --git a/tests/integration/test_analyzer_period_week_run_once_empty_analytics.py b/tests/integration/test_analyzer_period_week_run_once_empty_analytics.py index e8b0b61..5925d59 100644 --- a/tests/integration/test_analyzer_period_week_run_once_empty_analytics.py +++ b/tests/integration/test_analyzer_period_week_run_once_empty_analytics.py @@ -33,7 +33,8 @@ async def test_analyzer_week_period_run_once_empty_analytics(self): rawinfo_samples = [] # generating random rawinfo data - for i in range(160): + # for past 7 days (168 / 24 = 7) + for i in range(168): author = np.random.choice(acc_id) replied_user = np.random.choice(acc_id) # not producing any self-interactions diff --git a/tests/integration/test_assess_engagement_mention.py b/tests/integration/test_assess_engagement_mention.py index baca862..ae4cb47 100644 --- a/tests/integration/test_assess_engagement_mention.py +++ b/tests/integration/test_assess_engagement_mention.py @@ -39,7 +39,9 @@ async def heatmaps_analytics(self): """ heatmaps are the input for assess_engagement's interaction matrix """ - heatmaps_data = await self.heatmaps.start(from_start=True) + heatmaps_data = [] + async for results in self.heatmaps.start(from_start=True): + heatmaps_data.extend(results) analytics_data = {} analytics_data["heatmaps"] = heatmaps_data diff --git a/tests/integration/test_assess_engagement_reactions.py b/tests/integration/test_assess_engagement_reactions.py index 02cd35c..b58b57e 100644 --- a/tests/integration/test_assess_engagement_reactions.py +++ b/tests/integration/test_assess_engagement_reactions.py @@ -39,7 +39,9 @@ async def heatmaps_analytics(self): """ heatmaps are the input for assess_engagement's interaction matrix """ - heatmaps_data = await self.heatmaps.start(from_start=True) + heatmaps_data = [] + async for results in self.heatmaps.start(from_start=True): + heatmaps_data.extend(results) analytics_data = {} analytics_data["heatmaps"] = heatmaps_data diff --git a/tests/integration/test_assess_engagement_replies.py b/tests/integration/test_assess_engagement_replies.py index 410f199..36c48ae 100644 --- a/tests/integration/test_assess_engagement_replies.py +++ b/tests/integration/test_assess_engagement_replies.py @@ -39,7 +39,9 @@ async def heatmaps_analytics(self): """ heatmaps are the input for assess_engagement's interaction matrix """ - heatmaps_data = await self.heatmaps.start(from_start=True) + heatmaps_data = [] + async for results in self.heatmaps.start(from_start=True): + heatmaps_data.extend(results) analytics_data = {} analytics_data["heatmaps"] = heatmaps_data diff --git a/tests/integration/test_heatmaps_analytics.py b/tests/integration/test_heatmaps_analytics.py index 86efa05..5dc0447 100644 --- a/tests/integration/test_heatmaps_analytics.py +++ b/tests/integration/test_heatmaps_analytics.py @@ -156,7 +156,9 @@ async def test_heatmaps_single_day_from_start(self): sample_raw_data ) - analytics = await self.heatmaps.start(from_start=True) + analytics = [] + async for heatmaps_data in self.heatmaps.start(from_start=True): + analytics.extend(heatmaps_data) self.assertIsInstance(analytics, list) diff --git a/tests/integration/test_heatmaps_analytics_different_source.py b/tests/integration/test_heatmaps_analytics_different_source.py index 081a109..656fd8c 100644 --- a/tests/integration/test_heatmaps_analytics_different_source.py +++ b/tests/integration/test_heatmaps_analytics_different_source.py @@ -150,7 +150,9 @@ async def test_heatmaps_single_day_from_start(self): sample_raw_data ) - analytics = await self.heatmaps.start(from_start=True) + analytics = [] + async for heatmaps_data in self.heatmaps.start(from_start=True): + analytics.extend(heatmaps_data) self.assertIsInstance(analytics, list) @@ -181,7 +183,10 @@ async def test_heatmaps_analytics_pre_filled(self): } ) - analytics = await self.heatmaps.start(from_start=False) + analytics = [] + async for heatmaps_data in self.heatmaps.start(from_start=False): + analytics.extend(heatmaps_data) + # the day was pre-filled before # and the period was exactly yesterday self.assertEqual(analytics, []) diff --git a/tests/integration/test_heatmaps_hourly_lone_message.py b/tests/integration/test_heatmaps_hourly_lone_message.py index 5709b05..e9ac90e 100644 --- a/tests/integration/test_heatmaps_hourly_lone_message.py +++ b/tests/integration/test_heatmaps_hourly_lone_message.py @@ -75,7 +75,9 @@ async def test_lone_messages(self): resources=list(channelIds), analyzer_config=DiscordAnalyzerConfig(), ) - results = await analyzer_heatmaps.start(from_start=True) + results = [] + async for heatmaps_data in analyzer_heatmaps.start(from_start=True): + results.extend(heatmaps_data) assert len(results) == len(acc_names) * DAY_COUNT * len(channelIds) for document in results: diff --git a/tests/integration/test_heatmaps_hourly_mentions.py b/tests/integration/test_heatmaps_hourly_mentions.py index 4ac455a..8e37e6f 100644 --- a/tests/integration/test_heatmaps_hourly_mentions.py +++ b/tests/integration/test_heatmaps_hourly_mentions.py @@ -104,7 +104,9 @@ async def test_mentioned_messages(self): resources=list(channelIds), analyzer_config=DiscordAnalyzerConfig(), ) - results = await analyzer_heatmaps.start(from_start=True) + results = [] + async for heatmaps_data in analyzer_heatmaps.start(from_start=True): + results.extend(heatmaps_data) assert len(results) == len(acc_names) * DAY_COUNT * len(channelIds) for document in results: diff --git a/tests/integration/test_heatmaps_reactions.py b/tests/integration/test_heatmaps_reactions.py index 732ab40..c320c09 100644 --- a/tests/integration/test_heatmaps_reactions.py +++ b/tests/integration/test_heatmaps_reactions.py @@ -142,7 +142,9 @@ async def test_reacted_messages(self): resources=list(channelIds), analyzer_config=DiscordAnalyzerConfig(), ) - results = await analyzer_heatmaps.start(from_start=True) + results = [] + async for heatmaps_data in analyzer_heatmaps.start(from_start=True): + results.extend(heatmaps_data) assert len(results) == len(acc_names) * DAY_COUNT * len(channelIds) for document in results: diff --git a/tests/integration/test_heatmaps_replier.py b/tests/integration/test_heatmaps_replier.py index 89a96d3..22fb0d0 100644 --- a/tests/integration/test_heatmaps_replier.py +++ b/tests/integration/test_heatmaps_replier.py @@ -101,7 +101,9 @@ async def test_reply_messages(self): resources=list(channelIds), analyzer_config=DiscordAnalyzerConfig(), ) - results = await analyzer_heatmaps.start(from_start=True) + results = [] + async for heatmaps_data in analyzer_heatmaps.start(from_start=True): + results.extend(heatmaps_data) assert len(results) == len(acc_names) * DAY_COUNT * len(channelIds) for document in results: diff --git a/tests/integration/test_heatmaps_thread_msg.py b/tests/integration/test_heatmaps_thread_msg.py index 0e96371..f8235d0 100644 --- a/tests/integration/test_heatmaps_thread_msg.py +++ b/tests/integration/test_heatmaps_thread_msg.py @@ -1,90 +1,95 @@ -import asyncio from datetime import datetime, timedelta +from unittest import IsolatedAsyncioTestCase from tc_analyzer_lib.metrics.heatmaps import Heatmaps from tc_analyzer_lib.schemas.platform_configs import DiscordAnalyzerConfig from tc_analyzer_lib.utils.mongo import MongoSingleton -def test_thread_messages(): - platform_id = "1122334455" - mongo_client = MongoSingleton.get_instance(skip_singleton=True).get_client() - database = mongo_client[platform_id] +class TestHeatmapsThrMsgs(IsolatedAsyncioTestCase): + async def test_thread_messages(self): + platform_id = "1122334455" + mongo_client = MongoSingleton.get_instance(skip_singleton=True).get_client() + database = mongo_client[platform_id] - database.drop_collection("rawmemberactivities") - database.drop_collection("rawmembers") - # data preparation - DAY_COUNT = 2 - day = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta( - days=DAY_COUNT - ) - # hours to include interactions - hours_to_include = [2, 4, 5, 13, 16, 18, 19, 20, 21] + database.drop_collection("rawmemberactivities") + database.drop_collection("rawmembers") + # data preparation + DAY_COUNT = 2 + day = datetime.now().replace( + hour=0, minute=0, second=0, microsecond=0 + ) - timedelta(days=DAY_COUNT) + # hours to include interactions + hours_to_include = [2, 4, 5, 13, 16, 18, 19, 20, 21] - acc_names = [] - prepared_rawmembers = [] - for i in range(3): - acc = f"user_{i}" - acc_names.append(acc) + acc_names = [] + prepared_rawmembers = [] + for i in range(3): + acc = f"user_{i}" + acc_names.append(acc) - prepared_member = { - "id": acc, - "is_bot": False, - "left_at": None, - "joined_at": datetime(2023, i + 1, 1), - "options": {}, - } - prepared_rawmembers.append(prepared_member) + prepared_member = { + "id": acc, + "is_bot": False, + "left_at": None, + "joined_at": datetime(2023, i + 1, 1), + "options": {}, + } + prepared_rawmembers.append(prepared_member) - prepared_rawmemberactivities = [] - channelIds = set() - dates = set() + prepared_rawmemberactivities = [] + channelIds = set() + dates = set() - for i in range(DAY_COUNT): - for hour in hours_to_include: - for acc in acc_names: - chId = "channel_0" - data_date = (day + timedelta(days=i)).replace(hour=hour) - prepared_rawdata = { - "author_id": acc, - "date": data_date, - "source_id": f"9999{i}{hour}{acc}", # message id it was - "actions": [{"name": "message", "type": "emitter"}], - "interactions": [], - "metadata": { - "channel_id": chId, - "thread_id": chId + "THREAD", - "bot_activity": False, - }, - } - prepared_rawmemberactivities.append(prepared_rawdata) + for i in range(DAY_COUNT): + for hour in hours_to_include: + for acc in acc_names: + chId = "channel_0" + data_date = (day + timedelta(days=i)).replace(hour=hour) + prepared_rawdata = { + "author_id": acc, + "date": data_date, + "source_id": f"9999{i}{hour}{acc}", # message id it was + "actions": [{"name": "message", "type": "emitter"}], + "interactions": [], + "metadata": { + "channel_id": chId, + "thread_id": chId + "THREAD", + "bot_activity": False, + }, + } + prepared_rawmemberactivities.append(prepared_rawdata) - channelIds.add(chId) - dates.add(data_date.replace(hour=0, minute=0, second=0, microsecond=0)) + channelIds.add(chId) + dates.add( + data_date.replace(hour=0, minute=0, second=0, microsecond=0) + ) - database["rawmemberactivities"].insert_many(prepared_rawmemberactivities) - database["rawmembers"].insert_many(prepared_rawmembers) + database["rawmemberactivities"].insert_many(prepared_rawmemberactivities) + database["rawmembers"].insert_many(prepared_rawmembers) - analyzer_heatmaps = Heatmaps( - platform_id=platform_id, - period=day, - resources=list(channelIds), - analyzer_config=DiscordAnalyzerConfig(), - ) - results = asyncio.run(analyzer_heatmaps.start(from_start=True)) + analyzer_heatmaps = Heatmaps( + platform_id=platform_id, + period=day, + resources=list(channelIds), + analyzer_config=DiscordAnalyzerConfig(), + ) + results = [] + async for heatmaps_data in analyzer_heatmaps.start(from_start=True): + results.extend(heatmaps_data) - assert len(results) == len(acc_names) * DAY_COUNT - for document in results: - assert document["user"] in acc_names - assert document["date"] in dates - assert document["channel_id"] in channelIds - assert document["reacted_per_acc"] == [] - assert document["mentioner_per_acc"] == [] - assert document["replied_per_acc"] == [] - assert sum(document["thr_messages"]) == len(hours_to_include) - assert sum(document["mentioner"]) == 0 - assert sum(document["replied"]) == 0 - assert sum(document["replier"]) == 0 - assert sum(document["mentioned"]) == 0 - assert sum(document["reacter"]) == 0 - assert sum(document["lone_messages"]) == 0 + assert len(results) == len(acc_names) * DAY_COUNT + for document in results: + assert document["user"] in acc_names + assert document["date"] in dates + assert document["channel_id"] in channelIds + assert document["reacted_per_acc"] == [] + assert document["mentioner_per_acc"] == [] + assert document["replied_per_acc"] == [] + assert sum(document["thr_messages"]) == len(hours_to_include) + assert sum(document["mentioner"]) == 0 + assert sum(document["replied"]) == 0 + assert sum(document["replier"]) == 0 + assert sum(document["mentioned"]) == 0 + assert sum(document["reacter"]) == 0 + assert sum(document["lone_messages"]) == 0