Skip to content

Commit cf4ae1d

Browse files
committed
fix flush
1 parent 2cf24d5 commit cf4ae1d

File tree

2 files changed

+23
-13
lines changed

2 files changed

+23
-13
lines changed

vortex-file/src/segments/ordered.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ impl OrderedBuffers {
5757
pub fn finish_region(&mut self, region: &Region) {
5858
self.active_regions.remove(&region);
5959
if let Ok(first) = self.first_region() {
60-
if let Some(waker) = self.wakers.get(&first) {
60+
if let Some(waker) = self.wakers.remove(&first) {
6161
waker.wake_by_ref();
6262
}
6363
}
@@ -93,13 +93,15 @@ impl OrderedBuffers {
9393
.ok_or_else(|| vortex_err!("no active regions"))
9494
}
9595

96-
pub fn completed_buffers(&mut self) -> VortexResult<BTreeMap<usize, Vec<ByteBuffer>>> {
97-
let completed_until = self.first_region()?;
98-
let mut completed = std::mem::take(&mut self.data);
99-
// truncate completed to include only completed regions
100-
let in_progress = completed.split_off(&completed_until.start);
101-
self.data = in_progress;
102-
Ok(completed)
96+
pub fn take_buffers(&mut self) -> VortexResult<BTreeMap<usize, Vec<ByteBuffer>>> {
97+
if self.active_regions.len() > 1 {
98+
vortex_bail!("there are more than one active writers");
99+
}
100+
if !self.wakers.is_empty() {
101+
vortex_bail!("there is an inflight write");
102+
}
103+
self.active_regions = [Region::default()].into();
104+
Ok(std::mem::take(&mut self.data))
103105
}
104106

105107
pub fn next_segment_id(&mut self) -> SegmentId {

vortex-file/src/segments/writer.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,17 @@ impl SegmentWriter for InOrderSegmentWriter {
3535
}
3636

3737
impl InOrderSegmentWriter {
38+
// TODO: split to get &self, should modify self.region to come after all the splits
3839
pub fn split(self, splits: usize) -> VortexResult<Vec<Self>> {
40+
let unwritten_region = Region {
41+
start: self.region.start + self.region_offset,
42+
end: self.region.end,
43+
};
44+
3945
Ok(self
4046
.buffers
4147
.lock()
42-
.split_region(&self.region, splits)?
48+
.split_region(&unwritten_region, splits)?
4349
.map(|region| Self {
4450
buffers: self.buffers.clone(),
4551
region,
@@ -61,10 +67,12 @@ impl InOrderSegmentWriter {
6167
writer: &mut futures::io::Cursor<W>,
6268
segment_specs: &mut Vec<SegmentSpec>,
6369
) -> VortexResult<()> {
64-
let completed = {
65-
let mut guard = self.buffers.lock();
66-
guard.completed_buffers()?
67-
};
70+
let completed = self.buffers.lock().take_buffers()?;
71+
// we are the only writer if here, reclaim the entire region
72+
self.region = Region::default();
73+
self.region_offset = 0;
74+
75+
// TODO(os): spawn everything below
6876
for buffers in completed.into_values() {
6977
// The API requires us to write these buffers contiguously. Therefore, we can only
7078
// respect the alignment of the first one.

0 commit comments

Comments
 (0)