Skip to content

Commit 173ead0

Browse files
committed
fix flush
1 parent 02792d3 commit 173ead0

File tree

17 files changed

+232
-122
lines changed

17 files changed

+232
-122
lines changed

bench-vortex/src/compress/vortex.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@ use tokio::runtime::Handle;
77
use vortex::Array;
88
use vortex::arrow::IntoArrowArray;
99
use vortex::error::VortexResult;
10-
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
10+
use vortex::file::{VortexLayoutStrategy, VortexOpenOptions, VortexWriteOptions};
1111
use vortex::stream::ArrayStreamArrayExt;
1212

1313
#[inline(never)]
1414
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> VortexResult<u64> {
1515
Ok(VortexWriteOptions::default()
16+
.with_strategy(VortexLayoutStrategy::default().with_tokio_executor(Handle::current()))
1617
.write(Cursor::new(buf), array.to_array_stream())
1718
.await?
1819
.position())

encodings/dict/src/builders/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub const UNCONSTRAINED: DictConstraints = DictConstraints {
2323
max_len: usize::MAX,
2424
};
2525

26-
pub trait DictEncoder: Send {
26+
pub trait DictEncoder: Send + Sync {
2727
fn encode(&mut self, array: &dyn Array) -> VortexResult<ArrayRef>;
2828

2929
fn values(&mut self) -> VortexResult<ArrayRef>;

vortex-file/src/segments/ordered.rs

+17-12
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@ impl Region {
2828
if step == 0 {
2929
vortex_bail!("region space exhausted!");
3030
}
31-
Ok((self.start..self.end).step_by(step).map(move |start| Self {
32-
start,
33-
end: start + step,
34-
}))
31+
Ok((self.start..self.end)
32+
.step_by(step)
33+
.skip(1)
34+
.map(move |start| Self {
35+
start,
36+
end: start + step,
37+
}))
3538
}
3639
}
3740

@@ -57,7 +60,7 @@ impl OrderedBuffers {
5760
pub fn finish_region(&mut self, region: &Region) {
5861
self.active_regions.remove(&region);
5962
if let Ok(first) = self.first_region() {
60-
if let Some(waker) = self.wakers.get(&first) {
63+
if let Some(waker) = self.wakers.remove(&first) {
6164
waker.wake_by_ref();
6265
}
6366
}
@@ -93,13 +96,15 @@ impl OrderedBuffers {
9396
.ok_or_else(|| vortex_err!("no active regions"))
9497
}
9598

96-
pub fn completed_buffers(&mut self) -> VortexResult<BTreeMap<usize, Vec<ByteBuffer>>> {
97-
let completed_until = self.first_region()?;
98-
let mut completed = std::mem::take(&mut self.data);
99-
// truncate completed to include only completed regions
100-
let in_progress = completed.split_off(&completed_until.start);
101-
self.data = in_progress;
102-
Ok(completed)
99+
pub fn take_buffers(&mut self) -> VortexResult<BTreeMap<usize, Vec<ByteBuffer>>> {
100+
if self.active_regions.len() > 1 {
101+
vortex_bail!("there are more than one active writers");
102+
}
103+
if !self.wakers.is_empty() {
104+
vortex_bail!("there is an inflight write");
105+
}
106+
self.active_regions = [Region::default()].into();
107+
Ok(std::mem::take(&mut self.data))
103108
}
104109

105110
pub fn next_segment_id(&mut self) -> SegmentId {

vortex-file/src/segments/writer.rs

+34-14
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use std::task::{Context, Poll};
66
use async_trait::async_trait;
77
use parking_lot::Mutex;
88
use vortex_buffer::{Alignment, ByteBuffer};
9-
use vortex_error::{VortexResult, vortex_bail, vortex_err};
9+
use vortex_error::{VortexExpect as _, VortexResult, vortex_bail, vortex_err};
1010
use vortex_io::VortexWrite;
11-
use vortex_layout::segments::{SegmentId, SegmentWriter};
11+
use vortex_layout::segments::{ConcurrentSegmentWriter, SegmentId, SegmentWriter};
1212

1313
use super::ordered::{OrderedBuffers, Region};
1414
use crate::footer::SegmentSpec;
@@ -34,20 +34,38 @@ impl SegmentWriter for InOrderSegmentWriter {
3434
}
3535
}
3636

37-
impl InOrderSegmentWriter {
38-
pub fn split(self, splits: usize) -> VortexResult<Vec<Self>> {
39-
Ok(self
37+
impl ConcurrentSegmentWriter for InOrderSegmentWriter {
38+
fn split_off(&mut self, splits: usize) -> VortexResult<Vec<Box<dyn ConcurrentSegmentWriter>>> {
39+
let unwritten_region = Region {
40+
start: self.region.start + self.region_offset,
41+
end: self.region.end,
42+
};
43+
let mut regions: Vec<_> = self
4044
.buffers
4145
.lock()
42-
.split_region(&self.region, splits)?
43-
.map(|region| Self {
44-
buffers: self.buffers.clone(),
45-
region,
46-
region_offset: 0,
46+
.split_region(&unwritten_region, splits + 1)?
47+
.collect();
48+
// assign last splits region to self
49+
let last = regions
50+
.pop()
51+
.vortex_expect("there must be at least 1 split");
52+
self.region = last;
53+
self.region_offset = 0;
54+
55+
Ok(regions
56+
.into_iter()
57+
.map(|region| {
58+
Box::new(Self {
59+
buffers: self.buffers.clone(),
60+
region,
61+
region_offset: 0,
62+
}) as Box<dyn ConcurrentSegmentWriter>
4763
})
4864
.collect())
4965
}
66+
}
5067

68+
impl InOrderSegmentWriter {
5169
async fn next_segment_id_once_active(&self) -> VortexResult<SegmentId> {
5270
WaitRegionFuture {
5371
buffers: self.buffers.clone(),
@@ -61,10 +79,12 @@ impl InOrderSegmentWriter {
6179
writer: &mut futures::io::Cursor<W>,
6280
segment_specs: &mut Vec<SegmentSpec>,
6381
) -> VortexResult<()> {
64-
let completed = {
65-
let mut guard = self.buffers.lock();
66-
guard.completed_buffers()?
67-
};
82+
let completed = self.buffers.lock().take_buffers()?;
83+
// we are the only writer if here, reclaim the entire region
84+
self.region = Region::default();
85+
self.region_offset = 0;
86+
87+
// TODO(os): spawn everything below
6888
for buffers in completed.into_values() {
6989
// The API requires us to write these buffers contiguously. Therefore, we can only
7090
// respect the alignment of the first one.

vortex-file/src/strategy.rs

+39-12
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,37 @@ use vortex_layout::layouts::repartition::{
2121
};
2222
use vortex_layout::layouts::stats::writer::{StatsLayoutOptions, StatsLayoutWriter};
2323
use vortex_layout::layouts::struct_::writer::StructLayoutWriter;
24-
use vortex_layout::segments::SegmentWriter;
24+
use vortex_layout::scan::TaskExecutor;
25+
use vortex_layout::segments::ConcurrentSegmentWriter;
2526
use vortex_layout::{Layout, LayoutStrategy, LayoutWriter, LayoutWriterExt};
2627

2728
const ROW_BLOCK_SIZE: usize = 8192;
2829

2930
/// The default Vortex file layout strategy.
30-
#[derive(Clone, Debug, Default)]
31-
pub struct VortexLayoutStrategy;
31+
#[derive(Clone, Default)]
32+
pub struct VortexLayoutStrategy {
33+
executor: Option<Arc<dyn TaskExecutor>>,
34+
}
35+
36+
impl VortexLayoutStrategy {
37+
#[cfg(feature = "tokio")]
38+
pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
39+
self.executor = Some(Arc::new(handle));
40+
self
41+
}
42+
}
3243

3344
impl LayoutStrategy for VortexLayoutStrategy {
3445
fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
3546
// First, we unwrap struct arrays into their components.
3647
if dtype.is_struct() {
37-
return Ok(
38-
StructLayoutWriter::try_new_with_strategy(ctx, dtype, self.clone())?.boxed(),
39-
);
48+
return Ok(StructLayoutWriter::try_new_with_strategy(
49+
ctx,
50+
dtype,
51+
self.executor.clone(),
52+
self.clone(),
53+
)?
54+
.boxed());
4055
}
4156

4257
// We buffer arrays per column, before flushing them into a chunked layout.
@@ -135,7 +150,7 @@ struct BtrBlocksCompressedWriter {
135150
impl LayoutWriter for BtrBlocksCompressedWriter {
136151
async fn push_chunk(
137152
&mut self,
138-
segment_writer: &mut dyn SegmentWriter,
153+
segment_writer: &mut dyn ConcurrentSegmentWriter,
139154
chunk: ArrayRef,
140155
) -> VortexResult<()> {
141156
// Compute the stats for the chunk prior to compression
@@ -204,11 +219,17 @@ impl LayoutWriter for BtrBlocksCompressedWriter {
204219
.await
205220
}
206221

207-
async fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
222+
async fn flush(
223+
&mut self,
224+
segment_writer: &mut dyn ConcurrentSegmentWriter,
225+
) -> VortexResult<()> {
208226
self.child.flush(segment_writer).await
209227
}
210228

211-
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
229+
async fn finish(
230+
&mut self,
231+
segment_writer: &mut dyn ConcurrentSegmentWriter,
232+
) -> VortexResult<Layout> {
212233
self.child.finish(segment_writer).await
213234
}
214235
}
@@ -242,7 +263,7 @@ struct BufferedWriter {
242263
impl LayoutWriter for BufferedWriter {
243264
async fn push_chunk(
244265
&mut self,
245-
segment_writer: &mut dyn SegmentWriter,
266+
segment_writer: &mut dyn ConcurrentSegmentWriter,
246267
chunk: ArrayRef,
247268
) -> VortexResult<()> {
248269
self.nbytes += chunk.nbytes() as u64;
@@ -262,14 +283,20 @@ impl LayoutWriter for BufferedWriter {
262283
Ok(())
263284
}
264285

265-
async fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
286+
async fn flush(
287+
&mut self,
288+
segment_writer: &mut dyn ConcurrentSegmentWriter,
289+
) -> VortexResult<()> {
266290
for chunk in self.chunks.drain(..) {
267291
self.child.push_chunk(segment_writer, chunk).await?;
268292
}
269293
self.child.flush(segment_writer).await
270294
}
271295

272-
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
296+
async fn finish(
297+
&mut self,
298+
segment_writer: &mut dyn ConcurrentSegmentWriter,
299+
) -> VortexResult<Layout> {
273300
self.child.finish(segment_writer).await
274301
}
275302
}

vortex-file/src/writer.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#![allow(dead_code)]
2-
use std::sync::Arc;
32

43
use futures::StreamExt;
54
use vortex_array::ArrayContext;
@@ -9,7 +8,6 @@ use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
98
use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
109
use vortex_io::{IoDispatcher, VortexWrite};
1110
use vortex_layout::layouts::file_stats::FileStatsLayoutWriter;
12-
use vortex_layout::scan::TaskExecutor;
1311
use vortex_layout::{LayoutContext, LayoutStrategy, LayoutWriter};
1412

1513
use crate::footer::{FileStatistics, FooterFlatBufferWriter, Postscript, PostscriptSegment};
@@ -26,17 +24,15 @@ pub struct VortexWriteOptions {
2624
exclude_dtype: bool,
2725
file_statistics: Vec<Stat>,
2826
io_dispatcher: IoDispatcher,
29-
executor: Option<Arc<dyn TaskExecutor>>,
3027
}
3128

3229
impl Default for VortexWriteOptions {
3330
fn default() -> Self {
3431
Self {
35-
strategy: Box::new(VortexLayoutStrategy),
32+
strategy: Box::new(VortexLayoutStrategy::default()),
3633
exclude_dtype: false,
3734
file_statistics: PRUNING_STATS.to_vec(),
3835
io_dispatcher: IoDispatcher::shared(),
39-
executor: None,
4036
}
4137
}
4238
}

vortex-layout/src/layouts/chunked/writer.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use vortex_error::{VortexExpect, VortexResult};
99
use crate::data::Layout;
1010
use crate::layouts::chunked::ChunkedLayout;
1111
use crate::layouts::flat::writer::FlatLayoutStrategy;
12-
use crate::segments::SegmentWriter;
12+
use crate::segments::ConcurrentSegmentWriter;
1313
use crate::strategy::LayoutStrategy;
1414
use crate::writer::LayoutWriter;
1515
use crate::{LayoutVTableRef, LayoutWriterExt};
@@ -61,7 +61,7 @@ impl ChunkedLayoutWriter {
6161
impl LayoutWriter for ChunkedLayoutWriter {
6262
async fn push_chunk(
6363
&mut self,
64-
segment_writer: &mut dyn SegmentWriter,
64+
segment_writer: &mut dyn ConcurrentSegmentWriter,
6565
chunk: ArrayRef,
6666
) -> VortexResult<()> {
6767
self.row_count += chunk.len() as u64;
@@ -79,12 +79,18 @@ impl LayoutWriter for ChunkedLayoutWriter {
7979
Ok(())
8080
}
8181

82-
async fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
82+
async fn flush(
83+
&mut self,
84+
_segment_writer: &mut dyn ConcurrentSegmentWriter,
85+
) -> VortexResult<()> {
8386
// We flush each chunk as we write it, so there's nothing to do here.
8487
Ok(())
8588
}
8689

87-
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
90+
async fn finish(
91+
&mut self,
92+
segment_writer: &mut dyn ConcurrentSegmentWriter,
93+
) -> VortexResult<Layout> {
8894
// Call finish on each chunk's writer
8995
let mut children = vec![];
9096
for writer in self.chunks.iter_mut() {

vortex-layout/src/layouts/dict/writer/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ struct DelegatingDictLayoutWriter {
7676
impl LayoutWriter for DelegatingDictLayoutWriter {
7777
async fn push_chunk(
7878
&mut self,
79-
segment_writer: &mut dyn crate::segments::SegmentWriter,
79+
segment_writer: &mut dyn crate::segments::ConcurrentSegmentWriter,
8080
chunk: ArrayRef,
8181
) -> VortexResult<()> {
8282
match self.writer.as_mut() {
@@ -102,7 +102,7 @@ impl LayoutWriter for DelegatingDictLayoutWriter {
102102

103103
async fn flush(
104104
&mut self,
105-
segment_writer: &mut dyn crate::segments::SegmentWriter,
105+
segment_writer: &mut dyn crate::segments::ConcurrentSegmentWriter,
106106
) -> VortexResult<()> {
107107
match self.writer.as_mut() {
108108
None => vortex_bail!("flush called before push_chunk"),
@@ -112,7 +112,7 @@ impl LayoutWriter for DelegatingDictLayoutWriter {
112112

113113
async fn finish(
114114
&mut self,
115-
segment_writer: &mut dyn crate::segments::SegmentWriter,
115+
segment_writer: &mut dyn crate::segments::ConcurrentSegmentWriter,
116116
) -> VortexResult<Layout> {
117117
match self.writer.as_mut() {
118118
None => vortex_bail!("finish called before push_chunk"),

0 commit comments

Comments
 (0)