Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions doc/source/ray-core/direct-transport.rst
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ For collective-based tensor transports (Gloo and NCCL):
* Any unexpected system bugs


Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but simultaneously belong to different objects. To support this pattern, ensure that the first object is freed before storing the same tensor again in a second object.

.. literalinclude:: doc_code/direct_transport_nixl.py
:language: python
:start-after: __nixl_limitations_start__
:end-before: __nixl_limitations_end__

Advanced: RDT Internals
=======================

Expand Down
33 changes: 33 additions & 0 deletions doc/source/ray-core/doc_code/direct_transport_nixl.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,36 @@ def consume_with_nixl(self, refs):
ref1 = receiver.consume_with_nixl.remote(refs)
print(ray.get(ref1))
# __nixl_put__and_get_end__


# __nixl_limitations_start__
@ray.remote(num_gpus=1)
class Actor:
def __init__(self):
self.tensor1 = torch.tensor([1, 2, 3])
self.tensor2 = torch.tensor([4, 5, 6])
self.tensor3 = torch.tensor([7, 8, 9])

@ray.method(tensor_transport="nixl")
def send_dict1(self):
return {"round1-1": self.tensor1, "round1-2": self.tensor2}

@ray.method(tensor_transport="nixl")
def send_dict2(self):
return {"round2-1": self.tensor1, "round2-3": self.tensor3}

def sum_dict(self, dict):
return sum(v.sum().item() for v in dict.values())


sender, receiver = Actor.remote(), Actor.remote()
ref1 = sender.send_dict1.remote()
result1 = receiver.sum_dict.remote(ref1)
print(ray.get(result1))
ref2 = sender.send_dict2.remote()
result2 = receiver.sum_dict.remote(ref2)
try:
print(ray.get(result2))
except ValueError as e:
print("Error caught:", e)
# __nixl_limitations_end__
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Tensor Overlap Timing Issue

The example code expects a ValueError to be raised when sending overlapping tensors in different objects, but the timing is incorrect. The code calls ray.get(result1) before creating ref2, which means ref1 may already be garbage collected and freed from the GPU object store by the time ref2 is created. This makes the error non-deterministic and the example unreliable. The example should create both refs before calling ray.get() on either result to ensure the tensors overlap in the object store.

Fix in Cursor Fix in Web

Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ def add_object(
is_primary: Whether the GPU object is the primary copy.
"""
with self._object_present_cv:
for tensor in gpu_object:
if tensor in self._tensor_to_object_ids:
raise ValueError(
f"Tensor already exists in the RDT object store. Free all references to ObjectRef({obj_id}) before storing the tensor again."
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Object ID Reference in GPU Object Addition

The ValueError message in add_gpu_object incorrectly references the obj_id of the new object being added when a tensor already exists. It should instead reference the existing ObjectRef ID(s) that already contain the tensor, which are available in self._tensor_to_object_ids[tensor].

Fix in Cursor Fix in Web

for tensor in gpu_object:
self._tensor_to_object_ids[tensor].add(obj_id)
# Append to the queue instead of overwriting
Expand Down
6 changes: 6 additions & 0 deletions python/ray/tests/gpu_objects/test_gpu_objects_gloo.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,9 @@ def gc():
assert not gpu_object_store.has_object(obj_id)


@pytest.mark.skip(
reason="RDT currently doesn't support multiple objects containing the same tensor"
)
def test_wait_tensor_freed_double_tensor(ray_start_regular):
"""Unit test for ray.experimental.wait_tensor_freed when multiple objects
contain the same tensor."""
Expand Down Expand Up @@ -848,6 +851,9 @@ def gc(obj_id):
assert not gpu_object_store.has_object(obj_id2)


@pytest.mark.skip(
reason="RDT currently doesn't support multiple objects containing the same tensor"
)
def test_send_back_and_dst_warning(ray_start_regular):
# Test warning when object is sent back to the src actor and to dst actors
world_size = 2
Expand Down