From dd3718fe5df0d37bc7ec23e0451a6dfb6e816aac Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 20 Sep 2024 12:32:21 -0700 Subject: [PATCH 1/9] initial changes to support blocksize with arrow parquet reader --- dask_expr/io/io.py | 86 +++++++++++++++++++++++++++++++++++++++++ dask_expr/io/parquet.py | 57 ++++++++++++++++++++++++++- 2 files changed, 141 insertions(+), 2 deletions(-) diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index a28b8762e..e226af400 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -200,6 +200,92 @@ def _task(self, index: int): ) +class SplitParquetIO(PartitionsFiltered, BlockwiseIO): + _parameters = ["_expr", "_partitions"] + _defaults = {"_partitions": None} + + @functools.cached_property + def _name(self): + return ( + self.operand("_expr")._funcname + + "-split-" + + _tokenize_deterministic(*self.operands) + ) + + @functools.cached_property + def _meta(self): + return self.operand("_expr")._meta + + def dependencies(self): + return [] + + @property + def npartitions(self): + if self._filtered: + return len(self._partitions) + return len(self._split_mapping) + + def _divisions(self): + # TODO: Handle this? + return (None,) * (len(self._split_mapping) + 1) + + @staticmethod + def _load_partial_fragment( + local_split_index, + local_split_count, + frag, + filter, + columns, + schema, + *to_pandas_args, + ): + from dask_expr.io.parquet import ReadParquetPyarrowFS + + return ReadParquetPyarrowFS._table_to_pandas( + ReadParquetPyarrowFS._partial_fragment_to_table( + frag, + local_split_index, + local_split_count, + filter, + columns, + schema, + ), + *to_pandas_args, + ) + + def _filtered_task(self, index: int): + expr = self.operand("_expr") + original_index, local_split_index = self._split_mapping[index] + _, frag_to_table, *to_pandas_args = expr._task(original_index) + return ( + self._load_partial_fragment, + local_split_index, + self._local_split_count, + frag_to_table[1], # frag + frag_to_table[2], # filter + frag_to_table[3], # columns + frag_to_table[4], # schema + *to_pandas_args, + ) + + @functools.cached_property + def _local_split_count(self): + return self.operand("_expr")._split_division_factor + + @functools.cached_property + def _split_mapping(self): + count = 0 + mapping = {} + for op in self.operand("_expr")._partitions: + for s in range(self._local_split_count): + mapping[count] = (op, s) # original partition id, local split index + count += 1 + return mapping + + def _tune_up(self, parent): + return + + class FromMap(PartitionsFiltered, BlockwiseIO): _parameters = [ "func", diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 60c7400c9..352f63335 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -2,6 +2,7 @@ import contextlib import itertools +import math import operator import os import pickle @@ -60,7 +61,7 @@ from dask_expr._reductions import Len from dask_expr._util import _convert_to_list, _tokenize_deterministic from dask_expr.io import BlockwiseIO, PartitionsFiltered -from dask_expr.io.io import FusedParquetIO +from dask_expr.io.io import FusedParquetIO, SplitParquetIO @normalize_token.register(pa.fs.FileInfo) @@ -831,6 +832,7 @@ class ReadParquetPyarrowFS(ReadParquet): "_partitions", "_series", "_dataset_info_cache", + "_blocksize", ] _defaults = { "columns": None, @@ -847,6 +849,7 @@ class ReadParquetPyarrowFS(ReadParquet): "_partitions": None, "_series": False, "_dataset_info_cache": None, + "_blocksize": "256MiB", } _absorb_projections = True _filter_passthrough = True @@ -1037,6 +1040,7 @@ def _dataset_info(self): dataset_info["using_metadata_file"] = True dataset_info["fragments"] = _frags = list(dataset.get_fragments()) dataset_info["file_sizes"] = [None for fi in _frags] + dataset_info["all_files"] = all_files if checksum is None: checksum = tokenize(all_files) @@ -1094,7 +1098,11 @@ def _divisions(self): return self._division_from_stats[0] def _tune_up(self, parent): - if self._fusion_compression_factor >= 1: + if self._blocksize is not None and self._split_division_factor > 1: + if isinstance(parent, SplitParquetIO): + return + return parent.substitute(self, SplitParquetIO(self)) + if not self._aggregate_files or self._fusion_compression_factor >= 1: return if isinstance(parent, FusedParquetIO): return @@ -1150,6 +1158,22 @@ def _fusion_compression_factor(self): total_uncompressed = max(total_uncompressed, min_size) return max(after_projection / total_uncompressed, 0.001) + @property + def _split_division_factor(self) -> int: + approx_stats = self.approx_statistics() + after_projection = 0 + col_op = self.operand("columns") or self.columns + for col in approx_stats["columns"]: + if col["path_in_schema"] in col_op: + after_projection += col["total_uncompressed_size"] + + max_size = parse_bytes(self._blocksize) + max_splits = max(math.floor(approx_stats["num_row_groups"]), 1) + if after_projection <= max_size: + return 1 + else: + return min(math.ceil(after_projection / max_size), max_splits) + def _filtered_task(self, index: int): columns = self.columns.copy() index_name = self.index.name @@ -1175,6 +1199,35 @@ def _filtered_task(self, index: int): self.pyarrow_strings_enabled, ) + @classmethod + def _partial_fragment_to_table( + cls, + fragment_wrapper, + local_split_index, + local_split_count, + filters, + columns, + schema, + ): + if isinstance(fragment_wrapper, FragmentWrapper): + fragment = fragment_wrapper.fragment + else: + fragment = fragment_wrapper + + num_row_groups = fragment.num_row_groups + stride = max(math.floor(num_row_groups / local_split_count), 1) + offset = local_split_index * stride + row_groups = list(range(offset, min(offset + stride, num_row_groups))) + assert row_groups # TODO: Handle empty partition case + fragment = fragment.format.make_fragment( + fragment.path, + fragment.filesystem, + fragment.partition_expression, + row_groups=row_groups, + ) + + return cls._fragment_to_table(fragment, filters, columns, schema) + @staticmethod def _fragment_to_table(fragment_wrapper, filters, columns, schema): _maybe_adjust_cpu_count() From dc52fa98839a8e11874e37482ce80e41fed13261 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 20 Sep 2024 13:10:05 -0700 Subject: [PATCH 2/9] add aggregate_files support --- dask_expr/_collection.py | 11 +++++++---- dask_expr/io/parquet.py | 31 +++++++++++++++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 26772730f..0fe68ef17 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5428,13 +5428,14 @@ def read_parquet( raise NotImplementedError( "split_row_groups is not supported when using the pyarrow filesystem." ) - if blocksize is not None and blocksize != "default": + if blocksize is not None and blocksize != "default" and calculate_divisions: raise NotImplementedError( - "blocksize is not supported when using the pyarrow filesystem." + "blocksize is not supported when using the pyarrow filesystem " + "if calculate_divisions is set to True." ) - if aggregate_files is not None: + if aggregate_files not in (None, True, False): raise NotImplementedError( - "aggregate_files is not supported when using the pyarrow filesystem." + "aggregate_files must be bool or None when using the pyarrow filesystem." ) if parquet_file_extension != (".parq", ".parquet", ".pq"): raise NotImplementedError( @@ -5459,6 +5460,8 @@ def read_parquet( arrow_to_pandas=arrow_to_pandas, pyarrow_strings_enabled=pyarrow_strings_enabled(), kwargs=kwargs, + blocksize=blocksize, + aggregate_files=aggregate_files, _series=isinstance(columns, str), ) ) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 352f63335..35d8ca0bf 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -829,10 +829,11 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas", "pyarrow_strings_enabled", "kwargs", + "blocksize", + "aggregate_files", "_partitions", "_series", "_dataset_info_cache", - "_blocksize", ] _defaults = { "columns": None, @@ -846,14 +847,29 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas": None, "pyarrow_strings_enabled": True, "kwargs": None, + "blocksize": None, + "aggregate_files": None, "_partitions": None, "_series": False, "_dataset_info_cache": None, - "_blocksize": "256MiB", } _absorb_projections = True _filter_passthrough = True + @cached_property + def _blocksize(self): + if self.blocksize == "default": + if self.calculate_divisions: + # Not supported yet + return None + return "256MiB" + return self.blocksize + + @property + def _aggregate_files(self): + # Allow file aggregation by default + return self.aggregate_files is not None + @cached_property def normalized_path(self): return _normalize_and_strip_protocol(self.path) @@ -1102,7 +1118,9 @@ def _tune_up(self, parent): if isinstance(parent, SplitParquetIO): return return parent.substitute(self, SplitParquetIO(self)) - if not self._aggregate_files or self._fusion_compression_factor >= 1: + if not self._aggregate_files: + return + if self._fusion_compression_factor >= 1: return if isinstance(parent, FusedParquetIO): return @@ -1156,7 +1174,12 @@ def _fusion_compression_factor(self): dask.config.get("dataframe.parquet.minimum-partition-size") ) total_uncompressed = max(total_uncompressed, min_size) - return max(after_projection / total_uncompressed, 0.001) + ratio = after_projection / total_uncompressed + + if self._blocksize: + ratio *= total_uncompressed / parse_bytes(self._blocksize) + + return max(ratio, 0.001) @property def _split_division_factor(self) -> int: From 603851c924659159481e11de36175ffb35da201f Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 20 Sep 2024 15:48:04 -0700 Subject: [PATCH 3/9] use blockwise=None as default for now --- dask_expr/io/parquet.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 35d8ca0bf..b852229c5 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -859,16 +859,13 @@ class ReadParquetPyarrowFS(ReadParquet): @cached_property def _blocksize(self): if self.blocksize == "default": - if self.calculate_divisions: - # Not supported yet - return None - return "256MiB" + return None return self.blocksize @property def _aggregate_files(self): # Allow file aggregation by default - return self.aggregate_files is not None + return self.aggregate_files is not False @cached_property def normalized_path(self): @@ -1173,11 +1170,14 @@ def _fusion_compression_factor(self): min_size = parse_bytes( dask.config.get("dataframe.parquet.minimum-partition-size") ) - total_uncompressed = max(total_uncompressed, min_size) - ratio = after_projection / total_uncompressed - if self._blocksize: - ratio *= total_uncompressed / parse_bytes(self._blocksize) + # Use blocksize to calculate the compression factor + blocksize = max(parse_bytes(self._blocksize), min_size) + ratio = after_projection / blocksize + else: + # Aggregate files to preserve un-projected partition size + total_uncompressed = max(total_uncompressed, min_size) + ratio = after_projection / total_uncompressed return max(ratio, 0.001) From 7d126784e1d7c8cabc049c47c1f67f8288d086e4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Sat, 21 Sep 2024 06:42:17 -0700 Subject: [PATCH 4/9] add test coverage --- dask_expr/_collection.py | 2 +- dask_expr/io/tests/test_parquet.py | 34 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 0fe68ef17..f9a90c6f9 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5428,7 +5428,7 @@ def read_parquet( raise NotImplementedError( "split_row_groups is not supported when using the pyarrow filesystem." ) - if blocksize is not None and blocksize != "default" and calculate_divisions: + if blocksize not in (None, "default") and calculate_divisions: raise NotImplementedError( "blocksize is not supported when using the pyarrow filesystem " "if calculate_divisions is set to True." diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index 3d111d5da..6c9b98435 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -119,7 +119,39 @@ def test_pyarrow_filesystem(parquet_file): df_pa = read_parquet(parquet_file, filesystem=filesystem) df = read_parquet(parquet_file) - assert assert_eq(df, df_pa) + assert_eq(df, df_pa) + + +def test_pyarrow_filesystem_blocksize(tmpdir): + pdf = pd.DataFrame({c: range(10) for c in "abcde"}) + fn = _make_file(tmpdir, df=pdf, engine="pyarrow", row_group_size=1) + df = read_parquet(fn, filesystem="pyarrow", blocksize=1) + + # Trigger "_tune_up" optimization + df = df.map_partitions(lambda x: x) + assert df.optimize().npartitions == len(pdf) + assert_eq(df, pdf, check_index=False) + + +@pytest.mark.parametrize("aggregate_files", [True, False]) +def test_pyarrow_filesystem_aggregate_files(tmpdir, aggregate_files): + df0 = from_pandas( + pd.DataFrame({c: range(0, 20) for c in "abcde"}), + npartitions=2, + ) + path = tmpdir + "aggregate.parquet" + df0.to_parquet(path) + df = read_parquet( + path, + filesystem="pyarrow", + blocksize="1MiB", + aggregate_files=aggregate_files, + ) + + # Trigger "_tune_up" optimization + df = df.map_partitions(lambda x: x) + assert df.optimize().npartitions == 1 if aggregate_files else 2 + assert_eq(df, df0, check_index=False, check_divisions=False) @pytest.mark.parametrize("dtype_backend", ["pyarrow", "numpy_nullable", None]) From d420a09a66e9780fd2cefdcb0ee06962daedd17a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Sat, 21 Sep 2024 07:00:17 -0700 Subject: [PATCH 5/9] add docstring notes --- dask_expr/_collection.py | 16 +++++++++++++++- dask_expr/io/parquet.py | 7 ++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index f9a90c6f9..8ce66a086 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5209,13 +5209,16 @@ def read_parquet( .. note:: Dask automatically resizes partitions to ensure that each partition is of adequate size. The optimizer uses the ratio of selected columns to total - columns to squash multiple files into one partition. + columns to squash multiple files into one partition. If the ``blocksize`` + argument is specified, Additionally, the Optimizer uses a minimum size per partition (default 75MB) to avoid too many small partitions. This configuration can be set with >>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"}) + To disable file aggregation, set ``aggregate_files=False`` explicitly. + .. note:: Specifying ``filesystem="arrow"`` leverages a complete reimplementation of the Parquet reader that is solely based on PyArrow. It is significantly faster @@ -5313,10 +5316,21 @@ def read_parquet( set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default is 256 MiB. + + .. note:: + If ``filesystem="arrow"`` is specified, the ``blocksize`` value will + be used to split files at optimization time, and the default will + be ``None``. + aggregate_files : bool or str, default None WARNING: Passing a string argument to ``aggregate_files`` will result in experimental behavior. This behavior may change in the future. + .. note:: + If ``filesystem="arrow"`` is specified, ``aggregate_files`` will be + ``True`` by default, and file aggregation will occur at optimization + time only. + Whether distinct file paths may be aggregated into the same output partition. This parameter is only used when `split_row_groups` is set to 'infer', 'adaptive' or to an integer >1. A setting of True means that any diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index b852229c5..8884d767e 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -814,6 +814,11 @@ def _fusion_compression_factor(self): len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 ) + def _tune_up(self, parent): + if self.aggregate_files is False: + return + return super()._tune_up(parent) + class ReadParquetPyarrowFS(ReadParquet): _parameters = [ @@ -1115,7 +1120,7 @@ def _tune_up(self, parent): if isinstance(parent, SplitParquetIO): return return parent.substitute(self, SplitParquetIO(self)) - if not self._aggregate_files: + if self._aggregate_files is False: return if self._fusion_compression_factor >= 1: return From 73a10d8a38653e1c61833930b02e7c8283fcb737 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Sat, 21 Sep 2024 07:40:33 -0700 Subject: [PATCH 6/9] tweak _fusion_compression_factor --- dask_expr/io/parquet.py | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 8884d767e..1cd5a7b2d 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1172,19 +1172,11 @@ def _fusion_compression_factor(self): if col["path_in_schema"] in col_op: after_projection += col["total_uncompressed_size"] - min_size = parse_bytes( + target_size = self._blocksize or parse_bytes( dask.config.get("dataframe.parquet.minimum-partition-size") ) - if self._blocksize: - # Use blocksize to calculate the compression factor - blocksize = max(parse_bytes(self._blocksize), min_size) - ratio = after_projection / blocksize - else: - # Aggregate files to preserve un-projected partition size - total_uncompressed = max(total_uncompressed, min_size) - ratio = after_projection / total_uncompressed - - return max(ratio, 0.001) + total_uncompressed = max(total_uncompressed, target_size) + return max(after_projection / total_uncompressed, 0.001) @property def _split_division_factor(self) -> int: From 86ab5d5b2e0553bd92ee955687fb53035e2f22d5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Sat, 21 Sep 2024 07:42:51 -0700 Subject: [PATCH 7/9] fix _blocksize --- dask_expr/io/parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 1cd5a7b2d..b09e3d473 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -865,7 +865,7 @@ class ReadParquetPyarrowFS(ReadParquet): def _blocksize(self): if self.blocksize == "default": return None - return self.blocksize + return parse_bytes(self.blocksize) @property def _aggregate_files(self): @@ -1187,7 +1187,7 @@ def _split_division_factor(self) -> int: if col["path_in_schema"] in col_op: after_projection += col["total_uncompressed_size"] - max_size = parse_bytes(self._blocksize) + max_size = self._blocksize max_splits = max(math.floor(approx_stats["num_row_groups"]), 1) if after_projection <= max_size: return 1 From 0144cd937f68de85a98b54401dfbcdf130af5e3b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 23 Sep 2024 10:18:00 -0700 Subject: [PATCH 8/9] remove aggregate_files --- dask_expr/_collection.py | 15 +++------------ dask_expr/io/parquet.py | 14 -------------- dask_expr/io/tests/test_parquet.py | 21 --------------------- 3 files changed, 3 insertions(+), 47 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index 8ce66a086..cfd5f4d56 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5209,16 +5209,13 @@ def read_parquet( .. note:: Dask automatically resizes partitions to ensure that each partition is of adequate size. The optimizer uses the ratio of selected columns to total - columns to squash multiple files into one partition. If the ``blocksize`` - argument is specified, + columns to squash multiple files into one partition. Additionally, the Optimizer uses a minimum size per partition (default 75MB) to avoid too many small partitions. This configuration can be set with >>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"}) - To disable file aggregation, set ``aggregate_files=False`` explicitly. - .. note:: Specifying ``filesystem="arrow"`` leverages a complete reimplementation of the Parquet reader that is solely based on PyArrow. It is significantly faster @@ -5326,11 +5323,6 @@ def read_parquet( WARNING: Passing a string argument to ``aggregate_files`` will result in experimental behavior. This behavior may change in the future. - .. note:: - If ``filesystem="arrow"`` is specified, ``aggregate_files`` will be - ``True`` by default, and file aggregation will occur at optimization - time only. - Whether distinct file paths may be aggregated into the same output partition. This parameter is only used when `split_row_groups` is set to 'infer', 'adaptive' or to an integer >1. A setting of True means that any @@ -5447,9 +5439,9 @@ def read_parquet( "blocksize is not supported when using the pyarrow filesystem " "if calculate_divisions is set to True." ) - if aggregate_files not in (None, True, False): + if aggregate_files is not None: raise NotImplementedError( - "aggregate_files must be bool or None when using the pyarrow filesystem." + "aggregate_files is not supported when using the pyarrow filesystem." ) if parquet_file_extension != (".parq", ".parquet", ".pq"): raise NotImplementedError( @@ -5475,7 +5467,6 @@ def read_parquet( pyarrow_strings_enabled=pyarrow_strings_enabled(), kwargs=kwargs, blocksize=blocksize, - aggregate_files=aggregate_files, _series=isinstance(columns, str), ) ) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index b09e3d473..f57159c68 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -814,11 +814,6 @@ def _fusion_compression_factor(self): len(_convert_to_list(self.operand("columns"))) / nr_original_columns, 0.001 ) - def _tune_up(self, parent): - if self.aggregate_files is False: - return - return super()._tune_up(parent) - class ReadParquetPyarrowFS(ReadParquet): _parameters = [ @@ -835,7 +830,6 @@ class ReadParquetPyarrowFS(ReadParquet): "pyarrow_strings_enabled", "kwargs", "blocksize", - "aggregate_files", "_partitions", "_series", "_dataset_info_cache", @@ -853,7 +847,6 @@ class ReadParquetPyarrowFS(ReadParquet): "pyarrow_strings_enabled": True, "kwargs": None, "blocksize": None, - "aggregate_files": None, "_partitions": None, "_series": False, "_dataset_info_cache": None, @@ -867,11 +860,6 @@ def _blocksize(self): return None return parse_bytes(self.blocksize) - @property - def _aggregate_files(self): - # Allow file aggregation by default - return self.aggregate_files is not False - @cached_property def normalized_path(self): return _normalize_and_strip_protocol(self.path) @@ -1120,8 +1108,6 @@ def _tune_up(self, parent): if isinstance(parent, SplitParquetIO): return return parent.substitute(self, SplitParquetIO(self)) - if self._aggregate_files is False: - return if self._fusion_compression_factor >= 1: return if isinstance(parent, FusedParquetIO): diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index 6c9b98435..e07ef57ce 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -133,27 +133,6 @@ def test_pyarrow_filesystem_blocksize(tmpdir): assert_eq(df, pdf, check_index=False) -@pytest.mark.parametrize("aggregate_files", [True, False]) -def test_pyarrow_filesystem_aggregate_files(tmpdir, aggregate_files): - df0 = from_pandas( - pd.DataFrame({c: range(0, 20) for c in "abcde"}), - npartitions=2, - ) - path = tmpdir + "aggregate.parquet" - df0.to_parquet(path) - df = read_parquet( - path, - filesystem="pyarrow", - blocksize="1MiB", - aggregate_files=aggregate_files, - ) - - # Trigger "_tune_up" optimization - df = df.map_partitions(lambda x: x) - assert df.optimize().npartitions == 1 if aggregate_files else 2 - assert_eq(df, df0, check_index=False, check_divisions=False) - - @pytest.mark.parametrize("dtype_backend", ["pyarrow", "numpy_nullable", None]) def test_pyarrow_filesystem_dtype_backend(parquet_file, dtype_backend): filesystem = fs.LocalFileSystem() From 48f26234b149fe18d4fe5a3dcd8f293f852b4890 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 23 Sep 2024 10:59:43 -0700 Subject: [PATCH 9/9] remove blocksize in favor of config --- dask_expr/_collection.py | 22 +++++++++--------- dask_expr/io/parquet.py | 36 ++++++++++++------------------ dask_expr/io/tests/test_parquet.py | 19 ++++++++-------- 3 files changed, 35 insertions(+), 42 deletions(-) diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index cfd5f4d56..ac0ddf846 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -5216,6 +5216,12 @@ def read_parquet( >>> dask.config.set({"dataframe.parquet.minimum-partition-size": "100MB"}) + When ``filesystem="arrow"``, the Optimizer will also use a maximum size + per partition (default 256MB) to avoid over-sized partitions. This + configuration can be set with + + >>> dask.config.set({"dataframe.parquet.maximum-partition-size": "512MB"}) + .. note:: Specifying ``filesystem="arrow"`` leverages a complete reimplementation of the Parquet reader that is solely based on PyArrow. It is significantly faster @@ -5313,12 +5319,6 @@ def read_parquet( set the default value of ``split_row_groups`` (using row-group metadata from a single file), and will be ignored if ``split_row_groups`` is not set to 'infer' or 'adaptive'. Default is 256 MiB. - - .. note:: - If ``filesystem="arrow"`` is specified, the ``blocksize`` value will - be used to split files at optimization time, and the default will - be ``None``. - aggregate_files : bool or str, default None WARNING: Passing a string argument to ``aggregate_files`` will result in experimental behavior. This behavior may change in the future. @@ -5434,14 +5434,15 @@ def read_parquet( raise NotImplementedError( "split_row_groups is not supported when using the pyarrow filesystem." ) - if blocksize not in (None, "default") and calculate_divisions: + if blocksize is not None and blocksize != "default": raise NotImplementedError( - "blocksize is not supported when using the pyarrow filesystem " - "if calculate_divisions is set to True." + "blocksize is not supported when using the pyarrow filesystem. " + "Please use the 'dataframe.parquet.maximim-partition-size' config." ) if aggregate_files is not None: raise NotImplementedError( - "aggregate_files is not supported when using the pyarrow filesystem." + "aggregate_files is not supported when using the pyarrow filesystem. " + "Please use the 'dataframe.parquet.minimim-partition-size' config." ) if parquet_file_extension != (".parq", ".parquet", ".pq"): raise NotImplementedError( @@ -5466,7 +5467,6 @@ def read_parquet( arrow_to_pandas=arrow_to_pandas, pyarrow_strings_enabled=pyarrow_strings_enabled(), kwargs=kwargs, - blocksize=blocksize, _series=isinstance(columns, str), ) ) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index f57159c68..69c6299af 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -829,7 +829,6 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas", "pyarrow_strings_enabled", "kwargs", - "blocksize", "_partitions", "_series", "_dataset_info_cache", @@ -846,7 +845,6 @@ class ReadParquetPyarrowFS(ReadParquet): "arrow_to_pandas": None, "pyarrow_strings_enabled": True, "kwargs": None, - "blocksize": None, "_partitions": None, "_series": False, "_dataset_info_cache": None, @@ -854,12 +852,6 @@ class ReadParquetPyarrowFS(ReadParquet): _absorb_projections = True _filter_passthrough = True - @cached_property - def _blocksize(self): - if self.blocksize == "default": - return None - return parse_bytes(self.blocksize) - @cached_property def normalized_path(self): return _normalize_and_strip_protocol(self.path) @@ -1104,15 +1096,13 @@ def _divisions(self): return self._division_from_stats[0] def _tune_up(self, parent): - if self._blocksize is not None and self._split_division_factor > 1: - if isinstance(parent, SplitParquetIO): - return - return parent.substitute(self, SplitParquetIO(self)) - if self._fusion_compression_factor >= 1: - return - if isinstance(parent, FusedParquetIO): + if isinstance(parent, (FusedParquetIO, SplitParquetIO)): return - return parent.substitute(self, FusedParquetIO(self)) + if self._split_division_factor > 1: + return parent.substitute(self, SplitParquetIO(self)) + if self._fusion_compression_factor < 1: + return parent.substitute(self, FusedParquetIO(self)) + return @cached_property def fragments(self): @@ -1158,10 +1148,10 @@ def _fusion_compression_factor(self): if col["path_in_schema"] in col_op: after_projection += col["total_uncompressed_size"] - target_size = self._blocksize or parse_bytes( + min_size = parse_bytes( dask.config.get("dataframe.parquet.minimum-partition-size") ) - total_uncompressed = max(total_uncompressed, target_size) + total_uncompressed = max(total_uncompressed, min_size) return max(after_projection / total_uncompressed, 0.001) @property @@ -1173,12 +1163,14 @@ def _split_division_factor(self) -> int: if col["path_in_schema"] in col_op: after_projection += col["total_uncompressed_size"] - max_size = self._blocksize - max_splits = max(math.floor(approx_stats["num_row_groups"]), 1) + max_size = parse_bytes( + dask.config.get("dataframe.parquet.maximum-partition-size", "256 MB") + ) if after_projection <= max_size: return 1 - else: - return min(math.ceil(after_projection / max_size), max_splits) + + max_splits = max(math.floor(approx_stats["num_row_groups"]), 1) + return min(math.ceil(after_projection / max_size), max_splits) def _filtered_task(self, index: int): columns = self.columns.copy() diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index e07ef57ce..e2c7e85d9 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -122,15 +122,16 @@ def test_pyarrow_filesystem(parquet_file): assert_eq(df, df_pa) -def test_pyarrow_filesystem_blocksize(tmpdir): - pdf = pd.DataFrame({c: range(10) for c in "abcde"}) - fn = _make_file(tmpdir, df=pdf, engine="pyarrow", row_group_size=1) - df = read_parquet(fn, filesystem="pyarrow", blocksize=1) - - # Trigger "_tune_up" optimization - df = df.map_partitions(lambda x: x) - assert df.optimize().npartitions == len(pdf) - assert_eq(df, pdf, check_index=False) +def test_pyarrow_filesystem_max_partition_size(tmpdir): + with dask.config.set({"dataframe.parquet.maximum-partition-size": 1}): + pdf = pd.DataFrame({c: range(10) for c in "abcde"}) + fn = _make_file(tmpdir, df=pdf, engine="pyarrow", row_group_size=1) + df = read_parquet(fn, filesystem="pyarrow") + + # Trigger "_tune_up" optimization + df = df.map_partitions(lambda x: x) + assert df.optimize().npartitions == len(pdf) + assert_eq(df, pdf, check_index=False) @pytest.mark.parametrize("dtype_backend", ["pyarrow", "numpy_nullable", None])