From 6a63cbdee32dfb14771e7669f5dbef96840104ae Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 31 Oct 2025 18:28:22 +0800 Subject: [PATCH 01/14] support swanlab --- tests/utils/monitor_swanlab_test.py | 78 +++++++++++++++++ trinity/utils/monitor.py | 124 ++++++++++++++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 tests/utils/monitor_swanlab_test.py diff --git a/tests/utils/monitor_swanlab_test.py b/tests/utils/monitor_swanlab_test.py new file mode 100644 index 0000000000..3473c74c99 --- /dev/null +++ b/tests/utils/monitor_swanlab_test.py @@ -0,0 +1,78 @@ +""" +Simple smoke test for SwanlabMonitor. + +Run: + python cradle.py + +What it does: +- Ensures SWANLAB_API_KEY is read from environment (sets a dummy if missing). +- Initializes SwanlabMonitor with minimal args. +- Logs a small metric and closes the run. + +Notes: +- If `swanlab` is not installed, this script will print a helpful message and exit. +- The dummy API key is used only to exercise the login path; real authentication isn't required for this smoke test. +""" + +import os +import sys + + +def main() -> int: + # Defer imports to keep error handling simple + try: + from trinity.utils.monitor import SwanlabMonitor + except Exception as e: + print("[cradle] Failed to import SwanlabMonitor:", e) + return 1 + + # Ensure an env-based key path is exercised (uses dummy if not provided) + env_keys = ["SWANLAB_API_KEY", "SWANLAB_APIKEY", "SWANLAB_KEY", "SWANLAB_TOKEN"] + if not any(os.getenv(k) for k in env_keys): + os.environ["SWANLAB_API_KEY"] = "dummy_key_for_smoke_test" + print("[cradle] Set SWANLAB_API_KEY to a dummy value to test env-based login path.") + + # Try creating the monitor; if swanlab isn't installed, __init__ will assert + try: + mon = SwanlabMonitor( + project="trinity-smoke", + group="cradle", + name="swanlab-env", + role="tester", + config=None, + ) + except AssertionError as e: + print("[cradle] SwanLab not available or not installed:", e) + print("[cradle] Install swanlab to run this smoke test: pip install swanlab") + return 0 + except Exception as e: + print("[cradle] Unexpected error constructing SwanlabMonitor:", e) + return 1 + + # Log a minimal metric to verify basic flow + try: + mon.log({"smoke/metric": 1.0}, step=1) + print("[cradle] Logged a test metric via SwanlabMonitor.") + except Exception as e: + print("[cradle] Error during logging:", e) + try: + mon.close() + except Exception: + pass + return 1 + + # Close cleanly + try: + mon.close() + print("[cradle] SwanlabMonitor closed successfully.") + except Exception as e: + print("[cradle] Error closing monitor:", e) + return 1 + + print("[cradle] Smoke test completed.") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/trinity/utils/monitor.py b/trinity/utils/monitor.py index f0e847f4d2..4a70838723 100644 --- a/trinity/utils/monitor.py +++ b/trinity/utils/monitor.py @@ -16,6 +16,12 @@ import mlflow except ImportError: mlflow = None + +try: + import swanlab +except ImportError: + swanlab = None + from torch.utils.tensorboard import SummaryWriter from trinity.common.config import Config @@ -224,3 +230,121 @@ def default_args(cls) -> Dict: "username": None, "password": None, } + + +@MONITOR.register_module("swanlab") +class SwanlabMonitor(Monitor): + """Monitor with SwanLab. + This monitor integrates with SwanLab (https://swanlab.cn/) to track experiments. + """ + + def __init__( + self, project: str, group: str, name: str, role: str, config: Config = None + ) -> None: + assert ( + swanlab is not None + ), "swanlab is not installed. Please install it to use SwanlabMonitor." + + monitor_args = (config.monitor.monitor_args or {}) if config and getattr(config, "monitor", None) else {} + + # read api key from environment variable or monitor_args + api_key = ( + monitor_args.get("api_key") or os.environ.get("SWANLAB_API_KEY") + ) + if api_key: + try: + swanlab.login(api_key=api_key, save=True) + except Exception: + # Best-effort login; continue to init which may still work if already logged in + pass + + # Compose tags (ensure list and include role/group markers) + tags = monitor_args.get("tags") or [] + if isinstance(tags, tuple): + tags = list(tags) + if role and role not in tags: + tags.append(role) + if group and group not in tags: + tags.append(group) + + # Determine experiment name + exp_name = monitor_args.get("experiment_name") or f"{name}_{role}" + + # Prepare init kwargs, passing only non-None values to respect library defaults + init_kwargs = { + "project": project, + "experiment_name": exp_name, + "description": monitor_args.get("description"), + "tags": tags or None, + "logdir": monitor_args.get("logdir"), + "mode": monitor_args.get("mode") or "cloud", + "settings": monitor_args.get("settings"), + "id": monitor_args.get("id"), + "resume": monitor_args.get("resume"), + "reinit": monitor_args.get("reinit"), + } + # Strip None values to avoid overriding swanlab defaults + init_kwargs = {k: v for k, v in init_kwargs.items() if v is not None} + + # Convert config to a plain dict for SwanLab config logging + cfg_dict = None + if config is not None: + if hasattr(config, "flatten"): + try: + cfg_dict = config.flatten() + except Exception: + # Fallback: try to cast to dict if possible + try: + cfg_dict = dict(config) + except Exception: + cfg_dict = None + else: + try: + cfg_dict = dict(config) + except Exception: + cfg_dict = None + if cfg_dict is not None: + init_kwargs["config"] = cfg_dict + + self.logger = swanlab.init(**init_kwargs) + self.console_logger = get_logger(__name__, in_ray_actor=True) + + def log_table(self, table_name: str, experiences_table: pd.DataFrame, step: int): + # Convert pandas DataFrame to SwanLab ECharts Table + headers: List[str] = list(experiences_table.columns) + # Ensure rows are native Python types + rows: List[List[object]] = experiences_table.astype(object).values.tolist() + try: + tbl = swanlab.echarts.Table() + tbl.add(headers, rows) + swanlab.log({table_name: tbl}, step=step) + except Exception: + # Fallback: log as CSV string if echarts table is unavailable + csv_str = experiences_table.to_csv(index=False) + swanlab.log({table_name: csv_str}, step=step) + + def log(self, data: dict, step: int, commit: bool = False) -> None: + """Log metrics.""" + # SwanLab doesn't use commit flag; keep signature for compatibility + swanlab.log(data, step=step) + self.console_logger.info(f"Step {step}: {data}") + + def close(self) -> None: + try: + # Prefer run.finish() if available + if hasattr(self, "logger") and hasattr(self.logger, "finish"): + self.logger.finish() + else: + # Fallback to global finish + swanlab.finish() + except Exception: + pass + + @classmethod + def default_args(cls) -> Dict: + """Return default arguments for the monitor.""" + return { + "api_key": None, + "mode": "cloud", + "logdir": None, + } From e2bce75e1e3f92cdf62bac82066a9feae610283c Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 7 Nov 2025 10:43:07 +0800 Subject: [PATCH 02/14] stage for astune --- .gitignore | 1 + README_zh.md | 3 +- deps_incremental.txt | 2 + examples/agentscope_react/gsm8k.yaml | 4 +- .../agentscope_react/gsm8k_agentopia.yaml | 69 ++++ examples/grpo_gsm8k/gsm8k.yaml | 7 +- launcher_trinity.py | 341 ++++++++++++++++++ note.md | 12 + trinity/buffer/buffer.py | 4 + trinity/buffer/reader/file_reader.py | 47 +++ trinity/cli/launcher.py | 3 + trinity/common/config.py | 8 +- trinity/common/constants.py | 1 + .../agentscope/react/react_workflow.py | 118 +++++- trinity/common/workflows/workflow.py | 3 +- trinity/explorer/scheduler.py | 2 +- trinity/explorer/workflow_runner.py | 5 +- vsdb.py | 142 ++++++++ 18 files changed, 754 insertions(+), 18 deletions(-) create mode 100644 deps_incremental.txt create mode 100644 examples/agentscope_react/gsm8k_agentopia.yaml create mode 100644 launcher_trinity.py create mode 100644 note.md create mode 100644 vsdb.py diff --git a/.gitignore b/.gitignore index 7ab517ff20..5398cb7bd1 100644 --- a/.gitignore +++ b/.gitignore @@ -98,3 +98,4 @@ wandb/ # checkpoints checkpoints/ +launcher_record \ No newline at end of file diff --git a/README_zh.md b/README_zh.md index 8dadb307ef..d4dc456c32 100644 --- a/README_zh.md +++ b/README_zh.md @@ -146,7 +146,7 @@ pip install -e ".[flash_attn]" [`uv`](https://github.com/astral-sh/uv) 是现代的 Python 包管理工具。 ```bash -uv sync --extra dev --extra flash_attn +uv sync --extra dev --extra flash_attn -i https://mirrors.aliyun.com/pypi/simple/ --no-build-isolation ``` ## 通过 PyPI 安装 @@ -163,6 +163,7 @@ pip install flash-attn==2.8.1 ```bash uv pip install trinity-rft==0.3.1 uv pip install flash-attn==2.8.1 +uv pip install --verbose flash-attn -i https://mirrors.aliyun.com/pypi/simple/ --no-deps --no-build-isolation ``` ## 使用 Docker diff --git a/deps_incremental.txt b/deps_incremental.txt new file mode 100644 index 0000000000..373d554376 --- /dev/null +++ b/deps_incremental.txt @@ -0,0 +1,2 @@ +logoru +beast_logger \ No newline at end of file diff --git a/examples/agentscope_react/gsm8k.yaml b/examples/agentscope_react/gsm8k.yaml index a70da2c5e1..201cf9b351 100644 --- a/examples/agentscope_react/gsm8k.yaml +++ b/examples/agentscope_react/gsm8k.yaml @@ -7,7 +7,7 @@ algorithm: optimizer: lr: 1e-6 model: - model_path: ${oc.env:TRINITY_MODEL_PATH,Qwen/Qwen3-8B} + model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct' max_response_tokens: 16384 max_model_len: 24576 cluster: @@ -21,7 +21,7 @@ buffer: taskset: name: gsm8k storage_type: file - path: 'openai/gsm8k' + path: '/root/.cache/huggingface/hub/datasets--gsm8k/snapshots/e53f048856ff4f594e959d75785d2c2d37b678ee' subset_name: 'main' split: 'train' format: diff --git a/examples/agentscope_react/gsm8k_agentopia.yaml b/examples/agentscope_react/gsm8k_agentopia.yaml new file mode 100644 index 0000000000..5b30c6c710 --- /dev/null +++ b/examples/agentscope_react/gsm8k_agentopia.yaml @@ -0,0 +1,69 @@ +project: AgentScope-ReAct +name: GSM8K-Qwen3-8B +checkpoint_root_dir: ${oc.env:TRINITY_CHECKPOINT_ROOT_DIR,./checkpoints} +algorithm: + algorithm_type: multi_step_grpo + repeat_times: 8 + optimizer: + lr: 1e-6 +model: + model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct' + max_response_tokens: 16384 + max_model_len: 24576 +cluster: + node_num: 1 + gpu_per_node: 8 +buffer: + total_epochs: 1 + batch_size: 32 + train_batch_size: 256 + explorer_input: + taskset: + name: gsm8k + storage_type: env_service + path: 'http://localhost:8080' + subset_name: 'appworld' + split: 'train' + format: + prompt_key: 'question' + response_key: 'answer' + rollout_args: + temperature: 1.0 + default_workflow_type: 'agentopia_workflow' + eval_tasksets: [] + trainer_input: + experience_buffer: + name: agentscope_gsm8k_buffer + storage_type: queue +explorer: + eval_interval: 50 + runner_per_model: 16 + max_timeout: 1800 + rollout_model: + engine_num: 4 + tensor_parallel_size: 1 + enable_prefix_caching: false + enforce_eager: true + enable_openai_api: true + enable_history: true + enable_auto_tool_choice: true + tool_call_parser: hermes + # reasoning_parser: deepseek_r1 + enable_thinking: false + dtype: bfloat16 + seed: 42 +synchronizer: + sync_style: dynamic_by_explorer + sync_method: 'nccl' + sync_interval: 2 + sync_timeout: 1200 +trainer: + save_interval: 100 + grad_clip: 1.0 + use_dynamic_bsz: true + max_token_len_per_gpu: 24576 + ulysses_sequence_parallel_size: 2 +monitor: + monitor_type: tensorboard +agentopia_configuration: + config_path: '/mnt/data/qingxu.fu/ba-verl-advance/launcher/appworld_linear_base/git-appworld-qwen2-agentscope-bz32-tp4-linear.yaml' \ No newline at end of file diff --git a/examples/grpo_gsm8k/gsm8k.yaml b/examples/grpo_gsm8k/gsm8k.yaml index bc3e533d5f..233c8c17be 100644 --- a/examples/grpo_gsm8k/gsm8k.yaml +++ b/examples/grpo_gsm8k/gsm8k.yaml @@ -7,7 +7,7 @@ algorithm: optimizer: lr: 1e-5 model: - model_path: ${oc.env:TRINITY_MODEL_PATH,Qwen/Qwen2.5-1.5B-Instruct} + model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct' max_response_tokens: 1024 max_model_len: 1280 cluster: @@ -20,7 +20,7 @@ buffer: taskset: name: gsm8k storage_type: file - path: 'openai/gsm8k' + path: '/root/.cache/huggingface/hub/datasets--gsm8k/snapshots/e53f048856ff4f594e959d75785d2c2d37b678ee' subset_name: 'main' split: 'train' format: @@ -31,7 +31,7 @@ buffer: eval_tasksets: - name: gsm8k-eval storage_type: file - path: 'openai/gsm8k' + path: '/root/.cache/huggingface/hub/datasets--gsm8k/snapshots/e53f048856ff4f594e959d75785d2c2d37b678ee' subset_name: 'main' split: 'test' format: @@ -50,6 +50,7 @@ explorer: engine_num: 2 tensor_parallel_size: 1 enable_prefix_caching: false + gpu_memory_utilization: 0.7 enforce_eager: true dtype: bfloat16 seed: 42 diff --git a/launcher_trinity.py b/launcher_trinity.py new file mode 100644 index 0000000000..f42f8ce38d --- /dev/null +++ b/launcher_trinity.py @@ -0,0 +1,341 @@ +# from beast_logger import print_dict +import subprocess +import argparse +import shutil +import time +import sys +import os +from dotenv import load_dotenv + +load_dotenv() + +BACK_TARGETS = os.environ.get( + 'BACK_TARGETS', + 'trinity/explorer' +).split(',') + + +def parse_args(): + parser = argparse.ArgumentParser(description='BA Launcher') + parser.add_argument( + '--target', + type=str, + default='trinity', + required=False, + help='Target script to run (default: trinity)' + ) + parser.add_argument('--conf', + type=str, + default="", + required=False, + help='Path to configuration file' + ) + parser.add_argument('--db', + type=str, + default="", + required=False, + help='Path to configuration file' + ) + parser.add_argument('--with-ray', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch ray' + ) + parser.add_argument('--with-appworld', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch appworld' + ) + parser.add_argument('--with-appworld2', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch appworld2' + ) + parser.add_argument('--with-webshop', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch webshop' + ) + parser.add_argument('--with-logview', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch logview' + ) + parser.add_argument('--with-crafters', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='Launch Crafters Env Simulation' + ) + parser.add_argument('--reboot', + action='store_true', # Changed from store_true to action='store_true' + default=False, + help='reboot flag' + ) + + return parser.parse_args() + +def check_debugpy_version(): + """ + 检查 debugpy 模块版本是否 >= 1.8.0 + 如果未安装或版本过低,抛出 RuntimeError + """ + try: + import debugpy + except ImportError: + raise RuntimeError( + "Module 'debugpy>=1.8.0' cannot be loaded. " + "Ray Debugpy Debugger will not work without 'debugpy>=1.8.0' installed. " + "Install this module using 'pip install debugpy>=1.8.0'" + ) + + # 检查版本 + version = getattr(debugpy, '__version__', '0.0.0') + from packaging import version as packaging_version + + if packaging_version.parse(version) < packaging_version.parse('1.8.0'): + raise RuntimeError( + f"debugpy version {version} is too old. " + "Ray Debugpy Debugger requires 'debugpy>=1.8.0'. " + "Upgrade using 'pip install debugpy>=1.8.0'" + ) + + print(f"✓ debugpy version {version} meets requirement (>=1.8.0)") + +check_debugpy_version() +def main(): + args = parse_args() + + if args.conf: + yaml_path = args.conf + assert yaml_path.endswith('.yaml'), "Configuration file must be a YAML file" + exp_base = os.path.dirname(args.conf) + + if os.path.exists(exp_base): + + ## 0. read yaml (get trainer.experiment_name) + import yaml + with open(yaml_path, 'r') as file: + config = yaml.safe_load(file) + exp_name = config.get('trainer').get('experiment_name') + if exp_name is None or exp_name == 'read_yaml_name': + if exp_name is not None: exp_name = exp_name.replace('|', '-') + exp_name = os.path.basename(yaml_path).replace('.yaml', '') + else: + exp_name = exp_name.replace('|', '-') + + print('----------------------------------------') + backup_dir = os.path.join('launcher_record', exp_name, 'backup') + yaml_backup_dst = os.path.join('launcher_record', exp_name, 'yaml_backup.yaml') + exe_yaml_path = yaml_backup_dst + exe_exp_base = os.path.dirname(yaml_backup_dst) + print('Experiment Name:', exp_name) + print('Experiment Backup Dir:', backup_dir) + print('Experiment Yaml Dir:', yaml_backup_dst) + print('----------------------------------------') + time.sleep(2) + + ## 1. check exp_base/backup exist + if not os.path.exists(backup_dir): + os.makedirs(backup_dir) + else: + total_seconds = 10 + for i in range(total_seconds): + print(f"\rWarning: backup directory already exists, we will automatically ignore this after {total_seconds - i} seconds...", end="", flush=True) + time.sleep(1) + + ## 2. copy files to backup + for backup_target in BACK_TARGETS: + print(f"Copying {backup_target} to {os.path.join(backup_dir, os.path.basename(backup_target))}") + shutil.copytree(backup_target, os.path.join(backup_dir, os.path.basename(backup_target)), dirs_exist_ok=True) + + ## 3. copy yaml to backup + yaml_backup_src = yaml_path + shutil.copyfile(yaml_backup_src, yaml_backup_dst) + + ## 4. edit new yaml + yaml_path = yaml_backup_dst + # now, replace the trainer.experiment_name + with open(yaml_path, 'r') as file: + config = yaml.safe_load(file) + # config['trainer']['experiment_name'] = exp_name + with open(yaml_path, 'w') as file: + yaml.dump(config, file) + + else: + raise FileNotFoundError(f"Configuration file not found: {exp_base}") + + env = os.environ.copy() + if args.db: + env["RAY_DEBUG_POST_MORTEM"] = "1" + env["DEBUG_TAGS"] = args.db + env["RAY_record_task_actor_creation_sites"] = "true" + print("Debug mode is ON") + else: + print("Debug mode is OFF") + else: + assert args.with_appworld or args.with_webshop or args.with_logview or args.with_crafters, "You must at least do something." + if args.with_ray: + from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent + ray_env = {} + if args.db: + ray_env["RAY_DEBUG_POST_MORTEM"] = "1" + ray_env["DEBUG_TAGS"] = args.db + ray_env["RAY_record_task_actor_creation_sites"] = "true" + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + f"source ./.venv/bin/activate && ray start --head && sleep infinity" + ], + dir='./', + tag="ray_service", + use_pty=True + ) + companion.launch( + launch_wait_time=1800, + success_std_string="Ray runtime started", + env_dict=ray_env, + ) + if args.with_appworld: + from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + f"source /mnt/data/taoshuchang.tsc/anaconda3/etc/profile.d/conda.sh && conda activate appworld && bash -i EnvService/env_sandbox/appworld.sh" + ], + dir='/mnt/data/taoshuchang.tsc/beyondagent', + tag="appworld_env_service", + use_pty=True + ) + companion.launch( + launch_wait_time=1800, + success_std_string="Starting server on", + ) + if args.with_crafters: + from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent + crafters_path = os.environ.get('CRAFTERS_PATH') + crafters_script = os.environ.get('CRAFTERS_SCRIPT') + crafters_conda = os.environ.get('CRAFTERS_CONDA') + if crafters_path and os.path.exists(crafters_path): + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + f"source {crafters_conda} && conda activate balrog && bash -i {crafters_script}" + ], + dir=crafters_path, + tag="crafters_env_service", + use_pty=True + ) + companion.launch( + launch_wait_time=1800, + success_std_string="Starting server on", + ) + else: + raise RuntimeError("EnvService not found") + if args.with_webshop: + from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent + webshop_path = os.environ.get('WEBSHOP_PATH') + webshop_python = os.environ.get('WEBSHOP_PYTHON') + webshop_port = os.environ.get('WEBSHOP_PORT', '1907') + webshop_env_port = os.environ.get('WEBSHOP_ENV_PORT', '8080') + java_home = os.environ.get('JAVA_HOME') + java_ld_library_path = os.environ.get('JAVA_LD_LIBRARY_PATH') + search_engine_path = os.environ.get('SEARCH_ENGINE_PATH') + webshop_root = os.environ.get('WEBSHOP_ROOT') + items_attr_path = os.environ.get('ITEMS_ATTR_PATH') + items_file_path = os.environ.get('ITEMS_FILE_PATH') + pythonpath = os.environ.get('PYTHONPATH') + if webshop_path and os.path.exists(webshop_path): + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + webshop_python, + '-m', + 'env_sandbox.environments.webshop.SimServer_launch', + "--portal", + "127.0.0.1", + "--port", + webshop_port, + ], + dir=webshop_path, + tag="webshop_sim_server" + ) + + companion.launch(launch_wait_time=1800, success_std_string="Uvicorn running on", env_dict={ + "JAVA_HOME": java_home, + "JAVA_LD_LIBRARY_PATH": java_ld_library_path, + "search_engine_path": search_engine_path, + "webshop_root": webshop_root, + "ITEMS_ATTR_PATH": items_attr_path, + "ITEMS_FILE_PATH": items_file_path, + "PYTHONPATH": pythonpath + }, force_restart=args.reboot) + + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + webshop_python, + '-m', + 'env_sandbox.env_service', + "--env", + "webshop", + "--portal", + "127.0.0.1", + "--port", + webshop_env_port, + ], + dir=webshop_path, + tag="webshop_env_service" + ) + companion.launch(launch_wait_time=1800,success_std_string="Uvicorn running on", env_dict={ + "JAVA_HOME": java_home, + "JAVA_LD_LIBRARY_PATH": java_ld_library_path + }, force_restart=args.reboot) + else: + raise RuntimeError("EnvService not found") + + + + + if args.with_logview: + from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent + logview_nvm_dir = os.environ.get('LOGVIEW_NVM_DIR') + logview_nvm_bin = os.environ.get('LOGVIEW_NVM_BIN') + logview_path = os.environ.get('LOGVIEW_PATH') + companion = LaunchCommandWhenAbsent( + full_argument_list=[ + sys.executable, + '-m', + 'web_display.go', + ], + dir='./', + tag="logview" + ) + companion.launch(launch_wait_time=1800,success_std_string="Server running on", env_dict={ + 'NVM_DIR': logview_nvm_dir, + 'NVM_BIN': logview_nvm_bin, + 'PATH': logview_path + os.environ.get('PATH', '') + }) + + if args.conf: + # let's begin the training process + cmd = [ + sys.executable, + '-m', + 'trinity.cli.launcher', + # python -m trinity.cli.launcher run --config xxx.yaml + 'run', + '--config', yaml_backup_dst + ] + + if args.with_logview: + env.update({ + 'BEST_LOGGER_WEB_SERVICE_URL': os.environ.get('BEST_LOGGER_WEB_SERVICE_URL', 'http://127.0.0.1:8181/') + }) + + try: + print(f"Running command: {' '.join(cmd)}") + subprocess.run(cmd, check=True, cwd=os.path.abspath('./'), env=env) + except subprocess.CalledProcessError as e: + print(f"Error running subprocess: {e}") + sys.exit(1) + except Exception as e: + print(f"Unexpected error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/note.md b/note.md new file mode 100644 index 0000000000..f6158ccadc --- /dev/null +++ b/note.md @@ -0,0 +1,12 @@ + + +/mnt/data_cpfs/qingxu.fu/trinity/trinity/explorer/workflow_runner.py +run_task() + +--> +/mnt/data_cpfs/qingxu.fu/trinity/examples/agentscope_react/gsm8k.yaml +buffer.explorer_input.default_workflow_type +--> + +/mnt/data_cpfs/qingxu.fu/trinity/trinity/common/workflows/agentscope/react/react_workflow.py +run_async() diff --git a/trinity/buffer/buffer.py b/trinity/buffer/buffer.py index a7d52e60a7..d2b0bfce1e 100644 --- a/trinity/buffer/buffer.py +++ b/trinity/buffer/buffer.py @@ -16,6 +16,10 @@ def get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig from trinity.buffer.reader.queue_reader import QueueReader return QueueReader(storage_config, buffer_config) + elif storage_config.storage_type == StorageType.ENV_SERVICE: + from trinity.buffer.reader.file_reader import EnvServiceTaskReader + + return EnvServiceTaskReader(storage_config, buffer_config) elif storage_config.storage_type == StorageType.FILE: from trinity.buffer.reader.file_reader import ( ExperienceFileReader, diff --git a/trinity/buffer/reader/file_reader.py b/trinity/buffer/reader/file_reader.py index b79e87285d..1b7217ce0f 100644 --- a/trinity/buffer/reader/file_reader.py +++ b/trinity/buffer/reader/file_reader.py @@ -155,3 +155,50 @@ def read(self, batch_size: Optional[int] = None) -> List: task = self.formatter.format(sample) tasks.append(task) return tasks + + +class EnvServiceTaskReader(BaseFileReader): + def __init__(self, meta: StorageConfig, config: BufferConfig): + from agentopia.client.env_client_ng import EnvClient + self.meta = meta + self.env_url = self.meta.path + self.env_type = self.meta.subset_name + self.split = self.meta.split + + self.env = EnvClient(base_url=meta.path) + self.env_params = {} + dataframes = [] + + env_service_client = EnvClient(base_url=self.env_url) + task_id_array = env_service_client.get_env_profile(self.env_type, split=self.split) + if len(task_id_array) == 0: + raise ValueError(f"No task_id found for self.env_type: {self.env_type}, split: {self.split}, Please check connection to {self.env_url}") + data = { + 'task_selector': [task_id for task_id in task_id_array], + # 'reward_model': [{} for task_id in task_id_array], + # 'extras': [{'task_id': task_id} for task_id in task_id_array], + } + dataframe = Dataset.from_dict(data) + dataframes.append(dataframe) + + self.read_batch_size = config.batch_size + self.dataset = _HFBatchReader( + datasets.concatenate_datasets(dataframes), + name=meta.name, + default_batch_size=self.read_batch_size, + total_epochs=self.meta.total_epochs if not self.meta.is_eval else 1, + offset=self.meta.index, + drop_last=not self.meta.is_eval, + total_steps=meta.total_steps, + enable_progress_bar=meta.enable_progress_bar, + ) + self.formatter = FORMATTER.get("task")(meta) + + def read(self, batch_size: Optional[int] = None) -> List: + batch_size = batch_size or self.read_batch_size + tasks = [] + samples = self.dataset.read_batch(batch_size) + for sample in samples: + task = self.formatter.format(sample) + tasks.append(task) + return tasks diff --git a/trinity/cli/launcher.py b/trinity/cli/launcher.py index 468ab2df53..e18ae8d000 100644 --- a/trinity/cli/launcher.py +++ b/trinity/cli/launcher.py @@ -163,6 +163,9 @@ def run_stage(config: Config) -> None: def run(config_path: str, dlc: bool = False, plugin_dir: str = None): + if os.path.exists(".env"): + from dotenv import load_dotenv + load_dotenv(".env") if plugin_dir: os.environ[PLUGIN_DIRS_ENV_VAR] = plugin_dir load_plugins() diff --git a/trinity/common/config.py b/trinity/common/config.py index ba66634539..9f618abd9a 100644 --- a/trinity/common/config.py +++ b/trinity/common/config.py @@ -146,7 +146,7 @@ class StorageConfig: default_workflow_type: Optional[str] = None default_reward_fn_type: Optional[str] = None rollout_args: GenerationConfig = field(default_factory=GenerationConfig) - workflow_args: dict = field(default_factory=dict) + workflow_args: dict = field(default_factory=dict) # qingxu: TODO reward_fn_args: dict = field(default_factory=dict) # enable progress bar (tqdm) for _HFBatchReader @@ -568,6 +568,11 @@ class StageConfig: explorer: Optional[ExplorerConfig] = None trainer: Optional[TrainerConfig] = None +@dataclass +class AgentopiaConfiguration: + """Configs for a stage.""" + config_path: str = "" + @dataclass class Config: @@ -597,6 +602,7 @@ class Config: synchronizer: SynchronizerConfig = field(default_factory=SynchronizerConfig) service: ServiceConfig = field(default_factory=ServiceConfig) log: LogConfig = field(default_factory=LogConfig) + agentopia_configuration: AgentopiaConfiguration = field(default_factory=AgentopiaConfiguration) # configurations for different training stages stages: List[StageConfig] = field(default_factory=list) diff --git a/trinity/common/constants.py b/trinity/common/constants.py index ad092603d2..92270df22c 100644 --- a/trinity/common/constants.py +++ b/trinity/common/constants.py @@ -60,6 +60,7 @@ class StorageType(CaseInsensitiveEnum): SQL = "sql" QUEUE = "queue" FILE = "file" + ENV_SERVICE = "env_service" class SyncMethodEnumMeta(CaseInsensitiveEnumMeta): diff --git a/trinity/common/workflows/agentscope/react/react_workflow.py b/trinity/common/workflows/agentscope/react/react_workflow.py index a6dbca28e3..22997ed0ec 100644 --- a/trinity/common/workflows/agentscope/react/react_workflow.py +++ b/trinity/common/workflows/agentscope/react/react_workflow.py @@ -2,25 +2,22 @@ This workflow is a demonstration of how to integrate the AgentScope framework within the Trinity-RFT workflow system with minimal modifications. """ - -from typing import Dict, List, Optional, Union - +import uuid import openai - +from typing import Dict, List, Optional, Union from trinity.common.experience import Experience from trinity.common.models.model import ModelWrapper from trinity.common.workflows.workflow import WORKFLOWS, Task, Workflow - +from transformers import AutoTokenizer from .templates import TEMPLATE_MAP - @WORKFLOWS.register_module("as_react_workflow") class AgentScopeReActWorkflow(Workflow): is_async: bool = True def __init__( self, - *, + config, task: Task, model: ModelWrapper, auxiliary_models: Optional[List[openai.OpenAI]] = None, @@ -97,3 +94,110 @@ def construct_experiences(self, reward: Union[float, Dict[str, float]]) -> List[ if isinstance(reward, dict): exp.metrics.update(reward) return exps + + +@WORKFLOWS.register_module("agentopia_workflow") +class AgentopiatWorkflowWrap(Workflow): + is_async: bool = True + def __init__( + self, + config, + model: ModelWrapper, + task: Task, + auxiliary_models: Optional[List[openai.OpenAI]] = None, + ): + super().__init__( + task=task, + model=model, + auxiliary_models=auxiliary_models, + ) + self.config = config + self.task = task + + # 模拟openai的异步客户端 + self.model_client = model.get_openai_async_client() + # task_type 用于获取奖励函数 + # extract the query and the answer from the task + self.query = task.raw_task.get(task.format_args.prompt_key) # type: ignore [index] + self.answer = task.raw_task.get(task.format_args.response_key) # type: ignore [index] + self.task.workflow_args = { + "env_type": "appworld", + "task_id": self.task.task_id, + "instance_id": uuid.uuid4().hex, + } + + async def run_async(self): + cmt_tokenized = {} + from agentopia.trinity_compat_env import TrinityCompatWorkflow + from agentopia.schema.trajectory import Sample + from omegaconf import OmegaConf + + # config_path: '/mnt/data/qingxu.fu/ba-verl-advance/launcher/appworld_linear_base/git-appworld-qwen2-agentscope-bz32-tp4-linear.yaml' + config = OmegaConf.load(self.config.agentopia_configuration.config_path) + config.trainer.experiment_name = "dummy" + cmt = TrinityCompatWorkflow( + task=self.task, + llm_handle=self.model_client, + tokenizer=AutoTokenizer.from_pretrained(self.model_client.model_path), + config=config, + ).run_in_new_thread() + + from vsdb import bp + bp("DEV3") + + sample_final = [] + try: + sample_arr = cmt.group_tokenize() + except Exception as e: + cmt.generate_log(global_step=-1) + raise e + cmt.generate_log(global_step=-1) + sample_final += sample_arr + + + exps = [] + for index, sample in enumerate(sample_final): + sample: Sample + input_ids = sample.input_ids + prompt_ids = sample.prompt_ids + response_ids = sample.response_ids + attention_mask = sample.attention_mask + prompt_attention_mask = sample.prompt_attention_mask + response_attention_mask = sample.response_attention_mask + loss_mask = sample.loss_mask + prompt_loss_mask = sample.prompt_loss_mask + response_loss_mask = sample.response_loss_mask + position_ids = sample.position_ids + prompt_position_ids = sample.prompt_position_ids + response_position_ids = sample.response_position_ids + # cmt_tokenized["step_reward"] = self.reward_structure.step_reward[index] + + logprobs = sample.response_logprobs + reward = cmt.reward_structure.raw_reward + + exp = Experience( + # eid=uuid.uuid4().hex, + tokens = input_ids, # [seq_length] prompt + response + prompt_length = len(prompt_ids), # Length of the prompt in tokens, used for generating attention masks + logprobs = logprobs, # [resp_length] + reward = reward, # + # advantages=None, + # returns=None, + info = {}, + metrics = {}, # for wandb logging (must be string:float) + response_text = "", # optional + prompt_text = "", # optional + #### for multi-turn experiences + action_mask = response_loss_mask, # 1 是训练 + messages=sample.messages, # + # tools, + #### for dpo experiences + # chosen, + # rejected, + # chosen_messages, + # rejected_messages, + #### for multi-modal data + # multi_modal_inputs + ) + exps += [exp] + return exps diff --git a/trinity/common/workflows/workflow.py b/trinity/common/workflows/workflow.py index 26a555d7cc..99aa490e71 100644 --- a/trinity/common/workflows/workflow.py +++ b/trinity/common/workflows/workflow.py @@ -38,7 +38,7 @@ class Task(dict): task_id: Union[int, str] = "" def to_workflow( - self, model: Any, auxiliary_models: Optional[List[openai.OpenAI]] = None + self, config, model: Any, auxiliary_models: Optional[List[openai.OpenAI]] = None ) -> Workflow: """Convert the task to a workflow. @@ -53,6 +53,7 @@ def to_workflow( Workflow: The generated workflow object. """ return self.workflow( + config=config, model=model, task=self, auxiliary_models=auxiliary_models, diff --git a/trinity/explorer/scheduler.py b/trinity/explorer/scheduler.py index fecde5e61b..c217331b7a 100644 --- a/trinity/explorer/scheduler.py +++ b/trinity/explorer/scheduler.py @@ -81,7 +81,7 @@ async def run_with_retry(self, task: TaskWrapper) -> Tuple[Status, List, int]: for attempt in range(self.retry_times + 1): try: status, exps = await asyncio.wait_for( - self.runner.run_task.remote(task.task, task.repeat_times, task.run_id_base), + self.runner.run_task.remote(task.task, task.repeat_times, task.run_id_base), # Here we call the runner's run_task() self.timeout, ) if status.ok: diff --git a/trinity/explorer/workflow_runner.py b/trinity/explorer/workflow_runner.py index 187d0d5adf..6c93b9da5a 100644 --- a/trinity/explorer/workflow_runner.py +++ b/trinity/explorer/workflow_runner.py @@ -79,8 +79,9 @@ def _create_workflow_instance(self, task: Task) -> None: or not self.workflow_instance.resettable ): self.workflow_instance = task.to_workflow( - self.model_wrapper, - ( + config=self.config, + model=self.model_wrapper, + auxiliary_models=( self.auxiliary_model_async_clients if task.workflow.is_async else self.auxiliary_model_clients diff --git a/vsdb.py b/vsdb.py new file mode 100644 index 0000000000..662b77e461 --- /dev/null +++ b/vsdb.py @@ -0,0 +1,142 @@ +import os + + + +def vscode_conditional_breakpoint(tag=None, rank=-1, once=True): + """ + Set a conditional breakpoint in VSCode based on given tag and rank conditions. + + This function is used to trigger breakpoints during debugging when specific conditions are met. + The breakpoint will be triggered if: + 1. The `rank` parameter is 0, or the rank environment variable is 0. + 2. The environment variable `RAY_DEBUG_POST_MORTEM` is set. + 3. If a `tag` parameter is provided, it exists in the environment variable `DEBUG_TAGS`. + + Parameters: + - tag (str, optional): Tag to match against the environment variable `DEBUG_TAGS`. + If None, the breakpoint triggers unconditionally. + - rank (int, optional): GPU index, world rank. + - once (bool, optional): If True, the breakpoint will only trigger once. + + Environment Variables: + - RAY_DEBUG_POST_MORTEM: If not set, the function returns immediately without triggering breakpoint. + - DEBUG_TAGS: Contains multiple tags separated by `|`. If `tag` parameter exists in this variable, + the breakpoint triggers. + """ + + env_tag = f'HIT_BREAKPOINT_REC_{tag}' + # if rank < 0: rank = os.getenv("RANK", 0) + # if rank != 0: return + if not os.getenv('RAY_DEBUG_POST_MORTEM'): return + if tag is None: + if once: + if os.getenv(env_tag, "") != "1": + os.environ[env_tag] = "1" + breakpoint() + return + else: + breakpoint() + return + else: + debug_tags = os.getenv('DEBUG_TAGS', '').split('|') + if tag in debug_tags: + if once: + if os.getenv(env_tag, "") != "1": + os.environ[env_tag] = "1" + breakpoint() + return + else: + breakpoint() + return + +import pickle + +def objdump(obj, file="objdump.tmp"): + with open(file, "wb+") as f: + pickle.dump(obj, f) + return + +def objload(file="objdump.tmp"): + import os + if not os.path.exists(file): + return + with open(file, "rb") as f: + return pickle.load(f) + +bp = vscode_conditional_breakpoint + + + + +""" +Document: + +Ray Distributed Debugger VSCode Extension + +1. Starting with Ray 2.39, Anyscale has introduced the `Ray Distributed Debugger `_ VSCode extension. Follow the extension’s installation instructions, then add your cluster using the dashboard URL you obtained earlier. + + .. image:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/ray/debugger.png?raw=true + :alt: Ray Distributed Debugger VSCode extension screenshot + +2. Prerequisites. + + Ensure the following are installed (see the extension README for more detail): + + - Visual Studio Code + - `ray[default]` >= 2.9.1 + - `debugpy` >= 1.8.0 + + .. image:: https://github.com/aoshen524/verl/blob/main/docs/start/c7098b755ff689859837773a916c857.png?raw=true + :alt: VSCode with Ray prerequisites + +3. Environment Variables. + + To enable post‑mortem debugging, set: + + .. code-block:: bash + + export RAY_DEBUG_POST_MORTEM=1 + + .. admonition:: Note + :class: important + + Be sure to remove any legacy flags before starting Ray: + + - `RAY_DEBUG=legacy` + - `--ray-debugger-external` + +4. Configuring BreakpointsSet up breakpoint() in your code, and submit job to cluster. Then the extension will show the breakpoint information. + + + 1. Insert `breakpoint()` calls into your remote functions. + 2. Submit your job to the cluster. + + The extension will detect active breakpoints and display them in VSCode. + + .. image:: https://github.com/aoshen524/verl/blob/main/docs/start/4ddad74395c79a1402331c0ce73316f.png?raw=true + :alt: Detected breakpoint in VSCode + + **Note:** Breakpoints are only supported inside functions decorated with `@ray.remote`. + +5. Launching the Debugger. + + Run your job directly from the command line (do not use a `launch.json`): + + .. code-block:: bash + + python job.py + +6. Attaching to a Breakpoint. + + Once the process hits the first `breakpoint()`, click the Ray Distributed Debugger icon in the VSCode sidebar to attach the debugger. + + .. image:: https://github.com/aoshen524/verl/blob/main/docs/start/4ddad74395c79a1402331c0ce73316f.png?raw=true + :alt: Attaching VSCode debugger to Ray process + +7. Debugging With Multiple breakpoint(). + + For each subsequent task, first disconnect the current debugger session, then click the extension icon again to attach to the next breakpoint. + + .. image:: https://github.com/aoshen524/verl/blob/main/docs/start/6e83c910a62c82fecb89c6619e001cd.png?raw=true + :alt: Disconnecting and reconnecting the debugger +""" \ No newline at end of file From fc419b7c4b90b89f38e34277bf1c1a4165259584 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 7 Nov 2025 10:43:17 +0800 Subject: [PATCH 03/14] compat swanlab --- trinity/utils/monitor.py | 149 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/trinity/utils/monitor.py b/trinity/utils/monitor.py index 880d035f83..0b422582d3 100644 --- a/trinity/utils/monitor.py +++ b/trinity/utils/monitor.py @@ -16,6 +16,10 @@ import mlflow except ImportError: mlflow = None +try: + import swanlab +except ImportError: + swanlab = None from torch.utils.tensorboard import SummaryWriter from trinity.common.config import Config @@ -219,3 +223,148 @@ def default_args(cls) -> Dict: "username": None, "password": None, } + + +@MONITOR.register_module("swanlab") +class SwanlabMonitor(Monitor): + """Monitor with SwanLab. + + This monitor integrates with SwanLab (https://swanlab.cn/) to track experiments. + + Supported monitor_args in config.monitor.monitor_args: + - api_key (Optional[str]): API key for swanlab.login(). If omitted, will read from env + (SWANLAB_API_KEY, SWANLAB_APIKEY, SWANLAB_KEY, SWANLAB_TOKEN) or assume prior CLI login. + - workspace (Optional[str]): Organization/username workspace. + - mode (Optional[str]): "cloud" | "local" | "offline" | "disabled". + - logdir (Optional[str]): Local log directory when in local/offline modes. + - experiment_name (Optional[str]): Explicit experiment name. Defaults to "{name}_{role}". + - description (Optional[str]): Experiment description. + - tags (Optional[List[str]]): Tags to attach. Role and group are appended automatically. + - id (Optional[str]): Resume target run id (21 chars) when using resume modes. + - resume (Optional[Literal['must','allow','never']|bool]): Resume policy. + - reinit (Optional[bool]): Whether to re-init on repeated init() calls. + """ + + def __init__( + self, project: str, group: str, name: str, role: str, config: Config = None + ) -> None: + assert ( + swanlab is not None + ), "swanlab is not installed. Please install it to use SwanlabMonitor." + + monitor_args = (config.monitor.monitor_args or {}) if config and getattr(config, "monitor", None) else {} + + # Optional API login via code if provided; otherwise try environment, then rely on prior `swanlab login`. + api_key = ( + monitor_args.get("api_key") + or os.environ.get("SWANLAB_API_KEY") + ) + if api_key: + try: + swanlab.login(api_key=api_key, save=True) + except Exception: + # Best-effort login; continue to init which may still work if already logged in + pass + + # Compose tags (ensure list and include role/group markers) + tags = monitor_args.get("tags") or [] + if isinstance(tags, tuple): + tags = list(tags) + if role and role not in tags: + tags.append(role) + if group and group not in tags: + tags.append(group) + + # Determine experiment name + exp_name = monitor_args.get("experiment_name") or f"{name}_{role}" + self.exp_name = exp_name + + # Prepare init kwargs, passing only non-None values to respect library defaults + init_kwargs = { + "project": project, + "workspace": monitor_args.get("workspace"), + "experiment_name": exp_name, + "description": monitor_args.get("description"), + "tags": tags or None, + "logdir": monitor_args.get("logdir"), + "mode": monitor_args.get("mode") or "cloud", + "settings": monitor_args.get("settings"), + "id": monitor_args.get("id"), + "resume": monitor_args.get("resume"), + "reinit": monitor_args.get("reinit"), + } + # Strip None values to avoid overriding swanlab defaults + init_kwargs = {k: v for k, v in init_kwargs.items() if v is not None} + + # Convert config to a plain dict for SwanLab config logging + cfg_dict = None + if config is not None: + if hasattr(config, "flatten"): + try: + cfg_dict = config.flatten() + except Exception: + # Fallback: try to cast to dict if possible + try: + cfg_dict = dict(config) + except Exception: + cfg_dict = None + else: + try: + cfg_dict = dict(config) + except Exception: + cfg_dict = None + if cfg_dict is not None: + init_kwargs["config"] = cfg_dict + + self.logger = swanlab.init(**init_kwargs) + self.console_logger = get_logger(__name__, in_ray_actor=True) + + def log_table(self, table_name: str, experiences_table: pd.DataFrame, step: int): + # Convert pandas DataFrame to SwanLab ECharts Table + headers: List[str] = list(experiences_table.columns) + # Ensure rows are native Python types + rows: List[List[object]] = experiences_table.astype(object).values.tolist() + try: + tbl = swanlab.echarts.Table() + tbl.add(headers, rows) + swanlab.log({table_name: tbl}, step=step) + except Exception: + # Fallback: log as CSV string if echarts table is unavailable + csv_str = experiences_table.to_csv(index=False) + swanlab.log({table_name: csv_str}, step=step) + + def log(self, data: dict, step: int, commit: bool = False) -> None: + """Log metrics.""" + # SwanLab doesn't use commit flag; keep signature for compatibility + swanlab.log(data, step=step) + self.console_logger.info(f"Step {step}: {data}") + with open(f"{self.exp_name}.log", "a") as f: + f.write(f"Step {step}: {data}\n") + + + def close(self) -> None: + try: + # Prefer run.finish() if available + if hasattr(self, "logger") and hasattr(self.logger, "finish"): + self.logger.finish() + else: + # Fallback to global finish + swanlab.finish() + except Exception: + pass + + @classmethod + def default_args(cls) -> Dict: + """Return default arguments for the monitor.""" + return { + "api_key": None, + "workspace": None, + "mode": "cloud", + "logdir": None, + "experiment_name": None, + "description": None, + "tags": None, + "id": None, + "resume": None, + "reinit": None, + } From 42a85673bfe13f076d350e1e6b8d80ea732469d0 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 7 Nov 2025 10:43:29 +0800 Subject: [PATCH 04/14] stage for astune --- .../agentscope/react/react_workflow.py | 65 +++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/trinity/common/workflows/agentscope/react/react_workflow.py b/trinity/common/workflows/agentscope/react/react_workflow.py index 22997ed0ec..066d4b8f56 100644 --- a/trinity/common/workflows/agentscope/react/react_workflow.py +++ b/trinity/common/workflows/agentscope/react/react_workflow.py @@ -173,31 +173,42 @@ async def run_async(self): # cmt_tokenized["step_reward"] = self.reward_structure.step_reward[index] logprobs = sample.response_logprobs - reward = cmt.reward_structure.raw_reward - - exp = Experience( - # eid=uuid.uuid4().hex, - tokens = input_ids, # [seq_length] prompt + response - prompt_length = len(prompt_ids), # Length of the prompt in tokens, used for generating attention masks - logprobs = logprobs, # [resp_length] - reward = reward, # - # advantages=None, - # returns=None, - info = {}, - metrics = {}, # for wandb logging (must be string:float) - response_text = "", # optional - prompt_text = "", # optional - #### for multi-turn experiences - action_mask = response_loss_mask, # 1 是训练 - messages=sample.messages, # - # tools, - #### for dpo experiences - # chosen, - # rejected, - # chosen_messages, - # rejected_messages, - #### for multi-modal data - # multi_modal_inputs - ) - exps += [exp] + try: + reward = cmt.reward_structure.step_reward + if isinstance(reward, list): + reward = reward[0] + except Exception as e: + reward = cmt.reward_structure.raw_reward + if not isinstance(reward, (float, int)): # if reward is still not a float or int, set it to 0.0 + reward = cmt.reward_structure.raw_reward + + if len(response_ids) + len(prompt_ids) == len(input_ids) and len(logprobs) == len(response_ids) and len(logprobs) > 0: + exp = Experience( + # eid=uuid.uuid4().hex, + tokens = input_ids, # [seq_length] prompt + response + prompt_length = len(prompt_ids), # Length of the prompt in tokens, used for generating attention masks + logprobs = logprobs, # [resp_length] + reward = reward, # + # advantages=None, + # returns=None, + info = {}, + metrics = {}, # for wandb logging (must be string:float) + response_text = "", # optional + prompt_text = "", # optional + #### for multi-turn experiences + action_mask = response_loss_mask, # 1 是训练 + messages=sample.messages, # + # tools, + #### for dpo experiences + # chosen, + # rejected, + # chosen_messages, + # rejected_messages, + #### for multi-modal data + # multi_modal_inputs + ) + exps += [exp] + else: + from vsdb import bp + bp("BUGX") return exps From 4564a663266867b72792148878736e087a430930 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 7 Nov 2025 10:57:43 +0800 Subject: [PATCH 05/14] improve according to suggestions --- tests/utils/monitor_swanlab_test.py | 99 ++++++++++++++--------------- trinity/utils/monitor.py | 23 ++++--- 2 files changed, 63 insertions(+), 59 deletions(-) diff --git a/tests/utils/monitor_swanlab_test.py b/tests/utils/monitor_swanlab_test.py index 3473c74c99..6c6259b600 100644 --- a/tests/utils/monitor_swanlab_test.py +++ b/tests/utils/monitor_swanlab_test.py @@ -2,7 +2,7 @@ Simple smoke test for SwanlabMonitor. Run: - python cradle.py + python cradle.py What it does: - Ensures SWANLAB_API_KEY is read from environment (sets a dummy if missing). @@ -19,60 +19,59 @@ def main() -> int: - # Defer imports to keep error handling simple - try: - from trinity.utils.monitor import SwanlabMonitor - except Exception as e: - print("[cradle] Failed to import SwanlabMonitor:", e) - return 1 + # Defer imports to keep error handling simple + try: + from trinity.utils.monitor import SwanlabMonitor + except Exception as e: + print("Failed to import SwanlabMonitor:", e) + return 1 - # Ensure an env-based key path is exercised (uses dummy if not provided) - env_keys = ["SWANLAB_API_KEY", "SWANLAB_APIKEY", "SWANLAB_KEY", "SWANLAB_TOKEN"] - if not any(os.getenv(k) for k in env_keys): - os.environ["SWANLAB_API_KEY"] = "dummy_key_for_smoke_test" - print("[cradle] Set SWANLAB_API_KEY to a dummy value to test env-based login path.") + # Ensure an env-based key path is exercised (uses dummy if not provided) + env_keys = ["SWANLAB_API_KEY", "SWANLAB_APIKEY", "SWANLAB_KEY", "SWANLAB_TOKEN"] + if not any(os.getenv(k) for k in env_keys): + os.environ["SWANLAB_API_KEY"] = "dummy_key_for_smoke_test" + print("Set SWANLAB_API_KEY to a dummy value to test env-based login path.") - # Try creating the monitor; if swanlab isn't installed, __init__ will assert - try: - mon = SwanlabMonitor( - project="trinity-smoke", - group="cradle", - name="swanlab-env", - role="tester", - config=None, - ) - except AssertionError as e: - print("[cradle] SwanLab not available or not installed:", e) - print("[cradle] Install swanlab to run this smoke test: pip install swanlab") - return 0 - except Exception as e: - print("[cradle] Unexpected error constructing SwanlabMonitor:", e) - return 1 + # Try creating the monitor; if swanlab isn't installed, __init__ will assert + try: + mon = SwanlabMonitor( + project="trinity-smoke", + group="cradle", + name="swanlab-env", + role="tester", + config=None, + ) + except AssertionError as e: + print("SwanLab not available or not installed:", e) + print("Install swanlab to run this smoke test: pip install swanlab") + return 0 + except Exception as e: + print("Unexpected error constructing SwanlabMonitor:", e) + return 1 - # Log a minimal metric to verify basic flow - try: - mon.log({"smoke/metric": 1.0}, step=1) - print("[cradle] Logged a test metric via SwanlabMonitor.") - except Exception as e: - print("[cradle] Error during logging:", e) - try: - mon.close() - except Exception: - pass - return 1 + # Log a minimal metric to verify basic flow + try: + mon.log({"smoke/metric": 1.0}, step=1) + print("Logged a test metric via SwanlabMonitor.") + except Exception as e: + print("Error during logging:", e) + try: + mon.close() + except Exception: + pass + return 1 - # Close cleanly - try: - mon.close() - print("[cradle] SwanlabMonitor closed successfully.") - except Exception as e: - print("[cradle] Error closing monitor:", e) - return 1 + # Close cleanly + try: + mon.close() + print("SwanlabMonitor closed successfully.") + except Exception as e: + print("Error closing monitor:", e) + return 1 - print("[cradle] Smoke test completed.") - return 0 + print("Smoke test completed.") + return 0 if __name__ == "__main__": - sys.exit(main()) - + sys.exit(main()) diff --git a/trinity/utils/monitor.py b/trinity/utils/monitor.py index 4a70838723..0d81e98385 100644 --- a/trinity/utils/monitor.py +++ b/trinity/utils/monitor.py @@ -245,18 +245,22 @@ def __init__( swanlab is not None ), "swanlab is not installed. Please install it to use SwanlabMonitor." - monitor_args = (config.monitor.monitor_args or {}) if config and getattr(config, "monitor", None) else {} + monitor_args = ( + (config.monitor.monitor_args or {}) + if config and getattr(config, "monitor", None) + else {} + ) # read api key from environment variable or monitor_args - api_key = ( - monitor_args.get("api_key") or os.environ.get("SWANLAB_API_KEY") - ) + api_key = monitor_args.get("api_key") or os.environ.get("SWANLAB_API_KEY") if api_key: try: swanlab.login(api_key=api_key, save=True) - except Exception: + except Exception as e: # Best-effort login; continue to init which may still work if already logged in - pass + get_logger(__name__).warning( + f"Swanlab login failed, but continuing initialization: {e}" + ) # Compose tags (ensure list and include role/group markers) tags = monitor_args.get("tags") or [] @@ -334,11 +338,12 @@ def close(self) -> None: # Prefer run.finish() if available if hasattr(self, "logger") and hasattr(self.logger, "finish"): self.logger.finish() - else: + elif swanlab: # Fallback to global finish swanlab.finish() - except Exception: - pass + except Exception as e: + logger = getattr(self, "console_logger", get_logger(__name__)) + logger.warning(f"Error closing Swanlab monitor: {e}") @classmethod def default_args(cls) -> Dict: From 36df4c8566f69824ce7f30f90712358797788e6b Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Sun, 9 Nov 2025 14:54:34 +0800 Subject: [PATCH 06/14] repo merge stage 1 --- .../agentscope/react/react_workflow.py | 117 ------------------ trinity/explorer/explorer.py | 2 +- trinity/utils/monitor.py | 43 ++----- 3 files changed, 8 insertions(+), 154 deletions(-) diff --git a/trinity/common/workflows/agentscope/react/react_workflow.py b/trinity/common/workflows/agentscope/react/react_workflow.py index 066d4b8f56..6d2f1fbfc0 100644 --- a/trinity/common/workflows/agentscope/react/react_workflow.py +++ b/trinity/common/workflows/agentscope/react/react_workflow.py @@ -95,120 +95,3 @@ def construct_experiences(self, reward: Union[float, Dict[str, float]]) -> List[ exp.metrics.update(reward) return exps - -@WORKFLOWS.register_module("agentopia_workflow") -class AgentopiatWorkflowWrap(Workflow): - is_async: bool = True - def __init__( - self, - config, - model: ModelWrapper, - task: Task, - auxiliary_models: Optional[List[openai.OpenAI]] = None, - ): - super().__init__( - task=task, - model=model, - auxiliary_models=auxiliary_models, - ) - self.config = config - self.task = task - - # 模拟openai的异步客户端 - self.model_client = model.get_openai_async_client() - # task_type 用于获取奖励函数 - # extract the query and the answer from the task - self.query = task.raw_task.get(task.format_args.prompt_key) # type: ignore [index] - self.answer = task.raw_task.get(task.format_args.response_key) # type: ignore [index] - self.task.workflow_args = { - "env_type": "appworld", - "task_id": self.task.task_id, - "instance_id": uuid.uuid4().hex, - } - - async def run_async(self): - cmt_tokenized = {} - from agentopia.trinity_compat_env import TrinityCompatWorkflow - from agentopia.schema.trajectory import Sample - from omegaconf import OmegaConf - - # config_path: '/mnt/data/qingxu.fu/ba-verl-advance/launcher/appworld_linear_base/git-appworld-qwen2-agentscope-bz32-tp4-linear.yaml' - config = OmegaConf.load(self.config.agentopia_configuration.config_path) - config.trainer.experiment_name = "dummy" - cmt = TrinityCompatWorkflow( - task=self.task, - llm_handle=self.model_client, - tokenizer=AutoTokenizer.from_pretrained(self.model_client.model_path), - config=config, - ).run_in_new_thread() - - from vsdb import bp - bp("DEV3") - - sample_final = [] - try: - sample_arr = cmt.group_tokenize() - except Exception as e: - cmt.generate_log(global_step=-1) - raise e - cmt.generate_log(global_step=-1) - sample_final += sample_arr - - - exps = [] - for index, sample in enumerate(sample_final): - sample: Sample - input_ids = sample.input_ids - prompt_ids = sample.prompt_ids - response_ids = sample.response_ids - attention_mask = sample.attention_mask - prompt_attention_mask = sample.prompt_attention_mask - response_attention_mask = sample.response_attention_mask - loss_mask = sample.loss_mask - prompt_loss_mask = sample.prompt_loss_mask - response_loss_mask = sample.response_loss_mask - position_ids = sample.position_ids - prompt_position_ids = sample.prompt_position_ids - response_position_ids = sample.response_position_ids - # cmt_tokenized["step_reward"] = self.reward_structure.step_reward[index] - - logprobs = sample.response_logprobs - try: - reward = cmt.reward_structure.step_reward - if isinstance(reward, list): - reward = reward[0] - except Exception as e: - reward = cmt.reward_structure.raw_reward - if not isinstance(reward, (float, int)): # if reward is still not a float or int, set it to 0.0 - reward = cmt.reward_structure.raw_reward - - if len(response_ids) + len(prompt_ids) == len(input_ids) and len(logprobs) == len(response_ids) and len(logprobs) > 0: - exp = Experience( - # eid=uuid.uuid4().hex, - tokens = input_ids, # [seq_length] prompt + response - prompt_length = len(prompt_ids), # Length of the prompt in tokens, used for generating attention masks - logprobs = logprobs, # [resp_length] - reward = reward, # - # advantages=None, - # returns=None, - info = {}, - metrics = {}, # for wandb logging (must be string:float) - response_text = "", # optional - prompt_text = "", # optional - #### for multi-turn experiences - action_mask = response_loss_mask, # 1 是训练 - messages=sample.messages, # - # tools, - #### for dpo experiences - # chosen, - # rejected, - # chosen_messages, - # rejected_messages, - #### for multi-modal data - # multi_modal_inputs - ) - exps += [exp] - else: - from vsdb import bp - bp("BUGX") - return exps diff --git a/trinity/explorer/explorer.py b/trinity/explorer/explorer.py index a10b523af2..3c944ee94e 100644 --- a/trinity/explorer/explorer.py +++ b/trinity/explorer/explorer.py @@ -31,7 +31,7 @@ from trinity.utils.log import get_logger from trinity.utils.monitor import MONITOR, gather_metrics from trinity.utils.plugin_loader import load_plugins - +from agentopia.backbone_trinity import * class Explorer: """Responsible for exploring the taskset.""" diff --git a/trinity/utils/monitor.py b/trinity/utils/monitor.py index 0b422582d3..3484ab9213 100644 --- a/trinity/utils/monitor.py +++ b/trinity/utils/monitor.py @@ -255,16 +255,15 @@ def __init__( monitor_args = (config.monitor.monitor_args or {}) if config and getattr(config, "monitor", None) else {} # Optional API login via code if provided; otherwise try environment, then rely on prior `swanlab login`. - api_key = ( - monitor_args.get("api_key") - or os.environ.get("SWANLAB_API_KEY") - ) + api_key = os.environ.get("SWANLAB_API_KEY") if api_key: try: swanlab.login(api_key=api_key, save=True) except Exception: # Best-effort login; continue to init which may still work if already logged in pass + else: + raise RuntimeError("Swanlab API key not found in environment variable SWANLAB_API_KEY.") # Compose tags (ensure list and include role/group markers) tags = monitor_args.get("tags") or [] @@ -296,30 +295,12 @@ def __init__( # Strip None values to avoid overriding swanlab defaults init_kwargs = {k: v for k, v in init_kwargs.items() if v is not None} - # Convert config to a plain dict for SwanLab config logging - cfg_dict = None - if config is not None: - if hasattr(config, "flatten"): - try: - cfg_dict = config.flatten() - except Exception: - # Fallback: try to cast to dict if possible - try: - cfg_dict = dict(config) - except Exception: - cfg_dict = None - else: - try: - cfg_dict = dict(config) - except Exception: - cfg_dict = None - if cfg_dict is not None: - init_kwargs["config"] = cfg_dict - self.logger = swanlab.init(**init_kwargs) self.console_logger = get_logger(__name__, in_ray_actor=True) def log_table(self, table_name: str, experiences_table: pd.DataFrame, step: int): + assert swanlab is not None, "swanlab is not installed. Please install it to use SwanlabMonitor." + # Convert pandas DataFrame to SwanLab ECharts Table headers: List[str] = list(experiences_table.columns) # Ensure rows are native Python types @@ -336,6 +317,7 @@ def log_table(self, table_name: str, experiences_table: pd.DataFrame, step: int) def log(self, data: dict, step: int, commit: bool = False) -> None: """Log metrics.""" # SwanLab doesn't use commit flag; keep signature for compatibility + assert swanlab is not None, "swanlab is not installed. Please install it to use SwanlabMonitor." swanlab.log(data, step=step) self.console_logger.info(f"Step {step}: {data}") with open(f"{self.exp_name}.log", "a") as f: @@ -356,15 +338,4 @@ def close(self) -> None: @classmethod def default_args(cls) -> Dict: """Return default arguments for the monitor.""" - return { - "api_key": None, - "workspace": None, - "mode": "cloud", - "logdir": None, - "experiment_name": None, - "description": None, - "tags": None, - "id": None, - "resume": None, - "reinit": None, - } + return {} From de22b8b7ef4825476232decf7f2bb3bcff557e83 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Mon, 10 Nov 2025 00:10:34 +0800 Subject: [PATCH 07/14] rename --- .../agentscope_react/gsm8k_agentopia.yaml | 4 +- launcher_trinity.py | 341 ------------------ trinity/buffer/buffer.py | 6 +- trinity/buffer/reader/file_reader.py | 53 +-- trinity/common/config.py | 5 - trinity/common/constants.py | 2 +- trinity/explorer/explorer.py | 2 +- 7 files changed, 36 insertions(+), 377 deletions(-) diff --git a/examples/agentscope_react/gsm8k_agentopia.yaml b/examples/agentscope_react/gsm8k_agentopia.yaml index 5b30c6c710..4bbf1c6d4d 100644 --- a/examples/agentscope_react/gsm8k_agentopia.yaml +++ b/examples/agentscope_react/gsm8k_agentopia.yaml @@ -64,6 +64,4 @@ trainer: max_token_len_per_gpu: 24576 ulysses_sequence_parallel_size: 2 monitor: - monitor_type: tensorboard -agentopia_configuration: - config_path: '/mnt/data/qingxu.fu/ba-verl-advance/launcher/appworld_linear_base/git-appworld-qwen2-agentscope-bz32-tp4-linear.yaml' \ No newline at end of file + monitor_type: tensorboard \ No newline at end of file diff --git a/launcher_trinity.py b/launcher_trinity.py index f42f8ce38d..e69de29bb2 100644 --- a/launcher_trinity.py +++ b/launcher_trinity.py @@ -1,341 +0,0 @@ -# from beast_logger import print_dict -import subprocess -import argparse -import shutil -import time -import sys -import os -from dotenv import load_dotenv - -load_dotenv() - -BACK_TARGETS = os.environ.get( - 'BACK_TARGETS', - 'trinity/explorer' -).split(',') - - -def parse_args(): - parser = argparse.ArgumentParser(description='BA Launcher') - parser.add_argument( - '--target', - type=str, - default='trinity', - required=False, - help='Target script to run (default: trinity)' - ) - parser.add_argument('--conf', - type=str, - default="", - required=False, - help='Path to configuration file' - ) - parser.add_argument('--db', - type=str, - default="", - required=False, - help='Path to configuration file' - ) - parser.add_argument('--with-ray', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch ray' - ) - parser.add_argument('--with-appworld', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch appworld' - ) - parser.add_argument('--with-appworld2', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch appworld2' - ) - parser.add_argument('--with-webshop', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch webshop' - ) - parser.add_argument('--with-logview', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch logview' - ) - parser.add_argument('--with-crafters', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='Launch Crafters Env Simulation' - ) - parser.add_argument('--reboot', - action='store_true', # Changed from store_true to action='store_true' - default=False, - help='reboot flag' - ) - - return parser.parse_args() - -def check_debugpy_version(): - """ - 检查 debugpy 模块版本是否 >= 1.8.0 - 如果未安装或版本过低,抛出 RuntimeError - """ - try: - import debugpy - except ImportError: - raise RuntimeError( - "Module 'debugpy>=1.8.0' cannot be loaded. " - "Ray Debugpy Debugger will not work without 'debugpy>=1.8.0' installed. " - "Install this module using 'pip install debugpy>=1.8.0'" - ) - - # 检查版本 - version = getattr(debugpy, '__version__', '0.0.0') - from packaging import version as packaging_version - - if packaging_version.parse(version) < packaging_version.parse('1.8.0'): - raise RuntimeError( - f"debugpy version {version} is too old. " - "Ray Debugpy Debugger requires 'debugpy>=1.8.0'. " - "Upgrade using 'pip install debugpy>=1.8.0'" - ) - - print(f"✓ debugpy version {version} meets requirement (>=1.8.0)") - -check_debugpy_version() -def main(): - args = parse_args() - - if args.conf: - yaml_path = args.conf - assert yaml_path.endswith('.yaml'), "Configuration file must be a YAML file" - exp_base = os.path.dirname(args.conf) - - if os.path.exists(exp_base): - - ## 0. read yaml (get trainer.experiment_name) - import yaml - with open(yaml_path, 'r') as file: - config = yaml.safe_load(file) - exp_name = config.get('trainer').get('experiment_name') - if exp_name is None or exp_name == 'read_yaml_name': - if exp_name is not None: exp_name = exp_name.replace('|', '-') - exp_name = os.path.basename(yaml_path).replace('.yaml', '') - else: - exp_name = exp_name.replace('|', '-') - - print('----------------------------------------') - backup_dir = os.path.join('launcher_record', exp_name, 'backup') - yaml_backup_dst = os.path.join('launcher_record', exp_name, 'yaml_backup.yaml') - exe_yaml_path = yaml_backup_dst - exe_exp_base = os.path.dirname(yaml_backup_dst) - print('Experiment Name:', exp_name) - print('Experiment Backup Dir:', backup_dir) - print('Experiment Yaml Dir:', yaml_backup_dst) - print('----------------------------------------') - time.sleep(2) - - ## 1. check exp_base/backup exist - if not os.path.exists(backup_dir): - os.makedirs(backup_dir) - else: - total_seconds = 10 - for i in range(total_seconds): - print(f"\rWarning: backup directory already exists, we will automatically ignore this after {total_seconds - i} seconds...", end="", flush=True) - time.sleep(1) - - ## 2. copy files to backup - for backup_target in BACK_TARGETS: - print(f"Copying {backup_target} to {os.path.join(backup_dir, os.path.basename(backup_target))}") - shutil.copytree(backup_target, os.path.join(backup_dir, os.path.basename(backup_target)), dirs_exist_ok=True) - - ## 3. copy yaml to backup - yaml_backup_src = yaml_path - shutil.copyfile(yaml_backup_src, yaml_backup_dst) - - ## 4. edit new yaml - yaml_path = yaml_backup_dst - # now, replace the trainer.experiment_name - with open(yaml_path, 'r') as file: - config = yaml.safe_load(file) - # config['trainer']['experiment_name'] = exp_name - with open(yaml_path, 'w') as file: - yaml.dump(config, file) - - else: - raise FileNotFoundError(f"Configuration file not found: {exp_base}") - - env = os.environ.copy() - if args.db: - env["RAY_DEBUG_POST_MORTEM"] = "1" - env["DEBUG_TAGS"] = args.db - env["RAY_record_task_actor_creation_sites"] = "true" - print("Debug mode is ON") - else: - print("Debug mode is OFF") - else: - assert args.with_appworld or args.with_webshop or args.with_logview or args.with_crafters, "You must at least do something." - if args.with_ray: - from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent - ray_env = {} - if args.db: - ray_env["RAY_DEBUG_POST_MORTEM"] = "1" - ray_env["DEBUG_TAGS"] = args.db - ray_env["RAY_record_task_actor_creation_sites"] = "true" - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - f"source ./.venv/bin/activate && ray start --head && sleep infinity" - ], - dir='./', - tag="ray_service", - use_pty=True - ) - companion.launch( - launch_wait_time=1800, - success_std_string="Ray runtime started", - env_dict=ray_env, - ) - if args.with_appworld: - from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - f"source /mnt/data/taoshuchang.tsc/anaconda3/etc/profile.d/conda.sh && conda activate appworld && bash -i EnvService/env_sandbox/appworld.sh" - ], - dir='/mnt/data/taoshuchang.tsc/beyondagent', - tag="appworld_env_service", - use_pty=True - ) - companion.launch( - launch_wait_time=1800, - success_std_string="Starting server on", - ) - if args.with_crafters: - from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent - crafters_path = os.environ.get('CRAFTERS_PATH') - crafters_script = os.environ.get('CRAFTERS_SCRIPT') - crafters_conda = os.environ.get('CRAFTERS_CONDA') - if crafters_path and os.path.exists(crafters_path): - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - f"source {crafters_conda} && conda activate balrog && bash -i {crafters_script}" - ], - dir=crafters_path, - tag="crafters_env_service", - use_pty=True - ) - companion.launch( - launch_wait_time=1800, - success_std_string="Starting server on", - ) - else: - raise RuntimeError("EnvService not found") - if args.with_webshop: - from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent - webshop_path = os.environ.get('WEBSHOP_PATH') - webshop_python = os.environ.get('WEBSHOP_PYTHON') - webshop_port = os.environ.get('WEBSHOP_PORT', '1907') - webshop_env_port = os.environ.get('WEBSHOP_ENV_PORT', '8080') - java_home = os.environ.get('JAVA_HOME') - java_ld_library_path = os.environ.get('JAVA_LD_LIBRARY_PATH') - search_engine_path = os.environ.get('SEARCH_ENGINE_PATH') - webshop_root = os.environ.get('WEBSHOP_ROOT') - items_attr_path = os.environ.get('ITEMS_ATTR_PATH') - items_file_path = os.environ.get('ITEMS_FILE_PATH') - pythonpath = os.environ.get('PYTHONPATH') - if webshop_path and os.path.exists(webshop_path): - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - webshop_python, - '-m', - 'env_sandbox.environments.webshop.SimServer_launch', - "--portal", - "127.0.0.1", - "--port", - webshop_port, - ], - dir=webshop_path, - tag="webshop_sim_server" - ) - - companion.launch(launch_wait_time=1800, success_std_string="Uvicorn running on", env_dict={ - "JAVA_HOME": java_home, - "JAVA_LD_LIBRARY_PATH": java_ld_library_path, - "search_engine_path": search_engine_path, - "webshop_root": webshop_root, - "ITEMS_ATTR_PATH": items_attr_path, - "ITEMS_FILE_PATH": items_file_path, - "PYTHONPATH": pythonpath - }, force_restart=args.reboot) - - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - webshop_python, - '-m', - 'env_sandbox.env_service', - "--env", - "webshop", - "--portal", - "127.0.0.1", - "--port", - webshop_env_port, - ], - dir=webshop_path, - tag="webshop_env_service" - ) - companion.launch(launch_wait_time=1800,success_std_string="Uvicorn running on", env_dict={ - "JAVA_HOME": java_home, - "JAVA_LD_LIBRARY_PATH": java_ld_library_path - }, force_restart=args.reboot) - else: - raise RuntimeError("EnvService not found") - - - - - if args.with_logview: - from agentopia.utils.smart_daemon import LaunchCommandWhenAbsent - logview_nvm_dir = os.environ.get('LOGVIEW_NVM_DIR') - logview_nvm_bin = os.environ.get('LOGVIEW_NVM_BIN') - logview_path = os.environ.get('LOGVIEW_PATH') - companion = LaunchCommandWhenAbsent( - full_argument_list=[ - sys.executable, - '-m', - 'web_display.go', - ], - dir='./', - tag="logview" - ) - companion.launch(launch_wait_time=1800,success_std_string="Server running on", env_dict={ - 'NVM_DIR': logview_nvm_dir, - 'NVM_BIN': logview_nvm_bin, - 'PATH': logview_path + os.environ.get('PATH', '') - }) - - if args.conf: - # let's begin the training process - cmd = [ - sys.executable, - '-m', - 'trinity.cli.launcher', - # python -m trinity.cli.launcher run --config xxx.yaml - 'run', - '--config', yaml_backup_dst - ] - - if args.with_logview: - env.update({ - 'BEST_LOGGER_WEB_SERVICE_URL': os.environ.get('BEST_LOGGER_WEB_SERVICE_URL', 'http://127.0.0.1:8181/') - }) - - try: - print(f"Running command: {' '.join(cmd)}") - subprocess.run(cmd, check=True, cwd=os.path.abspath('./'), env=env) - except subprocess.CalledProcessError as e: - print(f"Error running subprocess: {e}") - sys.exit(1) - except Exception as e: - print(f"Unexpected error: {e}") - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/trinity/buffer/buffer.py b/trinity/buffer/buffer.py index d2b0bfce1e..96012250fa 100644 --- a/trinity/buffer/buffer.py +++ b/trinity/buffer/buffer.py @@ -16,10 +16,10 @@ def get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig from trinity.buffer.reader.queue_reader import QueueReader return QueueReader(storage_config, buffer_config) - elif storage_config.storage_type == StorageType.ENV_SERVICE: - from trinity.buffer.reader.file_reader import EnvServiceTaskReader + elif storage_config.storage_type == StorageType.ASTUNE: + from trinity.buffer.reader.file_reader import AstuneTaskReader - return EnvServiceTaskReader(storage_config, buffer_config) + return AstuneTaskReader(storage_config, buffer_config) elif storage_config.storage_type == StorageType.FILE: from trinity.buffer.reader.file_reader import ( ExperienceFileReader, diff --git a/trinity/buffer/reader/file_reader.py b/trinity/buffer/reader/file_reader.py index 1b7217ce0f..af8bf3d952 100644 --- a/trinity/buffer/reader/file_reader.py +++ b/trinity/buffer/reader/file_reader.py @@ -157,33 +157,40 @@ def read(self, batch_size: Optional[int] = None) -> List: return tasks -class EnvServiceTaskReader(BaseFileReader): +import os +def read_astune_config(yaml_fp): + from hydra import initialize, compose + from omegaconf import DictConfig + + def load_hydra_config(config_path: str, config_name: str) -> DictConfig: + with initialize(config_path=config_path, version_base=None): + cfg = compose(config_name=config_name, overrides=[]) + return cfg + + dir_path = os.path.dirname(yaml_fp) + file_name = os.path.basename(yaml_fp) + return load_hydra_config(config_path=dir_path, config_name=file_name) + +class AstuneTaskReader(BaseFileReader): def __init__(self, meta: StorageConfig, config: BufferConfig): - from agentopia.client.env_client_ng import EnvClient - self.meta = meta - self.env_url = self.meta.path - self.env_type = self.meta.subset_name - self.split = self.meta.split - - self.env = EnvClient(base_url=meta.path) - self.env_params = {} - dataframes = [] - - env_service_client = EnvClient(base_url=self.env_url) - task_id_array = env_service_client.get_env_profile(self.env_type, split=self.split) - if len(task_id_array) == 0: - raise ValueError(f"No task_id found for self.env_type: {self.env_type}, split: {self.split}, Please check connection to {self.env_url}") - data = { - 'task_selector': [task_id for task_id in task_id_array], - # 'reward_model': [{} for task_id in task_id_array], - # 'extras': [{'task_id': task_id} for task_id in task_id_array], - } - dataframe = Dataset.from_dict(data) - dataframes.append(dataframe) + yaml_path = os.environ.get('ASTUNE_CONFIG_REDIRECT', None) + if yaml_path is None: + raise ValueError("ASTUNE_CONFIG_REDIRECT is not set in environment variables") + config = read_astune_config(os.path.relpath(yaml_path, os.path.dirname(__file__))) + + from vsdb import bp + bp("XXX") + + from astune.task_reader.task_reader_base import TaskReaderRouter, task_to_standard_dataset + task_reader = TaskReaderRouter(config) + if 'train' in self.split: + train_dataset = task_to_standard_dataset(task_reader.get_training_tasks()) + if 'val' in self.split: + train_dataset = task_to_standard_dataset(task_reader.get_validation_tasks()) self.read_batch_size = config.batch_size self.dataset = _HFBatchReader( - datasets.concatenate_datasets(dataframes), + datasets.concatenate_datasets(train_dataset), name=meta.name, default_batch_size=self.read_batch_size, total_epochs=self.meta.total_epochs if not self.meta.is_eval else 1, diff --git a/trinity/common/config.py b/trinity/common/config.py index 9f618abd9a..0b6ed69ad0 100644 --- a/trinity/common/config.py +++ b/trinity/common/config.py @@ -568,10 +568,6 @@ class StageConfig: explorer: Optional[ExplorerConfig] = None trainer: Optional[TrainerConfig] = None -@dataclass -class AgentopiaConfiguration: - """Configs for a stage.""" - config_path: str = "" @dataclass @@ -602,7 +598,6 @@ class Config: synchronizer: SynchronizerConfig = field(default_factory=SynchronizerConfig) service: ServiceConfig = field(default_factory=ServiceConfig) log: LogConfig = field(default_factory=LogConfig) - agentopia_configuration: AgentopiaConfiguration = field(default_factory=AgentopiaConfiguration) # configurations for different training stages stages: List[StageConfig] = field(default_factory=list) diff --git a/trinity/common/constants.py b/trinity/common/constants.py index 92270df22c..9d5ac89ef9 100644 --- a/trinity/common/constants.py +++ b/trinity/common/constants.py @@ -60,7 +60,7 @@ class StorageType(CaseInsensitiveEnum): SQL = "sql" QUEUE = "queue" FILE = "file" - ENV_SERVICE = "env_service" + ASTUNE = "astune" class SyncMethodEnumMeta(CaseInsensitiveEnumMeta): diff --git a/trinity/explorer/explorer.py b/trinity/explorer/explorer.py index 3c944ee94e..1625d2d766 100644 --- a/trinity/explorer/explorer.py +++ b/trinity/explorer/explorer.py @@ -31,7 +31,7 @@ from trinity.utils.log import get_logger from trinity.utils.monitor import MONITOR, gather_metrics from trinity.utils.plugin_loader import load_plugins -from agentopia.backbone_trinity import * +from astune.backbone_trinity import * class Explorer: """Responsible for exploring the taskset.""" From dc2e63a51ddf5d15a57d48d17d15499ac8966e10 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Mon, 10 Nov 2025 02:31:11 +0800 Subject: [PATCH 08/14] astune compat --- trinity/buffer/reader/file_reader.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/trinity/buffer/reader/file_reader.py b/trinity/buffer/reader/file_reader.py index af8bf3d952..a69c18ed4f 100644 --- a/trinity/buffer/reader/file_reader.py +++ b/trinity/buffer/reader/file_reader.py @@ -173,24 +173,27 @@ def load_hydra_config(config_path: str, config_name: str) -> DictConfig: class AstuneTaskReader(BaseFileReader): def __init__(self, meta: StorageConfig, config: BufferConfig): + self.meta = meta + self.read_batch_size = config.batch_size + self.split = meta.split + yaml_path = os.environ.get('ASTUNE_CONFIG_REDIRECT', None) if yaml_path is None: raise ValueError("ASTUNE_CONFIG_REDIRECT is not set in environment variables") - config = read_astune_config(os.path.relpath(yaml_path, os.path.dirname(__file__))) + astune_config = read_astune_config(os.path.relpath(yaml_path, os.path.dirname(__file__))) - from vsdb import bp - bp("XXX") + # from vsdb import bp + # bp("XXX") from astune.task_reader.task_reader_base import TaskReaderRouter, task_to_standard_dataset - task_reader = TaskReaderRouter(config) + task_reader = TaskReaderRouter(astune_config) if 'train' in self.split: train_dataset = task_to_standard_dataset(task_reader.get_training_tasks()) if 'val' in self.split: train_dataset = task_to_standard_dataset(task_reader.get_validation_tasks()) - self.read_batch_size = config.batch_size self.dataset = _HFBatchReader( - datasets.concatenate_datasets(train_dataset), + datasets.concatenate_datasets([train_dataset]), name=meta.name, default_batch_size=self.read_batch_size, total_epochs=self.meta.total_epochs if not self.meta.is_eval else 1, From f9b6b5db9026b408a6ce30d211f5339e94f1fdd3 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Thu, 13 Nov 2025 03:06:27 +0800 Subject: [PATCH 09/14] patch --- trinity/buffer/buffer.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/trinity/buffer/buffer.py b/trinity/buffer/buffer.py index 96012250fa..59d744117a 100644 --- a/trinity/buffer/buffer.py +++ b/trinity/buffer/buffer.py @@ -1,25 +1,33 @@ # -*- coding: utf-8 -*- """The buffer module""" +from typing import Union + from trinity.buffer.buffer_reader import BufferReader from trinity.buffer.buffer_writer import BufferWriter -from trinity.common.config import BufferConfig, StorageConfig +from trinity.common.config import ExperienceBufferConfig, StorageConfig, TasksetConfig from trinity.common.constants import StorageType +BufferStorageConfig = Union[TasksetConfig, ExperienceBufferConfig, StorageConfig] + -def get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig) -> BufferReader: +def get_buffer_reader(config: BufferStorageConfig) -> BufferReader: """Get a buffer reader for the given dataset name.""" + if not isinstance(config, StorageConfig): + storage_config: StorageConfig = config.to_storage_config() + else: + storage_config = config if storage_config.storage_type == StorageType.SQL: from trinity.buffer.reader.sql_reader import SQLReader - return SQLReader(storage_config, buffer_config) + return SQLReader(storage_config) elif storage_config.storage_type == StorageType.QUEUE: from trinity.buffer.reader.queue_reader import QueueReader - return QueueReader(storage_config, buffer_config) + return QueueReader(storage_config) elif storage_config.storage_type == StorageType.ASTUNE: from trinity.buffer.reader.file_reader import AstuneTaskReader - return AstuneTaskReader(storage_config, buffer_config) + return AstuneTaskReader(storage_config) elif storage_config.storage_type == StorageType.FILE: from trinity.buffer.reader.file_reader import ( ExperienceFileReader, @@ -29,26 +37,30 @@ def get_buffer_reader(storage_config: StorageConfig, buffer_config: BufferConfig schema_type = storage_config.schema_type if schema_type: # only trainer input has schema type - return ExperienceFileReader(storage_config, buffer_config) + return ExperienceFileReader(storage_config) else: - return TaskFileReader(storage_config, buffer_config) + return TaskFileReader(storage_config) else: raise ValueError(f"{storage_config.storage_type} not supported.") -def get_buffer_writer(storage_config: StorageConfig, buffer_config: BufferConfig) -> BufferWriter: +def get_buffer_writer(config: BufferStorageConfig) -> BufferWriter: """Get a buffer writer for the given dataset name.""" + if not isinstance(config, StorageConfig): + storage_config: StorageConfig = config.to_storage_config() + else: + storage_config = config if storage_config.storage_type == StorageType.SQL: from trinity.buffer.writer.sql_writer import SQLWriter - return SQLWriter(storage_config, buffer_config) + return SQLWriter(storage_config) elif storage_config.storage_type == StorageType.QUEUE: from trinity.buffer.writer.queue_writer import QueueWriter - return QueueWriter(storage_config, buffer_config) + return QueueWriter(storage_config) elif storage_config.storage_type == StorageType.FILE: from trinity.buffer.writer.file_writer import JSONWriter - return JSONWriter(storage_config, buffer_config) + return JSONWriter(storage_config) else: raise ValueError(f"{storage_config.storage_type} not supported.") From 54bf2fce5b3f5ff86cf087b9d5a1c1274908cea6 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Thu, 13 Nov 2025 03:17:29 +0800 Subject: [PATCH 10/14] patch import --- trinity/buffer/task_scheduler.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/trinity/buffer/task_scheduler.py b/trinity/buffer/task_scheduler.py index 7619a3702f..16900ccd14 100644 --- a/trinity/buffer/task_scheduler.py +++ b/trinity/buffer/task_scheduler.py @@ -12,6 +12,10 @@ from trinity.common.constants import SELECTOR_METRIC from trinity.utils.annotations import Experimental +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from trinity.buffer.reader.file_reader import AstuneTaskReader + @Experimental class TasksetScheduler: @@ -62,7 +66,7 @@ def __init__(self, explorer_state: Dict, config: Config): for taskset_config, taskset_state in zip(taskset_configs, taskset_states): assert not taskset_config.is_eval # assume drop last taskset = get_buffer_reader(taskset_config) - if not isinstance(taskset, TaskFileReader): + if not isinstance(taskset, TaskFileReader) or isinstance(taskset, AstuneTaskReader): raise TypeError( f"Taskset '{taskset_config.name}' has an unsupported type '{type(taskset).__name__}'." f"Currently, only 'TaskFileReader' is supported by TasksetScheduler." From 7b1af160116bffed110782d777b52e28012a905c Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Thu, 13 Nov 2025 03:18:22 +0800 Subject: [PATCH 11/14] patch type chheck --- trinity/buffer/task_scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trinity/buffer/task_scheduler.py b/trinity/buffer/task_scheduler.py index 16900ccd14..7a0ce64b8f 100644 --- a/trinity/buffer/task_scheduler.py +++ b/trinity/buffer/task_scheduler.py @@ -66,7 +66,7 @@ def __init__(self, explorer_state: Dict, config: Config): for taskset_config, taskset_state in zip(taskset_configs, taskset_states): assert not taskset_config.is_eval # assume drop last taskset = get_buffer_reader(taskset_config) - if not isinstance(taskset, TaskFileReader) or isinstance(taskset, AstuneTaskReader): + if not isinstance(taskset, TaskFileReader) and not isinstance(taskset, AstuneTaskReader): raise TypeError( f"Taskset '{taskset_config.name}' has an unsupported type '{type(taskset).__name__}'." f"Currently, only 'TaskFileReader' is supported by TasksetScheduler." From c18e3cdfac52c6142a702e6b58573ca9f4f94461 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Thu, 13 Nov 2025 03:19:54 +0800 Subject: [PATCH 12/14] fix import --- trinity/buffer/task_scheduler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/trinity/buffer/task_scheduler.py b/trinity/buffer/task_scheduler.py index 7a0ce64b8f..8ce697dbb2 100644 --- a/trinity/buffer/task_scheduler.py +++ b/trinity/buffer/task_scheduler.py @@ -12,9 +12,7 @@ from trinity.common.constants import SELECTOR_METRIC from trinity.utils.annotations import Experimental -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from trinity.buffer.reader.file_reader import AstuneTaskReader +from trinity.buffer.reader.file_reader import AstuneTaskReader @Experimental From 69760978d1ca80c0a74207227fcfe1bb5c838722 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Mon, 17 Nov 2025 14:58:30 +0800 Subject: [PATCH 13/14] refactor: change AstuneTaskReader to inherit from TaskFileReader --- trinity/buffer/reader/file_reader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/trinity/buffer/reader/file_reader.py b/trinity/buffer/reader/file_reader.py index ce48629120..ea73b072d3 100644 --- a/trinity/buffer/reader/file_reader.py +++ b/trinity/buffer/reader/file_reader.py @@ -179,7 +179,7 @@ def load_hydra_config(config_path: str, config_name: str) -> DictConfig: file_name = os.path.basename(yaml_fp) return load_hydra_config(config_path=dir_path, config_name=file_name) -class AstuneTaskReader(BaseFileReader): +class AstuneTaskReader(TaskFileReader): def __init__(self, config): self.config = config self.read_batch_size = config.batch_size @@ -193,7 +193,7 @@ def __init__(self, config): # from vsdb import bp # bp("XXX") - from astune.task_reader.task_reader_base import TaskReaderRouter, task_to_standard_dataset + from astune.task_reader import TaskReaderRouter, task_to_standard_dataset task_reader = TaskReaderRouter(astune_config) if 'train' in self.split: train_dataset = task_to_standard_dataset(task_reader.get_training_tasks()) From 82ba92eaf27583bf3b54b89d5dcd565de27f5fe0 Mon Sep 17 00:00:00 2001 From: "qingxu.fu" Date: Fri, 21 Nov 2025 08:34:24 +0800 Subject: [PATCH 14/14] fix --- trinity/explorer/explorer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/trinity/explorer/explorer.py b/trinity/explorer/explorer.py index 1d3f8087fa..4b5e22c5e1 100644 --- a/trinity/explorer/explorer.py +++ b/trinity/explorer/explorer.py @@ -34,7 +34,10 @@ from trinity.utils.plugin_loader import load_plugins from trinity.utils.timer import Timer -from astune.backbone_trinity import * +try: + from astune.backbone_trinity import * +except ImportError: + from astune.backbone.trinity_compat_workflow import * class Explorer: """Responsible for exploring the taskset."""