diff --git a/docs/images/react.png b/docs/images/react.png new file mode 100644 index 00000000..a0afc386 --- /dev/null +++ b/docs/images/react.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:006d3d9163772eff897c0bb742eefe67fb5d71517d98a5548abd6fbf76acc567 +size 53806 diff --git a/examples/react/README.md b/examples/react/README.md new file mode 100644 index 00000000..d7bf579b --- /dev/null +++ b/examples/react/README.md @@ -0,0 +1,145 @@ +# ReAct Example + +ReAct (Reasoning and Acting) is a paradigm that combines reasoning and acting in an interleaved manner. The agent uses reasoning to determine what actions to take and interprets the results of those actions to inform further reasoning. + +This example demonstrates how to use the framework for ReAct tasks. The example code can be found in the `examples/react_example` directory. + +```bash +cd examples/react_example +``` + +## Overview + +This example implements a ReAct workflow that consists of following components: + +1. **Input Interface** + - Handles user input containing questions + - Supports both CLI and programmatic interfaces + +2. **ReAct Workflow** + - Think: Reason about the current state and decide next action + - Act: Execute the chosen action (e.g., web search) + - Observe: Process the results of the action + - Repeat until the task is complete or max steps reached + +### The workflow follows this pattern: + +ReAct Workflow + +1. User provides input (question) +2. Agent thinks about the question and decides to take an action (e.g., search) +3. Agent observes the result +4. Process repeats until the answer is found + +## Prerequisites + +- Python 3.10+ +- Required packages installed (see requirements.txt) +- Access to OpenAI API or compatible endpoint (see configs/llms/*.yml) +- Redis server running locally or remotely +- Conductor server running locally or remotely + +## Configuration + +The container.yaml file manages dependencies and settings for different components of the system. To set up your configuration: + +1. Configure your container.yaml: + - Set Redis connection settings for both `redis_stream_client` and `redis_stm_client` + - Update the Conductor server URL under conductor_config section + - Adjust any other component settings as needed + +2. Configure your LLM settings in configs: + ```bash + export custom_openai_key="your_openai_api_key" + export custom_openai_endpoint="your_openai_endpoint" + ``` + +## Running the Example + +You can run the example in three ways: + +1. Using the CLI interface: + ```bash + python run_cli.py + ``` + +2. Using the programmatic interface: + ```bash + python run_programmatic.py + ``` + +3. Running batch testing with dataset: + ```bash + python run_batch_test.py [options] + ``` + This script will: + - Read questions from the input dataset + - Process each question using the ReAct workflow + - Save the results in a standardized format + - Output results to `data/{dataset_name}_{alg}_{model_id}_results.json` + + Available options: + ```bash + --input_file Input dataset file path (relative to project root) + Default: data/hotpot_dev_select_500_data_test_0107.jsonl + + --dataset_name Name of the dataset + Default: hotpot + + --model_id Model identifier + Default: gpt-3.5-turbo + + --alg Algorithm name + Default: ReAct + + --output_dir Output directory for results (relative to project root) + Default: data + ``` + + Example usage: + ```bash + python run_batch_test.py \ + --input_file data/custom_test.jsonl \ + --dataset_name custom \ + --model_id gpt-4 \ + --alg ReAct-v2 \ + --output_dir results + ``` + +When running the CLI or programmatic interface, you'll be prompted to: +1. Input an example (optional, press Enter to skip) +2. Set maximum turns (optional, default is 10) +3. Input your question + +## Troubleshooting + +If you encounter issues: +- Verify Redis is running and accessible +- Check your OpenAI API key is valid +- Ensure all dependencies are installed correctly +- Review logs for any error messages +- Check the proxy settings if needed + +Common issues: +- Connection errors: Check your network settings +- LLM errors: Verify your API key and endpoint +- Redis errors: Ensure Redis server is running + +## Example Usage + +Here's a simple example of using the ReAct agent: + +```bash +$ python run_cli.py + +Please input your question: +The time when the first computer was invented? + +[Agent starts reasoning and searching...] +``` + +The agent will then: +1. Think about how to answer the question +2. Search for relevant information +3. Process the search results +4. Provide a final answer based on the found information \ No newline at end of file diff --git a/examples/react/agent/__init__.py b/examples/react/agent/__init__.py new file mode 100644 index 00000000..0519ecba --- /dev/null +++ b/examples/react/agent/__init__.py @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/examples/react/agent/input_interface/input_interface.py b/examples/react/agent/input_interface/input_interface.py new file mode 100644 index 00000000..777325b7 --- /dev/null +++ b/examples/react/agent/input_interface/input_interface.py @@ -0,0 +1,34 @@ +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.logger import logging +import uuid + +CURRENT_PATH = Path(__file__).parents[0] + +@registry.register_worker() +class InputInterface(BaseWorker): + def _process_input(self, input_data): + """Process input data and extract text content""" + if not input_data or 'messages' not in input_data: + return None + + message = input_data['messages'][-1] + for content in message.get('content', []): + if content.get('type') == 'text': + return content.get('data', '').strip() + return None + + def _run(self, *args, **kwargs): + # Get main question input + user_input = self.input.read_input( + workflow_instance_id=self.workflow_instance_id, + input_prompt='Please input your question:' + ) + query = self._process_input(user_input) + + # Return parameters + return { + 'query': query, # User's question + 'id': str(uuid.uuid4()) # Generate unique ID + } \ No newline at end of file diff --git a/examples/react/compile_container.py b/examples/react/compile_container.py new file mode 100644 index 00000000..43267b12 --- /dev/null +++ b/examples/react/compile_container.py @@ -0,0 +1,15 @@ +from pathlib import Path + +from omagent_core.utils.container import container +from omagent_core.utils.registry import registry + +# Set up path and import modules +CURRENT_PATH = root_path = Path(__file__).parents[0] +registry.import_module() + +# Register required components +container.register_callback(callback="AppCallback") +container.register_input(input="AppInput") +container.register_stm("RedisSTM") +# Compile container config +container.compile_config(CURRENT_PATH) \ No newline at end of file diff --git a/examples/react/configs/llms/text_res.yml b/examples/react/configs/llms/text_res.yml new file mode 100644 index 00000000..b8b35d05 --- /dev/null +++ b/examples/react/configs/llms/text_res.yml @@ -0,0 +1,8 @@ +name: OpenaiGPTLLM +model_id: gpt-3.5-turbo +api_key: ${env| custom_openai_key, openai_api_key} +endpoint: ${env| custom_openai_endpoint, https://api.openai.com/v1} +temperature: 0 +stream: false +response_format: text +use_default_sys_prompt: false \ No newline at end of file diff --git a/examples/react/configs/workers/input_interface.yml b/examples/react/configs/workers/input_interface.yml new file mode 100644 index 00000000..5b9feb59 --- /dev/null +++ b/examples/react/configs/workers/input_interface.yml @@ -0,0 +1 @@ +name: InputInterface \ No newline at end of file diff --git a/examples/react/configs/workers/react_workflow.yml b/examples/react/configs/workers/react_workflow.yml new file mode 100644 index 00000000..537588a7 --- /dev/null +++ b/examples/react/configs/workers/react_workflow.yml @@ -0,0 +1,20 @@ +- name: ThinkAction + llm: ${sub|text_res} + output_parser: + name: StrParser + max_steps: 8 + example: | + Question: Musician and satirist Allie Goertz wrote a song about the "The Simpsons" character Milhouse, who Matt Groening named after who? + Thought 1: The question simplifies to "The Simpsons" character Milhouse is named after who. I only need to search Milhouse and find who it is named after. + Action 1: Search[Milhouse] + Observation 1: Milhouse Mussolini Van Houten is a recurring character in the Fox animated television series The Simpsons voiced by Pamela Hayden and created by Matt Groening. + Thought 2: The paragraph does not tell who Milhouse is named after, maybe I can look up "named after". + Action 2: Lookup[named after] + Observation 2: (Result 1 / 1) Milhouse was named after U.S. president Richard Nixon, whose middle name was Milhous. + Thought 3: Milhouse was named after U.S. president Richard Nixon, so the answer is Richard Nixon. + Action 3: Finish[Richard Nixon] + + +- name: WikiSearch + +- name: ReactOutput diff --git a/examples/react/run_batch_test.py b/examples/react/run_batch_test.py new file mode 100644 index 00000000..834e43bf --- /dev/null +++ b/examples/react/run_batch_test.py @@ -0,0 +1,142 @@ +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.programmatic.client import ProgrammaticClient +from omagent_core.utils.logger import logging +from omagent_core.advanced_components.workflow.react.workflow import ReactWorkflow +import json +import argparse + + +def read_input_texts(file_path): + """Read questions from jsonl file""" + input_texts = [] + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + if line.strip(): + data = json.loads(line) + input_texts.append((data['question'], str(data['id']))) + return input_texts + + +def process_results(results, dataset_name="aqua", model_id="gpt-3.5-turbo", alg="ReAct"): + """Convert results to standard format""" + formatted_output = { + "dataset": dataset_name, + "model_id": model_id, + "alg": alg, + "model_result": [] + } + + for result in results: + output_data = result.get('output', {}) + + model_result = { + "id": output_data.get('id'), + "question": output_data.get('query'), + "body": output_data.get('body', {}), + "last_output": output_data.get('output', ''), + "step_number": output_data.get('step_number', 0), + "prompt_tokens": output_data.get('token_usage', {}).get('prompt_tokens', 0), + "completion_tokens": output_data.get('token_usage', {}).get('completion_tokens', 0) + } + + formatted_output["model_result"].append(model_result) + + return formatted_output + + +def run_workflow(args): + """Run the ReAct workflow with given arguments""" + logging.init_logger("omagent", "omagent", level="INFO") + + # Set current working directory path + CURRENT_PATH = Path(__file__).parents[0] + ROOT_PATH = CURRENT_PATH.parents[1] # 项目根目录 + + # Import registered modules + registry.import_module(CURRENT_PATH.joinpath('agent')) + + # Load container configuration from YAML file + container.register_stm("RedisSTM") + container.from_config(CURRENT_PATH.joinpath('container.yaml')) + + # Initialize workflow + workflow = ConductorWorkflow(name='react_basic_workflow_example') + + # Configure React Basic workflow + react_workflow = ReactWorkflow() + react_workflow.set_input( + query=workflow.input('query'), + id=workflow.input('id') + ) + + # Configure workflow execution flow + workflow >> react_workflow + + # Register workflow + workflow.register(overwrite=True) + + # Read input data + input_file = ROOT_PATH / args.input_file + input_data = read_input_texts(input_file) + + # Initialize programmatic client + config_path = CURRENT_PATH.joinpath('configs') + programmatic_client = ProgrammaticClient( + processor=workflow, + config_path=config_path, + workers=[] # React workflow 不需要额外的 workers + ) + + # Prepare input data + workflow_input_list = [ + {"query": item[0], "id": str(item[1])} for item in input_data + ] + + print(f"Processing {len(workflow_input_list)} queries in this split...") + + # Process data + res = programmatic_client.start_batch_processor( + workflow_input_list=workflow_input_list + ) + + # Process results + formatted_results = process_results( + res, + dataset_name=args.dataset_name, + model_id=args.model_id, + alg=args.alg + ) + + # Create output directory if it doesn't exist + output_dir = ROOT_PATH / args.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + + # Save results to file + output_file = output_dir / f"{args.dataset_name}_{args.alg}_{args.model_id}_results.json" + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(formatted_results, f, ensure_ascii=False, indent=2) + + programmatic_client.stop_processor() + print(f"Results saved to {output_file}") + print("All splits processed successfully!") + + +if __name__ == "__main__": + # Parse command line arguments + parser = argparse.ArgumentParser(description='Run ReAct workflow on dataset') + parser.add_argument('--input_file', type=str, default="data/hotpot_dev_select_500_data_test_0107.jsonl", + help='Input dataset file path (relative to project root)') + parser.add_argument('--dataset_name', type=str, default="hotpot", + help='Name of the dataset') + parser.add_argument('--model_id', type=str, default="gpt-3.5-turbo", + help='Model identifier') + parser.add_argument('--alg', type=str, default="ReAct", + help='Algorithm name') + parser.add_argument('--output_dir', type=str, default="data", + help='Output directory for results (relative to project root)') + + args = parser.parse_args() + run_workflow(args) \ No newline at end of file diff --git a/examples/react/run_cli.py b/examples/react/run_cli.py new file mode 100644 index 00000000..7f670a11 --- /dev/null +++ b/examples/react/run_cli.py @@ -0,0 +1,52 @@ +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.cli.client import DefaultClient +from omagent_core.utils.logger import logging +from omagent_core.advanced_components.workflow.react.workflow import ReactWorkflow +from agent.input_interface.input_interface import InputInterface + +logging.init_logger("omagent", "omagent", level="INFO") + +# Set current working directory path +CURRENT_PATH = Path(__file__).parents[0] + +# Import registered modules +registry.import_module(CURRENT_PATH.joinpath('agent')) + +# Load container configuration from YAML file +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + +# Initialize workflow +workflow = ConductorWorkflow(name='react_basic_workflow_example') + +# Configure input task +input_task = simple_task( + task_def_name=InputInterface, + task_reference_name='input_interface' +) + +# Configure React Basic workflow +react_workflow = ReactWorkflow() +react_workflow.set_input( + query=input_task.output('query'), + id=input_task.output('id') +) + +# Configure workflow execution flow +workflow >> input_task >> react_workflow + +# Register workflow +workflow.register(overwrite=True) + +# Initialize and start CLI client +config_path = CURRENT_PATH.joinpath('configs') +cli_client = DefaultClient( + interactor=workflow, config_path=config_path, workers=[InputInterface()] +) + +# Start CLI client +cli_client.start_interactor() \ No newline at end of file diff --git a/examples/react/run_programmatic.py b/examples/react/run_programmatic.py new file mode 100644 index 00000000..8d554d66 --- /dev/null +++ b/examples/react/run_programmatic.py @@ -0,0 +1,57 @@ +from omagent_core.utils.container import container +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from pathlib import Path +from omagent_core.utils.registry import registry +from omagent_core.clients.devices.programmatic.client import ProgrammaticClient +from omagent_core.utils.logger import logging +from omagent_core.advanced_components.workflow.react.workflow import ReactWorkflow + +logging.init_logger("omagent", "omagent", level="INFO") + +# Set current working directory path +CURRENT_PATH = Path(__file__).parents[0] + +# Import registered modules +registry.import_module(CURRENT_PATH.joinpath('agent')) + +# Load container configuration from YAML file +container.register_stm("RedisSTM") +container.from_config(CURRENT_PATH.joinpath('container.yaml')) + +# Initialize workflow +workflow = ConductorWorkflow(name='react_basic_workflow_example') + +# Configure React Basic workflow +react_workflow = ReactWorkflow() +react_workflow.set_input( + query=workflow.input('query'), + id=workflow.input('id') +) + +# Configure workflow execution flow +workflow >> react_workflow + +# Register workflow +workflow.register(overwrite=True) + +# Initialize programmatic client +config_path = CURRENT_PATH.joinpath('configs') +programmatic_client = ProgrammaticClient( + processor=workflow, + config_path=config_path, + workers=[] # No additional workers needed for React workflow +) + +# Prepare input data +workflow_input_list = [ + { + "query": "When was Albert Einstein born?", + "id": "21" + } +] + +res = programmatic_client.start_batch_processor( + workflow_input_list=workflow_input_list +) + +programmatic_client.stop_processor() diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/__init__.py b/omagent-core/src/omagent_core/advanced_components/workflow/react/__init__.py new file mode 100644 index 00000000..cdabf31d --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/__init__.py @@ -0,0 +1,5 @@ +from .agent.think_action.think_action import ThinkAction +from .agent.wiki_search.wiki_search import WikiSearch +from .agent.react_output.react_output import ReactOutput + +__all__ = ['ThinkAction', 'WikiSearch', 'ReactOutput'] \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/react_output/react_output.py b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/react_output/react_output.py new file mode 100644 index 00000000..6893de26 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/react_output/react_output.py @@ -0,0 +1,21 @@ +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.registry import registry + +@registry.register_worker() +class ReactOutput(BaseWorker): + """Simple worker that passes through the final action output for React workflow""" + + def _run(self, action_output: str, workflow_id: str, *args, **kwargs): + """Simply return the action output with any necessary state""" + + state = self.stm(workflow_id) + query = state.get('query', '') + id = state.get('id', '') + token_usage = state.get('token_usage', {}) + + return { + 'output': action_output, + 'query': query, + 'id': id, + 'token_usage': token_usage + } \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/sys_prompt.prompt b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/sys_prompt.prompt new file mode 100644 index 00000000..0519ecba --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/sys_prompt.prompt @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/think_action.py b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/think_action.py new file mode 100644 index 00000000..fc0bd51d --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/think_action.py @@ -0,0 +1,135 @@ +from pathlib import Path +from typing import List, Any +from omagent_core.models.llms.base import BaseLLMBackend +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.models.llms.prompt import PromptTemplate +from omagent_core.utils.registry import registry +from pydantic import Field + +CURRENT_PATH = Path(__file__).parents[0] + +@registry.register_worker() +class ThinkAction(BaseLLMBackend, BaseWorker): + """Combined Think and Action worker that implements ReAct (Reasoning and Acting) approach""" + + # debug_mode: bool = Field(default=False) + example: str = Field(default="") + max_steps: int = Field(default=8) + + prompts: List[PromptTemplate] = Field( + default=[ + PromptTemplate.from_file( + CURRENT_PATH.joinpath("user_prompt.prompt"), + role="user" + ), + ] + ) + + def _run(self, query: str, id: str = "", next_step: str = "Thought", *args, **kwargs): + """Process the query using ReAct approach with combined Think and Action steps""" + # Get context from STM + state = self.stm(self.workflow_instance_id) + context = state.get('context', '') + + # Initialize token_usage + token_usage = state.get('token_usage', { + 'prompt_tokens': 0, + 'completion_tokens': 0, + 'total_tokens': 0 + }) + + # Initialize step_number for new conversation (empty context) + if not context: + state['step_number'] = 1 + + # Get current step number + current_step = state.get('step_number', 1) + + # Get model call parameters + message = self.prep_prompt([{"question": query}]) + body = self.llm._msg2req(message[0]) + self.stm(self.workflow_instance_id)["body"] = body + + # Record input information + self.callback.info( + agent_id=self.workflow_instance_id, + progress='ThinkAction', + message=f'Step {current_step}' + ) + + # Build prompt + full_prompt = f"{context}\n{next_step} {current_step}:" if context else f"{self.example}\nQuestion: {query}\n{next_step} {current_step}:" + + try: + # First try to get thought and action in one call + response = self.simple_infer( + query=query, + context=full_prompt, + stop=[f"\nObservation {current_step}:"] + ) + output = response['choices'][0]['message']['content'] + if "\nObservation" in output: + output = output.split("\nObservation")[0] + try: + # Try to separate thought and action + thought, action = output.strip().split(f"\nAction {current_step}: ") + except: + # If separation fails, make a second attempt + thought = output.strip().split('\n')[0] + action_response = self.simple_infer( + query=query, + context=f"{full_prompt}\n{thought}\nAction {current_step}:", + stop=["\n"] + ) + if "\n" in action_response['choices'][0]['message']['content']: + action_response['choices'][0]['message']['content'] = action_response['choices'][0]['message']['content'].split("\n")[0] + action = action_response['choices'][0]['message']['content'].strip() + + # Update token usage + if 'usage' in response: + token_usage['prompt_tokens'] += response['usage']['prompt_tokens'] + token_usage['completion_tokens'] += response['usage']['completion_tokens'] + token_usage['total_tokens'] += response['usage']['total_tokens'] + + # Combine output + output = f"{thought}\nAction {current_step}: {action}" + + except Exception as e: + self.callback.error( + agent_id=self.workflow_instance_id, + progress='ThinkAction Error', + message=f'Error: {str(e)}' + ) + raise + + # Check if it's a Finish action + is_final = 'Finish[' in action + + # Record output information + self.callback.info( + agent_id=self.workflow_instance_id, + progress='ThinkAction', + message=f'Step {current_step}: {output}' + ) + + # Update context and store in STM + new_context = f"{context}\n{next_step} {current_step}: {output}" if context else f"{self.example}\nQuestion: {query}\n{next_step} {current_step}: {output}" + state.update({ + 'context': new_context, + 'query': query, + 'id': id, + 'token_usage': token_usage + }) + + return { + 'output': output, + 'action': action, + 'step_number': current_step, + 'is_final': is_final, + 'query': query, + 'id': id, + 'token_usage': token_usage, + 'body': state.get('body', {}), + 'max_steps': self.max_steps + } + \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/user_prompt.prompt b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/user_prompt.prompt new file mode 100644 index 00000000..8202815b --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/think_action/user_prompt.prompt @@ -0,0 +1,6 @@ +Solve a question answering task with interleaving Thought, Action, Observation steps. Thought can reason about the current situation, and Action can be three types: +(1) Search[entity], which searches the exact entity on Wikipedia and returns the first paragraph if it exists. If not, it will return some similar entities to search. +(2) Lookup[keyword], which returns the next sentence containing keyword in the current passage. +(3) Finish[answer], which returns the answer and finishes the task. + +{{context}} \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/wiki_search/wiki_search.py b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/wiki_search/wiki_search.py new file mode 100644 index 00000000..3d33395b --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/agent/wiki_search/wiki_search.py @@ -0,0 +1,141 @@ +from omagent_core.engine.worker.base import BaseWorker +from omagent_core.utils.registry import registry +from omagent_core.utils.logger import logging +from langchain.agents.react.base import DocstoreExplorer +from langchain_community.docstore.wikipedia import Wikipedia +from pydantic import Field + +@registry.register_worker() +class WikiSearch(BaseWorker): + """Wiki Search worker for React workflow""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.docstore = DocstoreExplorer(Wikipedia()) + + def _run(self, action_output: str, *args, **kwargs): + """Execute search or lookup based on action output""" + try: + # Get context and other states from STM + state = self.stm(self.workflow_instance_id) + context = state.get('context', '') + step_number = self._get_step_number(context) + + if 'Search[' in action_output: + result = self._handle_search(action_output) + elif 'Lookup[' in action_output: + result = self._handle_lookup(action_output) + else: + result = action_output + + # Record observation result + self.callback.info( + agent_id=self.workflow_instance_id, + progress='Observation', + message=f'Step {step_number}: {result}' + ) + + # Update context (only save in STM, not return in response) + new_context = f"{context}\nObservation {step_number}: {result}" + + # Update states including context and step_number + state.update({ + 'context': new_context, + 'step_number': step_number + 1 # Update step number at the end of a round + }) + + # Return result with all necessary information + return { + 'output': result + } + + except Exception as e: + logging.error(f"Error in WikiSearch: {str(e)}") + raise + + def _handle_search(self, action_output: str) -> str: + """Handle Search action""" + search_term = self._extract_term('Search', action_output) + if not search_term: + return action_output + + try: + result = self.docstore.search(search_term) + if result: + result_text = result.strip('\n').strip().replace('\n', '') + + # Update state + self.stm(self.workflow_instance_id).update({ + 'current_document': {'content': result}, + 'lookup_str': '', + 'lookup_index': 0 + }) + + return result_text + else: + return f"No content found for '{search_term}'" + except Exception as e: + return f"Error occurred during search: {str(e)}" + + def _handle_lookup(self, action_output: str) -> str: + """Handle Lookup action""" + lookup_term = self._extract_term('Lookup', action_output) + if not lookup_term: + return action_output + + try: + # Get state from state manager + state = self.stm(self.workflow_instance_id) + current_document = state.get('current_document', {}) + lookup_str = state.get('lookup_str', '') + lookup_index = state.get('lookup_index', 0) + + if not current_document: + return "No previous search results available. Please perform a search first." + + paragraphs = current_document['content'].split('\n\n') + + # Update lookup state + new_lookup_index = lookup_index + if lookup_term.lower() != lookup_str: + new_lookup_str = lookup_term.lower() + new_lookup_index = 0 + else: + new_lookup_str = lookup_str + new_lookup_index += 1 + + lookups = [p for p in paragraphs if lookup_term.lower() in p.lower()] + + if len(lookups) == 0: + result = "No Results" + elif new_lookup_index >= len(lookups): + result = "No More Results" + else: + result_prefix = f"(Result {new_lookup_index + 1}/{len(lookups)})" + result = f"{result_prefix} {lookups[new_lookup_index]}" + result = result.strip('\n').strip().replace('\n', '') + + # Update state + self.stm(self.workflow_instance_id).update({ + 'lookup_str': new_lookup_str, + 'lookup_index': new_lookup_index + }) + + return result + + except ValueError as ve: + return str(ve) + except Exception as e: + return f"Error occurred during lookup: {str(e)}" + + def _extract_term(self, action_type: str, action_output: str) -> str: + """Extract term from action output""" + if f'{action_type}[' in action_output: + start = action_output.find(f'{action_type}[') + len(action_type) + 1 + end = action_output.find(']', start) + return action_output[start:end].strip() + return "" + + def _get_step_number(self, context: str) -> int: + """Get the current step number from STM""" + return self.stm(self.workflow_instance_id).get('step_number', 1) \ No newline at end of file diff --git a/omagent-core/src/omagent_core/advanced_components/workflow/react/workflow.py b/omagent-core/src/omagent_core/advanced_components/workflow/react/workflow.py new file mode 100644 index 00000000..f7fea507 --- /dev/null +++ b/omagent-core/src/omagent_core/advanced_components/workflow/react/workflow.py @@ -0,0 +1,65 @@ +from omagent_core.engine.workflow.conductor_workflow import ConductorWorkflow +from omagent_core.engine.workflow.task.simple_task import simple_task +from omagent_core.engine.workflow.task.do_while_task import DoWhileTask +from .agent.think_action.think_action import ThinkAction +from .agent.wiki_search.wiki_search import WikiSearch +from .agent.react_output.react_output import ReactOutput + + +class ReactWorkflow(ConductorWorkflow): + def __init__(self): + super().__init__(name='react_workflow') + + def set_input(self, query: str, id: str = ""): + self.query = query + self.id = id + self._configure_tasks() + self._configure_workflow() + + def _configure_tasks(self): + # Combined Think and Action task + self.think_action_task = simple_task( + task_def_name=ThinkAction, + task_reference_name='think_action', + inputs={ + 'query': self.query, + 'id': self.id + } + ) + + # Wiki Search task + self.wiki_search_task = simple_task( + task_def_name=WikiSearch, + task_reference_name='wiki_search', + inputs={ + 'action_output': self.think_action_task.output('action'), + } + ) + + # Do-While loop with max_turns from config + self.loop_task = DoWhileTask( + task_ref_name='react_loop', + tasks=[self.think_action_task, self.wiki_search_task], + #inputs={'max_steps': self.max_steps}, + termination_condition=f''' + if (($.think_action.is_final == true) || ($.think_action.step_number > $.think_action.max_steps)) {{ + false; // Stop loop if it's a Finish action or exceeded max turns + }} else {{ + true; // Continue loop otherwise + }} + ''' + ) + + # Output task + self.react_output_task = simple_task( + task_def_name=ReactOutput, + task_reference_name='react_output', + inputs={ + 'action_output': '${think_action.output}', + 'workflow_id': '${workflow.workflowId}' + } + ) + + def _configure_workflow(self): + # Configure workflow execution sequence + self >> self.loop_task >> self.react_output_task \ No newline at end of file