Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add repartition by maximum number of rows per block #50179

Open
wants to merge 25 commits into
base: master
Choose a base branch
from

Conversation

srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Feb 2, 2025

Why are these changes needed?

Add repartition by maximum number of rows per block
Addresses #36724

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
@srinathk10 srinathk10 requested a review from a team as a code owner February 2, 2025 07:23
@@ -1319,11 +1320,18 @@ def filter(
@PublicAPI(api_group=SSR_API_GROUP)
def repartition(
self,
num_blocks: int,
num_blocks: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

randomly came across this. I think we do not need num_blocks to be default-ed to None. It's uncommon to have the default arguments that leads to the error path.

It will be helpful if the docstring includes all the invalid argument combinations (e.g., a Raises section).


# Determine the slice range for the next partition.
end_idx = start_idx + min(remaining_rows, max_num_rows_per_block - cur_rows)
cur_block_builder.add_block(accessor.slice(start_idx, end_idx, copy=True))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do copy=False? I don't think we need to copy the data here.

cur_block_builder.add_block(accessor.slice(start_idx, end_idx, copy=True))

# If the current block reaches the size limit, finalize and store it.
if cur_block_builder.num_rows() == max_num_rows_per_block:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this condition isn't needed. Because the current block will either have max_num_rows_per_block or be the last block.
Also, the builder isn't needed either. Because we are not combining slices across multiple input blocks. We can just return the sliced block.

if cur_block_builder.num_rows() > 0:
block_list.append(cur_block_builder.build())

return block_list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to yield blocks are soon as they are sliced.

@@ -520,6 +584,17 @@ def transform_fn(blocks: Iterable[Block], _: TaskContext) -> Iterable[Block]:
return transform_fn


def _generate_transform_fn_for_repartition_block(
fn: UserDefinedFunction,
) -> MapTransformCallable[Block, Block]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just inline this function, it's simple and won't be reused.

@raulchen
Copy link
Contributor

raulchen commented Feb 4, 2025

I just realized that there is a simpler solution.
We should just extend OutputBuffer to support num-rows-based target block size.

srinathk10 and others added 6 commits February 4, 2025 05:49
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: srinathk10 <[email protected]>
map_transformer = MapTransformer(transform_fns)
map_transformer.set_target_max_block_size(
target_max_block_size=op.max_num_rows_per_block
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

target_max_block_size is based on the data size in bytes, not num rows.
We also need to make OutputBuffer to support num rows.
Btw, let's also update the target_max_block_size arg to a struct that accepts either size-bytes or num-rows-based arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Missed this one.

@@ -31,8 +33,11 @@ class BlockOutputBuffer:
... yield output.next() # doctest: +SKIP
"""

def __init__(self, target_max_block_size: int):
def __init__(
self, target_max_block_size: int, target_max_rows_per_block: int = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should check that only one of target_max_block_size and target_max_rows_per_block is non-None.
Maybe introduce such a struct to make code cleaner.

@dataclass
class OutputBlockSizeOption:

    target_max_block_size: Optional[int]
    target_max_rows_per_block: Optional[int]

    def __post_init__(sefl):
        # check Nones

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the type as well

@@ -57,14 +62,33 @@ def finalize(self) -> None:
assert not self._finalized
self._finalized = True

def _buffer_row_limit(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _buffer_row_limit(self) -> bool:
def _exceeds_buffer_row_limit(self) -> bool:

target_num_rows_by_rows = (
self._target_max_rows_per_block or block.num_rows()
)
target_num_rows = min(target_num_rows_by_size, target_num_rows_by_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To simplify the logic, I think we can just consider one factor a time. I.E., only one factor can be non-None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

python/ray/data/_internal/output_buffer.py Show resolved Hide resolved
@@ -31,8 +33,11 @@ class BlockOutputBuffer:
... yield output.next() # doctest: +SKIP
"""

def __init__(self, target_max_block_size: int):
def __init__(
self, target_max_block_size: int, target_max_rows_per_block: int = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the type as well

and self._buffer.num_rows() > self._target_max_rows_per_block
)

def _buffer_size_limit(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto everywhere

target_num_rows_by_rows = (
self._target_max_rows_per_block or block.num_rows()
)
target_num_rows = min(target_num_rows_by_size, target_num_rows_by_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

python/ray/data/dataset.py Show resolved Hide resolved
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Feb 7, 2025
assert not (
self.target_max_block_size is not None
and self.target_max_rows_per_block is not None
), "Only one of target_max_block_size or target_max_rows_per_block should be set."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they cannot be both None either.
This is a cleaner way to check one is None and the other is non-None. assert (x is None) != (y is None)

@@ -63,6 +63,7 @@ def __init__(
self._output_type = output_type
self._category = category
self._target_max_block_size = None
self._target_max_rows_per_block = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, also use OutputBlockSizeOption here for simplicity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same for the map physical operator.

Signed-off-by: Srinath Krishnamachari <[email protected]>
srinathk10 and others added 4 commits February 7, 2025 21:43
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants