From b49394133afaf433e85457e74c99f9d5e490d7a4 Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Thu, 23 Oct 2025 14:40:16 -0700 Subject: [PATCH 01/14] upd --- doc/source/ray-core/direct-transport.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index c9d4838d7b44..b1c4545a4ca5 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -298,6 +298,10 @@ For collective-based tensor transports (Gloo and NCCL): * Any unexpected system bugs +For NIXL: + +* Due to an issue with our implementation of memory deregistration, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects. We will fix this problem soon. + Advanced: RDT Internals ======================= From 3baee79e31a5b76e7b2cb4d25a0636b9719bb94e Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Thu, 23 Oct 2025 17:40:11 -0700 Subject: [PATCH 02/14] upd --- doc/source/ray-core/direct-transport.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index b1c4545a4ca5..bc4c8d14f389 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -300,7 +300,10 @@ For collective-based tensor transports (Gloo and NCCL): For NIXL: -* Due to an issue with our implementation of memory deregistration, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects. We will fix this problem soon. +* Due to an issue with our implementation of memory deregistration, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. Here are some examples of what will not work: + + * Sending two lists of tensors that overlap, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor3}``. + * Sending the same tensor twice across rounds, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor2}``. Advanced: RDT Internals ======================= From 914291f35230788f477117cf07da947232154d9c Mon Sep 17 00:00:00 2001 From: Dhyey Shah Date: Thu, 23 Oct 2025 17:45:15 -0700 Subject: [PATCH 03/14] Apply suggestion from @dayshah Signed-off-by: Dhyey Shah --- doc/source/ray-core/direct-transport.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index bc4c8d14f389..bf5201844942 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -300,7 +300,7 @@ For collective-based tensor transports (Gloo and NCCL): For NIXL: -* Due to an issue with our implementation of memory deregistration, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. Here are some examples of what will not work: +* Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. Here are some examples of what will not work: * Sending two lists of tensors that overlap, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor3}``. * Sending the same tensor twice across rounds, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor2}``. From 7613301197f6beb93a4259027fa246f2082c331b Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 19:56:33 +0000 Subject: [PATCH 04/14] temp --- .../doc_code/direct_transport_nixl.py | 82 +++++++++++++------ .../gpu_object_manager/gpu_object_store.py | 4 + 2 files changed, 61 insertions(+), 25 deletions(-) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 2acf084cf195..5f76b2d531cd 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -7,6 +7,13 @@ @ray.remote(num_gpus=1) class MyActor: + + def __init__(self): + self.tensor1 = torch.tensor([1, 2, 3]) + self.tensor2 = torch.tensor([4, 5, 6]) + self.tensor3 = torch.tensor([7, 8, 9]) + + @ray.method(tensor_transport="nixl") def random_tensor(self): return torch.randn(1000, 1000).cuda() @@ -30,29 +37,54 @@ def consume_with_nixl(self, refs): sum += t.sum().item() return sum + @ray.method(tensor_transport="nixl") + def send_dict1(self): + return {"round1-1": self.tensor1, "round1-2": self.tensor2} + + @ray.method(tensor_transport="nixl") + def send_dict2(self): + return {"round2-1": self.tensor1, "round2-3": self.tensor3} + + def sum_dict(self, dict): + return sum(v.sum().item() for v in dict.values()) + + +# # No collective group is needed. The two actors just need to have NIXL +# # installed. +# sender, receiver = MyActor.remote(), MyActor.remote() + +# # The tensor will be stored by the `sender` actor instead of in Ray's object +# # store. +# tensor = sender.random_tensor.remote() +# result = receiver.sum.remote(tensor) +# ray.get(result) +# # __nixl_full_example_end__ + +# # __nixl_get_start__ +# # ray.get will also use NIXL to retrieve the +# # result. +# print(ray.get(tensor)) +# # torch.Tensor(...) +# # __nixl_get_end__ + +# # __nixl_put__and_get_start__ +# tensor1 = torch.randn(1000, 1000).cuda() +# tensor2 = torch.randn(1000, 1000).cuda() +# refs = sender.produce.remote([tensor1, tensor2]) +# ref1 = receiver.consume_with_nixl.remote(refs) +# print(ray.get(ref1)) +# # __nixl_put__and_get_end__ + + +# __nixl_limitations_start__ +import pytest -# No collective group is needed. The two actors just need to have NIXL -# installed. -sender, receiver = MyActor.remote(), MyActor.remote() - -# The tensor will be stored by the `sender` actor instead of in Ray's object -# store. -tensor = sender.random_tensor.remote() -result = receiver.sum.remote(tensor) -ray.get(result) -# __nixl_full_example_end__ - -# __nixl_get_start__ -# ray.get will also use NIXL to retrieve the -# result. -print(ray.get(tensor)) -# torch.Tensor(...) -# __nixl_get_end__ - -# __nixl_put__and_get_start__ -tensor1 = torch.randn(1000, 1000).cuda() -tensor2 = torch.randn(1000, 1000).cuda() -refs = sender.produce.remote([tensor1, tensor2]) -ref1 = receiver.consume_with_nixl.remote(refs) -print(ray.get(ref1)) -# __nixl_put__and_get_end__ +with pytest.raises(ValueError): + sender, receiver = MyActor.remote(), MyActor.remote() + ref1 = sender.send_dict1.remote() + result1 = receiver.sum_dict.remote(ref1) + print(ray.get(result1)) + ref2 = sender.send_dict2.remote() + result2 = receiver.sum_dict.remote(ref2) + print(ray.get(result2)) +# __nixl_limitations_end__ \ No newline at end of file diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index d52bec00c157..231ef556e3a5 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -203,6 +203,10 @@ def add_object( """ with self._object_present_cv: for tensor in gpu_object: + if tensor in self._tensor_to_object_ids: + raise ValueError( + f"Tensor {tensor} already exists in the GPU object store. Must wait for the tensor to be freed before adding a new object with the same tensor." + ) self._tensor_to_object_ids[tensor].add(obj_id) # Append to the queue instead of overwriting self._gpu_object_store[obj_id].append( From d0a77944f357358d3b3f4cd541ba55818c49a297 Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 12:57:30 -0700 Subject: [PATCH 05/14] fix --- .../doc_code/direct_transport_nixl.py | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 5f76b2d531cd..80f0d16267d0 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -49,31 +49,31 @@ def sum_dict(self, dict): return sum(v.sum().item() for v in dict.values()) -# # No collective group is needed. The two actors just need to have NIXL -# # installed. -# sender, receiver = MyActor.remote(), MyActor.remote() - -# # The tensor will be stored by the `sender` actor instead of in Ray's object -# # store. -# tensor = sender.random_tensor.remote() -# result = receiver.sum.remote(tensor) -# ray.get(result) -# # __nixl_full_example_end__ - -# # __nixl_get_start__ -# # ray.get will also use NIXL to retrieve the -# # result. -# print(ray.get(tensor)) -# # torch.Tensor(...) -# # __nixl_get_end__ - -# # __nixl_put__and_get_start__ -# tensor1 = torch.randn(1000, 1000).cuda() -# tensor2 = torch.randn(1000, 1000).cuda() -# refs = sender.produce.remote([tensor1, tensor2]) -# ref1 = receiver.consume_with_nixl.remote(refs) -# print(ray.get(ref1)) -# # __nixl_put__and_get_end__ +# No collective group is needed. The two actors just need to have NIXL +# installed. +sender, receiver = MyActor.remote(), MyActor.remote() + +# The tensor will be stored by the `sender` actor instead of in Ray's object +# store. +tensor = sender.random_tensor.remote() +result = receiver.sum.remote(tensor) +ray.get(result) +# __nixl_full_example_end__ + +# __nixl_get_start__ +# ray.get will also use NIXL to retrieve the +# result. +print(ray.get(tensor)) +# torch.Tensor(...) +# __nixl_get_end__ + +# __nixl_put__and_get_start__ +tensor1 = torch.randn(1000, 1000).cuda() +tensor2 = torch.randn(1000, 1000).cuda() +refs = sender.produce.remote([tensor1, tensor2]) +ref1 = receiver.consume_with_nixl.remote(refs) +print(ray.get(ref1)) +# __nixl_put__and_get_end__ # __nixl_limitations_start__ From cee4ec7bdd66606cba3d22b3921785ef8d3c098f Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 13:01:52 -0700 Subject: [PATCH 06/14] fix --- doc/source/ray-core/direct-transport.rst | 10 ++--- .../doc_code/direct_transport_nixl.py | 41 ++++++++++--------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index bf5201844942..4daaef19aed0 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -298,12 +298,12 @@ For collective-based tensor transports (Gloo and NCCL): * Any unexpected system bugs -For NIXL: +Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. -* Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. Here are some examples of what will not work: - - * Sending two lists of tensors that overlap, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor3}``. - * Sending the same tensor twice across rounds, e.g. first ``{"tensor-round1-1": tensor1, "tensor-round1-2": tensor2}`` and then ``{"tensor-round2-1": tensor1, "tensor-round2-2": tensor2}``. +.. literalinclude:: doc_code/direct_transport_nixl.py + :language: python + :start-after: __nixl_limitations_start__ + :end-before: __nixl_limitations_end__ Advanced: RDT Internals ======================= diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 80f0d16267d0..19f5bc7b4225 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -8,12 +8,6 @@ @ray.remote(num_gpus=1) class MyActor: - def __init__(self): - self.tensor1 = torch.tensor([1, 2, 3]) - self.tensor2 = torch.tensor([4, 5, 6]) - self.tensor3 = torch.tensor([7, 8, 9]) - - @ray.method(tensor_transport="nixl") def random_tensor(self): return torch.randn(1000, 1000).cuda() @@ -37,18 +31,6 @@ def consume_with_nixl(self, refs): sum += t.sum().item() return sum - @ray.method(tensor_transport="nixl") - def send_dict1(self): - return {"round1-1": self.tensor1, "round1-2": self.tensor2} - - @ray.method(tensor_transport="nixl") - def send_dict2(self): - return {"round2-1": self.tensor1, "round2-3": self.tensor3} - - def sum_dict(self, dict): - return sum(v.sum().item() for v in dict.values()) - - # No collective group is needed. The two actors just need to have NIXL # installed. sender, receiver = MyActor.remote(), MyActor.remote() @@ -79,12 +61,31 @@ def sum_dict(self, dict): # __nixl_limitations_start__ import pytest +@ray.remote(num_gpus=1) +class Actor: + def __init__(self): + self.tensor1 = torch.tensor([1, 2, 3]) + self.tensor2 = torch.tensor([4, 5, 6]) + self.tensor3 = torch.tensor([7, 8, 9]) + + @ray.method(tensor_transport="nixl") + def send_dict1(self): + return {"round1-1": self.tensor1, "round1-2": self.tensor2} + + @ray.method(tensor_transport="nixl") + def send_dict2(self): + return {"round2-1": self.tensor1, "round2-3": self.tensor3} + + def sum_dict(self, dict): + return sum(v.sum().item() for v in dict.values()) + + with pytest.raises(ValueError): - sender, receiver = MyActor.remote(), MyActor.remote() + sender, receiver = Actor.remote(), Actor.remote() ref1 = sender.send_dict1.remote() result1 = receiver.sum_dict.remote(ref1) print(ray.get(result1)) ref2 = sender.send_dict2.remote() result2 = receiver.sum_dict.remote(ref2) print(ray.get(result2)) -# __nixl_limitations_end__ \ No newline at end of file +# __nixl_limitations_end__ From 6debf69ffd8d1e68f982a5d0a3b2b62797392fa7 Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 13:04:25 -0700 Subject: [PATCH 07/14] fix Signed-off-by: Qiaolin-Yu --- doc/source/ray-core/doc_code/direct_transport_nixl.py | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 19f5bc7b4225..f56d7694469e 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -7,7 +7,6 @@ @ray.remote(num_gpus=1) class MyActor: - @ray.method(tensor_transport="nixl") def random_tensor(self): return torch.randn(1000, 1000).cuda() From 3b9aa76dc34ea8588df73859f82b94085dc8b685 Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 13:15:24 -0700 Subject: [PATCH 08/14] fix Signed-off-by: Qiaolin-Yu --- .../gpu_objects/test_gpu_objects_gloo.py | 63 ------------------- 1 file changed, 63 deletions(-) diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index b98cd9077124..9d26180333fb 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -809,69 +809,6 @@ def gc(): assert not gpu_object_store.has_object(obj_id) -def test_wait_tensor_freed_double_tensor(ray_start_regular): - """Unit test for ray.experimental.wait_tensor_freed when multiple objects - contain the same tensor.""" - gpu_object_store = ray.worker.global_worker.gpu_object_manager.gpu_object_store - obj_id1 = "random_id1" - obj_id2 = "random_id2" - tensor = torch.randn((1,)) - gpu_object_store.add_object(obj_id1, [tensor], is_primary=True) - gpu_object_store.add_object(obj_id2, [tensor], is_primary=True) - - assert gpu_object_store.has_object(obj_id1) - assert gpu_object_store.has_object(obj_id2) - with pytest.raises(TimeoutError): - ray.experimental.wait_tensor_freed(tensor, timeout=1) - assert gpu_object_store.has_object(obj_id1) - assert gpu_object_store.has_object(obj_id2) - - # Simulate garbage collection in a background thread. - def gc(obj_id): - time.sleep(0.1) - gpu_object_store.pop_object(obj_id) - - # Free one object. Tensor should still be stored. - gc_thread = threading.Thread(target=gc, args=(obj_id1,)) - gc_thread.start() - with pytest.raises(TimeoutError): - ray.experimental.wait_tensor_freed(tensor, timeout=1) - gc_thread.join() - assert not gpu_object_store.has_object(obj_id1) - - # Free the other object. Now the wait_tensor_freed call should be able to - # return. - gc_thread = threading.Thread(target=gc, args=(obj_id2,)) - gc_thread.start() - ray.experimental.wait_tensor_freed(tensor) - gc_thread.join() - assert not gpu_object_store.has_object(obj_id2) - - -def test_send_back_and_dst_warning(ray_start_regular): - # Test warning when object is sent back to the src actor and to dst actors - world_size = 2 - actors = [GPUTestActor.remote() for _ in range(world_size)] - create_collective_group(actors, backend="gloo") - - src_actor, dst_actor = actors[0], actors[1] - - tensor = torch.tensor([1, 2, 3]) - - warning_message = r"GPU ObjectRef\(.+\)" - - with pytest.warns(UserWarning, match=warning_message): - t = src_actor.echo.remote(tensor) - t1 = src_actor.echo.remote(t) # Sent back to the source actor - t2 = dst_actor.echo.remote(t) # Also sent to another actor - ray.get([t1, t2], _tensor_transport="object_store") - - # Second transmission of ObjectRef `t` to `dst_actor` should not trigger a warning - # Verify no `pytest.warns` context is used here because no warning should be raised - t3 = dst_actor.echo.remote(t) - ray.get(t3, _tensor_transport="object_store") - - def test_duplicate_objectref_transfer(ray_start_regular): world_size = 2 actors = [GPUTestActor.remote() for _ in range(world_size)] From 35a488f109b58433f69efc1370384185ecc1d755 Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 13:37:20 -0700 Subject: [PATCH 09/14] fix Signed-off-by: Qiaolin-Yu --- doc/source/ray-core/doc_code/direct_transport_nixl.py | 2 ++ python/ray/experimental/gpu_object_manager/gpu_object_store.py | 1 + 2 files changed, 3 insertions(+) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index f56d7694469e..29a41947deb3 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -30,6 +30,7 @@ def consume_with_nixl(self, refs): sum += t.sum().item() return sum + # No collective group is needed. The two actors just need to have NIXL # installed. sender, receiver = MyActor.remote(), MyActor.remote() @@ -60,6 +61,7 @@ def consume_with_nixl(self, refs): # __nixl_limitations_start__ import pytest + @ray.remote(num_gpus=1) class Actor: def __init__(self): diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 231ef556e3a5..9c5159563c04 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -207,6 +207,7 @@ def add_object( raise ValueError( f"Tensor {tensor} already exists in the GPU object store. Must wait for the tensor to be freed before adding a new object with the same tensor." ) + for tensor in gpu_object: self._tensor_to_object_ids[tensor].add(obj_id) # Append to the queue instead of overwriting self._gpu_object_store[obj_id].append( From 3947b87dd1d12712abaa9578f7d5b298701728db Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 21:16:25 +0000 Subject: [PATCH 10/14] fix --- .../gpu_objects/test_gpu_objects_gloo.py | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py index 9d26180333fb..ddecf58adfef 100644 --- a/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py +++ b/python/ray/tests/gpu_objects/test_gpu_objects_gloo.py @@ -809,6 +809,75 @@ def gc(): assert not gpu_object_store.has_object(obj_id) +@pytest.mark.skip( + reason="RDT currently doesn't support multiple objects containing the same tensor" +) +def test_wait_tensor_freed_double_tensor(ray_start_regular): + """Unit test for ray.experimental.wait_tensor_freed when multiple objects + contain the same tensor.""" + gpu_object_store = ray.worker.global_worker.gpu_object_manager.gpu_object_store + obj_id1 = "random_id1" + obj_id2 = "random_id2" + tensor = torch.randn((1,)) + gpu_object_store.add_object(obj_id1, [tensor], is_primary=True) + gpu_object_store.add_object(obj_id2, [tensor], is_primary=True) + + assert gpu_object_store.has_object(obj_id1) + assert gpu_object_store.has_object(obj_id2) + with pytest.raises(TimeoutError): + ray.experimental.wait_tensor_freed(tensor, timeout=1) + assert gpu_object_store.has_object(obj_id1) + assert gpu_object_store.has_object(obj_id2) + + # Simulate garbage collection in a background thread. + def gc(obj_id): + time.sleep(0.1) + gpu_object_store.pop_object(obj_id) + + # Free one object. Tensor should still be stored. + gc_thread = threading.Thread(target=gc, args=(obj_id1,)) + gc_thread.start() + with pytest.raises(TimeoutError): + ray.experimental.wait_tensor_freed(tensor, timeout=1) + gc_thread.join() + assert not gpu_object_store.has_object(obj_id1) + + # Free the other object. Now the wait_tensor_freed call should be able to + # return. + gc_thread = threading.Thread(target=gc, args=(obj_id2,)) + gc_thread.start() + ray.experimental.wait_tensor_freed(tensor) + gc_thread.join() + assert not gpu_object_store.has_object(obj_id2) + + +@pytest.mark.skip( + reason="RDT currently doesn't support multiple objects containing the same tensor" +) +def test_send_back_and_dst_warning(ray_start_regular): + # Test warning when object is sent back to the src actor and to dst actors + world_size = 2 + actors = [GPUTestActor.remote() for _ in range(world_size)] + create_collective_group(actors, backend="gloo") + + src_actor, dst_actor = actors[0], actors[1] + + tensor = torch.tensor([1, 2, 3]) + + warning_message = r"GPU ObjectRef\(.+\)" + + with pytest.warns(UserWarning, match=warning_message): + t = src_actor.echo.remote(tensor) + t1 = src_actor.echo.remote(t) # Sent back to the source actor + t2 = dst_actor.echo.remote(t) # Also sent to another actor + ray.get([t1, t2], _tensor_transport="object_store") + + # Second transmission of ObjectRef `t` to `dst_actor` should not trigger a warning + # Verify no `pytest.warns` context is used here because no warning should be raised + t3 = dst_actor.echo.remote(t) + ray.get(t3, _tensor_transport="object_store") + + def test_duplicate_objectref_transfer(ray_start_regular): world_size = 2 actors = [GPUTestActor.remote() for _ in range(world_size)] From 5d5bd934a2567a32525c1844af1f796858c25dcb Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 21:19:00 +0000 Subject: [PATCH 11/14] fix Signed-off-by: Qiaolin-Yu --- python/ray/experimental/gpu_object_manager/gpu_object_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 9c5159563c04..1a5ba22c3eca 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -205,7 +205,7 @@ def add_object( for tensor in gpu_object: if tensor in self._tensor_to_object_ids: raise ValueError( - f"Tensor {tensor} already exists in the GPU object store. Must wait for the tensor to be freed before adding a new object with the same tensor." + "Adding a new object with a tensor that already exists in the GPU object store. Must wait for the tensor to be freed before adding a new object with the same tensor." ) for tensor in gpu_object: self._tensor_to_object_ids[tensor].add(obj_id) From b2440dcc325932d3af8f989354acb4e9ea742eb7 Mon Sep 17 00:00:00 2001 From: Qiaolin Yu Date: Fri, 24 Oct 2025 15:13:11 -0700 Subject: [PATCH 12/14] Update doc/source/ray-core/direct-transport.rst Co-authored-by: Stephanie Wang Signed-off-by: Qiaolin Yu --- doc/source/ray-core/direct-transport.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/ray-core/direct-transport.rst b/doc/source/ray-core/direct-transport.rst index 4daaef19aed0..8f79f72fbade 100644 --- a/doc/source/ray-core/direct-transport.rst +++ b/doc/source/ray-core/direct-transport.rst @@ -298,7 +298,7 @@ For collective-based tensor transports (Gloo and NCCL): * Any unexpected system bugs -Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but belong to different objects before the object in the first transfer is freed from Ray's gpu object store. +Due to a known issue, we currently do not support repeated transfers of tensors that share the same memory space but simultaneously belong to different objects. To support this pattern, ensure that the first object is freed before storing the same tensor again in a second object. .. literalinclude:: doc_code/direct_transport_nixl.py :language: python From 2bf832bd2c0ca94e7ee9aa5773bf57445f586bba Mon Sep 17 00:00:00 2001 From: Qiaolin-Yu Date: Fri, 24 Oct 2025 22:19:17 +0000 Subject: [PATCH 13/14] refine Signed-off-by: Qiaolin-Yu --- .../doc_code/direct_transport_nixl.py | 20 +++++++++---------- .../gpu_object_manager/gpu_object_store.py | 2 +- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index 29a41947deb3..b67366a12f0d 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -59,9 +59,6 @@ def consume_with_nixl(self, refs): # __nixl_limitations_start__ -import pytest - - @ray.remote(num_gpus=1) class Actor: def __init__(self): @@ -80,13 +77,14 @@ def send_dict2(self): def sum_dict(self, dict): return sum(v.sum().item() for v in dict.values()) - -with pytest.raises(ValueError): - sender, receiver = Actor.remote(), Actor.remote() - ref1 = sender.send_dict1.remote() - result1 = receiver.sum_dict.remote(ref1) - print(ray.get(result1)) - ref2 = sender.send_dict2.remote() - result2 = receiver.sum_dict.remote(ref2) +sender, receiver = Actor.remote(), Actor.remote() +ref1 = sender.send_dict1.remote() +result1 = receiver.sum_dict.remote(ref1) +print(ray.get(result1)) +ref2 = sender.send_dict2.remote() +result2 = receiver.sum_dict.remote(ref2) +try: print(ray.get(result2)) +except ValueError as e: + print("Error caught:", e) # __nixl_limitations_end__ diff --git a/python/ray/experimental/gpu_object_manager/gpu_object_store.py b/python/ray/experimental/gpu_object_manager/gpu_object_store.py index 1a5ba22c3eca..78c2c2f38606 100644 --- a/python/ray/experimental/gpu_object_manager/gpu_object_store.py +++ b/python/ray/experimental/gpu_object_manager/gpu_object_store.py @@ -205,7 +205,7 @@ def add_object( for tensor in gpu_object: if tensor in self._tensor_to_object_ids: raise ValueError( - "Adding a new object with a tensor that already exists in the GPU object store. Must wait for the tensor to be freed before adding a new object with the same tensor." + f"Tensor already exists in the RDT object store. Free all references to ObjectRef({obj_id}) before storing the tensor again." ) for tensor in gpu_object: self._tensor_to_object_ids[tensor].add(obj_id) From d69f7aebd257fca03fbdcae3d893cec4c8d3b873 Mon Sep 17 00:00:00 2001 From: Dhyey Shah Date: Fri, 24 Oct 2025 15:47:26 -0700 Subject: [PATCH 14/14] Update doc/source/ray-core/doc_code/direct_transport_nixl.py Signed-off-by: Dhyey Shah --- doc/source/ray-core/doc_code/direct_transport_nixl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/source/ray-core/doc_code/direct_transport_nixl.py b/doc/source/ray-core/doc_code/direct_transport_nixl.py index b67366a12f0d..6267b8c24fc2 100644 --- a/doc/source/ray-core/doc_code/direct_transport_nixl.py +++ b/doc/source/ray-core/doc_code/direct_transport_nixl.py @@ -77,6 +77,7 @@ def send_dict2(self): def sum_dict(self, dict): return sum(v.sum().item() for v in dict.values()) + sender, receiver = Actor.remote(), Actor.remote() ref1 = sender.send_dict1.remote() result1 = receiver.sum_dict.remote(ref1)