Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improve data handling and text formatting (loop fix) #6771

Merged
merged 14 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/base/langflow/components/logic/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def evaluate_stop_loop(self) -> bool:
"""Evaluate whether to stop item or done output."""
current_index = self.ctx.get(f"{self._id}_index", 0)
data_length = len(self.ctx.get(f"{self._id}_data", []))
return current_index > max(data_length - 1, 0)
return current_index > data_length

def item_output(self) -> Data:
"""Output the next item in the list or stop if done."""
Expand Down
87 changes: 64 additions & 23 deletions src/backend/base/langflow/helpers/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections import defaultdict

from langchain_core.documents import Document

from langflow.schema import Data
Expand All @@ -17,18 +19,42 @@ def docs_to_data(documents: list[Document]) -> list[Data]:


def data_to_text_list(template: str, data: Data | list[Data]) -> tuple[list[str], list[Data]]:
r"""Formats `text` within Data objects based on a given template.
"""Format text from Data objects using a template string.

This function processes Data objects and formats their content using a template string.
It handles various data structures and ensures consistent text formatting across different
input types.

Converts a Data object or a list of Data objects into a tuple containing a list of formatted strings
and a list of Data objects based on a given template.
Key Features:
- Supports single Data object or list of Data objects
- Handles nested dictionaries and extracts text from various locations
- Uses safe string formatting with fallback for missing keys
- Preserves original Data objects in output

Args:
template (str): The format string template to be used for formatting the data.
data (Data | list[Data]): A single Data object or a list of Data objects to be formatted.
template: Format string with placeholders (e.g., "Hello {text}")
Placeholders are replaced with values from Data objects
data: Either a single Data object or a list of Data objects to format
Each object can contain text, dictionaries, or nested data

Returns:
tuple[list[str], list[Data]]: A tuple containing a list of formatted strings based on the
provided template and data, and a list of Data objects.
A tuple containing:
- List[str]: Formatted strings based on the template
- List[Data]: Original Data objects in the same order

Raises:
ValueError: If template is None
TypeError: If template is not a string

Examples:
>>> result = data_to_text_list("Hello {text}", Data(text="world"))
>>> assert result == (["Hello world"], [Data(text="world")])

>>> result = data_to_text_list(
... "{name} is {age}",
... Data(data={"name": "Alice", "age": 25})
... )
>>> assert result == (["Alice is 25"], [Data(data={"name": "Alice", "age": 25})])
"""
if data is None:
return [], []
Expand All @@ -41,22 +67,37 @@ def data_to_text_list(template: str, data: Data | list[Data]) -> tuple[list[str]
msg = f"Template must be a string, but got {type(template)}"
raise TypeError(msg)

if isinstance(data, (Data)):
data = [data]
# Check if there are any format strings in the template
data_ = [
# If it is not a record, create one with the key "text"
Data(text=value) if not isinstance(value, Data) else value
for value in data
]
formatted_text = []
for value in data_:
# Prevent conflict with 'data' keyword in template formatting
kwargs = value.data.copy()
data = kwargs.pop("data", value.data)
formatted_text.append(template.format(data=data, **kwargs))

return formatted_text, data_
formatted_text: list[str] = []
processed_data: list[Data] = []

data_list = [data] if isinstance(data, Data) else data

data_objects = [item if isinstance(item, Data) else Data(text=str(item)) for item in data_list]

for data_obj in data_objects:
format_dict = {}

if isinstance(data_obj.data, dict):
format_dict.update(data_obj.data)

if isinstance(data_obj.data.get("data"), dict):
format_dict.update(data_obj.data["data"])

elif "error" in format_dict:
format_dict["text"] = format_dict["error"]

format_dict["data"] = data_obj.data

safe_dict = defaultdict(str, format_dict)

try:
formatted_text.append(template.format_map(safe_dict))
processed_data.append(data_obj)
except ValueError as e:
msg = f"Error formatting template: {e!s}"
raise ValueError(msg) from e

return formatted_text, processed_data


def data_to_text(template: str, data: Data | list[Data], sep: str = "\n") -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@
"show": true,
"title_case": false,
"type": "code",
"value": "from langflow.custom import Component\nfrom langflow.io import DataInput, Output\nfrom langflow.schema import Data\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n DataInput(\n name=\"data\",\n display_name=\"Data\",\n info=\"The initial list of Data objects to iterate over.\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\"),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a list of Data objects or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > max(data_length - 1, 0)\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n return Data(text=\"\")\n\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n return current_item\n\n def done_output(self) -> Data:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n return self.ctx.get(f\"{self._id}_aggregated\", [])\n self.stop(\"done\")\n return Data(text=\"\")\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> Data:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n # Check if loop input is provided and append to aggregated list\n if self.item is not None and not isinstance(self.item, str) and len(aggregated) <= len(data_list):\n aggregated.append(self.item)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
"value": "from langflow.custom import Component\nfrom langflow.io import DataInput, Output\nfrom langflow.schema import Data\n\n\nclass LoopComponent(Component):\n display_name = \"Loop\"\n description = (\n \"Iterates over a list of Data objects, outputting one item at a time and aggregating results from loop inputs.\"\n )\n icon = \"infinity\"\n\n inputs = [\n DataInput(\n name=\"data\",\n display_name=\"Data\",\n info=\"The initial list of Data objects to iterate over.\",\n ),\n ]\n\n outputs = [\n Output(display_name=\"Item\", name=\"item\", method=\"item_output\", allows_loop=True),\n Output(display_name=\"Done\", name=\"done\", method=\"done_output\"),\n ]\n\n def initialize_data(self) -> None:\n \"\"\"Initialize the data list, context index, and aggregated list.\"\"\"\n if self.ctx.get(f\"{self._id}_initialized\", False):\n return\n\n # Ensure data is a list of Data objects\n data_list = self._validate_data(self.data)\n\n # Store the initial data and context variables\n self.update_ctx(\n {\n f\"{self._id}_data\": data_list,\n f\"{self._id}_index\": 0,\n f\"{self._id}_aggregated\": [],\n f\"{self._id}_initialized\": True,\n }\n )\n\n def _validate_data(self, data):\n \"\"\"Validate and return a list of Data objects.\"\"\"\n if isinstance(data, Data):\n return [data]\n if isinstance(data, list) and all(isinstance(item, Data) for item in data):\n return data\n msg = \"The 'data' input must be a list of Data objects or a single Data object.\"\n raise TypeError(msg)\n\n def evaluate_stop_loop(self) -> bool:\n \"\"\"Evaluate whether to stop item or done output.\"\"\"\n current_index = self.ctx.get(f\"{self._id}_index\", 0)\n data_length = len(self.ctx.get(f\"{self._id}_data\", []))\n return current_index > data_length\n\n def item_output(self) -> Data:\n \"\"\"Output the next item in the list or stop if done.\"\"\"\n self.initialize_data()\n current_item = Data(text=\"\")\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n return Data(text=\"\")\n\n # Get data list and current index\n data_list, current_index = self.loop_variables()\n if current_index < len(data_list):\n # Output current item and increment index\n try:\n current_item = data_list[current_index]\n except IndexError:\n current_item = Data(text=\"\")\n self.aggregated_output()\n self.update_ctx({f\"{self._id}_index\": current_index + 1})\n return current_item\n\n def done_output(self) -> Data:\n \"\"\"Trigger the done output when iteration is complete.\"\"\"\n self.initialize_data()\n\n if self.evaluate_stop_loop():\n self.stop(\"item\")\n self.start(\"done\")\n\n return self.ctx.get(f\"{self._id}_aggregated\", [])\n self.stop(\"done\")\n return Data(text=\"\")\n\n def loop_variables(self):\n \"\"\"Retrieve loop variables from context.\"\"\"\n return (\n self.ctx.get(f\"{self._id}_data\", []),\n self.ctx.get(f\"{self._id}_index\", 0),\n )\n\n def aggregated_output(self) -> Data:\n \"\"\"Return the aggregated list once all items are processed.\"\"\"\n self.initialize_data()\n\n # Get data list and aggregated list\n data_list = self.ctx.get(f\"{self._id}_data\", [])\n aggregated = self.ctx.get(f\"{self._id}_aggregated\", [])\n\n # Check if loop input is provided and append to aggregated list\n if self.item is not None and not isinstance(self.item, str) and len(aggregated) <= len(data_list):\n aggregated.append(self.item)\n self.update_ctx({f\"{self._id}_aggregated\": aggregated})\n return aggregated\n"
},
"data": {
"_input_type": "DataInput",
Expand Down
10 changes: 6 additions & 4 deletions src/backend/tests/unit/helpers/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,18 @@ def test_data_to_text_list__template_wrong_placeholder():
template = "My favorite color is {color}"
data = Data(data={"fruit": "apple"})

with pytest.raises(KeyError):
data_to_text_list(template, data)
# Should not raise KeyError due to defaultdict behavior
result = data_to_text_list(template, data)
assert result == (["My favorite color is "], [data])


def test_data_to_text_list__data_with_data_attribute_empty():
template = "My favorite color is {color}"
data = Data(data={})

with pytest.raises(KeyError):
data_to_text_list(template, data)
# Should not raise KeyError due to defaultdict behavior
result = data_to_text_list(template, data)
assert result == (["My favorite color is "], [data])


def test_data_to_text_list__data_contains_nested_data_key():
Expand Down
134 changes: 134 additions & 0 deletions src/backend/tests/unit/helpers/test_data_to_text_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import pytest
from langflow.helpers.data import data_to_text_list
from langflow.schema import Data


@pytest.mark.parametrize(
("template", "data", "expected_text"),
[
# Test basic string data
(
"Text: {text}",
Data(text="Hello"),
["Text: Hello"],
),
# Test dictionary data
(
"{name} is {age} years old",
Data(data={"name": "Alice", "age": 25}),
["Alice is 25 years old"],
),
# Test list of Data objects
(
"{name} is {age} years old",
[
Data(data={"name": "Alice", "age": 25}),
Data(data={"name": "Bob", "age": 30}),
],
["Alice is 25 years old", "Bob is 30 years old"],
),
# Test nested data dictionary
(
"User: {text}",
Data(data={"data": {"text": "Hello World"}}),
["User: Hello World"],
),
# Test error message in data
(
"Error: {text}",
Data(data={"error": "Something went wrong"}),
["Error: Something went wrong"],
),
# Test non-Data object conversion
(
"Value: {text}",
Data(text="Simple string"),
["Value: Simple string"],
),
],
)
def test_data_to_text_list_parametrized(template, data, expected_text):
"""Test various input combinations for data_to_text_list."""
result = data_to_text_list(template, data)
assert result[0] == expected_text
assert all(isinstance(d, Data) for d in result[1])


def test_data_to_text_list_none_data():
"""Test handling of None data input."""
result = data_to_text_list("template", None)
assert result == ([], [])


def test_data_to_text_list_none_template():
"""Test handling of None template input."""
with pytest.raises(ValueError, match="Template must be a string, but got None"):
data_to_text_list(None, Data(text="test"))


def test_data_to_text_list_invalid_template_type():
"""Test handling of invalid template type."""
with pytest.raises(TypeError, match="Template must be a string, but got"):
data_to_text_list(123, Data(text="test"))


def test_data_to_text_list_missing_key():
"""Test handling of missing template key."""
template = "Hello {missing_key}"
data = Data(data={"existing_key": "value"})
# Should not raise KeyError due to defaultdict
result = data_to_text_list(template, data)
assert result == (["Hello "], [data])


def test_data_to_text_list_empty_data_dict():
"""Test handling of empty data dictionary."""
template = "Hello {text}"
data = Data(data={})
result = data_to_text_list(template, data)
assert result == (["Hello "], [data])


def test_data_to_text_list_mixed_data_types():
"""Test handling of mixed data types in list."""
template = "Item: {text}"
data = [
Data(text="First"),
"Second",
Data(data={"text": "Third"}),
123,
]
result = data_to_text_list(template, data)
expected_texts = [
"Item: First",
"Item: Second",
"Item: Third",
"Item: 123",
]
assert result[0] == expected_texts
assert len(result[1]) == 4
assert all(isinstance(d, Data) for d in result[1])


def test_data_to_text_list_complex_nested_data():
"""Test handling of complex nested data structures."""
template = "Name: {name}, Info: {text}, Status: {status}"
data = Data(data={"name": "Test", "data": {"text": "Nested text", "status": "active"}})
result = data_to_text_list(template, data)
expected = (["Name: Test, Info: Nested text, Status: active"], [data])
assert result == expected


def test_data_to_text_list_empty_template():
"""Test handling of empty template string."""
data = Data(data={"key": "value"})
result = data_to_text_list("", data)
assert result == ([""], [data])


def test_data_to_text_list_string_data():
"""Test handling of string data in Data object."""
template = "Message: {text}"
data = Data(data={"text": "Direct string"})
result = data_to_text_list(template, data)
assert result == (["Message: Direct string"], [data])
14 changes: 11 additions & 3 deletions src/frontend/tests/core/integrations/Memory Chatbot.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { test } from "@playwright/test";
import { expect, test } from "@playwright/test";
import * as dotenv from "dotenv";
import path from "path";
import { awaitBootstrapTest } from "../../utils/await-bootstrap-test";
Expand Down Expand Up @@ -63,8 +63,16 @@ withEventDeliveryModes(

await page.getByTestId("button-send").last().click();

await page.waitForSelector("text=roar", { timeout: 30000 });
await page.getByText("roar").last().isVisible();
await page.waitForSelector(".markdown", { timeout: 3000 });

const textContents = await page
.locator(".markdown")
.last()
.allTextContents();

const concatAllText = textContents.join(" ");
expect(concatAllText.length).toBeGreaterThan(20);

await page.getByText("Default Session").last().click();

await page.getByText("timestamp", { exact: true }).last().isVisible();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,6 @@ test(

// Count occurrences of modified_value in output
const matches = output?.match(/modified_value/g) || [];
expect(matches).toHaveLength(1);
expect(matches).toHaveLength(2);
},
);
Loading