Skip to content

Commit e2a717a

Browse files
committed
ordered segments buffer
1 parent fc3196c commit e2a717a

File tree

6 files changed

+239
-2
lines changed

6 files changed

+239
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ getrandom_v03 = { workspace = true }
2323
itertools = { workspace = true }
2424
linked_hash_set = { workspace = true }
2525
log = { workspace = true }
26+
parking_lot = { workspace = true }
2627
moka = { workspace = true, features = ["future"] }
2728
object_store = { workspace = true, optional = true }
2829
pin-project-lite = { workspace = true }

vortex-file/src/segments/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod cache;
2+
mod ordered;
23
pub(crate) mod writer;
34

45
pub use cache::*;

vortex-file/src/segments/ordered.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use std::collections::{BTreeMap, BTreeSet};
2+
use std::task::Waker;
3+
4+
use vortex_array::aliases::hash_map::HashMap;
5+
use vortex_buffer::ByteBuffer;
6+
use vortex_error::{VortexResult, vortex_bail, vortex_err};
7+
use vortex_layout::segments::SegmentId;
8+
9+
// [start, end)
10+
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash, Debug)]
11+
pub(super) struct Region {
12+
pub start: usize,
13+
pub end: usize,
14+
}
15+
16+
impl Default for Region {
17+
fn default() -> Self {
18+
Self {
19+
start: 0,
20+
end: usize::MAX,
21+
}
22+
}
23+
}
24+
25+
impl Region {
26+
pub fn split(self, splits: usize) -> VortexResult<impl Iterator<Item = Self>> {
27+
let step = (self.end - self.start) / splits;
28+
if step == 0 {
29+
vortex_bail!("region space exhausted!");
30+
}
31+
Ok((self.start..self.end).step_by(step).map(move |start| Self {
32+
start,
33+
end: start + step,
34+
}))
35+
}
36+
}
37+
38+
pub(super) struct OrderedBuffers {
39+
data: BTreeMap<usize, Vec<ByteBuffer>>,
40+
active_regions: BTreeSet<Region>,
41+
wakers: HashMap<Region, Waker>,
42+
next_segment_id: SegmentId,
43+
}
44+
45+
impl Default for OrderedBuffers {
46+
fn default() -> Self {
47+
Self {
48+
data: Default::default(),
49+
active_regions: [Region::default()].into(),
50+
wakers: Default::default(),
51+
next_segment_id: Default::default(),
52+
}
53+
}
54+
}
55+
56+
impl OrderedBuffers {
57+
pub fn finish_region(&mut self, region: &Region) {
58+
self.active_regions.remove(&region);
59+
if let Ok(first) = self.first_region() {
60+
if let Some(waker) = self.wakers.get(&first) {
61+
waker.wake_by_ref();
62+
}
63+
}
64+
}
65+
66+
pub fn split_region(
67+
&mut self,
68+
region: &Region,
69+
splits: usize,
70+
) -> VortexResult<impl Iterator<Item = Region>> {
71+
if !self.active_regions.remove(&region) {
72+
vortex_bail!("region not active {:?}", region);
73+
}
74+
Ok(region.split(splits)?.map(|region| {
75+
self.active_regions.insert(region);
76+
region
77+
}))
78+
}
79+
80+
pub fn insert_buffer(&mut self, idx: usize, buffer: Vec<ByteBuffer>) {
81+
self.data.insert(idx, buffer);
82+
}
83+
84+
pub fn register_waker(&mut self, region: Region, waker: Waker) {
85+
// TODO(os): should this store a Vec<Waker> instead of replacing?
86+
self.wakers.insert(region, waker);
87+
}
88+
89+
pub fn first_region(&self) -> VortexResult<Region> {
90+
self.active_regions
91+
.first()
92+
.copied()
93+
.ok_or_else(|| vortex_err!("no active regions"))
94+
}
95+
96+
pub fn completed_buffers(&mut self) -> VortexResult<BTreeMap<usize, Vec<ByteBuffer>>> {
97+
let completed_until = self.first_region()?;
98+
let mut completed = std::mem::take(&mut self.data);
99+
// truncate completed to include only completed regions
100+
let in_progress = completed.split_off(&completed_until.start);
101+
self.data = in_progress;
102+
Ok(completed)
103+
}
104+
105+
pub fn next_segment_id(&mut self) -> SegmentId {
106+
let res = self.next_segment_id;
107+
self.next_segment_id = SegmentId::from(*res + 1);
108+
res
109+
}
110+
}

vortex-file/src/segments/writer.rs

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
use std::pin::Pin;
2+
use std::sync::Arc;
3+
use std::task::{Context, Poll};
4+
5+
use parking_lot::Mutex;
16
use vortex_buffer::{Alignment, ByteBuffer};
2-
use vortex_error::{VortexResult, vortex_err};
7+
use vortex_error::{VortexResult, vortex_bail, vortex_err};
38
use vortex_io::VortexWrite;
49
use vortex_layout::segments::{SegmentId, SegmentWriter};
510

11+
use super::ordered::{OrderedBuffers, Region};
612
use crate::footer::SegmentSpec;
713

814
/// A segment writer that holds buffers in memory until they are flushed by a writer.
@@ -63,3 +69,114 @@ impl BufferedSegmentWriter {
6369
Ok(())
6470
}
6571
}
72+
73+
#[derive(Default)]
74+
pub struct InOrderSegmentWriter {
75+
buffers: Arc<Mutex<OrderedBuffers>>,
76+
region: Region,
77+
region_offset: usize,
78+
}
79+
80+
impl InOrderSegmentWriter {
81+
pub async fn put(&mut self, data: Vec<ByteBuffer>) -> VortexResult<SegmentId> {
82+
let buffer_idx = self.region.start + self.region_offset;
83+
if buffer_idx >= self.region.end {
84+
vortex_bail!("region space exhausted!");
85+
}
86+
self.buffers.lock().insert_buffer(buffer_idx, data);
87+
self.region_offset += 1;
88+
self.next_segment_id_once_active().await
89+
}
90+
91+
pub fn split(self, splits: usize) -> VortexResult<Vec<Self>> {
92+
Ok(self
93+
.buffers
94+
.lock()
95+
.split_region(&self.region, splits)?
96+
.map(|region| Self {
97+
buffers: self.buffers.clone(),
98+
region,
99+
region_offset: 0,
100+
})
101+
.collect())
102+
}
103+
104+
async fn next_segment_id_once_active(&self) -> VortexResult<SegmentId> {
105+
WaitRegionFuture {
106+
buffers: self.buffers.clone(),
107+
region: self.region,
108+
}
109+
.await
110+
}
111+
112+
pub async fn flush<W: VortexWrite>(
113+
&mut self,
114+
writer: &mut futures::io::Cursor<W>,
115+
segment_specs: &mut Vec<SegmentSpec>,
116+
) -> VortexResult<()> {
117+
let completed = {
118+
let mut guard = self.buffers.lock();
119+
guard.completed_buffers()?
120+
};
121+
for buffers in completed.into_values() {
122+
// The API requires us to write these buffers contiguously. Therefore, we can only
123+
// respect the alignment of the first one.
124+
// Don't worry, in most cases the caller knows what they're doing and will align the
125+
// buffers themselves, inserting padding buffers where necessary.
126+
let alignment = buffers
127+
.first()
128+
.map(|buffer| buffer.alignment())
129+
.unwrap_or_else(Alignment::none);
130+
131+
// Add any padding required to align the segment.
132+
let offset = writer.position();
133+
let padding = offset.next_multiple_of(*alignment as u64) - offset;
134+
if padding > 0 {
135+
writer
136+
.write_all(ByteBuffer::zeroed(padding as usize))
137+
.await?;
138+
}
139+
let offset = writer.position();
140+
141+
for buffer in buffers {
142+
writer.write_all(buffer).await?;
143+
}
144+
145+
segment_specs.push(SegmentSpec {
146+
offset,
147+
length: u32::try_from(writer.position() - offset)
148+
.map_err(|_| vortex_err!("segment length exceeds maximum u32"))?,
149+
alignment,
150+
});
151+
}
152+
Ok(())
153+
}
154+
}
155+
156+
impl Drop for InOrderSegmentWriter {
157+
fn drop(&mut self) {
158+
self.buffers.lock().finish_region(&self.region);
159+
}
160+
}
161+
162+
struct WaitRegionFuture {
163+
buffers: Arc<Mutex<OrderedBuffers>>,
164+
region: Region,
165+
}
166+
167+
impl Future for WaitRegionFuture {
168+
type Output = VortexResult<SegmentId>;
169+
170+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
171+
let mut guard = self.buffers.lock();
172+
let current_first = match guard.first_region() {
173+
Ok(first) => first,
174+
Err(e) => return Poll::Ready(Err(e)),
175+
};
176+
if self.region == current_first {
177+
return Poll::Ready(Ok(guard.next_segment_id()));
178+
}
179+
guard.register_waker(self.region, cx.waker().clone());
180+
Poll::Pending
181+
}
182+
}

vortex-file/src/writer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
use std::sync::Arc;
2+
13
use futures::StreamExt;
24
use vortex_array::ArrayContext;
35
use vortex_array::stats::{PRUNING_STATS, Stat};
46
use vortex_array::stream::ArrayStream;
57
use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
68
use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
7-
use vortex_io::VortexWrite;
9+
use vortex_io::{IoDispatcher, VortexWrite};
810
use vortex_layout::layouts::file_stats::FileStatsLayoutWriter;
11+
use vortex_layout::scan::TaskExecutor;
912
use vortex_layout::{LayoutContext, LayoutStrategy, LayoutWriter};
1013

1114
use crate::footer::{FileStatistics, FooterFlatBufferWriter, Postscript, PostscriptSegment};
@@ -21,6 +24,8 @@ pub struct VortexWriteOptions {
2124
strategy: Box<dyn LayoutStrategy>,
2225
exclude_dtype: bool,
2326
file_statistics: Vec<Stat>,
27+
io_dispatcher: IoDispatcher,
28+
executor: Option<Arc<dyn TaskExecutor>>,
2429
}
2530

2631
impl Default for VortexWriteOptions {
@@ -29,6 +34,8 @@ impl Default for VortexWriteOptions {
2934
strategy: Box::new(VortexLayoutStrategy),
3035
exclude_dtype: false,
3136
file_statistics: PRUNING_STATS.to_vec(),
37+
io_dispatcher: IoDispatcher::shared(),
38+
executor: None,
3239
}
3340
}
3441
}

0 commit comments

Comments
 (0)