Skip to content

Commit

Permalink
Change ParallelExecution output (#547)
Browse files Browse the repository at this point in the history
* Change `ParallelExecution` output

Add `verbose_output` toggle.

* Remove verbose output toggle
  • Loading branch information
gtopper authored Dec 17, 2024
1 parent ee998ab commit a747bd0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 18 deletions.
7 changes: 4 additions & 3 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,8 @@ async def _do(self, event):
)
futures.append(future)
results: list[_ParallelExecutionRunnableResult] = await asyncio.gather(*futures)
event.body = {"input": event.body, "results": {}}
for result in results:
event.body["results"][result.runnable_name] = {"runtime": result.runtime, "output": result.data}
if len(self.runnables) == 1:
event.body = results[0].data if results else None
else:
event.body = {result.runnable_name: result.data for result in results}
return await self._do_downstream(event)
25 changes: 10 additions & 15 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4771,7 +4771,6 @@ def init(self):

async def run_async(self, data, path):
await asyncio.sleep(1)
print(f"{self.name} returning {self._result}")
return self._result


Expand Down Expand Up @@ -4856,13 +4855,15 @@ def select_runnables(self, event):

assert end - start < 6
termination_result = termination_result[0]
assert termination_result.keys() == {"input", "results"}
assert termination_result["input"] == 0
results = termination_result["results"]
assert results.keys() == {"busy1", "busy2", "sleep1", "sleep2", "asleep1", "asleep2", "naive"}
for result in results.values():
assert result["output"] == 1
assert 1 < result["runtime"] < 2
assert termination_result == {
"asleep1": 1,
"asleep2": 1,
"busy1": 1,
"busy2": 1,
"naive": 1,
"sleep1": 1,
"sleep2": 1,
}


def test_invalid_runnable():
Expand Down Expand Up @@ -4896,10 +4897,4 @@ def test_event_input_preservation():
controller.terminate()
termination_result = controller.await_termination()
termination_result = termination_result[0]
assert termination_result.keys() == {"input", "results"}
assert termination_result["input"] == {"n": 1}
results = termination_result["results"]
assert results.keys() == {"x"}
result = results["x"]
assert result.keys() == {"runtime", "output"}
assert result["output"] == {"n": 2}
assert termination_result == {"n": 2}

0 comments on commit a747bd0

Please sign in to comment.