From a6c256a602163346e482c06064f124773178f16b Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Tue, 27 May 2025 18:32:23 +0800 Subject: [PATCH 1/6] test --- verl/utils/dataset/rl_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index 53b3abf..0eced06 100755 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -190,7 +190,7 @@ def __getitem__(self, item): # There's a trap here, multi_modal_inputs has to be a dict, not BatchFeature row_dict["multi_modal_data"] = multi_modal_data - row_dict["multi_modal_inputs"] = dict(model_inputs) + row_dict["multiv_modal_inputs"] = dict(model_inputs) # second_per_grid_ts isn't used for training, just for mrope row_dict["multi_modal_inputs"].pop("second_per_grid_ts", None) From b4ad6759447104de74d0d995af8a7f1a07288275 Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Wed, 28 May 2025 15:28:42 +0800 Subject: [PATCH 2/6] add user --- envs/base.py | 9 ++++- envs/simulated_user/simulated_user.yaml | 0 envs/tool_manager/qwen3_manager.py | 50 +++++++++++++++++++++++++ envs/utils/tool_utils.py | 2 +- 4 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 envs/simulated_user/simulated_user.yaml diff --git a/envs/base.py b/envs/base.py index f5fe9a1..c15b0ec 100755 --- a/envs/base.py +++ b/envs/base.py @@ -15,6 +15,8 @@ def __init__(self, config): self.tool_manager = TOOL_MANAGER_REGISTRY[tool_manager_name](verl_config=config) self.max_prompt_length = config.get('max_prompt_length', 2048) self.use_verify_tool = False + self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', False) + def verify_tool(self, data_source, solution_str, ground_truth, extra_info): # If you need a tool to evaluate the generated response, you need to modify the following code @@ -56,7 +58,10 @@ def step(self, responses, tokenizer): for action, tool_result in zip(cur_actions, tool_results): if action == 'answer': - temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 + if self.use_simulated_user_feedback: + temp_next_obs = self.tool_manager.simulated_user_feedback(responses,tokenizer) + else: + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 elif action == 'error': temp_next_obs = self.tool_manager.get_prompt( input_data=tool_result, @@ -83,6 +88,8 @@ def step(self, responses, tokenizer): return next_obs, dones, valid_action, is_tool + + def compute_score(self, reward_rollout_wg, reward_tokenizer, tokenizer, data: DataProto, if_val=False): if reward_rollout_wg is not None: scores = self._compute_score_with_reward_rollout_wg(reward_rollout_wg, reward_tokenizer, data) diff --git a/envs/simulated_user/simulated_user.yaml b/envs/simulated_user/simulated_user.yaml new file mode 100644 index 0000000..e69de29 diff --git a/envs/tool_manager/qwen3_manager.py b/envs/tool_manager/qwen3_manager.py index 89c88ae..8534ed2 100644 --- a/envs/tool_manager/qwen3_manager.py +++ b/envs/tool_manager/qwen3_manager.py @@ -13,6 +13,10 @@ from envs.utils.mcp_manager import MCPManager as SSEMCPManager from qwen_agent.tools import TOOL_REGISTRY, MCPManager, BaseTool from qwen_agent.llm.schema import ASSISTANT, SYSTEM, USER, FUNCTION, ContentItem +import yaml +import random +import os +import openai # 你需要安装 openai 包 def parse_mcp_tools_config(file_path): @@ -40,6 +44,11 @@ def __init__(self, verl_config): 'lang': 'en', 'max_input_tokens': 10000 } + # 加载 simulated_user/user.yaml + parent_dir = os.path.dirname(os.path.dirname(__file__)) + user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'user.yaml') + with open(user_yaml_path, 'r', encoding='utf-8') as f: + self._sim_user_feedback_cfg = yaml.safe_load(f) def get_tool(self, name_or_short_name: str): """通过名称或简写获取工具 @@ -357,3 +366,44 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp raise ValueError('Invalid mode: {}'.format(mode)) return prompt_with_chat_template + + async def _single_feedback(self, response: str) -> dict: + cfg = self._sim_user_feedback_cfg + prob = cfg.get('probability', 0.5) + feedbacks = cfg.get('feedbacks', []) + persona = cfg.get('persona', '') + + if random.random() < prob and response: + if persona: + try: + import openai + prompt = ( + f"你的人物设定如下:\n{persona}\n\n" + f"请你以该人物身份,对下面AI的回答进行评价和反馈,要求简明扼要、具体、有指导性:\n" + f"AI的回答:{response}\n" + f"你的反馈:" + ) + completion = await asyncio.to_thread( + openai.ChatCompletion.create, + model="gpt-4", + messages=[ + {"role": "system", "content": "你是一个用户反馈生成器。"}, + {"role": "user", "content": prompt} + ], + max_tokens=128, + temperature=0.7, + ) + feedback = completion['choices'][0]['message']['content'].strip() + except Exception as e: + feedback = random.choice(feedbacks) if feedbacks else "请完善你的回答。" + else: + feedback = random.choice(feedbacks) if feedbacks else "请完善你的回答。" + else: + feedback = "" + + return {'role': USER, 'content': feedback} + + async def simulated_user_feedback(self, responses: List[str]) -> list: + tasks = [self._single_feedback(response) for response in responses] + results = await asyncio.gather(*tasks) + return results diff --git a/envs/utils/tool_utils.py b/envs/utils/tool_utils.py index 11ba6fa..40a60db 100644 --- a/envs/utils/tool_utils.py +++ b/envs/utils/tool_utils.py @@ -61,7 +61,7 @@ def postprocess_output(self, output: DataProto, step: int): responses=responses_str, tokenizer=self.tokenizer ) - # encode infos for next prompt + # encode infos for next prompt TODO: can tokenize be faster? info_tokens = self.tokenizer(infos_str).input_ids next_prompt_token = [] next_prompt_length = [] From e30c0307db3b9dc9b5e25864c93cd7a12db6ab9d Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Thu, 29 May 2025 10:01:45 +0800 Subject: [PATCH 3/6] add user --- envs/base.py | 12 +++++++++--- envs/tool_manager/qwen3_manager.py | 30 +++++++++++++++++++++++++----- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/envs/base.py b/envs/base.py index c15b0ec..b9bfba7 100755 --- a/envs/base.py +++ b/envs/base.py @@ -16,6 +16,7 @@ def __init__(self, config): self.max_prompt_length = config.get('max_prompt_length', 2048) self.use_verify_tool = False self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', False) + self.user_feedback_prob = 0.5 def verify_tool(self, data_source, solution_str, ground_truth, extra_info): @@ -55,11 +56,15 @@ def _process_data(self, data_item, tokenizer): def step(self, responses, tokenizer): cur_actions, tool_results = self.tool_manager.execute_actions(responses=responses) next_obs, dones, valid_action, is_tool = [], [], [], [] + user_feedback_flag = [] for action, tool_result in zip(cur_actions, tool_results): if action == 'answer': if self.use_simulated_user_feedback: - temp_next_obs = self.tool_manager.simulated_user_feedback(responses,tokenizer) + if random.random() < self.user_feedback_prob: + user_feedback_flag.append(1) + else: + user_feedback_flag.append(0) else: temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 elif action == 'error': @@ -80,12 +85,13 @@ def step(self, responses, tokenizer): temp_done, temp_valid_action, temp_is_tool = False, 1, 1 else: raise ValueError('Unexpected action: {}'.format(action)) - next_obs.append(temp_next_obs) dones.append(temp_done) valid_action.append(temp_valid_action) is_tool.append(temp_is_tool) - + + if self.use_simulated_user_feedback: + next_obs, dones, valid_action, is_tool = self.tool_manager.simulated_user_feedback(responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool) return next_obs, dones, valid_action, is_tool diff --git a/envs/tool_manager/qwen3_manager.py b/envs/tool_manager/qwen3_manager.py index e7d78ca..7eb6259 100644 --- a/envs/tool_manager/qwen3_manager.py +++ b/envs/tool_manager/qwen3_manager.py @@ -366,7 +366,9 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp temp_prompt_with_chat_template = tokenizer.apply_chat_template( conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()], - tokenize=False, add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking + tokenize=False, add_generation_prompt=ad + ]lmo + o_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) prompt_with_chat_template = temp_prompt_with_chat_template.replace(base_prompt, '') else: @@ -410,7 +412,25 @@ async def _single_feedback(self, response: str) -> dict: return {'role': USER, 'content': feedback} - async def simulated_user_feedback(self, responses: List[str]) -> list: - tasks = [self._single_feedback(response) for response in responses] - results = await asyncio.gather(*tasks) - return results + async def simulated_user_feedback(self, responses: List[str], tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool) -> list: + tmp_responses = [] + simulaited_idx = [] + for idx, use_feedback in enumerate(user_feedback_flag): + if use_feedback == 1: + tmp_responses.append(responses[idx]) + simulaited_idx.append(idx) + + if len(tmp_responses) > 0: + tasks = [self._single_feedback(response) for response in tmp_responses] + results = await asyncio.gather(*tasks) + for idx, result in zip(simulaited_idx, results): + assert isinstance(next_obs[idx], str) + next_obs[idx] += tokenizer.apply_chat_template( + conversation=[{'role': USER, 'content': result}], + tokenize=False, + add_generation_prompt=True + ) + dones[idx] = False + valid_action[idx] = 1 + is_tool[idx] = 0 + return next_obs, dones, valid_action, is_tool From 2db45c27e9bbcd824f8f30ea603a4be2dae2b01a Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Thu, 29 May 2025 03:40:42 +0000 Subject: [PATCH 4/6] add claude --- .gitignore | 5 +- env.sh | 2 + envs/base.py | 3 +- envs/simulated_user/generate_dataset.py | 0 envs/simulated_user/simulated_user.yaml | 3 + envs/simulated_user/user.jsonl | 5 + envs/tool_manager/qwen3_manager.py | 114 +++++++------ main_grpo.sh | 19 ++- memory_ocupy.py | 209 ++++++++++++++++++++++++ 9 files changed, 300 insertions(+), 60 deletions(-) create mode 100644 env.sh create mode 100644 envs/simulated_user/generate_dataset.py create mode 100644 envs/simulated_user/user.jsonl create mode 100644 memory_ocupy.py diff --git a/.gitignore b/.gitignore index e88e92f..1412450 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,7 @@ outputs tensorboard_log ckpt -.hopeignore \ No newline at end of file +.hopeignore + +## RL-Factory +tmp/ \ No newline at end of file diff --git a/env.sh b/env.sh new file mode 100644 index 0000000..8e0a22b --- /dev/null +++ b/env.sh @@ -0,0 +1,2 @@ +source /home/ma-user/modelarts/work/jjw/miniconda3-A100/bin/activate +conda activate rl-factory \ No newline at end of file diff --git a/envs/base.py b/envs/base.py index b9bfba7..7975b2d 100755 --- a/envs/base.py +++ b/envs/base.py @@ -16,7 +16,6 @@ def __init__(self, config): self.max_prompt_length = config.get('max_prompt_length', 2048) self.use_verify_tool = False self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', False) - self.user_feedback_prob = 0.5 def verify_tool(self, data_source, solution_str, ground_truth, extra_info): @@ -61,7 +60,7 @@ def step(self, responses, tokenizer): for action, tool_result in zip(cur_actions, tool_results): if action == 'answer': if self.use_simulated_user_feedback: - if random.random() < self.user_feedback_prob: + if random.random() < self.tool_manager.user_feedback_prob: user_feedback_flag.append(1) else: user_feedback_flag.append(0) diff --git a/envs/simulated_user/generate_dataset.py b/envs/simulated_user/generate_dataset.py new file mode 100644 index 0000000..e69de29 diff --git a/envs/simulated_user/simulated_user.yaml b/envs/simulated_user/simulated_user.yaml index e69de29..276b411 100644 --- a/envs/simulated_user/simulated_user.yaml +++ b/envs/simulated_user/simulated_user.yaml @@ -0,0 +1,3 @@ +train: + data_file: /home/ma-user/modelarts/work/jjw/RL-Factory/RL-Factory/envs/simulated_user/user.jsonl + user_feedback_prob: 0.5 diff --git a/envs/simulated_user/user.jsonl b/envs/simulated_user/user.jsonl new file mode 100644 index 0000000..c80c0b1 --- /dev/null +++ b/envs/simulated_user/user.jsonl @@ -0,0 +1,5 @@ +{"user_id": "u001", "gender": "male", "age": 28, "interests": ["technology", "sports"], "location": "Beijing", "persona": "28岁的北京男生,热爱科技和体育,性格开朗,喜欢探索新事物,工作于互联网公司,业余时间常去健身和参加科技沙龙。"} +{"user_id": "u002", "gender": "female", "age": 24, "interests": ["fashion", "travel"], "location": "Shanghai", "persona": "24岁的上海女生,时尚达人,喜欢旅行和拍照,性格外向,乐于结交朋友,喜欢在社交媒体分享生活。"} +{"user_id": "u003", "gender": "male", "age": 35, "interests": ["finance", "reading"], "location": "Shenzhen", "persona": "35岁的深圳男士,金融行业从业者,喜欢阅读财经类书籍,性格稳重,注重自我提升,业余时间常参加读书会。"} +{"user_id": "u004", "gender": "female", "age": 30, "interests": ["cooking", "music"], "location": "Guangzhou", "persona": "30岁的广州女性,热爱烹饪和音乐,性格温和,喜欢在家做美食,也常去音乐会放松自己。"} +{"user_id": "u005", "gender": "male", "age": 22, "interests": ["gaming", "anime"], "location": "Chengdu", "persona": "22岁的成都男生,大学生,喜欢打游戏和看动漫,性格幽默,喜欢和朋友一起讨论二次元话题。"} diff --git a/envs/tool_manager/qwen3_manager.py b/envs/tool_manager/qwen3_manager.py index 7eb6259..9b6ac3d 100644 --- a/envs/tool_manager/qwen3_manager.py +++ b/envs/tool_manager/qwen3_manager.py @@ -16,7 +16,7 @@ import yaml import random import os -import openai # 你需要安装 openai 包 + def parse_mcp_tools_config(file_path): @@ -44,11 +44,22 @@ def __init__(self, verl_config): 'lang': 'en', 'max_input_tokens': 10000 } - # 加载 simulated_user/user.yaml - parent_dir = os.path.dirname(os.path.dirname(__file__)) - user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'user.yaml') - with open(user_yaml_path, 'r', encoding='utf-8') as f: - self._sim_user_feedback_cfg = yaml.safe_load(f) + if self.use_simulated_user_feedback: + import datasets + from omegaconf import OmegaConf + parent_dir = os.path.dirname(os.path.dirname(__file__)) + user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'simulated_user.yaml') + yaml_config = OmegaConf.load(user_yaml_path) + self.user_feedback_prob = yaml_config.train.user_feedback_prob + data_file = yaml_config.train.data_file + if not os.path.isabs(data_file): + data_file = os.path.join(parent_dir, 'simulated_user', data_file) + if data_file.endswith('.jsonl'): + self.persona_dataset = datasets.load_dataset('jsonl', data_files=data_file)['train'] + elif data_file.endswith('.json'): + self.persona_dataset = datasets.load_dataset('json', data_files=data_file)['train'] + else: + raise ValueError(f"Unsupported file type: {data_file}") def get_tool(self, name_or_short_name: str): """通过名称或简写获取工具 @@ -351,11 +362,10 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp if mode == 'initial': chat = input_data prompt_with_chat_template = tokenizer.apply_chat_template( - conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()], + conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()], add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) elif mode in ['tool_call', 'assistant_response']: - # NOTE: the assistant response might not be used role = 'tool' if mode == 'tool_call' else ASSISTANT if type(input_data) == str: chat = {'role': role, 'content': input_data} @@ -363,56 +373,59 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp chat = input_data else: raise ValueError('Unexpected type of input_data {} ({})'.format(type(input_data), input_data)) - temp_prompt_with_chat_template = tokenizer.apply_chat_template( - conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()], - tokenize=False, add_generation_prompt=ad - ]lmo - o_generation_prompt, enable_thinking=self.verl_config.enable_thinking + conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()], + tokenize=False, add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) prompt_with_chat_template = temp_prompt_with_chat_template.replace(base_prompt, '') else: raise ValueError('Invalid mode: {}'.format(mode)) - return prompt_with_chat_template async def _single_feedback(self, response: str) -> dict: cfg = self._sim_user_feedback_cfg prob = cfg.get('probability', 0.5) feedbacks = cfg.get('feedbacks', []) - persona = cfg.get('persona', '') - - if random.random() < prob and response: - if persona: - try: - import openai - prompt = ( - f"你的人物设定如下:\n{persona}\n\n" - f"请你以该人物身份,对下面AI的回答进行评价和反馈,要求简明扼要、具体、有指导性:\n" - f"AI的回答:{response}\n" - f"你的反馈:" - ) - completion = await asyncio.to_thread( - openai.ChatCompletion.create, - model="gpt-4", - messages=[ - {"role": "system", "content": "你是一个用户反馈生成器。"}, - {"role": "user", "content": prompt} - ], - max_tokens=128, - temperature=0.7, - ) - feedback = completion['choices'][0]['message']['content'].strip() - except Exception as e: - feedback = random.choice(feedbacks) if feedbacks else "请完善你的回答。" + persona = None + persona_str = None + try: + persona = random.choice(self.persona_dataset) + if isinstance(persona, dict): + persona_str = persona.get('persona') or str(persona) else: - feedback = random.choice(feedbacks) if feedbacks else "请完善你的回答。" + persona_str = str(persona) + except Exception: + persona_str = None + from openai import OpenAI + api_key = os.getenv("OPENAI_API_KEY",) + base_url = os.getenv("OPENAI_BASE_URL") + client = OpenAI(api_key=api_key, base_url=base_url) + if persona_str: + try: + prompt = ( + f"Your persona is as follows:\n{persona_str}\n\n" + f"Please, in the role of this persona, evaluate and provide feedback on the following AI answer. The feedback should be concise, specific, and constructive:\n" + f"AI's answer: {response}\n" + f"Your feedback:" + ) + completion = await asyncio.to_thread( + client.chat.completions.create, + model="anthropic.claude-3.5-sonnet-v2", + messages=[ + {"role": "system", "content": "You are a user feedback generator."}, + {"role": "user", "content": prompt} + ], + max_tokens=128, + temperature=0.7, + ) + feedback = completion.choices[0].message.content.strip() + except Exception as e: + feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer." else: - feedback = "" - - return {'role': USER, 'content': feedback} + feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer." + return feedback - async def simulated_user_feedback(self, responses: List[str], tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool) -> list: + def simulated_user_feedback(self, responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool): tmp_responses = [] simulaited_idx = [] for idx, use_feedback in enumerate(user_feedback_flag): @@ -421,16 +434,21 @@ async def simulated_user_feedback(self, responses: List[str], tokenizer, user_fe simulaited_idx.append(idx) if len(tmp_responses) > 0: + import asyncio tasks = [self._single_feedback(response) for response in tmp_responses] - results = await asyncio.gather(*tasks) + try: + loop = asyncio.get_running_loop() + results = loop.run_until_complete(asyncio.gather(*tasks)) + except RuntimeError: + results = asyncio.run(asyncio.gather(*tasks)) for idx, result in zip(simulaited_idx, results): - assert isinstance(next_obs[idx], str) - next_obs[idx] += tokenizer.apply_chat_template( + assert isinstance(next_obs[idx], List) + next_obs[idx].append(tokenizer.apply_chat_template( conversation=[{'role': USER, 'content': result}], tokenize=False, add_generation_prompt=True ) dones[idx] = False valid_action[idx] = 1 - is_tool[idx] = 0 + is_tool[idx] = 1 return next_obs, dones, valid_action, is_tool diff --git a/main_grpo.sh b/main_grpo.sh index 12c318b..74985d1 100644 --- a/main_grpo.sh +++ b/main_grpo.sh @@ -1,13 +1,14 @@ -set -e -x - -export MODEL_PATH=/your/path/to/Qwen/Qwen3-8B -export REWARD_MODEL_PATH=/your/path/to/Qwen/QwQ-32B +#!/bin/bash +source /home/ma-user/modelarts/work/jjw/RL-Factory/env.sh +export MODEL_PATH=/home/ma-user/modelarts/work/model/Qwen3-8B +export REWARD_MODEL_PATH=/home/ma-user/modelarts/work/model/Qwen3-8B # export VLLM_ATTENTION_BACKEND=XFORMERS - +DATA=/home/ma-user/modelarts/work/jjw/Search-R1/data/nq_search +export CUDA_VISIBLE_DEVICES="1,2" python3 -m verl.trainer.main_ppo\ algorithm.adv_estimator=grpo\ - data.train_files=data/nq_search/train.parquet\ - data.val_files=data/nq_search/test.parquet\ + data.train_files=$DATA/train.parquet\ + data.val_files=$DATA/test.parquet\ data.train_batch_size=128\ data.max_prompt_length=4096\ data.max_response_length=512\ @@ -39,7 +40,7 @@ python3 -m verl.trainer.main_ppo\ actor_rollout_ref.env.enable_thinking=False\ actor_rollout_ref.env.config_path=/your/path/to/envs/configs/mcp_tools.pydata\ reward_rollout.if_use_reward_rollout=False\ - reward_rollout.rollout.tensor_model_parallel_size=4\ + reward_rollout.rollout.tensor_model_parallel_size=1\ reward_rollout.rollout.gpu_memory_utilization=0.75\ reward_rollout.rollout.model_name=$REWARD_MODEL_PATH\ reward_rollout.rollout.free_cache_engine=False\ @@ -50,7 +51,7 @@ python3 -m verl.trainer.main_ppo\ trainer.logger=['tensorboard']\ trainer.project_name='GRPO_search'\ trainer.experiment_name='search_with_thinking'\ - trainer.n_gpus_per_node=8\ + trainer.n_gpus_per_node=1\ trainer.nnodes=1\ trainer.val_before_train=False\ trainer.default_local_dir=ckpt\ diff --git a/memory_ocupy.py b/memory_ocupy.py new file mode 100644 index 0000000..8d63b95 --- /dev/null +++ b/memory_ocupy.py @@ -0,0 +1,209 @@ +import torch +import time +import psutil +import os +import argparse +import numpy as np +from pynvml import nvmlInit, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, nvmlDeviceGetUtilizationRates, nvmlDeviceGetCount + +def get_gpu_memory_info(device_id=0): + """获取GPU显存信息""" + nvmlInit() + handle = nvmlDeviceGetHandleByIndex(device_id) + info = nvmlDeviceGetMemoryInfo(handle) + total_memory = info.total / 1024**2 # MB + used_memory = info.used / 1024**2 # MB + free_memory = info.free / 1024**2 # MB + return total_memory, used_memory, free_memory + +def get_gpu_utilization(device_id=0): + """获取GPU利用率""" + nvmlInit() + handle = nvmlDeviceGetHandleByIndex(device_id) + util = nvmlDeviceGetUtilizationRates(handle) + return util.gpu # GPU利用率百分比 + +def get_gpu_count(): + """获取可用的GPU数量""" + nvmlInit() + return nvmlDeviceGetCount() + +def occupy_gpu_memory(target_percentage=20, target_util=20, gpu_ids=None): + """ + 占用指定百分比的GPU显存并保持指定的利用率 + + 参数: + target_percentage: 目标显存占用百分比 + target_util: 目标GPU利用率百分比 + gpu_ids: 要使用的GPU ID列表,如果为None则使用所有可用GPU + """ + print(f"正在尝试占用 {target_percentage}% 的GPU显存并保持 {target_util}% 的利用率...") + + # 检查是否有可用的GPU + if not torch.cuda.is_available(): + print("没有可用的GPU!") + return + + # 获取可用的GPU数量 + num_gpus = torch.cuda.device_count() + print(f"系统中共有 {num_gpus} 个可用的GPU") + + # 如果没有指定GPU ID,则使用所有可用的GPU + if gpu_ids is None: + gpu_ids = list(range(num_gpus)) + else: + # 确保所有指定的GPU ID都是有效的 + gpu_ids = [gpu_id for gpu_id in gpu_ids if gpu_id < num_gpus] + if not gpu_ids: + print("指定的GPU ID无效!") + return + + print(f"将使用以下GPU: {gpu_ids}") + + # 为每个GPU创建一个字典来存储分配的张量和计算张量 + gpu_data = {} + + # 初始化每个GPU的数据 + for gpu_id in gpu_ids: + # 获取GPU总显存 + total_memory, used_memory, free_memory = get_gpu_memory_info(gpu_id) + print(f"GPU {gpu_id} 总显存: {total_memory:.2f} MB") + print(f"GPU {gpu_id} 当前已使用: {used_memory:.2f} MB ({used_memory/total_memory*100:.2f}%)") + print(f"GPU {gpu_id} 当前可用: {free_memory:.2f} MB") + + # 计算需要分配的显存大小 + target_memory = (total_memory * target_percentage / 100) - used_memory + if target_memory <= 0: + print(f"GPU {gpu_id} 已经使用了超过 {target_percentage}% 的显存,无需额外分配") + target_memory = 0 + else: + print(f"GPU {gpu_id} 将分配约 {target_memory:.2f} MB 显存...") + + gpu_data[gpu_id] = { + 'total_memory': total_memory, + 'target_memory': target_memory, + 'allocated_memory': 0, + 'allocated_tensors': [], + 'compute_tensors': None + } + + # 每次分配的块大小 (MB) + block_size = 100 # MB + + # 为每个GPU分配显存 + try: + for gpu_id in gpu_ids: + # 设置当前设备 + torch.cuda.set_device(gpu_id) + + target_memory = gpu_data[gpu_id]['target_memory'] + if target_memory <= 0: + continue + + allocated_memory = 0 + allocated_tensors = [] + + while allocated_memory < target_memory: + # 计算当前块的大小 + current_block = min(block_size, target_memory - allocated_memory) + if current_block <= 0: + break + + # 分配显存 (每个float32值占4字节) + tensor_size = int(current_block * 1024 * 1024 / 4) + tensor = torch.rand(tensor_size, device=f'cuda:{gpu_id}') + allocated_tensors.append(tensor) + allocated_memory += current_block + + # 打印进度 + progress = allocated_memory / target_memory * 100 + print(f"GPU {gpu_id} 已分配: {allocated_memory:.2f} MB / {target_memory:.2f} MB ({progress:.2f}%)", end='\r') + + print(f"\nGPU {gpu_id} 成功分配了约 {allocated_memory:.2f} MB 显存") + + # 更新GPU数据 + gpu_data[gpu_id]['allocated_memory'] = allocated_memory + gpu_data[gpu_id]['allocated_tensors'] = allocated_tensors + + # 获取当前显存使用情况 + _, current_used, _ = get_gpu_memory_info(gpu_id) + print(f"GPU {gpu_id} 当前显存使用: {current_used:.2f} MB ({current_used/gpu_data[gpu_id]['total_memory']*100:.2f}%)") + + # 创建用于计算的张量 + compute_size = 2000 # 调整大小以影响利用率 + a = torch.rand((compute_size, compute_size), device=f'cuda:{gpu_id}') + b = torch.rand((compute_size, compute_size), device=f'cuda:{gpu_id}') + gpu_data[gpu_id]['compute_tensors'] = (a, b) + + except RuntimeError as e: + print(f"\n无法分配更多显存: {e}") + + # 保持GPU利用率 + print(f"正在执行计算以保持所有GPU利用率在 {target_util}% 左右...") + + try: + while True: + for gpu_id in gpu_ids: + # 获取当前GPU利用率 + current_util = get_gpu_utilization(gpu_id) + + # 设置当前设备 + torch.cuda.set_device(gpu_id) + + # 获取计算张量 + if gpu_data[gpu_id]['compute_tensors'] is None: + continue + + a, b = gpu_data[gpu_id]['compute_tensors'] + + # 根据当前利用率调整计算强度 + if current_util < target_util - 5: + # 增加计算强度 + for _ in range(10): + c = torch.matmul(a, b) + d = torch.nn.functional.relu(c) + e = torch.matmul(d, b) + elif current_util > target_util + 5: + # 降低计算强度,休息一下 + time.sleep(0.1) + else: + # 保持适中的计算强度 + c = torch.matmul(a, b) + d = torch.nn.functional.relu(c) + + # 获取当前显存使用情况 + _, current_used, _ = get_gpu_memory_info(gpu_id) + memory_percentage = current_used / gpu_data[gpu_id]['total_memory'] * 100 + print(f"GPU {gpu_id} - 显存: {current_used:.2f} MB ({memory_percentage:.2f}%), 利用率: {current_util}%", end=' ') + + print("", end='\r') + # 短暂休息,避免打印过快 + time.sleep(0.5) + + except KeyboardInterrupt: + print("\n程序被用户中断") + finally: + # 清理分配的显存 + for gpu_id in gpu_ids: + gpu_data[gpu_id]['allocated_tensors'].clear() + gpu_data[gpu_id]['compute_tensors'] = None + torch.cuda.empty_cache() + print("\n已释放分配的显存") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='占用指定百分比的GPU显存并保持指定的利用率') + parser.add_argument('--memory', type=float, default=40.0, help='目标显存占用百分比 (默认: 80%%)') + parser.add_argument('--util', type=float, default=40.0, help='目标GPU利用率百分比 (默认: 80%%)') + parser.add_argument('--gpus', type=str, default="1,2", help='要使用的GPU ID,用逗号分隔,例如"0,1,2,3" (默认: 使用所有可用GPU)') + args = parser.parse_args() + + # 解析GPU ID + gpu_ids = None + if args.gpus is not None: + try: + gpu_ids = [int(gpu_id.strip()) for gpu_id in args.gpus.split(',') if gpu_id.strip()] + except ValueError: + print("无效的GPU ID格式,应为逗号分隔的整数") + exit(1) + + occupy_gpu_memory(args.memory, args.util, gpu_ids) From 8a9e5d293ae82ef51ab3f057423b36c33759cc5e Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Thu, 29 May 2025 03:42:11 +0000 Subject: [PATCH 5/6] clear unrelevant --- memory_ocupy.py | 209 ------------------------------------------------ 1 file changed, 209 deletions(-) delete mode 100644 memory_ocupy.py diff --git a/memory_ocupy.py b/memory_ocupy.py deleted file mode 100644 index 8d63b95..0000000 --- a/memory_ocupy.py +++ /dev/null @@ -1,209 +0,0 @@ -import torch -import time -import psutil -import os -import argparse -import numpy as np -from pynvml import nvmlInit, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo, nvmlDeviceGetUtilizationRates, nvmlDeviceGetCount - -def get_gpu_memory_info(device_id=0): - """获取GPU显存信息""" - nvmlInit() - handle = nvmlDeviceGetHandleByIndex(device_id) - info = nvmlDeviceGetMemoryInfo(handle) - total_memory = info.total / 1024**2 # MB - used_memory = info.used / 1024**2 # MB - free_memory = info.free / 1024**2 # MB - return total_memory, used_memory, free_memory - -def get_gpu_utilization(device_id=0): - """获取GPU利用率""" - nvmlInit() - handle = nvmlDeviceGetHandleByIndex(device_id) - util = nvmlDeviceGetUtilizationRates(handle) - return util.gpu # GPU利用率百分比 - -def get_gpu_count(): - """获取可用的GPU数量""" - nvmlInit() - return nvmlDeviceGetCount() - -def occupy_gpu_memory(target_percentage=20, target_util=20, gpu_ids=None): - """ - 占用指定百分比的GPU显存并保持指定的利用率 - - 参数: - target_percentage: 目标显存占用百分比 - target_util: 目标GPU利用率百分比 - gpu_ids: 要使用的GPU ID列表,如果为None则使用所有可用GPU - """ - print(f"正在尝试占用 {target_percentage}% 的GPU显存并保持 {target_util}% 的利用率...") - - # 检查是否有可用的GPU - if not torch.cuda.is_available(): - print("没有可用的GPU!") - return - - # 获取可用的GPU数量 - num_gpus = torch.cuda.device_count() - print(f"系统中共有 {num_gpus} 个可用的GPU") - - # 如果没有指定GPU ID,则使用所有可用的GPU - if gpu_ids is None: - gpu_ids = list(range(num_gpus)) - else: - # 确保所有指定的GPU ID都是有效的 - gpu_ids = [gpu_id for gpu_id in gpu_ids if gpu_id < num_gpus] - if not gpu_ids: - print("指定的GPU ID无效!") - return - - print(f"将使用以下GPU: {gpu_ids}") - - # 为每个GPU创建一个字典来存储分配的张量和计算张量 - gpu_data = {} - - # 初始化每个GPU的数据 - for gpu_id in gpu_ids: - # 获取GPU总显存 - total_memory, used_memory, free_memory = get_gpu_memory_info(gpu_id) - print(f"GPU {gpu_id} 总显存: {total_memory:.2f} MB") - print(f"GPU {gpu_id} 当前已使用: {used_memory:.2f} MB ({used_memory/total_memory*100:.2f}%)") - print(f"GPU {gpu_id} 当前可用: {free_memory:.2f} MB") - - # 计算需要分配的显存大小 - target_memory = (total_memory * target_percentage / 100) - used_memory - if target_memory <= 0: - print(f"GPU {gpu_id} 已经使用了超过 {target_percentage}% 的显存,无需额外分配") - target_memory = 0 - else: - print(f"GPU {gpu_id} 将分配约 {target_memory:.2f} MB 显存...") - - gpu_data[gpu_id] = { - 'total_memory': total_memory, - 'target_memory': target_memory, - 'allocated_memory': 0, - 'allocated_tensors': [], - 'compute_tensors': None - } - - # 每次分配的块大小 (MB) - block_size = 100 # MB - - # 为每个GPU分配显存 - try: - for gpu_id in gpu_ids: - # 设置当前设备 - torch.cuda.set_device(gpu_id) - - target_memory = gpu_data[gpu_id]['target_memory'] - if target_memory <= 0: - continue - - allocated_memory = 0 - allocated_tensors = [] - - while allocated_memory < target_memory: - # 计算当前块的大小 - current_block = min(block_size, target_memory - allocated_memory) - if current_block <= 0: - break - - # 分配显存 (每个float32值占4字节) - tensor_size = int(current_block * 1024 * 1024 / 4) - tensor = torch.rand(tensor_size, device=f'cuda:{gpu_id}') - allocated_tensors.append(tensor) - allocated_memory += current_block - - # 打印进度 - progress = allocated_memory / target_memory * 100 - print(f"GPU {gpu_id} 已分配: {allocated_memory:.2f} MB / {target_memory:.2f} MB ({progress:.2f}%)", end='\r') - - print(f"\nGPU {gpu_id} 成功分配了约 {allocated_memory:.2f} MB 显存") - - # 更新GPU数据 - gpu_data[gpu_id]['allocated_memory'] = allocated_memory - gpu_data[gpu_id]['allocated_tensors'] = allocated_tensors - - # 获取当前显存使用情况 - _, current_used, _ = get_gpu_memory_info(gpu_id) - print(f"GPU {gpu_id} 当前显存使用: {current_used:.2f} MB ({current_used/gpu_data[gpu_id]['total_memory']*100:.2f}%)") - - # 创建用于计算的张量 - compute_size = 2000 # 调整大小以影响利用率 - a = torch.rand((compute_size, compute_size), device=f'cuda:{gpu_id}') - b = torch.rand((compute_size, compute_size), device=f'cuda:{gpu_id}') - gpu_data[gpu_id]['compute_tensors'] = (a, b) - - except RuntimeError as e: - print(f"\n无法分配更多显存: {e}") - - # 保持GPU利用率 - print(f"正在执行计算以保持所有GPU利用率在 {target_util}% 左右...") - - try: - while True: - for gpu_id in gpu_ids: - # 获取当前GPU利用率 - current_util = get_gpu_utilization(gpu_id) - - # 设置当前设备 - torch.cuda.set_device(gpu_id) - - # 获取计算张量 - if gpu_data[gpu_id]['compute_tensors'] is None: - continue - - a, b = gpu_data[gpu_id]['compute_tensors'] - - # 根据当前利用率调整计算强度 - if current_util < target_util - 5: - # 增加计算强度 - for _ in range(10): - c = torch.matmul(a, b) - d = torch.nn.functional.relu(c) - e = torch.matmul(d, b) - elif current_util > target_util + 5: - # 降低计算强度,休息一下 - time.sleep(0.1) - else: - # 保持适中的计算强度 - c = torch.matmul(a, b) - d = torch.nn.functional.relu(c) - - # 获取当前显存使用情况 - _, current_used, _ = get_gpu_memory_info(gpu_id) - memory_percentage = current_used / gpu_data[gpu_id]['total_memory'] * 100 - print(f"GPU {gpu_id} - 显存: {current_used:.2f} MB ({memory_percentage:.2f}%), 利用率: {current_util}%", end=' ') - - print("", end='\r') - # 短暂休息,避免打印过快 - time.sleep(0.5) - - except KeyboardInterrupt: - print("\n程序被用户中断") - finally: - # 清理分配的显存 - for gpu_id in gpu_ids: - gpu_data[gpu_id]['allocated_tensors'].clear() - gpu_data[gpu_id]['compute_tensors'] = None - torch.cuda.empty_cache() - print("\n已释放分配的显存") - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='占用指定百分比的GPU显存并保持指定的利用率') - parser.add_argument('--memory', type=float, default=40.0, help='目标显存占用百分比 (默认: 80%%)') - parser.add_argument('--util', type=float, default=40.0, help='目标GPU利用率百分比 (默认: 80%%)') - parser.add_argument('--gpus', type=str, default="1,2", help='要使用的GPU ID,用逗号分隔,例如"0,1,2,3" (默认: 使用所有可用GPU)') - args = parser.parse_args() - - # 解析GPU ID - gpu_ids = None - if args.gpus is not None: - try: - gpu_ids = [int(gpu_id.strip()) for gpu_id in args.gpus.split(',') if gpu_id.strip()] - except ValueError: - print("无效的GPU ID格式,应为逗号分隔的整数") - exit(1) - - occupy_gpu_memory(args.memory, args.util, gpu_ids) From de2c37646c7e4aa89944b0c671dbfca5bf44133a Mon Sep 17 00:00:00 2001 From: JiwenJ <3522936020@qq.com> Date: Tue, 3 Jun 2025 08:04:33 +0800 Subject: [PATCH 6/6] fix bugs of async loop --- envs/base.py | 5 ++- envs/simulated_user/simulated_user.yaml | 4 +- envs/simulated_user/user.jsonl | 10 ++--- envs/tool_manager/qwen3_manager.py | 50 +++++++++++++------------ install.sh | 2 +- main_grpo.sh | 28 +++++++++----- 6 files changed, 57 insertions(+), 42 deletions(-) diff --git a/envs/base.py b/envs/base.py index 7975b2d..cb2491e 100755 --- a/envs/base.py +++ b/envs/base.py @@ -15,7 +15,7 @@ def __init__(self, config): self.tool_manager = TOOL_MANAGER_REGISTRY[tool_manager_name](verl_config=config) self.max_prompt_length = config.get('max_prompt_length', 2048) self.use_verify_tool = False - self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', False) + self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', True) def verify_tool(self, data_source, solution_str, ground_truth, extra_info): @@ -62,8 +62,11 @@ def step(self, responses, tokenizer): if self.use_simulated_user_feedback: if random.random() < self.tool_manager.user_feedback_prob: user_feedback_flag.append(1) + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', False, 1, 0 else: user_feedback_flag.append(0) + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 else: temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 elif action == 'error': diff --git a/envs/simulated_user/simulated_user.yaml b/envs/simulated_user/simulated_user.yaml index 276b411..062bfed 100644 --- a/envs/simulated_user/simulated_user.yaml +++ b/envs/simulated_user/simulated_user.yaml @@ -1,3 +1,3 @@ train: - data_file: /home/ma-user/modelarts/work/jjw/RL-Factory/RL-Factory/envs/simulated_user/user.jsonl - user_feedback_prob: 0.5 + data_file: user.jsonl + user_feedback_prob: 0.1 diff --git a/envs/simulated_user/user.jsonl b/envs/simulated_user/user.jsonl index c80c0b1..98dcd3c 100644 --- a/envs/simulated_user/user.jsonl +++ b/envs/simulated_user/user.jsonl @@ -1,5 +1,5 @@ -{"user_id": "u001", "gender": "male", "age": 28, "interests": ["technology", "sports"], "location": "Beijing", "persona": "28岁的北京男生,热爱科技和体育,性格开朗,喜欢探索新事物,工作于互联网公司,业余时间常去健身和参加科技沙龙。"} -{"user_id": "u002", "gender": "female", "age": 24, "interests": ["fashion", "travel"], "location": "Shanghai", "persona": "24岁的上海女生,时尚达人,喜欢旅行和拍照,性格外向,乐于结交朋友,喜欢在社交媒体分享生活。"} -{"user_id": "u003", "gender": "male", "age": 35, "interests": ["finance", "reading"], "location": "Shenzhen", "persona": "35岁的深圳男士,金融行业从业者,喜欢阅读财经类书籍,性格稳重,注重自我提升,业余时间常参加读书会。"} -{"user_id": "u004", "gender": "female", "age": 30, "interests": ["cooking", "music"], "location": "Guangzhou", "persona": "30岁的广州女性,热爱烹饪和音乐,性格温和,喜欢在家做美食,也常去音乐会放松自己。"} -{"user_id": "u005", "gender": "male", "age": 22, "interests": ["gaming", "anime"], "location": "Chengdu", "persona": "22岁的成都男生,大学生,喜欢打游戏和看动漫,性格幽默,喜欢和朋友一起讨论二次元话题。"} +{"user_id": "u001", "gender": "male", "age": 28, "interests": ["technology", "sports"], "location": "Beijing", "persona": "A 28-year-old man from Beijing who loves technology and sports. He is outgoing, enjoys exploring new things, works at an internet company, and often goes to the gym and attends tech salons in his spare time."} +{"user_id": "u002", "gender": "female", "age": 24, "interests": ["fashion", "travel"], "location": "Shanghai", "persona": "A 24-year-old woman from Shanghai who is a fashion enthusiast. She loves traveling and taking photos, is outgoing, enjoys making friends, and likes sharing her life on social media."} +{"user_id": "u003", "gender": "male", "age": 35, "interests": ["finance", "reading"], "location": "Shenzhen", "persona": "A 35-year-old man from Shenzhen working in the finance industry. He enjoys reading finance books, is steady and focused on self-improvement, and often participates in book clubs in his free time."} +{"user_id": "u004", "gender": "female", "age": 30, "interests": ["cooking", "music"], "location": "Guangzhou", "persona": "A 30-year-old woman from Guangzhou who loves cooking and music. She is gentle, enjoys making delicious food at home, and often goes to concerts to relax."} +{"user_id": "u005", "gender": "male", "age": 22, "interests": ["gaming", "anime"], "location": "Chengdu", "persona": "A 22-year-old male college student from Chengdu who likes gaming and watching anime. He has a good sense of humor and enjoys discussing anime topics with friends."} diff --git a/envs/tool_manager/qwen3_manager.py b/envs/tool_manager/qwen3_manager.py index 9b6ac3d..81f0378 100644 --- a/envs/tool_manager/qwen3_manager.py +++ b/envs/tool_manager/qwen3_manager.py @@ -44,19 +44,18 @@ def __init__(self, verl_config): 'lang': 'en', 'max_input_tokens': 10000 } + self.use_simulated_user_feedback = verl_config.get("use_simulated_user_feedback", True) if self.use_simulated_user_feedback: import datasets from omegaconf import OmegaConf parent_dir = os.path.dirname(os.path.dirname(__file__)) user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'simulated_user.yaml') - yaml_config = OmegaConf.load(user_yaml_path) - self.user_feedback_prob = yaml_config.train.user_feedback_prob - data_file = yaml_config.train.data_file + self.yaml_config = OmegaConf.load(user_yaml_path) + self.user_feedback_prob = self.yaml_config.train.user_feedback_prob + data_file = self.yaml_config.train.data_file if not os.path.isabs(data_file): data_file = os.path.join(parent_dir, 'simulated_user', data_file) if data_file.endswith('.jsonl'): - self.persona_dataset = datasets.load_dataset('jsonl', data_files=data_file)['train'] - elif data_file.endswith('.json'): self.persona_dataset = datasets.load_dataset('json', data_files=data_file)['train'] else: raise ValueError(f"Unsupported file type: {data_file}") @@ -348,7 +347,7 @@ def parse_tools(self, response: str): return parsed_tools def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_prompt=True): - assert mode in ['initial', 'tool_call', 'assistant_response'], 'Invalid mode: {}'.format(mode) + assert mode in ['initial', 'tool_call', 'assistant_response', 'user_feedback'], 'Invalid mode: {}'.format(mode) base_chat = [ {'role': SYSTEM, 'content': 'base'}, {'role': USER, 'content': 'base'}, @@ -365,10 +364,15 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()], add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) - elif mode in ['tool_call', 'assistant_response']: - role = 'tool' if mode == 'tool_call' else ASSISTANT + elif mode in ['tool_call', 'assistant_response', 'user_feedback']: + if mode == 'tool_call': + role = 'tool' + elif mode == 'user_feedback': + role = USER + else: + role = ASSISTANT if type(input_data) == str: - chat = {'role': role, 'content': input_data} + chat = [{'role': role, 'content': input_data}] elif type(input_data) == list: chat = input_data else: @@ -383,8 +387,7 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp return prompt_with_chat_template async def _single_feedback(self, response: str) -> dict: - cfg = self._sim_user_feedback_cfg - prob = cfg.get('probability', 0.5) + cfg = self.yaml_config feedbacks = cfg.get('feedbacks', []) persona = None persona_str = None @@ -397,20 +400,21 @@ async def _single_feedback(self, response: str) -> dict: except Exception: persona_str = None from openai import OpenAI - api_key = os.getenv("OPENAI_API_KEY",) - base_url = os.getenv("OPENAI_BASE_URL") + api_key = os.getenv("OPENAI_API_KEY","sk-045a991549b245c9838dfe33552fcf86") + assert api_key is not None, "OPENAI_API_KEY should be set" + base_url = os.getenv("OPENAI_BASE_URL","https://api.deepseek.com") client = OpenAI(api_key=api_key, base_url=base_url) if persona_str: try: prompt = ( f"Your persona is as follows:\n{persona_str}\n\n" - f"Please, in the role of this persona, evaluate and provide feedback on the following AI answer. The feedback should be concise, specific, and constructive:\n" + f"Please, in the role of this persona, evaluate and provide feedback on the following AI answer. The feedback should be concise, specific, and constructive, your feedback should be in English:\n" f"AI's answer: {response}\n" f"Your feedback:" ) completion = await asyncio.to_thread( client.chat.completions.create, - model="anthropic.claude-3.5-sonnet-v2", + model="deepseek-chat", messages=[ {"role": "system", "content": "You are a user feedback generator."}, {"role": "user", "content": prompt} @@ -432,7 +436,6 @@ def simulated_user_feedback(self, responses, tokenizer, user_feedback_flag, next if use_feedback == 1: tmp_responses.append(responses[idx]) simulaited_idx.append(idx) - if len(tmp_responses) > 0: import asyncio tasks = [self._single_feedback(response) for response in tmp_responses] @@ -440,14 +443,15 @@ def simulated_user_feedback(self, responses, tokenizer, user_feedback_flag, next loop = asyncio.get_running_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) except RuntimeError: - results = asyncio.run(asyncio.gather(*tasks)) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + results = loop.run_until_complete(asyncio.gather(*tasks)) + loop.close() for idx, result in zip(simulaited_idx, results): - assert isinstance(next_obs[idx], List) - next_obs[idx].append(tokenizer.apply_chat_template( - conversation=[{'role': USER, 'content': result}], - tokenize=False, - add_generation_prompt=True - ) + breakpoint() + assert isinstance(next_obs[idx], str) + assert isinstance(result, str) + next_obs[idx]+= self.get_prompt(result, tokenizer, mode='user_feedback') dones[idx] = False valid_action[idx] = 1 is_tool[idx] = 1 diff --git a/install.sh b/install.sh index 8971329..326d21d 100755 --- a/install.sh +++ b/install.sh @@ -1,4 +1,4 @@ -pip3 install accelerate bitsandbytes datasets deepspeed==0.16.4 einops flash-attn==2.7.0.post2 isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torch torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel +pip3 install accelerate bitsandbytes datasets deepspeed einops isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel pip3 install vllm==0.8.5 pip3 install "qwen-agent[code_interpreter]" pip3 install llama_index bs4 pymilvus infinity_client codetiming tensordict==0.6 omegaconf torchdata==0.10.0 hydra-core easydict dill python-multipart diff --git a/main_grpo.sh b/main_grpo.sh index 74985d1..77a62fd 100644 --- a/main_grpo.sh +++ b/main_grpo.sh @@ -1,16 +1,24 @@ #!/bin/bash -source /home/ma-user/modelarts/work/jjw/RL-Factory/env.sh -export MODEL_PATH=/home/ma-user/modelarts/work/model/Qwen3-8B -export REWARD_MODEL_PATH=/home/ma-user/modelarts/work/model/Qwen3-8B +source /root/autodl-tmp/RL-Factory/tmp/env_autodl.sh +ray stop --force +sleep 5 +export MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B +export REWARD_MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B +export WANDB_API_KEY=76ecf2334073036f76da7b9e4eb5bbe934767728 +export HYDRA_FULL_ERROR=1 +export RAY_DEBUG=1 +# export RAY_DEBUG="legacy" # export VLLM_ATTENTION_BACKEND=XFORMERS -DATA=/home/ma-user/modelarts/work/jjw/Search-R1/data/nq_search -export CUDA_VISIBLE_DEVICES="1,2" +DATA=/root/autodl-tmp/data/nq_hotpotqa_train + + + python3 -m verl.trainer.main_ppo\ algorithm.adv_estimator=grpo\ data.train_files=$DATA/train.parquet\ data.val_files=$DATA/test.parquet\ - data.train_batch_size=128\ - data.max_prompt_length=4096\ + data.train_batch_size=32\ + data.max_prompt_length=1024\ data.max_response_length=512\ actor_rollout_ref.model.path=$MODEL_PATH\ actor_rollout_ref.model.use_remove_padding=True\ @@ -27,7 +35,7 @@ python3 -m verl.trainer.main_ppo\ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=16\ actor_rollout_ref.rollout.tensor_model_parallel_size=1\ actor_rollout_ref.rollout.name=vllm\ - actor_rollout_ref.rollout.gpu_memory_utilization=0.75\ + actor_rollout_ref.rollout.gpu_memory_utilization=0.4\ actor_rollout_ref.rollout.n=4\ actor_rollout_ref.rollout.max_turns=2\ actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16\ @@ -41,10 +49,10 @@ python3 -m verl.trainer.main_ppo\ actor_rollout_ref.env.config_path=/your/path/to/envs/configs/mcp_tools.pydata\ reward_rollout.if_use_reward_rollout=False\ reward_rollout.rollout.tensor_model_parallel_size=1\ - reward_rollout.rollout.gpu_memory_utilization=0.75\ + reward_rollout.rollout.gpu_memory_utilization=0.4\ reward_rollout.rollout.model_name=$REWARD_MODEL_PATH\ reward_rollout.rollout.free_cache_engine=False\ - reward_rollout.rollout.response_length=2048\ + reward_rollout.rollout.response_length=512\ reward_model.reward_manager=parallel\ algorithm.kl_ctrl.kl_coef=0.001\ trainer.critic_warmup=0\