Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@ wandb/

# checkpoints
checkpoints/
launcher_record
3 changes: 2 additions & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pip install -e ".[flash_attn]"
[`uv`](https://github.com/astral-sh/uv) 是现代的 Python 包管理工具。

```bash
uv sync --extra dev --extra flash_attn
uv sync --extra dev --extra flash_attn -i https://mirrors.aliyun.com/pypi/simple/ --no-build-isolation
```

## 通过 PyPI 安装
Expand All @@ -193,6 +193,7 @@ pip install flash-attn==2.8.1
```bash
uv pip install trinity-rft
uv pip install flash-attn==2.8.1
uv pip install --verbose flash-attn -i https://mirrors.aliyun.com/pypi/simple/ --no-deps --no-build-isolation
```

## 使用 Docker
Expand Down
2 changes: 2 additions & 0 deletions deps_incremental.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
logoru
beast_logger
2 changes: 1 addition & 1 deletion examples/agentscope_react/gsm8k.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ algorithm:
optimizer:
lr: 1e-6
model:
model_path: ${oc.env:TRINITY_MODEL_PATH,Qwen/Qwen3-8B}
model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct'
max_response_tokens: 16384
max_model_len: 24576
cluster:
Expand Down
67 changes: 67 additions & 0 deletions examples/agentscope_react/gsm8k_agentopia.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
project: AgentScope-ReAct
name: GSM8K-Qwen3-8B
checkpoint_root_dir: ${oc.env:TRINITY_CHECKPOINT_ROOT_DIR,./checkpoints}
algorithm:
algorithm_type: multi_step_grpo
repeat_times: 8
optimizer:
lr: 1e-6
model:
model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct'
max_response_tokens: 16384
max_model_len: 24576
cluster:
node_num: 1
gpu_per_node: 8
buffer:
total_epochs: 1
batch_size: 32
train_batch_size: 256
explorer_input:
taskset:
name: gsm8k
storage_type: env_service
path: 'http://localhost:8080'
subset_name: 'appworld'
split: 'train'
format:
prompt_key: 'question'
response_key: 'answer'
rollout_args:
temperature: 1.0
default_workflow_type: 'agentopia_workflow'
eval_tasksets: []
trainer_input:
experience_buffer:
name: agentscope_gsm8k_buffer
storage_type: queue
explorer:
eval_interval: 50
runner_per_model: 16
max_timeout: 1800
rollout_model:
engine_num: 4
tensor_parallel_size: 1
enable_prefix_caching: false
enforce_eager: true
enable_openai_api: true
enable_history: true
enable_auto_tool_choice: true
tool_call_parser: hermes
# reasoning_parser: deepseek_r1
enable_thinking: false
dtype: bfloat16
seed: 42
synchronizer:
sync_style: dynamic_by_explorer
sync_method: 'nccl'
sync_interval: 2
sync_timeout: 1200
trainer:
save_interval: 100
grad_clip: 1.0
use_dynamic_bsz: true
max_token_len_per_gpu: 24576
ulysses_sequence_parallel_size: 2
monitor:
monitor_type: tensorboard
3 changes: 2 additions & 1 deletion examples/grpo_gsm8k/gsm8k.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ algorithm:
optimizer:
lr: 1e-5
model:
model_path: ${oc.env:TRINITY_MODEL_PATH,Qwen/Qwen2.5-1.5B-Instruct}
model_path: '/mnt/data/model_cache/modelscope/hub/Qwen/Qwen/Qwen2___5-1___5B-Instruct'
max_response_tokens: 1024
max_model_len: 2048
cluster:
Expand Down Expand Up @@ -50,6 +50,7 @@ explorer:
engine_num: 2
tensor_parallel_size: 1
enable_prefix_caching: false
gpu_memory_utilization: 0.7
enforce_eager: true
dtype: bfloat16
seed: 42
Expand Down
Empty file added launcher_trinity.py
Empty file.
12 changes: 12 additions & 0 deletions note.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@


/mnt/data_cpfs/qingxu.fu/trinity/trinity/explorer/workflow_runner.py
run_task()

-->
/mnt/data_cpfs/qingxu.fu/trinity/examples/agentscope_react/gsm8k.yaml
buffer.explorer_input.default_workflow_type
-->

/mnt/data_cpfs/qingxu.fu/trinity/trinity/common/workflows/agentscope/react/react_workflow.py
run_async()
77 changes: 77 additions & 0 deletions tests/utils/monitor_swanlab_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""
Simple smoke test for SwanlabMonitor.

Run:
python cradle.py

What it does:
- Ensures SWANLAB_API_KEY is read from environment (sets a dummy if missing).
- Initializes SwanlabMonitor with minimal args.
- Logs a small metric and closes the run.

Notes:
- If `swanlab` is not installed, this script will print a helpful message and exit.
- The dummy API key is used only to exercise the login path; real authentication isn't required for this smoke test.
"""

import os
import sys


def main() -> int:
# Defer imports to keep error handling simple
try:
from trinity.utils.monitor import SwanlabMonitor
except Exception as e:
print("Failed to import SwanlabMonitor:", e)
return 1

# Ensure an env-based key path is exercised (uses dummy if not provided)
env_keys = ["SWANLAB_API_KEY", "SWANLAB_APIKEY", "SWANLAB_KEY", "SWANLAB_TOKEN"]
if not any(os.getenv(k) for k in env_keys):
os.environ["SWANLAB_API_KEY"] = "dummy_key_for_smoke_test"
print("Set SWANLAB_API_KEY to a dummy value to test env-based login path.")

# Try creating the monitor; if swanlab isn't installed, __init__ will assert
try:
mon = SwanlabMonitor(
project="trinity-smoke",
group="cradle",
name="swanlab-env",
role="tester",
config=None,
)
except AssertionError as e:
print("SwanLab not available or not installed:", e)
print("Install swanlab to run this smoke test: pip install swanlab")
return 0
except Exception as e:
print("Unexpected error constructing SwanlabMonitor:", e)
return 1

# Log a minimal metric to verify basic flow
try:
mon.log({"smoke/metric": 1.0}, step=1)
print("Logged a test metric via SwanlabMonitor.")
except Exception as e:
print("Error during logging:", e)
try:
mon.close()
except Exception:
pass
return 1

# Close cleanly
try:
mon.close()
print("SwanlabMonitor closed successfully.")
except Exception as e:
print("Error closing monitor:", e)
return 1

print("Smoke test completed.")
return 0


if __name__ == "__main__":
sys.exit(main())
4 changes: 4 additions & 0 deletions trinity/buffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ def get_buffer_reader(config: BufferStorageConfig) -> BufferReader:
from trinity.buffer.reader.queue_reader import QueueReader

return QueueReader(storage_config)
elif storage_config.storage_type == StorageType.ASTUNE:
from trinity.buffer.reader.file_reader import AstuneTaskReader

return AstuneTaskReader(storage_config)
elif storage_config.storage_type == StorageType.FILE:
from trinity.buffer.reader.file_reader import (
ExperienceFileReader,
Expand Down
58 changes: 58 additions & 0 deletions trinity/buffer/reader/file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,61 @@ def read_with_indices(self, indices: List[int]) -> List:
async def read_with_indices_async(self, indices: List[int]) -> List:
"""Read tasks with indices asynchronously."""
return self.read_with_indices(indices)


import os
def read_astune_config(yaml_fp):
from hydra import initialize, compose
from omegaconf import DictConfig

def load_hydra_config(config_path: str, config_name: str) -> DictConfig:
with initialize(config_path=config_path, version_base=None):
cfg = compose(config_name=config_name, overrides=[])
return cfg

dir_path = os.path.dirname(yaml_fp)
file_name = os.path.basename(yaml_fp)
return load_hydra_config(config_path=dir_path, config_name=file_name)

class AstuneTaskReader(TaskFileReader):
def __init__(self, config):
self.config = config
self.read_batch_size = config.batch_size
self.split = config.split

yaml_path = os.environ.get('ASTUNE_CONFIG_REDIRECT', None)
if yaml_path is None:
raise ValueError("ASTUNE_CONFIG_REDIRECT is not set in environment variables")
astune_config = read_astune_config(os.path.relpath(yaml_path, os.path.dirname(__file__)))

# from vsdb import bp
# bp("XXX")

from astune.task_reader import TaskReaderRouter, task_to_standard_dataset
task_reader = TaskReaderRouter(astune_config)
if 'train' in self.split:
train_dataset = task_to_standard_dataset(task_reader.get_training_tasks())
if 'val' in self.split:
train_dataset = task_to_standard_dataset(task_reader.get_validation_tasks())

self.dataset = _HFBatchReader(
datasets.concatenate_datasets([train_dataset]), # type ignore
name=self.config.name,
default_batch_size=self.read_batch_size,
total_epochs=self.config.total_epochs if not self.config.is_eval else 1,
offset=self.config.index,
drop_last=not self.config.is_eval,
total_steps=self.config.total_steps,
enable_progress_bar=self.config.enable_progress_bar,
)
self.formatter = FORMATTER.get("task")(self.config)

def read(self, batch_size: Optional[int] = None) -> List:
batch_size = batch_size or self.read_batch_size
tasks = []
samples = self.dataset.read_batch(batch_size)
for sample in samples:
task = self.formatter.format(sample)
tasks.append(task)
return tasks

4 changes: 3 additions & 1 deletion trinity/buffer/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from trinity.common.constants import SELECTOR_METRIC
from trinity.utils.annotations import Experimental

from trinity.buffer.reader.file_reader import AstuneTaskReader


@Experimental
class TasksetScheduler:
Expand Down Expand Up @@ -62,7 +64,7 @@ def __init__(self, explorer_state: Dict, config: Config):
for taskset_config, taskset_state in zip(taskset_configs, taskset_states):
assert not taskset_config.is_eval # assume drop last
taskset = get_buffer_reader(taskset_config)
if not isinstance(taskset, TaskFileReader):
if not isinstance(taskset, TaskFileReader) and not isinstance(taskset, AstuneTaskReader):
raise TypeError(
f"Taskset '{taskset_config.name}' has an unsupported type '{type(taskset).__name__}'."
f"Currently, only 'TaskFileReader' is supported by TasksetScheduler."
Expand Down
3 changes: 3 additions & 0 deletions trinity/cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ def run_stage(config: Config) -> None:


def run(config_path: str, dlc: bool = False, plugin_dir: str = None):
if os.path.exists(".env"):
from dotenv import load_dotenv
load_dotenv(".env")
if plugin_dir:
os.environ[PLUGIN_DIRS_ENV_VAR] = plugin_dir
load_plugins()
Expand Down
3 changes: 2 additions & 1 deletion trinity/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class StorageConfig:
default_workflow_type: Optional[str] = None
default_reward_fn_type: Optional[str] = None
rollout_args: GenerationConfig = field(default_factory=GenerationConfig)
workflow_args: dict = field(default_factory=dict)
workflow_args: dict = field(default_factory=dict) # qingxu: TODO
reward_fn_args: dict = field(default_factory=dict)
task_selector: TaskSelectorConfig = field(default_factory=TaskSelectorConfig)

Expand Down Expand Up @@ -738,6 +738,7 @@ class StageConfig:
trainer: Optional[TrainerConfig] = None



@dataclass
class Config:
"""Global Configuration"""
Expand Down
1 change: 1 addition & 0 deletions trinity/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class StorageType(CaseInsensitiveEnum):
SQL = "sql"
QUEUE = "queue"
FILE = "file"
ASTUNE = "astune"


class SyncMethodEnumMeta(CaseInsensitiveEnumMeta):
Expand Down
12 changes: 5 additions & 7 deletions trinity/common/workflows/agentscope/react/react_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,22 @@

This workflow is a demonstration of how to integrate the AgentScope framework within the Trinity-RFT workflow system with minimal modifications.
"""

from typing import Dict, List, Optional, Union

import uuid
import openai

from typing import Dict, List, Optional, Union
from trinity.common.experience import Experience
from trinity.common.models.model import ModelWrapper
from trinity.common.workflows.workflow import WORKFLOWS, Task, Workflow

from transformers import AutoTokenizer
from .templates import TEMPLATE_MAP


@WORKFLOWS.register_module("as_react_workflow")
class AgentScopeReActWorkflow(Workflow):
is_async: bool = True

def __init__(
self,
*,
config,
task: Task,
model: ModelWrapper,
auxiliary_models: Optional[List[openai.OpenAI]] = None,
Expand Down Expand Up @@ -97,3 +94,4 @@ def construct_experiences(self, reward: Union[float, Dict[str, float]]) -> List[
if isinstance(reward, dict):
exp.metrics.update(reward)
return exps

3 changes: 2 additions & 1 deletion trinity/common/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class Task(dict):
index: dict = field(default_factory=dict)

def to_workflow(
self, model: Any, auxiliary_models: Optional[List[openai.OpenAI]] = None
self, config, model: Any, auxiliary_models: Optional[List[openai.OpenAI]] = None
) -> Workflow:
"""Convert the task to a workflow.

Expand All @@ -55,6 +55,7 @@ def to_workflow(
Workflow: The generated workflow object.
"""
return self.workflow(
config=config,
model=model,
task=self,
auxiliary_models=auxiliary_models,
Expand Down
Loading