diff --git a/poetry.lock b/poetry.lock index a31b64734662..7c97426f9cb2 100644 --- a/poetry.lock +++ b/poetry.lock @@ -8259,6 +8259,20 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] +[[package]] +name = "pytest-flakefinder" +version = "1.1.0" +description = "Runs tests multiple times to expose flakiness." +optional = false +python-versions = ">=3.5" +files = [ + {file = "pytest-flakefinder-1.1.0.tar.gz", hash = "sha256:e2412a1920bdb8e7908783b20b3d57e9dad590cc39a93e8596ffdd493b403e0e"}, + {file = "pytest_flakefinder-1.1.0-py2.py3-none-any.whl", hash = "sha256:741e0e8eea427052f5b8c89c2b3c3019a50c39a59ce4df6a305a2c2d9ba2bd13"}, +] + +[package.dependencies] +pytest = ">=2.7.1" + [[package]] name = "pytest-instafail" version = "0.5.0" @@ -11971,4 +11985,4 @@ local = ["ctransformers", "llama-cpp-python", "sentence-transformers"] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "b5bc8889f729816d67f91ac094e93356a99809d9dc9e2dd994469a37ba122447" +content-hash = "0a8488e090f11cb87d8eecc8672afa2d4eeb31f5dc69d131c7a1c1b69ebc74bd" diff --git a/pyproject.toml b/pyproject.toml index 759eef8ac978..5c464eeea783 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -136,6 +136,7 @@ pre-commit = "^3.7.0" vulture = "^2.11" dictdiffer = "^0.9.0" pytest-split = "^0.9.0" +pytest-flakefinder = "^1.1.0" [tool.poetry.extras] deploy = ["celery", "redis", "flower"] @@ -162,6 +163,8 @@ testpaths = ["tests", "integration"] console_output_style = "progress" filterwarnings = ["ignore::DeprecationWarning", "ignore::ResourceWarning"] log_cli = true +log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)" +log_cli_date_format = "%Y-%m-%d %H:%M:%S" markers = ["async_test", "api_key_required"] diff --git a/src/backend/base/langflow/custom/custom_component/component.py b/src/backend/base/langflow/custom/custom_component/component.py index 3f38ff45ec74..daa8f9cccac1 100644 --- a/src/backend/base/langflow/custom/custom_component/component.py +++ b/src/backend/base/langflow/custom/custom_component/component.py @@ -211,6 +211,13 @@ def get_output(self, name: str) -> Any: return self._outputs[name] raise ValueError(f"Output {name} not found in {self.__class__.__name__}") + def set_on_output(self, name: str, **kwargs): + output = self.get_output(name) + for key, value in kwargs.items(): + if not hasattr(output, key): + raise ValueError(f"Output {name} does not have a method {key}") + setattr(output, key, value) + def set_output_value(self, name: str, value: Any): if name in self._outputs: self._outputs[name].value = value @@ -377,6 +384,8 @@ def __getattr__(self, name: str) -> Any: return self.__dict__["_attributes"][name] if "_inputs" in self.__dict__ and name in self.__dict__["_inputs"]: return self.__dict__["_inputs"][name].value + if "_outputs" in self.__dict__ and name in self.__dict__["_outputs"]: + return self.__dict__["_outputs"][name] if name in BACKWARDS_COMPATIBLE_ATTRIBUTES: return self.__dict__[f"_{name}"] if name.startswith("_") and name[1:] in BACKWARDS_COMPATIBLE_ATTRIBUTES: @@ -493,6 +502,7 @@ def _set_outputs(self, outputs: List[dict]): self.outputs = [Output(**output) for output in outputs] for output in self.outputs: setattr(self, output.name, output) + self._outputs[output.name] = output def get_trace_as_inputs(self): predefined_inputs = { @@ -532,8 +542,6 @@ async def _build_results(self): _results = {} _artifacts = {} if hasattr(self, "outputs"): - if self._vertex: - self._set_outputs(self._vertex.outputs) for output in self.outputs: # Build the output if it's connected to some other vertex # or if it's not connected to any vertex @@ -547,6 +555,7 @@ async def _build_results(self): method: Callable = getattr(self, output.method) if output.cache and output.value != UNDEFINED: _results[output.name] = output.value + result = output.value else: result = method() # If the method is asynchronous, we need to await it @@ -561,33 +570,33 @@ async def _build_results(self): result.set_flow_id(self._vertex.graph.flow_id) _results[output.name] = result output.value = result - custom_repr = self.custom_repr() - if custom_repr is None and isinstance(result, (dict, Data, str)): - custom_repr = result - if not isinstance(custom_repr, str): - custom_repr = str(custom_repr) - raw = result - if self.status is None: - artifact_value = raw - else: - artifact_value = self.status - raw = self.status - - if hasattr(raw, "data") and raw is not None: - raw = raw.data - if raw is None: - raw = custom_repr - - elif hasattr(raw, "model_dump") and raw is not None: - raw = raw.model_dump() - if raw is None and isinstance(result, (dict, Data, str)): - raw = result.data if isinstance(result, Data) else result - artifact_type = get_artifact_type(artifact_value, result) - raw, artifact_type = post_process_raw(raw, artifact_type) - artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} - _artifacts[output.name] = artifact - self._output_logs[output.name] = self._logs - self._logs = [] + custom_repr = self.custom_repr() + if custom_repr is None and isinstance(result, (dict, Data, str)): + custom_repr = result + if not isinstance(custom_repr, str): + custom_repr = str(custom_repr) + raw = result + if self.status is None: + artifact_value = raw + else: + artifact_value = self.status + raw = self.status + + if hasattr(raw, "data") and raw is not None: + raw = raw.data + if raw is None: + raw = custom_repr + + elif hasattr(raw, "model_dump") and raw is not None: + raw = raw.model_dump() + if raw is None and isinstance(result, (dict, Data, str)): + raw = result.data if isinstance(result, Data) else result + artifact_type = get_artifact_type(artifact_value, result) + raw, artifact_type = post_process_raw(raw, artifact_type) + artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} + _artifacts[output.name] = artifact + self._output_logs[output.name] = self._logs + self._logs = [] self._artifacts = _artifacts self._results = _results if self._tracing_service: diff --git a/src/backend/base/langflow/graph/edge/base.py b/src/backend/base/langflow/graph/edge/base.py index 80aa5519bb6a..219f2f285454 100644 --- a/src/backend/base/langflow/graph/edge/base.py +++ b/src/backend/base/langflow/graph/edge/base.py @@ -17,6 +17,7 @@ def __init__(self, source: "Vertex", target: "Vertex", edge: EdgeData): self.target_param: str | None = None self._target_handle: TargetHandleDict | str | None = None self._data = edge.copy() + self.is_cycle = False if data := edge.get("data", {}): self._source_handle = data.get("sourceHandle", {}) self._target_handle = cast(TargetHandleDict, data.get("targetHandle", {})) @@ -159,10 +160,11 @@ def _legacy_validate_edge(self, source, target) -> None: raise ValueError(f"Edge between {source.vertex_type} and {target.vertex_type} " f"has no matched type") def __repr__(self) -> str: - return ( - f"Edge(source={self.source_id}, target={self.target_id}, target_param={self.target_param}" - f", matched_type={self.matched_type})" - ) + if (hasattr(self, "source_handle") and self.source_handle) and ( + hasattr(self, "target_handle") and self.target_handle + ): + return f"{self.source_id} -[{self.source_handle.name}->{self.target_handle.field_name}]-> {self.target_id}" + return f"{self.source_id} -[{self.target_param}]-> {self.target_id}" def __hash__(self) -> int: return hash(self.__repr__()) @@ -176,12 +178,16 @@ def __eq__(self, __o: object) -> bool: and self.target_param == __o.target_param ) + def __str__(self) -> str: + return self.__repr__() + -class ContractEdge(Edge): +class CycleEdge(Edge): def __init__(self, source: "Vertex", target: "Vertex", raw_edge: EdgeData): super().__init__(source, target, raw_edge) self.is_fulfilled = False # Whether the contract has been fulfilled. self.result: Any = None + self.is_cycle = True async def honor(self, source: "Vertex", target: "Vertex") -> None: """ @@ -220,10 +226,3 @@ async def get_result_from_source(self, source: "Vertex", target: "Vertex"): if target.params.get("message") == "": return self.result return self.result - - def __repr__(self) -> str: - if (hasattr(self, "source_handle") and self.source_handle) and ( - hasattr(self, "target_handle") and self.target_handle - ): - return f"{self.source_id} -[{self.source_handle.name}->{self.target_handle.field_name}]-> {self.target_id}" - return f"{self.source_id} -[{self.target_param}]-> {self.target_id}" diff --git a/src/backend/base/langflow/graph/graph/base.py b/src/backend/base/langflow/graph/graph/base.py index 9a93b60534ee..2e9d212568b0 100644 --- a/src/backend/base/langflow/graph/graph/base.py +++ b/src/backend/base/langflow/graph/graph/base.py @@ -7,20 +7,27 @@ from datetime import datetime, timezone from functools import partial from itertools import chain -from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, Dict, Generator, Iterable, List, Optional, Set, Tuple, Type, Union, cast import nest_asyncio from loguru import logger from langflow.exceptions.component import ComponentBuildException -from langflow.graph.edge.base import ContractEdge +from langflow.graph.edge.base import CycleEdge, Edge from langflow.graph.edge.schema import EdgeData from langflow.graph.graph.constants import Finish, lazy_load_vertex_dict from langflow.graph.graph.runnable_vertices_manager import RunnableVerticesManager from langflow.graph.graph.schema import GraphData, GraphDump, VertexBuildResult from langflow.graph.graph.state_manager import GraphStateManager from langflow.graph.graph.state_model import create_state_model_from_graph -from langflow.graph.graph.utils import find_start_component_id, process_flow, sort_up_to_vertex +from langflow.graph.graph.utils import ( + find_all_cycle_edges, + find_start_component_id, + has_cycle, + process_flow, + should_continue, + sort_up_to_vertex, +) from langflow.graph.schema import InterfaceComponentTypes, RunOutputs from langflow.graph.vertex.base import Vertex, VertexStates from langflow.graph.vertex.schema import NodeData @@ -84,7 +91,7 @@ def __init__( self.vertices_to_run: set[str] = set() self.stop_vertex: Optional[str] = None self.inactive_vertices: set = set() - self.edges: List[ContractEdge] = [] + self.edges: List[CycleEdge] = [] self.vertices: List[Vertex] = [] self.run_manager = RunnableVerticesManager() self.state_manager = GraphStateManager() @@ -100,6 +107,8 @@ def __init__( self._first_layer: List[str] = [] self._lock = asyncio.Lock() self.raw_graph_data: GraphData = {"nodes": [], "edges": []} + self._is_cyclic: Optional[bool] = None + self._cycles: Optional[List[tuple[str, str]]] = None try: self.tracing_service: "TracingService" | None = get_tracing_service() except Exception as exc: @@ -107,7 +116,7 @@ def __init__( self.tracing_service = None if start is not None and end is not None: self._set_start_and_end(start, end) - self.prepare() + self.prepare(start_component_id=start._id) if (start is not None and end is None) or (start is None and end is not None): raise ValueError("You must provide both input and output components") @@ -247,7 +256,7 @@ def add_component_edge(self, source_id: str, output_input_tuple: Tuple[str, str] } self._add_edge(edge_data) - async def async_start(self, inputs: Optional[List[dict]] = None): + async def async_start(self, inputs: Optional[List[dict]] = None, max_iterations: Optional[int] = None): if not self._prepared: raise ValueError("Graph not prepared. Call prepare() first.") # The idea is for this to return a generator that yields the result of @@ -256,17 +265,34 @@ async def async_start(self, inputs: Optional[List[dict]] = None): for key, value in _input.items(): vertex = self.get_vertex(key) vertex.set_input_value(key, value) - while True: + # I want to keep a counter of how many tyimes result.vertex.id + # has been yielded + yielded_counts: dict[str, int] = defaultdict(int) + + while should_continue(yielded_counts, max_iterations): result = await self.astep() yield result + if hasattr(result, "vertex"): + yielded_counts[result.vertex.id] += 1 if isinstance(result, Finish): return - def start(self, inputs: Optional[List[dict]] = None) -> Generator: + raise ValueError("Max iterations reached") + + def _snapshot(self): + return { + "_run_queue": self._run_queue.copy(), + "_first_layer": self._first_layer.copy(), + "vertices_layers": copy.deepcopy(self.vertices_layers), + "vertices_to_run": copy.deepcopy(self.vertices_to_run), + "run_manager": copy.deepcopy(self.run_manager.to_dict()), + } + + def start(self, inputs: Optional[List[dict]] = None, max_iterations: Optional[int] = None) -> Generator: #! Change this ASAP nest_asyncio.apply() loop = asyncio.get_event_loop() - async_gen = self.async_start(inputs) + async_gen = self.async_start(inputs, max_iterations) async_gen_task = asyncio.ensure_future(async_gen.__anext__()) while True: @@ -428,6 +454,23 @@ def first_layer(self): raise ValueError("Graph not prepared. Call prepare() first.") return self._first_layer + @property + def is_cyclic(self): + """ + Check if the graph has any cycles. + + Returns: + bool: True if the graph has any cycles, False otherwise. + """ + if self._is_cyclic is None: + vertices = [vertex.id for vertex in self.vertices] + try: + edges = [(e["data"]["sourceHandle"]["id"], e["data"]["targetHandle"]["id"]) for e in self._edges] + except KeyError: + edges = [(e["source"], e["target"]) for e in self._edges] + self._is_cyclic = has_cycle(vertices, edges) + return self._is_cyclic + @property def run_id(self): """ @@ -721,7 +764,7 @@ def metadata(self): "flow_name": self.flow_name, } - def build_graph_maps(self, edges: Optional[List[ContractEdge]] = None, vertices: Optional[List["Vertex"]] = None): + def build_graph_maps(self, edges: Optional[List[CycleEdge]] = None, vertices: Optional[List["Vertex"]] = None): """ Builds the adjacency maps for the graph. """ @@ -757,16 +800,18 @@ def mark_vertex(self, vertex_id: str, state: str): if state == VertexStates.INACTIVE: self.run_manager.remove_from_predecessors(vertex_id) - def mark_branch(self, vertex_id: str, state: str, visited: Optional[set] = None, output_name: Optional[str] = None): + def _mark_branch( + self, vertex_id: str, state: str, visited: Optional[set] = None, output_name: Optional[str] = None + ): """Marks a branch of the graph.""" if visited is None: visited = set() + else: + self.mark_vertex(vertex_id, state) if vertex_id in visited: return visited.add(vertex_id) - self.mark_vertex(vertex_id, state) - for child_id in self.parent_child_map[vertex_id]: # Only child_id that have an edge with the vertex_id through the output_name # should be marked @@ -774,9 +819,17 @@ def mark_branch(self, vertex_id: str, state: str, visited: Optional[set] = None, edge = self.get_edge(vertex_id, child_id) if edge and edge.source_handle.name != output_name: continue - self.mark_branch(child_id, state) + self._mark_branch(child_id, state, visited) - def get_edge(self, source_id: str, target_id: str) -> Optional[ContractEdge]: + def mark_branch(self, vertex_id: str, state: str, output_name: Optional[str] = None): + self._mark_branch(vertex_id=vertex_id, state=state, output_name=output_name) + new_predecessor_map, _ = self.build_adjacency_maps(self.edges) + self.run_manager.update_run_state( + run_predecessors=new_predecessor_map, + vertices_to_run=self.vertices_to_run, + ) + + def get_edge(self, source_id: str, target_id: str) -> Optional[CycleEdge]: """Returns the edge between two vertices.""" for edge in self.edges: if edge.source_id == source_id and edge.target_id == target_id: @@ -1242,7 +1295,7 @@ def get_vertex_edges( vertex_id: str, is_target: Optional[bool] = None, is_source: Optional[bool] = None, - ) -> List[ContractEdge]: + ) -> List[CycleEdge]: """Returns a list of edges for a given vertex.""" # The idea here is to return the edges that have the vertex_id as source or target # or both @@ -1312,14 +1365,14 @@ async def process(self, fallback_to_env_vars: bool, start_component_id: Optional return self def find_next_runnable_vertices(self, vertex_id: str, vertex_successors_ids: List[str]) -> List[str]: - next_runnable_vertices = [] - for v_id in vertex_successors_ids: + next_runnable_vertices = set() + for v_id in sorted(vertex_successors_ids): if not self.is_vertex_runnable(v_id): - next_runnable_vertices.extend(self.find_runnable_predecessors_for_successor(v_id)) + next_runnable_vertices.update(self.find_runnable_predecessors_for_successor(v_id)) else: - next_runnable_vertices.append(v_id) + next_runnable_vertices.add(v_id) - return next_runnable_vertices + return list(next_runnable_vertices) async def get_next_runnable_vertices(self, lock: asyncio.Lock, vertex: "Vertex", cache: bool = True) -> List[str]: v_id = vertex.id @@ -1412,33 +1465,25 @@ def get_predecessors(self, vertex): """Returns the predecessors of a vertex.""" return [self.get_vertex(source_id) for source_id in self.predecessor_map.get(vertex.id, [])] - def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True): - # Recursively get the successors of the current vertex - # successors = vertex.successors - # if not successors: - # return [] - # successors_result = [] - # for successor in successors: - # # Just return a list of successors - # if recursive: - # next_successors = self.get_all_successors(successor) - # successors_result.extend(next_successors) - # successors_result.append(successor) - # return successors_result - # The above is the version without the flat parameter - # The below is the version with the flat parameter - # the flat parameter will define if each layer of successors - # becomes one list or if the result is a list of lists - # if flat is True, the result will be a list of vertices - # if flat is False, the result will be a list of lists of vertices - # each list will represent a layer of successors + def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True, visited=None): + if visited is None: + visited = set() + + # Prevent revisiting vertices to avoid infinite loops in cyclic graphs + if vertex in visited: + return [] + + visited.add(vertex) + successors = vertex.successors if not successors: return [] + successors_result = [] + for successor in successors: if recursive: - next_successors = self.get_all_successors(successor) + next_successors = self.get_all_successors(successor, recursive=recursive, flat=flat, visited=visited) if flat: successors_result.extend(next_successors) else: @@ -1447,6 +1492,10 @@ def get_all_successors(self, vertex: "Vertex", recursive=True, flat=True): successors_result.append(successor) else: successors_result.append([successor]) + + if not flat and successors_result: + return [successors] + successors_result + return successors_result def get_successors(self, vertex: "Vertex") -> List["Vertex"]: @@ -1473,21 +1522,31 @@ def get_vertex_neighbors(self, vertex: "Vertex") -> Dict["Vertex", int]: neighbors[neighbor] += 1 return neighbors - def _build_edges(self) -> List[ContractEdge]: + @property + def cycles(self): + if self._cycles is None: + if self._start is None: + self._cycles = [] + else: + entry_vertex = self._start._id + edges = [(e["data"]["sourceHandle"]["id"], e["data"]["targetHandle"]["id"]) for e in self._edges] + self._cycles = find_all_cycle_edges(entry_vertex, edges) + return self._cycles + + def _build_edges(self) -> List[CycleEdge]: """Builds the edges of the graph.""" # Edge takes two vertices as arguments, so we need to build the vertices first # and then build the edges # if we can't find a vertex, we raise an error - - edges: set[ContractEdge] = set() + edges: Set[CycleEdge | Edge] = set() for edge in self._edges: new_edge = self.build_edge(edge) edges.add(new_edge) if self.vertices and not edges: warnings.warn("Graph has vertices but no edges") - return list(edges) + return list(cast(Iterable[CycleEdge], edges)) - def build_edge(self, edge: EdgeData) -> ContractEdge: + def build_edge(self, edge: EdgeData) -> CycleEdge | Edge: source = self.get_vertex(edge["source"]) target = self.get_vertex(edge["target"]) @@ -1495,7 +1554,10 @@ def build_edge(self, edge: EdgeData) -> ContractEdge: raise ValueError(f"Source vertex {edge['source']} not found") if target is None: raise ValueError(f"Target vertex {edge['target']} not found") - new_edge = ContractEdge(source, target, edge) + if (source.id, target.id) in self.cycles: + new_edge: CycleEdge | Edge = CycleEdge(source, target, edge) + else: + new_edge = Edge(source, target, edge) return new_edge def _get_vertex_class(self, node_type: str, node_base_type: str, node_id: str) -> Type["Vertex"]: @@ -1545,7 +1607,6 @@ def prepare(self, stop_component_id: Optional[str] = None, start_component_id: O if stop_component_id and start_component_id: raise ValueError("You can only provide one of stop_component_id or start_component_id") self.validate_stream() - self.edges = self._build_edges() if stop_component_id or start_component_id: try: @@ -1558,8 +1619,8 @@ def prepare(self, stop_component_id: Optional[str] = None, start_component_id: O for vertex_id in first_layer: self.run_manager.add_to_vertices_being_run(vertex_id) - self._first_layer = first_layer - self._run_queue = deque(first_layer) + self._first_layer = sorted(first_layer) + self._run_queue = deque(self._first_layer) self._prepared = True return self @@ -1594,12 +1655,25 @@ def layered_topological_sort( """Performs a layered topological sort of the vertices in the graph.""" vertices_ids = {vertex.id for vertex in vertices} # Queue for vertices with no incoming edges - queue = deque( - vertex.id - for vertex in vertices - # if filter_graphs then only vertex.is_input will be considered - if self.in_degree_map[vertex.id] == 0 and (not filter_graphs or vertex.is_input) - ) + in_degree_map = self.in_degree_map.copy() + if self.is_cyclic and all(in_degree_map.values()): + # This means we have a cycle because all vertex have in_degree_map > 0 + # because of this we set the queue to start on the ._start if it exists + if self._start is not None: + queue = deque([self._start._id]) + else: + # Find the chat input component + chat_input = find_start_component_id(vertices_ids) + if chat_input is None: + raise ValueError("No input component found and no start component provided") + queue = deque([chat_input]) + else: + queue = deque( + vertex.id + for vertex in vertices + # if filter_graphs then only vertex.is_input will be considered + if in_degree_map[vertex.id] == 0 and (not filter_graphs or vertex.is_input) + ) layers: List[List[str]] = [] visited = set(queue) @@ -1620,13 +1694,13 @@ def layered_topological_sort( if neighbor not in vertices_ids: continue - self.in_degree_map[neighbor] -= 1 # 'remove' edge - if self.in_degree_map[neighbor] == 0 and neighbor not in visited: + in_degree_map[neighbor] -= 1 # 'remove' edge + if in_degree_map[neighbor] == 0 and neighbor not in visited: queue.append(neighbor) # if > 0 it might mean not all predecessors have added to the queue # so we should process the neighbors predecessors - elif self.in_degree_map[neighbor] > 0: + elif in_degree_map[neighbor] > 0: for predecessor in self.predecessor_map[neighbor]: if predecessor not in queue and predecessor not in visited: queue.append(predecessor) @@ -1873,13 +1947,16 @@ def get_top_level_vertices(self, vertices_ids): top_level_vertices.append(vertex_id) return top_level_vertices - def build_in_degree(self, edges: List[ContractEdge]) -> Dict[str, int]: + def build_in_degree(self, edges: List[CycleEdge]) -> Dict[str, int]: in_degree: Dict[str, int] = defaultdict(int) for edge in edges: in_degree[edge.target_id] += 1 + for vertex in self.vertices: + if vertex.id not in in_degree: + in_degree[vertex.id] = 0 return in_degree - def build_adjacency_maps(self, edges: List[ContractEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]: + def build_adjacency_maps(self, edges: List[CycleEdge]) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]: """Returns the adjacency maps for the graph.""" predecessor_map: dict[str, list[str]] = defaultdict(list) successor_map: dict[str, list[str]] = defaultdict(list) diff --git a/src/backend/base/langflow/graph/graph/utils.py b/src/backend/base/langflow/graph/graph/utils.py index 5500f210c887..21e00f696df4 100644 --- a/src/backend/base/langflow/graph/graph/utils.py +++ b/src/backend/base/langflow/graph/graph/utils.py @@ -1,5 +1,5 @@ import copy -from collections import deque +from collections import defaultdict, deque from typing import Dict, List PRIORITY_LIST_OF_INPUTS = ["webhook", "chat"] @@ -241,8 +241,12 @@ def get_updated_edges(base_flow, g_nodes, g_edges, group_node_id): def get_successors(graph: Dict[str, Dict[str, List[str]]], vertex_id: str) -> List[str]: successors_result = [] stack = [vertex_id] + visited = set() while stack: current_id = stack.pop() + if current_id in visited: + continue + visited.add(current_id) successors_result.append(current_id) stack.extend(graph[current_id]["successors"]) return successors_result @@ -282,3 +286,127 @@ def sort_up_to_vertex(graph: Dict[str, Dict[str, List[str]]], vertex_id: str, is excluded.add(succ_id) return list(visited) + + +def has_cycle(vertex_ids: list[str], edges: list[tuple[str, str]]) -> bool: + """ + Determines whether a directed graph represented by a list of vertices and edges contains a cycle. + + Args: + vertex_ids (list[str]): A list of vertex IDs. + edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices. + + Returns: + bool: True if the graph contains a cycle, False otherwise. + """ + # Build the graph as an adjacency list + graph = defaultdict(list) + for u, v in edges: + graph[u].append(v) + + # Utility function to perform DFS + def dfs(v, visited, rec_stack): + visited.add(v) + rec_stack.add(v) + + for neighbor in graph[v]: + if neighbor not in visited: + if dfs(neighbor, visited, rec_stack): + return True + elif neighbor in rec_stack: + return True + + rec_stack.remove(v) + return False + + visited: set[str] = set() + rec_stack: set[str] = set() + + for vertex in vertex_ids: + if vertex not in visited: + if dfs(vertex, visited, rec_stack): + return True + + return False + + +def find_cycle_edge(entry_point: str, edges: list[tuple[str, str]]) -> tuple[str, str]: + """ + Find the edge that causes a cycle in a directed graph starting from a given entry point. + + Args: + entry_point (str): The vertex ID from which to start the search. + edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices. + + Returns: + tuple[str, str]: A tuple representing the edge that causes a cycle, or None if no cycle is found. + """ + # Build the graph as an adjacency list + graph = defaultdict(list) + for u, v in edges: + graph[u].append(v) + + # Utility function to perform DFS + def dfs(v, visited, rec_stack): + visited.add(v) + rec_stack.add(v) + + for neighbor in graph[v]: + if neighbor not in visited: + result = dfs(neighbor, visited, rec_stack) + if result: + return result + elif neighbor in rec_stack: + return (v, neighbor) # This edge causes the cycle + + rec_stack.remove(v) + return None + + visited: set[str] = set() + rec_stack: set[str] = set() + + return dfs(entry_point, visited, rec_stack) + + +def find_all_cycle_edges(entry_point: str, edges: list[tuple[str, str]]) -> list[tuple[str, str]]: + """ + Find all edges that cause cycles in a directed graph starting from a given entry point. + + Args: + entry_point (str): The vertex ID from which to start the search. + edges (list[tuple[str, str]]): A list of tuples representing directed edges between vertices. + + Returns: + list[tuple[str, str]]: A list of tuples representing edges that cause cycles. + """ + # Build the graph as an adjacency list + graph = defaultdict(list) + for u, v in edges: + graph[u].append(v) + + # Utility function to perform DFS + def dfs(v, visited, rec_stack): + visited.add(v) + rec_stack.add(v) + + cycle_edges = [] + + for neighbor in graph[v]: + if neighbor not in visited: + cycle_edges += dfs(neighbor, visited, rec_stack) + elif neighbor in rec_stack: + cycle_edges.append((v, neighbor)) # This edge causes a cycle + + rec_stack.remove(v) + return cycle_edges + + visited: set[str] = set() + rec_stack: set[str] = set() + + return dfs(entry_point, visited, rec_stack) + + +def should_continue(yielded_counts: dict[str, int], max_iterations: int | None) -> bool: + if max_iterations is None: + return True + return max(yielded_counts.values(), default=0) <= max_iterations diff --git a/src/backend/base/langflow/graph/vertex/base.py b/src/backend/base/langflow/graph/vertex/base.py index 0b7941734b48..b0df08ee923c 100644 --- a/src/backend/base/langflow/graph/vertex/base.py +++ b/src/backend/base/langflow/graph/vertex/base.py @@ -28,7 +28,7 @@ if TYPE_CHECKING: from langflow.custom import Component - from langflow.graph.edge.base import ContractEdge + from langflow.graph.edge.base import CycleEdge, Edge from langflow.graph.graph.base import Graph @@ -162,15 +162,15 @@ def set_artifacts(self) -> None: pass @property - def edges(self) -> List["ContractEdge"]: + def edges(self) -> List["CycleEdge"]: return self.graph.get_vertex_edges(self.id) @property - def outgoing_edges(self) -> List["ContractEdge"]: + def outgoing_edges(self) -> List["CycleEdge"]: return [edge for edge in self.edges if edge.source_id == self.id] @property - def incoming_edges(self) -> List["ContractEdge"]: + def incoming_edges(self) -> List["CycleEdge"]: return [edge for edge in self.edges if edge.target_id == self.id] @property @@ -250,6 +250,12 @@ def _parse_data(self) -> None: self.base_type = base_type break + def get_value_from_template_dict(self, key: str): + template_dict = self.data.get("node", {}).get("template", {}) + if key not in template_dict: + raise ValueError(f"Key {key} not found in template dict") + return template_dict.get(key, {}).get("value") + def get_task(self): # using the task_id, get the task from celery # and return it @@ -257,6 +263,31 @@ def get_task(self): return AsyncResult(self.task_id) + def _set_params_from_normal_edge(self, params: dict, edge: "Edge", template_dict: dict): + param_key = edge.target_param + + # If the param_key is in the template_dict and the edge.target_id is the current node + # We check this to make sure params with the same name but different target_id + # don't get overwritten + if param_key in template_dict and edge.target_id == self.id: + if template_dict[param_key].get("list"): + if param_key not in params: + params[param_key] = [] + params[param_key].append(self.graph.get_vertex(edge.source_id)) + elif edge.target_id == self.id: + if isinstance(template_dict[param_key].get("value"), dict): + # we don't know the key of the dict but we need to set the value + # to the vertex that is the source of the edge + param_dict = template_dict[param_key]["value"] + if not param_dict or len(param_dict) != 1: + params[param_key] = self.graph.get_vertex(edge.source_id) + else: + params[param_key] = {key: self.graph.get_vertex(edge.source_id) for key in param_dict.keys()} + + else: + params[param_key] = self.graph.get_vertex(edge.source_id) + return params + def _build_params(self): # sourcery skip: merge-list-append, remove-redundant-if # Some params are required, some are optional @@ -287,30 +318,7 @@ def _build_params(self): for edge in self.edges: if not hasattr(edge, "target_param"): continue - param_key = edge.target_param - - # If the param_key is in the template_dict and the edge.target_id is the current node - # We check this to make sure params with the same name but different target_id - # don't get overwritten - if param_key in template_dict and edge.target_id == self.id: - if template_dict[param_key].get("list"): - if param_key not in params: - params[param_key] = [] - params[param_key].append(self.graph.get_vertex(edge.source_id)) - elif edge.target_id == self.id: - if isinstance(template_dict[param_key].get("value"), dict): - # we don't know the key of the dict but we need to set the value - # to the vertex that is the source of the edge - param_dict = template_dict[param_key]["value"] - if not param_dict or len(param_dict) != 1: - params[param_key] = self.graph.get_vertex(edge.source_id) - else: - params[param_key] = { - key: self.graph.get_vertex(edge.source_id) for key in param_dict.keys() - } - - else: - params[param_key] = self.graph.get_vertex(edge.source_id) + params = self._set_params_from_normal_edge(params, edge, template_dict) load_from_db_fields = [] for field_name, field in template_dict.items(): @@ -598,11 +606,13 @@ async def _get_result(self, requester: "Vertex") -> Any: """ flow_id = self.graph.flow_id if not self._built: - asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="error")) + if flow_id: + asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="error")) raise ValueError(f"Component {self.display_name} has not been built yet") result = self._built_result if self.use_result else self._built_object - asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="success")) + if flow_id: + asyncio.create_task(log_transaction(str(flow_id), source=self, target=requester, status="success")) return result async def _build_vertex_and_update_params(self, key, vertex: "Vertex"): @@ -798,7 +808,7 @@ async def get_requester_result(self, requester: Optional["Vertex"]): else await requester_edge.get_result_from_source(source=self, target=requester) ) - def add_edge(self, edge: "ContractEdge") -> None: + def add_edge(self, edge: "CycleEdge") -> None: if edge not in self.edges: self.edges.append(edge) diff --git a/src/backend/base/langflow/graph/vertex/types.py b/src/backend/base/langflow/graph/vertex/types.py index 10882b496204..87927c3e3814 100644 --- a/src/backend/base/langflow/graph/vertex/types.py +++ b/src/backend/base/langflow/graph/vertex/types.py @@ -1,13 +1,13 @@ import asyncio import json -from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Generator, Iterator, List +from typing import TYPE_CHECKING, Any, AsyncIterator, Dict, Generator, Iterator, List, cast import yaml from langchain_core.messages import AIMessage, AIMessageChunk from loguru import logger from langflow.graph.schema import CHAT_COMPONENTS, RECORDS_COMPONENTS, InterfaceComponentTypes, ResultData -from langflow.graph.utils import UnbuiltObject, log_transaction, serialize_field +from langflow.graph.utils import UnbuiltObject, log_transaction, log_vertex_build, serialize_field from langflow.graph.vertex.base import Vertex from langflow.graph.vertex.schema import NodeData from langflow.inputs.inputs import InputTypes @@ -15,13 +15,12 @@ from langflow.schema.artifact import ArtifactType from langflow.schema.message import Message from langflow.schema.schema import INPUT_FIELD_NAME -from langflow.graph.utils import log_vertex_build from langflow.template.field.base import UNDEFINED, Output from langflow.utils.schemas import ChatOutputResponse, DataOutputResponse from langflow.utils.util import unescape_string if TYPE_CHECKING: - from langflow.graph.edge.base import ContractEdge + from langflow.graph.edge.base import CycleEdge class CustomComponentVertex(Vertex): @@ -70,7 +69,7 @@ def _update_built_object_and_artifacts(self, result): for key, value in self._built_object.items(): self.add_result(key, value) - def get_edge_with_target(self, target_id: str) -> Generator["ContractEdge", None, None]: + def get_edge_with_target(self, target_id: str) -> Generator["CycleEdge", None, None]: """ Get the edge with the target id. @@ -93,10 +92,19 @@ async def _get_result(self, requester: "Vertex") -> Any: Returns: The built result if use_result is True, else the built object. """ + flow_id = self.graph.flow_id if not self._built: - asyncio.create_task( - log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="error") - ) + if flow_id: + asyncio.create_task( + log_transaction(source=self, target=requester, flow_id=str(flow_id), status="error") + ) + for edge in self.get_edge_with_target(requester.id): + # We need to check if the edge is a normal edge + # or a contract edge + + if edge.is_cycle and edge.target_param: + return requester.get_value_from_template_dict(edge.target_param) + raise ValueError(f"Component {self.display_name} has not been built yet") if requester is None: @@ -104,10 +112,15 @@ async def _get_result(self, requester: "Vertex") -> Any: edges = self.get_edge_with_target(requester.id) result = UNDEFINED - edge = None for edge in edges: if edge is not None and edge.source_handle.name in self.results: - result = self.results[edge.source_handle.name] + # Get the result from the output instead of the results dict + output = self.get_output(edge.source_handle.name) + if output.value is UNDEFINED: + result = self.results[edge.source_handle.name] + else: + result = cast(Any, output.value) + # result = self.results[edge.source_handle.name] break if result is UNDEFINED: if edge is None: @@ -115,10 +128,9 @@ async def _get_result(self, requester: "Vertex") -> Any: elif edge.source_handle.name not in self.results: raise ValueError(f"Result not found for {edge.source_handle.name}. Results: {self.results}") else: - raise ValueError(f"Result not found for {edge.source_handle.name}") - asyncio.create_task( - log_transaction(source=self, target=requester, flow_id=str(self.graph.flow_id), status="success") - ) + raise ValueError(f"Result not found for {edge.source_handle.name} in {edge}") + if flow_id: + asyncio.create_task(log_transaction(source=self, target=requester, flow_id=str(flow_id), status="success")) return result def extract_messages_from_artifacts(self, artifacts: Dict[str, Any]) -> List[dict]: diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json b/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json index 6794b116d2b9..c3dd09702212 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Basic Prompting (Hello, World).json @@ -111,7 +111,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -326,7 +326,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -446,7 +446,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -647,7 +647,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -658,7 +658,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json b/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json index def5b5eb9344..e8b9a1dcf56b 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Blog Writer.json @@ -159,7 +159,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "fetch_content", "name": "data", @@ -259,7 +259,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "parse_data", "name": "text", @@ -395,7 +395,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -634,7 +634,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -835,7 +835,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -846,7 +846,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json index dea218fc079c..847ef6bef18b 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Complex Agent.json @@ -632,7 +632,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Output", "method": "build_output", "name": "output", @@ -877,7 +877,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -888,7 +888,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -1167,7 +1167,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -1352,7 +1352,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Task", "method": "build_task", "name": "task_output", @@ -1491,7 +1491,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -1745,7 +1745,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -2004,7 +2004,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -2015,7 +2015,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -2299,7 +2299,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -2412,7 +2412,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -2619,7 +2619,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Tool", "method": "build_tool", "name": "tool", @@ -2704,7 +2704,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -2715,7 +2715,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -3124,7 +3124,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -3135,7 +3135,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -3569,7 +3569,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -3580,7 +3580,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -3866,7 +3866,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -4031,7 +4031,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "run_model", "name": "api_run_model", @@ -4043,7 +4043,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Tool", "method": "build_tool", "name": "api_build_tool", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json index 5b491ca631f5..894704e672a3 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Document QA.json @@ -164,7 +164,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -308,7 +308,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -524,7 +524,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -725,7 +725,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -736,7 +736,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -1012,7 +1012,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "parse_data", "name": "text", @@ -1144,7 +1144,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "load_file", "name": "data", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json index c8433504f339..9cc4d761ea40 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Hierarchical Agent.json @@ -329,7 +329,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Output", "method": "build_output", "name": "output", @@ -577,7 +577,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -588,7 +588,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -871,7 +871,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -1061,7 +1061,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Task", "method": "build_task", "name": "task_output", @@ -1205,7 +1205,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -1463,7 +1463,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -1724,7 +1724,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -1735,7 +1735,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -2023,7 +2023,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -2140,7 +2140,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -2359,7 +2359,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -2609,7 +2609,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "run_model", "name": "api_run_model", @@ -2621,7 +2621,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Tool", "method": "build_tool", "name": "api_build_tool", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json index 05b1bf5768ea..114cb2ecbd43 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Memory Chatbot.json @@ -138,7 +138,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -282,7 +282,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -505,7 +505,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -516,7 +516,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -795,7 +795,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -987,7 +987,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Messages (Data)", "method": "retrieve_messages", "name": "messages", @@ -998,7 +998,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Messages (Text)", "method": "retrieve_messages_as_text", "name": "messages_text", @@ -1009,7 +1009,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Memory", "method": "build_lc_memory", "name": "lc_memory", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json b/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json index e73aad988037..ee92c94e5c42 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Sequential Agent.json @@ -477,7 +477,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Output", "method": "build_output", "name": "output", @@ -672,7 +672,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -927,7 +927,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Task", "method": "build_task", "name": "task_output", @@ -1118,7 +1118,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Task", "method": "build_task", "name": "task_output", @@ -1309,7 +1309,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Task", "method": "build_task", "name": "task_output", @@ -1504,7 +1504,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -1763,7 +1763,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Agent", "method": "build_output", "name": "output", @@ -2023,7 +2023,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -2034,7 +2034,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", @@ -2313,7 +2313,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -2593,7 +2593,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -2712,7 +2712,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -2831,7 +2831,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -2947,7 +2947,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "run_model", "name": "api_run_model", @@ -2959,7 +2959,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Tool", "method": "build_tool", "name": "api_build_tool", diff --git a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json index 6378998e19c7..6328858c74a7 100644 --- a/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json +++ b/src/backend/base/langflow/initial_setup/starter_projects/Vector Store RAG.json @@ -296,7 +296,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -528,7 +528,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Retriever", "method": "build_base_retriever", "name": "base_retriever", @@ -539,7 +539,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Search Results", "method": "search_documents", "name": "search_results", @@ -550,7 +550,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Vector Store", "method": "cast_vector_store", "name": "vector_store", @@ -979,7 +979,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "parse_data", "name": "text", @@ -1115,7 +1115,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Prompt Message", "method": "build_prompt", "name": "prompt", @@ -1259,7 +1259,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Message", "method": "message_response", "name": "message", @@ -1450,7 +1450,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Chunks", "method": "split_text", "name": "chunks", @@ -1594,7 +1594,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Data", "method": "load_file", "name": "data", @@ -1742,7 +1742,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Retriever", "method": "build_base_retriever", "name": "base_retriever", @@ -1753,7 +1753,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Search Results", "method": "search_documents", "name": "search_results", @@ -1764,7 +1764,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Vector Store", "method": "cast_vector_store", "name": "vector_store", @@ -2211,7 +2211,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Embeddings", "method": "build_embeddings", "name": "embeddings", @@ -2664,7 +2664,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Embeddings", "method": "build_embeddings", "name": "embeddings", @@ -3109,7 +3109,7 @@ "output_types": [], "outputs": [ { - "cache": true, + "cache": false, "display_name": "Text", "method": "text_response", "name": "text_output", @@ -3120,7 +3120,7 @@ "value": "__UNDEFINED__" }, { - "cache": true, + "cache": false, "display_name": "Language Model", "method": "build_model", "name": "model_output", diff --git a/src/backend/base/langflow/template/field/base.py b/src/backend/base/langflow/template/field/base.py index 4a09c656befa..493a9e857b38 100644 --- a/src/backend/base/langflow/template/field/base.py +++ b/src/backend/base/langflow/template/field/base.py @@ -178,7 +178,7 @@ class Output(BaseModel): value: Optional[Any] = Field(default=UNDEFINED) - cache: bool = Field(default=True) + cache: bool = Field(default=False) def to_dict(self): return self.model_dump(by_alias=True, exclude_none=True) diff --git a/src/backend/base/poetry.lock b/src/backend/base/poetry.lock index 7b04d0b43198..8ec8168d98f8 100644 --- a/src/backend/base/poetry.lock +++ b/src/backend/base/poetry.lock @@ -5406,6 +5406,20 @@ pytest = ">=4.6" [package.extras] testing = ["fields", "hunter", "process-tests", "pytest-xdist", "virtualenv"] +[[package]] +name = "pytest-flakefinder" +version = "1.1.0" +description = "Runs tests multiple times to expose flakiness." +optional = false +python-versions = ">=3.5" +files = [ + {file = "pytest-flakefinder-1.1.0.tar.gz", hash = "sha256:e2412a1920bdb8e7908783b20b3d57e9dad590cc39a93e8596ffdd493b403e0e"}, + {file = "pytest_flakefinder-1.1.0-py2.py3-none-any.whl", hash = "sha256:741e0e8eea427052f5b8c89c2b3c3019a50c39a59ce4df6a305a2c2d9ba2bd13"}, +] + +[package.dependencies] +pytest = ">=2.7.1" + [[package]] name = "pytest-instafail" version = "0.5.0" @@ -7544,4 +7558,4 @@ local = [] [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "fe6710d7325bc2cceeaa298d94d6f1157cfe1533c2acbabe3ecdca5594d9e007" +content-hash = "12d207ab2531a201c153cfc0ec800ae9b43c47dd34506f19f7ed97e27c032149" diff --git a/src/backend/base/pyproject.toml b/src/backend/base/pyproject.toml index a8de78d9e01f..b5278c375b26 100644 --- a/src/backend/base/pyproject.toml +++ b/src/backend/base/pyproject.toml @@ -119,6 +119,7 @@ vulture = "^2.11" dictdiffer = "^0.9.0" pytest-split = "^0.9.0" devtools = "^0.12.2" +pytest-flakefinder = "^1.1.0" [tool.pytest.ini_options] diff --git a/src/backend/tests/test_schema.py b/src/backend/tests/test_schema.py index 1a2b69c347b0..71a9e5c5bfaa 100644 --- a/src/backend/tests/test_schema.py +++ b/src/backend/tests/test_schema.py @@ -69,7 +69,7 @@ def test_output_default(self): output_obj = Output(name="test_output") assert output_obj.name == "test_output" assert output_obj.value == UNDEFINED - assert output_obj.cache is True + assert output_obj.cache is False def test_output_add_types(self): output_obj = Output(name="test_output") @@ -87,7 +87,7 @@ def test_output_to_dict(self): "types": [], "name": "test_output", "display_name": "test_output", - "cache": True, + "cache": False, "value": "__UNDEFINED__", } diff --git a/src/backend/tests/unit/graph/graph/test_cycles.py b/src/backend/tests/unit/graph/graph/test_cycles.py new file mode 100644 index 000000000000..1d5d61616709 --- /dev/null +++ b/src/backend/tests/unit/graph/graph/test_cycles.py @@ -0,0 +1,111 @@ +import pytest + +from langflow.components.inputs.ChatInput import ChatInput +from langflow.components.outputs.ChatOutput import ChatOutput +from langflow.components.outputs.TextOutput import TextOutputComponent +from langflow.components.prototypes.ConditionalRouter import ConditionalRouterComponent +from langflow.custom.custom_component.component import Component +from langflow.graph.graph.base import Graph +from langflow.io import MessageTextInput, Output +from langflow.schema.message import Message + + +@pytest.fixture +def client(): + pass + + +class Concatenate(Component): + display_name = "Concatenate" + description = "Concatenates two strings" + + inputs = [ + MessageTextInput(name="text", display_name="Text", required=True), + ] + outputs = [ + Output(display_name="Text", name="some_text", method="concatenate"), + ] + + def concatenate(self) -> Message: + return Message(text=f"{self.text}{self.text}" or "test") + + +def test_cycle_in_graph(): + chat_input = ChatInput(_id="chat_input") + router = ConditionalRouterComponent(_id="router") + chat_input.set(input_value=router.false_response) + concat_component = Concatenate(_id="concatenate") + concat_component.set(text=chat_input.message_response) + router.set( + input_text=chat_input.message_response, + match_text="testtesttesttest", + operator="equals", + message=concat_component.concatenate, + ) + text_output = TextOutputComponent(_id="text_output") + text_output.set(input_value=router.true_response) + chat_output = ChatOutput(_id="chat_output") + chat_output.set(input_value=text_output.text_response) + + graph = Graph(chat_input, chat_output) + assert graph.is_cyclic is True + + # Run queue should contain chat_input and not router + assert "chat_input" in graph._run_queue + assert "router" not in graph._run_queue + results = [] + max_iterations = 20 + snapshots = [graph._snapshot()] + for result in graph.start(max_iterations=max_iterations): + snapshots.append(graph._snapshot()) + results.append(result) + results_ids = [result.vertex.id for result in results if hasattr(result, "vertex")] + assert results_ids[-2:] == ["text_output", "chat_output"] + assert len(results_ids) > len(graph.vertices), snapshots + # Check that chat_output and text_output are the last vertices in the results + assert results_ids == [ + "chat_input", + "concatenate", + "router", + "chat_input", + "concatenate", + "router", + "chat_input", + "concatenate", + "router", + "chat_input", + "concatenate", + "router", + "text_output", + "chat_output", + ], f"Results: {results_ids}" + + +def test_cycle_in_graph_max_iterations(): + chat_input = ChatInput(_id="chat_input") + router = ConditionalRouterComponent(_id="router") + chat_input.set(input_value=router.false_response) + concat_component = Concatenate(_id="concatenate") + concat_component.set(text=chat_input.message_response) + router.set( + input_text=chat_input.message_response, + match_text="testtesttesttest", + operator="equals", + message=concat_component.concatenate, + ) + text_output = TextOutputComponent(_id="text_output") + text_output.set(input_value=router.true_response) + chat_output = ChatOutput(_id="chat_output") + chat_output.set(input_value=text_output.text_response) + + graph = Graph(chat_input, chat_output) + assert graph.is_cyclic is True + + # Run queue should contain chat_input and not router + assert "chat_input" in graph._run_queue + assert "router" not in graph._run_queue + results = [] + + with pytest.raises(ValueError, match="Max iterations reached"): + for result in graph.start(max_iterations=1): + results.append(result) diff --git a/src/backend/tests/unit/graph/graph/test_utils.py b/src/backend/tests/unit/graph/graph/test_utils.py index 53ccc0e40efb..5f211604c7e2 100644 --- a/src/backend/tests/unit/graph/graph/test_utils.py +++ b/src/backend/tests/unit/graph/graph/test_utils.py @@ -3,6 +3,11 @@ from langflow.graph.graph import utils +@pytest.fixture +def client(): + pass + + @pytest.fixture def graph(): return { @@ -120,3 +125,181 @@ def test_sort_up_to_vertex_invalid_vertex(graph): with pytest.raises(ValueError): utils.sort_up_to_vertex(graph, vertex_id) + + +def test_has_cycle(): + edges = [("A", "B"), ("B", "C"), ("C", "D"), ("D", "E"), ("E", "B")] + vertices = ["A", "B", "C", "D", "E"] + assert utils.has_cycle(vertices, edges) is True + + +class TestFindCycleEdge: + # Detects a cycle in a simple directed graph + def test_detects_cycle_in_simple_graph(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("C", "A") + + # Returns None when no cycle is present + def test_returns_none_when_no_cycle(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C")] + result = utils.find_cycle_edge(entry_point, edges) + assert result is None + + # Correctly identifies the first cycle encountered + def test_identifies_first_cycle(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A"), ("A", "D"), ("D", "E"), ("E", "A")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("C", "A") + + # Handles graphs with multiple edges between the same nodes + def test_multiple_edges_between_same_nodes(self): + entry_point = "A" + edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("C", "A") + + # Processes graphs with multiple disconnected components + def test_disconnected_components(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("D", "E"), ("E", "F"), ("F", "D")] + result = utils.find_cycle_edge(entry_point, edges) + assert result is None + + # Handles an empty list of edges + def test_empty_edges_list(self): + entry_point = "A" + edges = [] + result = utils.find_cycle_edge(entry_point, edges) + assert result is None + + # Manages a graph with a single node and no edges + def test_single_node_no_edges(self): + entry_point = "A" + edges = [] + result = utils.find_cycle_edge(entry_point, edges) + assert result is None + + # Detects cycles in graphs with self-loops + def test_self_loop_cycle(self): + entry_point = "A" + edges = [("A", "A")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("A", "A") + + # Handles graphs with multiple cycles + def test_multiple_cycles(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("C", "A") + + # Processes graphs with nodes having no outgoing edges + def test_nodes_with_no_outgoing_edges(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C")] + result = utils.find_cycle_edge(entry_point, edges) + assert result is None + + # Handles large graphs efficiently + def test_large_graph_efficiency(self): + entry_point = "0" + edges = [(str(i), str(i + 1)) for i in range(1000)] + [("999", "0")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("999", "0") + + # Manages graphs with duplicate edges + def test_duplicate_edges(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")] + result = utils.find_cycle_edge(entry_point, edges) + assert result == ("C", "A") + + +class TestFindAllCycleEdges: + # Detects cycles in a simple directed graph + def test_detects_cycles_in_simple_graph(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [("C", "A")] + + # Identifies multiple cycles in a complex graph + def test_identifies_multiple_cycles(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C"), ("C", "A"), ("B", "D"), ("D", "B")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert set(result) == {("C", "A"), ("D", "B")} + + # Returns an empty list when no cycles are present + def test_no_cycles_present(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [] + + # Handles graphs with a single node and no edges + def test_single_node_no_edges(self): + entry_point = "A" + edges = [] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [] + + # Processes graphs with disconnected components + def test_disconnected_components(self): + entry_point = "A" + edges = [("A", "B"), ("C", "D")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [] + + # Handles graphs with self-loops + def test_self_loops(self): + entry_point = "A" + edges = [("A", "A")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [("A", "A")] + + # Manages graphs with multiple edges between the same nodes + def test_multiple_edges_between_same_nodes(self): + entry_point = "A" + edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [("C", "A")] + + # Processes graphs with nodes having no outgoing edges + def test_nodes_with_no_outgoing_edges(self): + entry_point = "A" + edges = [("A", "B"), ("B", "C")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [] + + # Handles large graphs efficiently + def test_large_graphs_efficiency(self): + entry_point = "A" + edges = [(chr(65 + i), chr(65 + (i + 1) % 26)) for i in range(1000)] + result = utils.find_all_cycle_edges(entry_point, edges) + assert isinstance(result, list) + + # Manages graphs with nodes having no incoming edges + def test_nodes_with_no_incoming_edges(self): + entry_point = "A" + edges = [("B", "C"), ("C", "D")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [] + + # Handles graphs with mixed data types in edges + def test_mixed_data_types_in_edges(self): + entry_point = 1 + edges = [(1, 2), (2, 3), (3, 1)] + result = utils.find_all_cycle_edges(entry_point, edges) + assert result == [(3, 1)] + + # Processes graphs with duplicate edges + def test_duplicate_edges(self): + entry_point = "A" + edges = [("A", "B"), ("A", "B"), ("B", "C"), ("C", "A"), ("C", "A")] + result = utils.find_all_cycle_edges(entry_point, edges) + assert set(result) == {("C", "A")} diff --git a/src/backend/tests/unit/initial_setup/starter_projects/test_memory_chatbot.py b/src/backend/tests/unit/initial_setup/starter_projects/test_memory_chatbot.py index a5fe1b700314..5ccfa421bb50 100644 --- a/src/backend/tests/unit/initial_setup/starter_projects/test_memory_chatbot.py +++ b/src/backend/tests/unit/initial_setup/starter_projects/test_memory_chatbot.py @@ -30,23 +30,35 @@ def memory_chatbot_graph(): openai_component.set( input_value=prompt_component.build_prompt, max_tokens=100, temperature=0.1, api_key="test_api_key" ) - openai_component.get_output("text_output").value = "Mock response" + openai_component.set_on_output(name="text_output", value="Mock response", cache=True) chat_output = ChatOutput(_id="chat_output") chat_output.set(input_value=openai_component.text_response) graph = Graph(chat_input, chat_output) + assert graph.in_degree_map == {"chat_output": 1, "prompt": 2, "openai": 1, "chat_input": 0, "chat_memory": 0} return graph def test_memory_chatbot(memory_chatbot_graph): # Now we run step by step expected_order = deque(["chat_input", "chat_memory", "prompt", "openai", "chat_output"]) + assert memory_chatbot_graph.in_degree_map == { + "chat_output": 1, + "prompt": 2, + "openai": 1, + "chat_input": 0, + "chat_memory": 0, + } + assert memory_chatbot_graph.vertices_layers == [["prompt"], ["openai"], ["chat_output"]] + assert memory_chatbot_graph.first_layer == ["chat_input", "chat_memory"] + for step in expected_order: result = memory_chatbot_graph.step() if isinstance(result, Finish): break - assert step == result.vertex.id + + assert step == result.vertex.id, (memory_chatbot_graph.in_degree_map, memory_chatbot_graph.vertices_layers) def test_memory_chatbot_dump_structure(memory_chatbot_graph: Graph): diff --git a/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py b/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py index e216c0c567cc..2066fb07d22f 100644 --- a/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py +++ b/src/backend/tests/unit/initial_setup/starter_projects/test_vector_store_rag.py @@ -27,7 +27,7 @@ def ingestion_graph(): # Ingestion Graph file_component = FileComponent(_id="file-123") file_component.set(path="test.txt") - file_component.set_output_value("data", Data(text="This is a test file.")) + file_component.set_on_output(name="data", value=Data(text="This is a test file."), cache=True) text_splitter = SplitTextComponent(_id="text-splitter-123") text_splitter.set(data_inputs=file_component.load_file) openai_embeddings = OpenAIEmbeddingsComponent(_id="openai-embeddings-123") @@ -41,9 +41,9 @@ def ingestion_graph(): api_endpoint="https://astra.example.com", token="token", ) - vector_store.set_output_value("vector_store", "mock_vector_store") - vector_store.set_output_value("base_retriever", "mock_retriever") - vector_store.set_output_value("search_results", [Data(text="This is a test file.")]) + vector_store.set_on_output(name="vector_store", value="mock_vector_store", cache=True) + vector_store.set_on_output(name="base_retriever", value="mock_retriever", cache=True) + vector_store.set_on_output(name="search_results", value=[Data(text="This is a test file.")], cache=True) ingestion_graph = Graph(file_component, vector_store) return ingestion_graph @@ -63,10 +63,16 @@ def rag_graph(): embedding=openai_embeddings.build_embeddings, ) # Mock search_documents - rag_vector_store.get_output("search_results").value = [ - Data(data={"text": "Hello, world!"}), - Data(data={"text": "Goodbye, world!"}), - ] + rag_vector_store.set_on_output( + name="search_results", + value=[ + Data(data={"text": "Hello, world!"}), + Data(data={"text": "Goodbye, world!"}), + ], + cache=True, + ) + rag_vector_store.set_on_output(name="vector_store", value="mock_vector_store", cache=True) + rag_vector_store.set_on_output(name="base_retriever", value="mock_retriever", cache=True) parse_data = ParseDataComponent(_id="parse-data-123") parse_data.set(data=rag_vector_store.search_documents) prompt_component = PromptComponent(_id="prompt-123") @@ -82,7 +88,7 @@ def rag_graph(): openai_component = OpenAIModelComponent(_id="openai-123") openai_component.set(api_key="sk-123", openai_api_base="https://api.openai.com/v1") - openai_component.set_output_value("text_output", "Hello, world!") + openai_component.set_on_output(name="text_output", value="Hello, world!", cache=True) openai_component.set(input_value=prompt_component.build_prompt) chat_output = ChatOutput(_id="chatoutput-123")