From 2a6a913fae08f179f18b4796b6750fd31740a98c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Hibner?= Date: Mon, 14 Apr 2025 18:09:06 +0000 Subject: [PATCH 1/2] Fix DatasetWriter writting batch greater than MaxRowsQueued --- cpp/src/arrow/dataset/dataset_writer.cc | 4 ++-- cpp/src/arrow/dataset/dataset_writer_test.cc | 13 ++++++++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index c60042dd6fef8..0ecde4a0dca80 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -58,7 +58,7 @@ class Throttle { return Future<>::MakeFinished(); } std::lock_guard lg(mutex_); - if (values + current_value_ > max_value_) { + if (current_value_ > max_value_) { in_waiting_ = values; backpressure_ = Future<>::Make(); } else { @@ -75,7 +75,7 @@ class Throttle { { std::lock_guard lg(mutex_); current_value_ -= values; - if (in_waiting_ > 0 && in_waiting_ + current_value_ <= max_value_) { + if (in_waiting_ > 0 && current_value_ <= max_value_) { in_waiting_ = 0; to_complete = backpressure_; } diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index 32ae8d7ee12fb..0c9ce0961f698 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -105,7 +105,8 @@ class DatasetWriterTestFixture : public testing::Test { uint64_t max_rows = kDefaultDatasetWriterMaxRowsQueued) { EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make( - write_options_, scheduler_, [] {}, [] {}, [] {}, max_rows)); + write_options_, scheduler_, [this] { paused_ = true; }, + [this] { paused_ = false; }, [] {}, max_rows)); return dataset_writer; } @@ -231,6 +232,7 @@ class DatasetWriterTestFixture : public testing::Test { util::AsyncTaskScheduler* scheduler_; Future<> scheduler_finished_; FileSystemDatasetWriteOptions write_options_; + bool paused_{false}; uint64_t counter_ = 0; }; @@ -265,6 +267,14 @@ TEST_F(DatasetWriterTestFixture, DirectoryCreateFails) { ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_finished_); } +TEST_F(DatasetWriterTestFixture, BatchGreaterThanMaxRowsQueued) { + auto dataset_writer = MakeDatasetWriter(10); + dataset_writer->WriteRecordBatch(MakeBatch(35), ""); + EndWriterChecked(dataset_writer.get()); + AssertCreatedData({{"testdir/chunk-0.arrow", 0, 35}}); + ASSERT_EQ(paused_, false); +} + TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { write_options_.max_rows_per_file = 10; write_options_.max_rows_per_group = 10; @@ -275,6 +285,7 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { {"testdir/chunk-1.arrow", 10, 10}, {"testdir/chunk-2.arrow", 20, 10}, {"testdir/chunk-3.arrow", 30, 5}}); + ASSERT_EQ(paused_, false); } TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) { From 287a1e033896d19f4900d2f4d9e811c79f1ce831 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Hibner?= Date: Mon, 14 Apr 2025 20:03:53 +0000 Subject: [PATCH 2/2] Fix max_open_files throttle threshold --- cpp/src/arrow/dataset/dataset_writer.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index 0ecde4a0dca80..e2098b7a0b8cb 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -58,7 +58,7 @@ class Throttle { return Future<>::MakeFinished(); } std::lock_guard lg(mutex_); - if (current_value_ > max_value_) { + if (current_value_ >= max_value_) { in_waiting_ = values; backpressure_ = Future<>::Make(); } else { @@ -75,7 +75,7 @@ class Throttle { { std::lock_guard lg(mutex_); current_value_ -= values; - if (in_waiting_ > 0 && current_value_ <= max_value_) { + if (in_waiting_ > 0 && current_value_ < max_value_) { in_waiting_ = 0; to_complete = backpressure_; }