Skip to content

Commit 84d6b8b

Browse files
author
Rafał Hibner
committed
Fix DatasetWriter writting batch greater than MaxRowsQueued
1 parent 601fde9 commit 84d6b8b

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

cpp/src/arrow/dataset/dataset_writer.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class Throttle {
5858
return Future<>::MakeFinished();
5959
}
6060
std::lock_guard<std::mutex> lg(mutex_);
61-
if (values + current_value_ > max_value_) {
61+
if (current_value_ > max_value_) {
6262
in_waiting_ = values;
6363
backpressure_ = Future<>::Make();
6464
} else {

cpp/src/arrow/dataset/dataset_writer_test.cc

+12-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ class DatasetWriterTestFixture : public testing::Test {
105105
uint64_t max_rows = kDefaultDatasetWriterMaxRowsQueued) {
106106
EXPECT_OK_AND_ASSIGN(auto dataset_writer,
107107
DatasetWriter::Make(
108-
write_options_, scheduler_, [] {}, [] {}, [] {}, max_rows));
108+
write_options_, scheduler_, [this] { paused_ = true; },
109+
[this] { paused_ = false; }, [] {}, max_rows));
109110
return dataset_writer;
110111
}
111112

@@ -231,6 +232,7 @@ class DatasetWriterTestFixture : public testing::Test {
231232
util::AsyncTaskScheduler* scheduler_;
232233
Future<> scheduler_finished_;
233234
FileSystemDatasetWriteOptions write_options_;
235+
bool paused_{false};
234236
uint64_t counter_ = 0;
235237
};
236238

@@ -265,6 +267,14 @@ TEST_F(DatasetWriterTestFixture, DirectoryCreateFails) {
265267
ASSERT_FINISHES_AND_RAISES(Invalid, scheduler_finished_);
266268
}
267269

270+
TEST_F(DatasetWriterTestFixture, BatchGreaterThanMaxRowsQueued) {
271+
auto dataset_writer = MakeDatasetWriter(10);
272+
dataset_writer->WriteRecordBatch(MakeBatch(35), "");
273+
EndWriterChecked(dataset_writer.get());
274+
AssertCreatedData({{"testdir/chunk-0.arrow", 0, 35}});
275+
ASSERT_EQ(paused_, false);
276+
}
277+
268278
TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
269279
write_options_.max_rows_per_file = 10;
270280
write_options_.max_rows_per_group = 10;
@@ -275,6 +285,7 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
275285
{"testdir/chunk-1.arrow", 10, 10},
276286
{"testdir/chunk-2.arrow", 20, 10},
277287
{"testdir/chunk-3.arrow", 30, 5}});
288+
ASSERT_EQ(paused_, false);
278289
}
279290

280291
TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {

0 commit comments

Comments
 (0)