Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Nov 13, 2024
1 parent 165fa9c commit 2672a09
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 150 deletions.
15 changes: 7 additions & 8 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import os
import signal
import sys
import time
import typing
from datetime import datetime

Expand Down Expand Up @@ -72,21 +71,21 @@ def _setup_logging(directory: str = "logs"):
JOBS_JSON_RETRY_TIME = 30


def _wait_for_retry():
time.sleep(JOBS_JSON_RETRY_TIME)
async def _wait_for_retry():
await asyncio.sleep(JOBS_JSON_RETRY_TIME)


def _read_jobs_file(retry=True):
async def _read_jobs_file(retry=True):
try:
with open(JOBS_FILE, "r", encoding="utf-8") as json_file:
with open(JOBS_FILE, "r", encoding="utf-8") as json_file: # noqa: ASYNC230
return json.load(json_file)
except json.JSONDecodeError as e:
raise IOError("Job Runner cli failed to load JSON-file.") from e
except FileNotFoundError as e:
if retry:
logger.error(f"Could not find file {JOBS_FILE}, retrying")
_wait_for_retry()
return _read_jobs_file(retry=False)
await _wait_for_retry()
return await _read_jobs_file(retry=False)
else:
raise e

Expand Down Expand Up @@ -119,7 +118,7 @@ async def main(args):
# Make sure that logging is setup _after_ we have moved to the runpath directory
_setup_logging()

jobs_data = _read_jobs_file()
jobs_data = await _read_jobs_file()

experiment_id = jobs_data.get("experiment_id")
ens_id = jobs_data.get("ens_id")
Expand Down
16 changes: 12 additions & 4 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
self._ens_id = None
self._real_id = None
self._event_queue: asyncio.Queue[events.Event | EventSentinel] = asyncio.Queue()
# self._event_publisher_thread = ErtThread(target=self._event_publisher)
self._timeout_timestamp = None
self._timestamp_lock = threading.Lock()
# seconds to timeout the reporter the thread after Finish() was received
self._reporter_timeout = 60
self._running = True
self._event_publishing_task = asyncio.create_task(self.async_event_publisher())
self._event_publisher_ready = asyncio.Event()

async def join(self) -> None:
await self._event_publishing_task

async def async_event_publisher(self):
logger.debug("Publishing event.")
Expand All @@ -91,8 +93,9 @@ async def async_event_publisher(self):
token=self._token,
cert=self._cert,
) as client:
self._event_publisher_ready.set()
event = None
while self._running:
while True:
with self._timestamp_lock:
if (
self._timeout_timestamp is not None
Expand All @@ -103,14 +106,17 @@ async def async_event_publisher(self):
if event is None:
# if we successfully sent the event we can proceed
# to next one
print("GETTING MORE EVENTS!")
event = await self._event_queue.get()
if event is self._sentinel:
self._event_queue.task_done()
print("NEW EVENT WAS SENTINEL :))")
return
break
try:
await client.send(event_to_json(event))
self._event_queue.task_done()
event = None
print("Sent event :)")
except ClientConnectionError as exception:
# Possible intermittent failure, we retry sending the event
logger.error(str(exception))
Expand All @@ -122,9 +128,11 @@ async def async_event_publisher(self):
break

async def report(self, msg):
await self._event_publisher_ready.wait()
await self._statemachine.transition(msg)

async def _dump_event(self, event: events.Event):
print(f"DUMPED EVENT {type(event)=}")
logger.debug(f'Schedule "{type(event)}" for delivery')
await self._event_queue.put(event)

Expand Down
2 changes: 1 addition & 1 deletion src/_ert/forward_model_runner/reporting/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ async def transition(self, message: Message):
f"Illegal transition {self._state} -> {new_state} for {message}, "
f"expected to transition into {self._transitions[self._state]}"
)

print(f"TRANSITIONING STATE W/{message=}")
await self._handler[new_state](message)
self._state = new_state
Loading

0 comments on commit 2672a09

Please sign in to comment.