-
Couldn't load subscription status.
- Fork 6.8k
[data] support generator udf for map_groups #58039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[data] support generator udf for map_groups #58039
Conversation
Signed-off-by: my-vegetable-has-exploded <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a valuable enhancement by adding support for generator UDFs in map_groups. This change can significantly reduce memory consumption for UDFs that produce large outputs. The implementation in _apply_udf_to_groups is clean and correctly handles both single DataBatch and Iterator[DataBatch] return types. The new test case test_map_groups_generator_udf is comprehensive and effectively validates the new functionality. I have one minor suggestion to improve code clarity by updating a type hint to align with this new capability.
Bug: UDF Type Checking Fails on Older Python VersionsThe |
Signed-off-by: my-vegetable-has-exploded <[email protected]>
Bug: Runtime Error with Typing GenericsThe |
Signed-off-by: my-vegetable-has-exploded <[email protected]>
Signed-off-by: my-vegetable-has-exploded <[email protected]>
Description
This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost.
Related issues
Close #57935
Additional information
This change centers on the
_apply_udf_to_groupshelper function within the file ray/data/grouped_data.py.map_groupsinternally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls_apply_udf_to_groupsto apply the user's UDF to each group.The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with
yield fromto produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior.