Skip to content

Commit

Permalink
add functioning non-list-async solution for receiving log entries, ad…
Browse files Browse the repository at this point in the history
…ded termination of monpoly threads
  • Loading branch information
claasga committed Dec 11, 2024
1 parent 10fccf4 commit efd3c9d
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 50 deletions.
10 changes: 8 additions & 2 deletions backend/dps_training_k/game/models/exercise.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def createExercise(cls, trainer):
Lab.objects.create(exercise=new_Exercise)
return new_Exercise

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def start_exercise(self):
from .patient_instance import PatientInstance

Expand All @@ -60,9 +63,9 @@ def start_exercise(self):
# ToDo: Add logrulerunner for testing purposes
from ..channel_notifications import LogEntryDispatcher

test_log_runner = LogRuleRunner(self, LogEntryDispatcher)
self.test_log_runner = LogRuleRunner(self, LogEntryDispatcher)
# print("LogRuleRunner created")
test_log_runner.start_log_rule()
self.test_log_runner.start_log_rule()
# print("LogRuleRunner started")

def save(self, *args, **kwargs):
Expand All @@ -77,6 +80,9 @@ def update_state(self, state):
LogEntry.set_empty_timestamps(self)
elif self.state == self.StateTypes.FINISHED:
ScheduledEvent.remove_events_of_exercise(self)
# for every instance of the LogRuleRunner, stop the log rule
for instance in LogRuleRunner.instances:
instance.stop_log_rule()

def time_factor(self):
# config currently is not being used, but could be implemented as follows:
Expand Down
49 changes: 18 additions & 31 deletions backend/dps_training_k/game/templmon/log_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,29 @@ def _generate_temp_file_name(cls, name):
class LogRuleRunner:
# I need to start monpoly before subscribing to the log or store the log entries
loop = asyncio.new_event_loop()
instances = []

def __init__(self, exercise, log):
self.log = log
self.exercise = exercise
self.monpoly_started_event = asyncio.Event()
self.finished_reading_monpoly = asyncio.Event()
LogRuleRunner.instances.append(
self
) # ToDo: remove when log rules get created inside trainer consumer

def __del__(self):
pass

def receive_log_entry(self, log_entry):
personnel_list = list(log_entry.personnel.all())
# print("Personnel List in main thread: ")
# print(personnel_list)
material_list = list(log_entry.materials.all())
transformed_log_entry = transform(log_entry)
asyncio.run_coroutine_threadsafe(
self._receive_log_entry(log_entry, personnel_list, material_list), self.loop
self._write_log_entry(transformed_log_entry), self.loop
)

async def _receive_log_entry(self, log_entry, personnel_list, material_list):
print(f"Received log entry: {log_entry}")
async def _write_log_entry(self, monpolified_log_entry: str):
print(f"Received log entry: {monpolified_log_entry}")
await self.monpoly_started_event.wait() # Wait until monpoly is ready
print("Monpoly is ready")
try:
monpolified_log_entry = transform(log_entry, personnel_list, material_list)
except Exception as e:
raise e
print(f"Monpolified log entry: {monpolified_log_entry}")
if self.monpoly.stdin:
try:
encoded = monpolified_log_entry.encode()
Expand All @@ -78,20 +74,17 @@ async def _receive_log_entry(self, log_entry, personnel_list, material_list):
raise e
else:
raise Exception("Monpoly is not running")
print("Log entry sent")

async def read_output(self, process):
self.monpoly_started_event.set()
while True:
line = await process.stdout.readline()
if not line:
print("process terminated")
self.finished_reading_monpoly.set()
break
print(f"Received: {line.decode('utf-8')[:-1]}")

async def _launch_monpoly(
self, loop: asyncio.AbstractEventLoop, mfotl_path, sig_path, rewrite=True
):
async def _launch_monpoly(self, mfotl_path, sig_path, rewrite=True):
"""Has to be launched in a separate thread"""
self.monpoly = await asyncio.create_subprocess_exec(
"monpoly",
Expand All @@ -109,6 +102,11 @@ async def _launch_monpoly(
asyncio.create_task(self.read_output(self.monpoly))
self.monpoly_started_event.set() # Signal that monpoly is ready

async def terminate_monpoly(self):
self.monpoly.stdin.close()
await self.finished_reading_monpoly.wait()
self.monpoly.terminate()

def start_log_rule(self):
def launch_listener_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
Expand All @@ -122,23 +120,12 @@ def launch_listener_loop(loop: asyncio.AbstractEventLoop):
sig_path = os.path.join(base_dir, "kdps.sig")
asyncio.run_coroutine_threadsafe(
self._launch_monpoly(
self.loop,
mfotl_path,
sig_path,
),
self.loop,
)
self.log.subscribe_to_exercise(self.exercise, self, send_past_logs=True)

# startup extra asyncio_thread as loop
# asyncio.run(lauch_monpoly())
# event.wait()
# log.subscribe_to_exercise(exercise, self, send_past_logs=True)
#
# launch_monpoly()
# monpoly = await asyncio.create_subprocess_exec()
# listener_thread = loop.call_soon_threadsafe(asyncio.create_task, listener) oder eventuell
#
#
#
# atexitstuff"""
def stop_log_rule(self):
asyncio.run_coroutine_threadsafe(self.terminate_monpoly(), self.loop)
29 changes: 12 additions & 17 deletions backend/dps_training_k/game/templmon/log_transformer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import game.models.log_entry as le
import datetime

max_posix_timestamp = pow(2, 31) - 1


class MonpolyLogEntry:
Expand All @@ -24,41 +27,33 @@ def determine_log_type(log_entry: le.LogEntry):
return MonpolyLogEntry.UNKNOW_LOG_TYPE


def trigger_event(posix_timestamp):
return f"@{posix_timestamp} trigger_event()"


def generate_timestamp(log_entry: le.LogEntry):
return str(log_entry.timestamp.timestamp())


def transform(log_entry: le.LogEntry, personnel_list, material_list):
# print("Personnel List: ")
# print(personnel_list)
# print("Material List: ")
# print(material_list)
# print("deteriming log type")
def transform(log_entry: le.LogEntry):
log_type = determine_log_type(log_entry)
# print("Generating timestamp")
timestamp = generate_timestamp(log_entry)
log_str = f"@{timestamp} "

if log_type == MonpolyLogEntry.ASSIGNED_PERSONNEL:
# print(f"field: {log_entry.personnel}")
# print(f"Queryset: {log_entry.personnel.all()}")
# print(f"Entry: {log_entry.personnel.all().first()}")
# print(f"key: {log_entry.personnel.all().first().pk}")
personnel_id = personnel_list[0].pk
personnel_id = log_entry.personnel.all().first().pk
patient_id = log_entry.patient_instance.pk
log_str += f"assigned_personnel({personnel_id}, {patient_id})"
# log_str += f"assigned_personnel(144, {patient_id})"

elif log_type == MonpolyLogEntry.UNASSIGNED_PERSONNEL:
personnel_id = personnel_list[0].pk
personnel_id = log_entry.personnel.all().first().pk
log_str += f"unassigned_personnel({personnel_id})"
# log_str += f"unassigned_personnel(144)"

elif log_type == MonpolyLogEntry.PATIENT_ARRIVED:
patient_id = log_entry.patient_instance.pk
area_id = log_entry.area.pk
triage_display = log_entry.patient_instance.get_triage_display
injuries = log_entry.content["injuries"]
triage_display = log_entry.patient_instance.get_triage_display()
injuries = f'"{log_entry.content["injuries"]}"'
log_str += (
f"patient_arrived({patient_id}, {area_id}, {triage_display}, {injuries})"
)
Expand Down

0 comments on commit efd3c9d

Please sign in to comment.