Skip to content

Commit d921f01

Browse files
authored
[data] Include total memory in aggregator memory calculation (#58002)
## Description Currently, the aggregators don't consider total available memory usage in scheduling, and will default to 1GiB if the `estimated_dataset_bytes` is None. This can be troublesome for smaller machines. See #57979 for details. This PR uses total available memory to calculate memory per aggregator. ## Related issues Fixes #57979 ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: iamjustinhsu <[email protected]>
1 parent 4ec817b commit d921f01

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

python/ray/data/_internal/execution/operators/hash_shuffle.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,11 +1090,19 @@ def _get_default_aggregator_ray_remote_args(
10901090
)
10911091
else:
10921092
# NOTE: In cases when we're unable to estimate dataset size,
1093-
# we simply fallback to request
1094-
# ``DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`` worth of
1095-
# memory for every Aggregator
1096-
estimated_aggregator_memory_required = (
1097-
DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION
1093+
# we simply fallback to request the minimum of:
1094+
# - conservative 50% of total available memory for a join operation.
1095+
# - ``DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION`` worth of
1096+
# memory for every Aggregator.
1097+
1098+
max_memory_per_aggregator = (
1099+
total_available_cluster_resources.memory / num_aggregators
1100+
)
1101+
modest_memory_per_aggregator = max_memory_per_aggregator / 2
1102+
1103+
estimated_aggregator_memory_required = min(
1104+
modest_memory_per_aggregator,
1105+
DEFAULT_HASH_SHUFFLE_AGGREGATOR_MEMORY_ALLOCATION,
10981106
)
10991107

11001108
remote_args = {

python/ray/data/_internal/execution/util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def make_ref_bundles(simple_data: List[List[Any]]) -> List["RefBundle"]:
3636
return output
3737

3838

39-
memory_units = ["B", "KB", "MB", "GB", "TB", "PB"]
39+
memory_units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
4040

4141

4242
def memory_string(num_bytes: float) -> str:

python/ray/data/tests/test_executor_resource_management.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ def test_execution_resources(ray_start_10_cpus_shared):
5353
)
5454
assert (
5555
repr(r4)
56-
== "ExecutionResources(cpu=1, gpu=1, object_store_memory=100.0MB, memory=0.0B)"
56+
== "ExecutionResources(cpu=1, gpu=1, object_store_memory=100.0MiB, memory=0.0B)"
5757
)
5858
assert (
5959
repr(r5)
60-
== "ExecutionResources(cpu=1, gpu=1, object_store_memory=1.0GB, memory=64.0MB)"
60+
== "ExecutionResources(cpu=1, gpu=1, object_store_memory=1.0GiB, memory=64.0MiB)"
6161
)
6262
assert (
6363
repr(unlimited)
@@ -66,8 +66,8 @@ def test_execution_resources(ray_start_10_cpus_shared):
6666

6767
# Test object_store_memory_str.
6868
assert r3.object_store_memory_str() == "0.0B"
69-
assert r4.object_store_memory_str() == "100.0MB"
70-
assert r5.object_store_memory_str() == "1.0GB"
69+
assert r4.object_store_memory_str() == "100.0MiB"
70+
assert r5.object_store_memory_str() == "1.0GiB"
7171
assert unlimited.object_store_memory_str() == "inf"
7272

7373
# Test add.

0 commit comments

Comments
 (0)