|
24 | 24 | from timeit import default_timer
|
25 | 25 | from typing import Any, AsyncGenerator, Optional
|
26 | 26 |
|
| 27 | +from neo4j_graphrag.utils.logging import prettify |
| 28 | + |
27 | 29 | try:
|
28 | 30 | import pygraphviz as pgv
|
29 | 31 | except ImportError:
|
@@ -90,21 +92,21 @@ async def execute(self, **kwargs: Any) -> RunResult | None:
|
90 | 92 | if the task run successfully, None if the status update
|
91 | 93 | was unsuccessful.
|
92 | 94 | """
|
93 |
| - logger.debug(f"Running component {self.name} with {kwargs}") |
94 |
| - start_time = default_timer() |
95 | 95 | component_result = await self.component.run(**kwargs)
|
96 | 96 | run_result = RunResult(
|
97 | 97 | result=component_result,
|
98 | 98 | )
|
99 |
| - end_time = default_timer() |
100 |
| - logger.debug(f"Component {self.name} finished in {end_time - start_time}s") |
101 | 99 | return run_result
|
102 | 100 |
|
103 | 101 | async def run(self, inputs: dict[str, Any]) -> RunResult | None:
|
104 | 102 | """Main method to execute the task."""
|
105 |
| - logger.debug(f"TASK START {self.name=} {inputs=}") |
| 103 | + logger.debug(f"TASK START {self.name=} input={prettify(inputs)}") |
| 104 | + start_time = default_timer() |
106 | 105 | res = await self.execute(**inputs)
|
107 |
| - logger.debug(f"TASK RESULT {self.name=} {res=}") |
| 106 | + end_time = default_timer() |
| 107 | + logger.debug( |
| 108 | + f"TASK FINISHED {self.name} in {end_time - start_time} res={prettify(res)}" |
| 109 | + ) |
108 | 110 | return res
|
109 | 111 |
|
110 | 112 |
|
@@ -141,7 +143,9 @@ async def run_task(self, task: TaskPipelineNode, data: dict[str, Any]) -> None:
|
141 | 143 | try:
|
142 | 144 | await self.set_task_status(task.name, RunStatus.RUNNING)
|
143 | 145 | except PipelineStatusUpdateError:
|
144 |
| - logger.info(f"Component {task.name} already running or done") |
| 146 | + logger.debug( |
| 147 | + f"ORCHESTRATOR: TASK ABORTED: {task.name} is already running or done, aborting" |
| 148 | + ) |
145 | 149 | return None
|
146 | 150 | res = await task.run(inputs)
|
147 | 151 | await self.set_task_status(task.name, RunStatus.DONE)
|
@@ -198,7 +202,8 @@ async def check_dependencies_complete(self, task: TaskPipelineNode) -> None:
|
198 | 202 | d_status = await self.get_status_for_component(d.start)
|
199 | 203 | if d_status != RunStatus.DONE:
|
200 | 204 | logger.debug(
|
201 |
| - f"Missing dependency {d.start} for {task.name} (status: {d_status}). " |
| 205 | + f"ORCHESTRATOR {self.run_id}: TASK DELAYED: Missing dependency {d.start} for {task.name} " |
| 206 | + f"(status: {d_status}). " |
202 | 207 | "Will try again when dependency is complete."
|
203 | 208 | )
|
204 | 209 | raise PipelineMissingDependencyError()
|
@@ -227,6 +232,9 @@ async def next(
|
227 | 232 | await self.check_dependencies_complete(next_node)
|
228 | 233 | except PipelineMissingDependencyError:
|
229 | 234 | continue
|
| 235 | + logger.debug( |
| 236 | + f"ORCHESTRATOR {self.run_id}: enqueuing next task: {next_node.name}" |
| 237 | + ) |
230 | 238 | yield next_node
|
231 | 239 | return
|
232 | 240 |
|
@@ -315,7 +323,6 @@ async def run(self, data: dict[str, Any]) -> None:
|
315 | 323 | (node without any parent). Then the callback on_task_complete
|
316 | 324 | will handle the task dependencies.
|
317 | 325 | """
|
318 |
| - logger.debug(f"PIPELINE START {data=}") |
319 | 326 | tasks = [self.run_task(root, data) for root in self.pipeline.roots()]
|
320 | 327 | await asyncio.gather(*tasks)
|
321 | 328 |
|
@@ -624,15 +631,16 @@ def validate_parameter_mapping_for_task(self, task: TaskPipelineNode) -> bool:
|
624 | 631 | return True
|
625 | 632 |
|
626 | 633 | async def run(self, data: dict[str, Any]) -> PipelineResult:
|
627 |
| - logger.debug("Starting pipeline") |
| 634 | + logger.debug("PIPELINE START") |
628 | 635 | start_time = default_timer()
|
629 | 636 | self.invalidate()
|
630 | 637 | self.validate_input_data(data)
|
631 | 638 | orchestrator = Orchestrator(self)
|
| 639 | + logger.debug(f"PIPELINE ORCHESTRATOR: {orchestrator.run_id}") |
632 | 640 | await orchestrator.run(data)
|
633 | 641 | end_time = default_timer()
|
634 | 642 | logger.debug(
|
635 |
| - f"Pipeline {orchestrator.run_id} finished in {end_time - start_time}s" |
| 643 | + f"PIPELINE FINISHED {orchestrator.run_id} in {end_time - start_time}s" |
636 | 644 | )
|
637 | 645 | return PipelineResult(
|
638 | 646 | run_id=orchestrator.run_id,
|
|
0 commit comments