Skip to content

Commit 02792d3

Browse files
committed
async
1 parent 2e465b7 commit 02792d3

File tree

14 files changed

+158
-181
lines changed

14 files changed

+158
-181
lines changed

vortex-file/src/segments/writer.rs

+7-60
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
#![allow(dead_code)]
12
use std::pin::Pin;
23
use std::sync::Arc;
34
use std::task::{Context, Poll};
45

6+
use async_trait::async_trait;
57
use parking_lot::Mutex;
68
use vortex_buffer::{Alignment, ByteBuffer};
79
use vortex_error::{VortexResult, vortex_bail, vortex_err};
@@ -12,73 +14,16 @@ use super::ordered::{OrderedBuffers, Region};
1214
use crate::footer::SegmentSpec;
1315

1416
/// A segment writer that holds buffers in memory until they are flushed by a writer.
15-
#[derive(Default)]
16-
pub(crate) struct BufferedSegmentWriter {
17-
/// A Vec byte buffers for segments
18-
segments_buffers: Vec<Vec<ByteBuffer>>,
19-
next_id: SegmentId,
20-
}
21-
22-
impl SegmentWriter for BufferedSegmentWriter {
23-
fn put(&mut self, data: &[ByteBuffer]) -> SegmentId {
24-
self.segments_buffers.push(data.to_vec());
25-
let id = self.next_id;
26-
self.next_id = SegmentId::from(*self.next_id + 1);
27-
id
28-
}
29-
}
30-
31-
impl BufferedSegmentWriter {
32-
/// Flush the segments to the provided async writer.
33-
pub async fn flush_async<W: VortexWrite>(
34-
&mut self,
35-
writer: &mut futures::io::Cursor<W>,
36-
segment_specs: &mut Vec<SegmentSpec>,
37-
) -> VortexResult<()> {
38-
for buffers in self.segments_buffers.drain(..) {
39-
// The API requires us to write these buffers contiguously. Therefore, we can only
40-
// respect the alignment of the first one.
41-
// Don't worry, in most cases the caller knows what they're doing and will align the
42-
// buffers themselves, inserting padding buffers where necessary.
43-
let alignment = buffers
44-
.first()
45-
.map(|buffer| buffer.alignment())
46-
.unwrap_or_else(Alignment::none);
47-
48-
// Add any padding required to align the segment.
49-
let offset = writer.position();
50-
let padding = offset.next_multiple_of(*alignment as u64) - offset;
51-
if padding > 0 {
52-
writer
53-
.write_all(ByteBuffer::zeroed(padding as usize))
54-
.await?;
55-
}
56-
let offset = writer.position();
57-
58-
for buffer in buffers {
59-
writer.write_all(buffer).await?;
60-
}
61-
62-
segment_specs.push(SegmentSpec {
63-
offset,
64-
length: u32::try_from(writer.position() - offset)
65-
.map_err(|_| vortex_err!("segment length exceeds maximum u32"))?,
66-
alignment,
67-
});
68-
}
69-
Ok(())
70-
}
71-
}
72-
7317
#[derive(Default)]
7418
pub struct InOrderSegmentWriter {
7519
buffers: Arc<Mutex<OrderedBuffers>>,
7620
region: Region,
7721
region_offset: usize,
7822
}
7923

80-
impl InOrderSegmentWriter {
81-
pub async fn put(&mut self, data: Vec<ByteBuffer>) -> VortexResult<SegmentId> {
24+
#[async_trait]
25+
impl SegmentWriter for InOrderSegmentWriter {
26+
async fn put(&mut self, data: Vec<ByteBuffer>) -> VortexResult<SegmentId> {
8227
let buffer_idx = self.region.start + self.region_offset;
8328
if buffer_idx >= self.region.end {
8429
vortex_bail!("region space exhausted!");
@@ -87,7 +32,9 @@ impl InOrderSegmentWriter {
8732
self.region_offset += 1;
8833
self.next_segment_id_once_active().await
8934
}
35+
}
9036

37+
impl InOrderSegmentWriter {
9138
pub fn split(self, splits: usize) -> VortexResult<Vec<Self>> {
9239
Ok(self
9340
.buffers

vortex-file/src/strategy.rs

+18-13
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::collections::VecDeque;
44
use std::sync::Arc;
55

6+
use async_trait::async_trait;
67
use itertools::Itertools;
78
use vortex_array::arcref::ArcRef;
89
use vortex_array::arrays::ConstantArray;
@@ -130,8 +131,9 @@ struct BtrBlocksCompressedWriter {
130131
previous_chunk: Option<PreviousCompression>,
131132
}
132133

134+
#[async_trait]
133135
impl LayoutWriter for BtrBlocksCompressedWriter {
134-
fn push_chunk(
136+
async fn push_chunk(
135137
&mut self,
136138
segment_writer: &mut dyn SegmentWriter,
137139
chunk: ArrayRef,
@@ -197,15 +199,17 @@ impl LayoutWriter for BtrBlocksCompressedWriter {
197199

198200
compressed_chunk.statistics().inherit(chunk.statistics());
199201

200-
self.child.push_chunk(segment_writer, compressed_chunk)
202+
self.child
203+
.push_chunk(segment_writer, compressed_chunk)
204+
.await
201205
}
202206

203-
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
204-
self.child.flush(segment_writer)
207+
async fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
208+
self.child.flush(segment_writer).await
205209
}
206210

207-
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
208-
self.child.finish(segment_writer)
211+
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
212+
self.child.finish(segment_writer).await
209213
}
210214
}
211215

@@ -234,8 +238,9 @@ struct BufferedWriter {
234238
child: Box<dyn LayoutWriter>,
235239
}
236240

241+
#[async_trait]
237242
impl LayoutWriter for BufferedWriter {
238-
fn push_chunk(
243+
async fn push_chunk(
239244
&mut self,
240245
segment_writer: &mut dyn SegmentWriter,
241246
chunk: ArrayRef,
@@ -248,7 +253,7 @@ impl LayoutWriter for BufferedWriter {
248253
while self.nbytes > self.buffer_size {
249254
if let Some(chunk) = self.chunks.pop_front() {
250255
self.nbytes -= chunk.nbytes() as u64;
251-
self.child.push_chunk(segment_writer, chunk)?;
256+
self.child.push_chunk(segment_writer, chunk).await?;
252257
} else {
253258
break;
254259
}
@@ -257,15 +262,15 @@ impl LayoutWriter for BufferedWriter {
257262
Ok(())
258263
}
259264

260-
fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
265+
async fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
261266
for chunk in self.chunks.drain(..) {
262-
self.child.push_chunk(segment_writer, chunk)?;
267+
self.child.push_chunk(segment_writer, chunk).await?;
263268
}
264-
self.child.flush(segment_writer)
269+
self.child.flush(segment_writer).await
265270
}
266271

267-
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
268-
self.child.finish(segment_writer)
272+
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
273+
self.child.finish(segment_writer).await
269274
}
270275
}
271276

vortex-file/src/writer.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#![allow(dead_code)]
12
use std::sync::Arc;
23

34
use futures::StreamExt;
@@ -12,7 +13,7 @@ use vortex_layout::scan::TaskExecutor;
1213
use vortex_layout::{LayoutContext, LayoutStrategy, LayoutWriter};
1314

1415
use crate::footer::{FileStatistics, FooterFlatBufferWriter, Postscript, PostscriptSegment};
15-
use crate::segments::writer::BufferedSegmentWriter;
16+
use crate::segments::writer::InOrderSegmentWriter;
1617
use crate::strategy::VortexLayoutStrategy;
1718
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
1819

@@ -85,30 +86,24 @@ impl VortexWriteOptions {
8586

8687
// Our buffered message writer accumulates messages for each batch so we can flush them
8788
// into the file.
88-
let mut segment_writer = BufferedSegmentWriter::default();
89+
let mut segment_writer = InOrderSegmentWriter::default();
8990
let mut segment_specs = vec![];
9091

9192
// Then write the stream via the root layout
9293
while let Some(chunk) = stream.next().await {
9394
let chunk = chunk?;
94-
layout_writer.push_chunk(&mut segment_writer, chunk)?;
95+
layout_writer.push_chunk(&mut segment_writer, chunk).await?;
9596
// NOTE(ngates): we could spawn this task and continue to compress the next chunk.
96-
segment_writer
97-
.flush_async(&mut write, &mut segment_specs)
98-
.await?;
97+
segment_writer.flush(&mut write, &mut segment_specs).await?;
9998
}
10099

101100
// Flush the final layout messages into the file
102-
layout_writer.flush(&mut segment_writer)?;
103-
segment_writer
104-
.flush_async(&mut write, &mut segment_specs)
105-
.await?;
101+
layout_writer.flush(&mut segment_writer).await?;
102+
segment_writer.flush(&mut write, &mut segment_specs).await?;
106103

107104
// Finish the layouts and flush the finishing messages into the file
108-
let layout = layout_writer.finish(&mut segment_writer)?;
109-
segment_writer
110-
.flush_async(&mut write, &mut segment_specs)
111-
.await?;
105+
let layout = layout_writer.finish(&mut segment_writer).await?;
106+
segment_writer.flush(&mut write, &mut segment_specs).await?;
112107

113108
// We write our footer components in order of least likely to be needed to most likely.
114109
// DType is the least likely to be needed, as many readers may provide this from an

vortex-layout/src/layouts/chunked/writer.rs

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::Arc;
22

3+
use async_trait::async_trait;
34
use vortex_array::arcref::ArcRef;
45
use vortex_array::{ArrayContext, ArrayRef};
56
use vortex_dtype::DType;
@@ -56,8 +57,9 @@ impl ChunkedLayoutWriter {
5657
}
5758
}
5859

60+
#[async_trait]
5961
impl LayoutWriter for ChunkedLayoutWriter {
60-
fn push_chunk(
62+
async fn push_chunk(
6163
&mut self,
6264
segment_writer: &mut dyn SegmentWriter,
6365
chunk: ArrayRef,
@@ -70,24 +72,24 @@ impl LayoutWriter for ChunkedLayoutWriter {
7072
.options
7173
.chunk_strategy
7274
.new_writer(&self.ctx, chunk.dtype())?;
73-
chunk_writer.push_chunk(segment_writer, chunk)?;
74-
chunk_writer.flush(segment_writer)?;
75+
chunk_writer.push_chunk(segment_writer, chunk).await?;
76+
chunk_writer.flush(segment_writer).await?;
7577
self.chunks.push(chunk_writer);
7678

7779
Ok(())
7880
}
7981

80-
fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
82+
async fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
8183
// We flush each chunk as we write it, so there's nothing to do here.
8284
Ok(())
8385
}
8486

85-
fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
87+
async fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
8688
// Call finish on each chunk's writer
8789
let mut children = vec![];
8890
for writer in self.chunks.iter_mut() {
8991
// FIXME(ngates): we should try calling finish after each chunk.
90-
children.push(writer.finish(segment_writer)?);
92+
children.push(writer.finish(segment_writer).await?);
9193
}
9294

9395
// If there's only one child, there's no point even writing a stats table since

vortex-layout/src/layouts/dict/writer/mod.rs

+9-7
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use async_trait::async_trait;
12
use bytes::Bytes;
23
use vortex_array::arcref::ArcRef;
34
use vortex_array::vtable::EncodingVTable as _;
@@ -71,14 +72,15 @@ struct DelegatingDictLayoutWriter {
7172
writer: Option<Box<dyn LayoutWriter>>,
7273
}
7374

75+
#[async_trait]
7476
impl LayoutWriter for DelegatingDictLayoutWriter {
75-
fn push_chunk(
77+
async fn push_chunk(
7678
&mut self,
7779
segment_writer: &mut dyn crate::segments::SegmentWriter,
7880
chunk: ArrayRef,
7981
) -> VortexResult<()> {
8082
match self.writer.as_mut() {
81-
Some(writer) => writer.push_chunk(segment_writer, chunk),
83+
Some(writer) => writer.push_chunk(segment_writer, chunk).await,
8284
None => {
8385
let compressed = BtrBlocksCompressor.compress(&chunk)?;
8486
let mut writer = if !compressed.is_encoding(DictEncoding.id()) {
@@ -91,30 +93,30 @@ impl LayoutWriter for DelegatingDictLayoutWriter {
9193
)
9294
.boxed()
9395
};
94-
writer.push_chunk(segment_writer, chunk)?;
96+
writer.push_chunk(segment_writer, chunk).await?;
9597
self.writer = Some(writer);
9698
Ok(())
9799
}
98100
}
99101
}
100102

101-
fn flush(
103+
async fn flush(
102104
&mut self,
103105
segment_writer: &mut dyn crate::segments::SegmentWriter,
104106
) -> VortexResult<()> {
105107
match self.writer.as_mut() {
106108
None => vortex_bail!("flush called before push_chunk"),
107-
Some(writer) => writer.flush(segment_writer),
109+
Some(writer) => writer.flush(segment_writer).await,
108110
}
109111
}
110112

111-
fn finish(
113+
async fn finish(
112114
&mut self,
113115
segment_writer: &mut dyn crate::segments::SegmentWriter,
114116
) -> VortexResult<Layout> {
115117
match self.writer.as_mut() {
116118
None => vortex_bail!("finish called before push_chunk"),
117-
Some(writer) => writer.finish(segment_writer),
119+
Some(writer) => writer.finish(segment_writer).await,
118120
}
119121
}
120122
}

0 commit comments

Comments
 (0)