Skip to content

tianyi-ge/TransferQueue

Β 
Β 

Repository files navigation

TransferQueue: An asynchronous streaming data management module for efficient post-training

Paper | Zhihu

Ask DeepWiki.com GitHub Repo stars GitHub commit activity


πŸŽ‰ Overview

TransferQueue is a high-performance data storage and transfer module with panoramic data visibility and streaming scheduling capabilities, optimized for efficient dataflow in post-training workflows.

TransferQueue offers fine-grained, sample-level data management and load-balancing (on the way) capabilities, serving as a data gateway that decouples explicit data dependencies across computational tasks. This enables a divide-and-conquer approach, significantly simplifying the design of the algorithm controller.

πŸ”„ Updates

  • Oct 21, 2025: Official integration into verl is ready verl/pulls/3649. Following PRs will optimize the single controller architecture by fully decoupling data & control flows.
  • July 22, 2025: We present a series of Chinese blogs on Zhihu 1, 2.
  • July 21, 2025: We started an RFC on verl community verl/RFC#2662.
  • July 2, 2025: We publish the paper AsyncFlow.

🧩 Components

Control Plane: Panoramic Data Management

In the control plane, TransferQueueController tracks the production status and consumption status of each training sample as metadata. When all the required data fields are ready (i.e., written to the TransferQueueStorage), we know that this data sample can be consumed by downstream tasks.

For consumption status, we record the consumption records for each computational task (e.g., generate_sequences, compute_log_prob, etc.). Therefore, even when different computation tasks require the same data field, they can consume the data independently without interfering with each other.

In the future, we plan to support load-balancing and dynamic batching capabilities in the control plane. Additionally, we will support data management for disaggregated frameworks where each rank manages the data retrieval by itself, rather than coordinated by a single controller.

Data Plane: Distributed Data Storage

In the data plane, SimpleStorageUnit serves as a naive storage unit based on CPU memory, responsible for the actual storage and retrieval of data. Each storage unit can be deployed on a separate node, allowing for distributed data management.

The storage system is built on top of the TransferQueueStorageManager abstraction layer, which enables pluggable storage backends. The current implementation uses AsyncSimpleStorageManager to coordinate distributed SimpleStorageUnit instances.

SimpleStorageUnit employs a 2D data structure as follows:

  • Each row corresponds to a training sample, assigned a unique index within the corresponding global batch.
  • Each column represents the input/output data fields for computational tasks.

This data structure design is motivated by the computational characteristics of the post-training process, where each training sample is generated in a relayed manner across task pipelines. It provides an accurate addressing capability, which allows fine-grained, concurrent data read/write operations in a streaming manner.

The general storage abstraction layer has been implemented with TransferQueueStorageManager (PR#66, issue#72), which allows pluggable storage backends. The current implementation includes AsyncSimpleStorageManager for CPU memory storage. Through this abstraction, we plan to integrate high-performance storage solutions such as MoonCakeStore to support device-to-device data transfer through RDMA, further enhancing data transfer efficiency for large-scale data.

User Interface: Asynchronous & Synchronous Client

The interaction workflow of TransferQueue system is as follows:

  1. A process sends a read request to the TransferQueueController.
  2. TransferQueueController scans the production and consumption metadata for each sample (row), and dynamically assembles a micro-batch metadata according to the load-balancing policy. This mechanism enables sample-level data scheduling.
  3. The process retrieves the actual data from distributed storage units using the metadata provided by the controller.

To simplify the usage of TransferQueue, we have encapsulated this process into AsyncTransferQueueClient and TransferQueueClient. These clients provide both asynchronous and synchronous interfaces for data transfer, allowing users to easily integrate TransferQueue into their framework.

In the future, we will provide a StreamingDataLoader interface for disaggregated frameworks as discussed in issue#85 and verl/RFC#2662. Leveraging this abstraction, each rank can automatically get its own data like DataLoader in PyTorch. The TransferQueue system will handle the underlying data scheduling and transfer logic caused by different parallelism strategies, significantly simplifying the design of disaggregated frameworks.

πŸ”₯ Showcases

General Usage

The primary interaction points are AsyncTransferQueueClient and TransferQueueClient, serving as the communication interface with the TransferQueue system.

Core interfaces:

  • (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
  • (async_)get_data(metadata:BatchMeta) -> TensorDict
  • (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
  • (async_)clear(global_step: int)

We will soon release a detailed tutorial and API documentation.

Collocated Example

verl

The primary motivation for integrating TransferQueue to verl now is to alleviate the data transfer bottleneck of the single controller RayPPOTrainer. Currently, all DataProto objects must be routed through RayPPOTrainer, resulting in a single point bottleneck of the whole post-training system.

verl_dataflow_DataProto

Leveraging TransferQueue, we separate experience data transfer from metadata dispatch by

  • Replacing DataProto with BatchMeta (metadata) and TensorDict (actual data) structures
  • Preserving verl's original Dispatch/Collect logic via BatchMeta (maintaining single-controller debuggability)
  • Accelerating data transfer by TransferQueue's distributed storage units

verl_dataflow_TransferQueue

You may refer to the recipe, where we mimic the verl usage in both async & sync scenarios. Official integration to verl is also available now at verl/pulls/3649 (with subsequent PRs to further optimize the integration).

Disaggregated Example

Work in progress :)

πŸš€ Quick Start

Use Python package

We will soon release the Python package on PyPI.

Build wheel package from source code

Follow these steps to build and install:

  1. Retrieve source code from GitHub repo

    git clone https://github.com/TransferQueue/TransferQueue/
    cd TransferQueue
  2. Install dependencies

    pip install -r requirements.txt
  3. Build and install

    python -m build --wheel
    pip install dist/*.whl

πŸ“Š Performance

Note: The above benchmark for TransferQueue is based on our naive SimpleStorageUnit backend. By introducing high-performance storage backends and optimizing serialization/deserialization, we expect to achieve even better performance. Warmly welcome contributions from the community!

For detailed performance benchmarks, please refer to this blog.

πŸ›£οΈ RoadMap

  • Support data rewrite for partial rollout & agentic post-training
  • Provide a general storage abstraction layer TransferQueueStorageManager to manage distributed storage units, which simplifies Client design and makes it possible to introduce different storage backends (PR#66, issue#72)
  • Implement AsyncSimpleStorageManager as the default storage backend based on the TransferQueueStorageManager abstraction
  • Provide a KVStorageManager to cover all the KV based storage backends
  • Support topic-based data partitioning to maintain train/val/test data simultaneously
  • Release the first stable version through PyPI
  • Support disaggregated framework (each rank retrieves its own data without going through a centralized node)
  • Provide a StreamingDataLoader interface for disaggregated framework
  • Support load-balancing and dynamic batching
  • Support high-performance storage backends for RDMA transmission (e.g., MoonCakeStore, Ray Direct Transport...)
  • High-performance serialization and deserialization
  • More documentation, examples and tutorials

πŸ“‘ Citation

Please kindly cite our paper if you find this repo is useful:
@article{han2025asyncflow,
  title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
  author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
  journal={arXiv preprint arXiv:2507.01663},
  year={2025}
}

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%