Skip to content

Commit

Permalink
add parser for MonPoly output
Browse files Browse the repository at this point in the history
  • Loading branch information
claasga committed Jan 20, 2025
1 parent efd3c9d commit c0a322e
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 2 deletions.
11 changes: 10 additions & 1 deletion backend/dps_training_k/game/models/exercise.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .lab import Lab
from .scheduled_event import ScheduledEvent
from .log_entry import LogEntry
from ..templmon.log_rule import LogRule
from ..templmon.log_rule import LogRuleRunner


Expand Down Expand Up @@ -63,7 +64,15 @@ def start_exercise(self):
# ToDo: Add logrulerunner for testing purposes
from ..channel_notifications import LogEntryDispatcher

self.test_log_runner = LogRuleRunner(self, LogEntryDispatcher)
test_rule_str = """ (personnel_count <- CNT personnel_id;patient_id
(NOT unassigned_personnel(personnel_id))
SINCE[0,*]
assigned_personnel(personnel_id, patient_id))
AND
(personnel_count >= 4)"""
test_rule = LogRule.create(test_rule_str, "test_rule")
self.test_log_runner = LogRuleRunner(self, LogEntryDispatcher, test_rule)
# self.test_log_runner = LogRuleRunner(self, LogEntryDispatcher)
# print("LogRuleRunner created")
self.test_log_runner.start_log_rule()
# print("LogRuleRunner started")
Expand Down
2 changes: 1 addition & 1 deletion backend/dps_training_k/game/templmon/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .log_rule import LogRule
from .log_rule import LogRule, LogRuleRunner
from .log_transformer import *
184 changes: 184 additions & 0 deletions backend/dps_training_k/game/templmon/test_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import re
import asyncio

free_variables = ["personnel_id", "patient_id"]


class TestSender:
@classmethod
def dispatch_error_started(cls, error_assignment, start_stamp, start_point):
print(f"Durational Error started at {start_stamp}: {error_assignment}")

@classmethod
def dispatch_error_finished(
cls, error_assignment, start_stamp, start_point, past_end_stamp, past_end_point
):
print(
f"Durational Error: {error_assignment} from {start_stamp} till {past_end_stamp}"
)

@classmethod
def dispatch_singular_error(cls, error_assignment, time_stamp, time_point):
print(f"Singular Error at {time_stamp}: {error_assignment}")


class OutputReceiver:
def __init__(self, keys, sender):
self.keys = keys
self.sender = sender

def update_errors(self, timestamp, timepoint, errors: list[dict]):
pass

def get_format(self):
return self.keys


class SingularErrorReceiver(OutputReceiver):
def update_errors(self, timestamp, timepoint, errors: list[dict]):
for error in errors:
self.sender.dispatch_singular_error(error, timestamp, timepoint)


class DurationalErrorReceiver(OutputReceiver):
def __init__(self, keys, sender):
super().__init__(keys, sender)
self.current_unfinished_errors = {}

def _add_errors(self, timestamp, timepoint, errors):
for error in errors:
self.current_unfinished_errors[error] = (timestamp, timepoint)
self.sender.dispatch_error_started(
dict(zip(self.keys, error)), timestamp, timepoint
)

def _remove_errors(self, timestamp, timepoint, errors):
for error in errors:
start_stamp, start_point = self.current_unfinished_errors.pop(error)
self.sender.dispatch_error_finished(
dict(zip(self.keys, error)),
start_stamp,
start_point,
timestamp,
timepoint,
)

def update_errors(self, timestamp, timepoint, errors: list[dict]):
"""Assumes that errors are unique"""
errors_transformed = [tuple(error.values()) for error in errors]
errors_transformed = sorted(errors_transformed)
errors_to_add = []
erros_to_remove = []
sorted_unfinished_keys = sorted(self.current_unfinished_errors.keys())
while errors_transformed or sorted_unfinished_keys:
new_error = errors_transformed.pop(0) if errors_transformed else None
unfinished_error = (
sorted_unfinished_keys.pop(0) if sorted_unfinished_keys else None
)
if new_error == unfinished_error:
continue
elif new_error and (not unfinished_error or new_error < unfinished_error):
errors_to_add.append(new_error)
sorted_unfinished_keys.insert(0, unfinished_error)
elif not new_error or new_error > unfinished_error:
erros_to_remove.append(unfinished_error)
errors_transformed.insert(0, new_error)
self._remove_errors(timestamp, timepoint, erros_to_remove)
self._add_errors(timestamp, timepoint, errors_to_add)


def get_output(
test_outputs,
output_receiver: OutputReceiver,
matches_seperator=" (",
value_seperator=", ",
):
def get_fullfilling_assignments(assignments_str: str):
assignments = []
i = 0
print("Start mapping assignments")
while i < len(assignments_str):
if assignments_str[i] == "(":
j = i + 1
inside_string = False # is Inside quotes wird nicht genutzt
while assignments_str[j] != ")" or inside_string:
if assignments_str[j] == '"':
inside_string = not inside_string
j += 1
assignments.append(assignments_str[i + 1 : j])
i = j
i += 1
print("Finished mapping assignments with " + str(assignments))
assignment_dicts = []
for assignment in assignments:
print(assignment)

i = 0
assignment_dict = {}
for free_variable in output_receiver.get_format():
if i >= len(assignment):
raise Exception("Assignment does not contain all free variables")
inside_string = False
beginning_i = i
while i < len(assignment) and (
assignment[i : min(i + len(value_seperator), len(assignment))]
!= value_seperator
or inside_string
):
if assignment[i] == '"':
inside_string = not inside_string
i += 1
assignment_dict[free_variable] = assignment[beginning_i:i]
print(
f"Free variable: {free_variable}, Value: {assignment_dict[free_variable]}"
)
i += len(value_seperator)
if i < len(assignment):
raise Exception("Assignment contains more than the free variables")
print(assignment_dict)
assignment_dicts.append(assignment_dict)
return assignment_dicts

while True:
decoded_line = test_outputs.readline()
# decoded_line = '@1734087146.26 (time point 0): ((21, ",ag,12.\nabb"),(42, 69))\n'
if not decoded_line:
print("process terminated")
break

if decoded_line[0] == "@":
print("Received output:")
print(decoded_line)
decoded_line = decoded_line[1:-2]
parts = decoded_line.split(matches_seperator)
if len(parts) != 3:
raise Exception("Invalid output format")

timestamp = float(parts[0])
timepoint = int(re.search(r"\d+", parts[1]).group())
fullfilling_assignments = get_fullfilling_assignments(parts[2])
for assignment in fullfilling_assignments:
print(f'Processed output : {timestamp}, {timepoint}, "{assignment}"')
output_receiver.update_errors(timestamp, timepoint, fullfilling_assignments)
# asyncio.sleep(1)


async def main():
await get_output(
open(
"/home/claas/Desktop/BP/dps.training_k/backend/dps_training_k/game/templmon/test.txt",
"r",
),
OutputReceiver(),
)
print("Finished")


# asyncio.run(main())
get_output(
open(
"/home/claas/Desktop/BP/dps.training_k/backend/dps_training_k/game/templmon/test.txt",
"r",
),
SingularErrorReceiver(free_variables, TestSender),
)

0 comments on commit c0a322e

Please sign in to comment.