Skip to content

concurrent compression #3206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench-vortex/src/compress/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use tokio::runtime::Handle;
use vortex::Array;
use vortex::arrow::IntoArrowArray;
use vortex::error::VortexResult;
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
use vortex::file::{VortexLayoutStrategy, VortexOpenOptions, VortexWriteOptions};
use vortex::stream::ArrayStreamArrayExt;

#[inline(never)]
pub async fn vortex_compress_write(array: &dyn Array, buf: &mut Vec<u8>) -> VortexResult<u64> {
Ok(VortexWriteOptions::default()
.with_strategy(VortexLayoutStrategy::default().with_tokio_executor(Handle::current()))
.write(Cursor::new(buf), array.to_array_stream())
.await?
.position())
Expand Down
2 changes: 1 addition & 1 deletion encodings/dict/src/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub const UNCONSTRAINED: DictConstraints = DictConstraints {
max_len: usize::MAX,
};

pub trait DictEncoder: Send {
pub trait DictEncoder: Send + Sync {
fn encode(&mut self, array: &dyn Array) -> VortexResult<ArrayRef>;

fn values(&mut self) -> VortexResult<ArrayRef>;
Expand Down
1 change: 1 addition & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ getrandom_v03 = { workspace = true }
itertools = { workspace = true }
linked_hash_set = { workspace = true }
log = { workspace = true }
parking_lot = { workspace = true }
moka = { workspace = true, features = ["future"] }
object_store = { workspace = true, optional = true }
pin-project-lite = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl SegmentSource for InMemorySegmentReader {
async move {
let segment: &SegmentSpec = segment_map
.get(*id as usize)
.ok_or_else(|| vortex_err!("segment not found"))?;
.ok_or_else(|| vortex_err!("segment not found {id}"))?;

let start =
usize::try_from(segment.offset).vortex_expect("segment offset larger than usize");
Expand Down
1 change: 1 addition & 0 deletions vortex-file/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod cache;
mod ordered;
pub(crate) mod writer;

pub use cache::*;
115 changes: 115 additions & 0 deletions vortex-file/src/segments/ordered.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
use std::collections::{BTreeMap, BTreeSet};
use std::task::Waker;

use vortex_array::aliases::hash_map::HashMap;
use vortex_buffer::ByteBuffer;
use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
use vortex_layout::segments::SegmentId;

#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Debug)]
pub(super) struct Section(Vec<usize>);

impl Default for Section {
fn default() -> Self {
Section(vec![0])
}
}

impl Section {
pub fn subsection(&self, idx: usize) -> Self {
let mut ordinals = self.0.clone();
ordinals.push(idx);
Section(ordinals)
}

pub fn increment(&mut self) {
let last = self.0.last_mut().vortex_expect("must have section id");
*last += 1;
}

pub fn split(&self, splits: usize, starting_from: usize) -> impl Iterator<Item = Self> {
(starting_from..splits + starting_from).map(|idx| self.subsection(idx))
}
}

pub(super) struct OrderedBuffers {
data: BTreeMap<Section, Vec<ByteBuffer>>,
active_sections: BTreeSet<Section>,
wakers: HashMap<Section, Waker>,
next_segment_id: SegmentId,
}

impl Default for OrderedBuffers {
fn default() -> Self {
Self {
data: Default::default(),
active_sections: [Section::default()].into(),
wakers: Default::default(),
next_segment_id: Default::default(),
}
}
}

impl OrderedBuffers {
pub fn finish_section(
&mut self,
section: &Section,
) -> Option<BTreeMap<Section, Vec<ByteBuffer>>> {
self.active_sections.remove(section);
let Ok(first) = self.first_section() else {
// last section finished, all is completed
assert!(self.wakers.is_empty(), "all wakers must have been removed");
return Some(std::mem::take(&mut self.data));
};

if let Some(waker) = self.wakers.remove(&first) {
waker.wake_by_ref();
}
// tail includes incomplete buffers, self.data to only include complete buffers.
let mut tail = self.data.split_off(&first);
// swap so tail points to complete buffers, and self.data to incomplete buffers.
std::mem::swap(&mut tail, &mut self.data);
Some(tail)
}

pub fn split_section(
&mut self,
section: &Section,
splits: usize,
starting_from: usize,
) -> VortexResult<impl Iterator<Item = Section>> {
if !self.active_sections.remove(&section) {
vortex_bail!("section not active {:?}", section);
}
Ok(section.split(splits, starting_from).map(|section| {
self.active_sections.insert(section.clone());
section
}))
}

pub fn add_section(&mut self, section: &Section) {
self.active_sections.insert(section.clone());
}

pub fn insert_buffer(&mut self, idx: Section, buffer: Vec<ByteBuffer>) {
self.data.insert(idx, buffer);
}

pub fn register_waker(&mut self, section: Section, waker: Waker) {
// TODO(os): should this store a Vec<Waker> instead of replacing?
self.wakers.insert(section, waker);
}

pub fn first_section(&self) -> VortexResult<Section> {
self.active_sections
.first()
.cloned()
.ok_or_else(|| vortex_err!("no active sections"))
}

pub fn next_segment_id(&mut self) -> SegmentId {
let res = self.next_segment_id;
self.next_segment_id = SegmentId::from(*res + 1);
res
}
}
141 changes: 119 additions & 22 deletions vortex-file/src/segments/writer.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,132 @@
#![allow(dead_code)]
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use async_trait::async_trait;
use futures::StreamExt;
use futures::channel::mpsc;
use futures::io::Cursor;
use parking_lot::Mutex;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_error::{VortexResult, vortex_err};
use vortex_io::VortexWrite;
use vortex_layout::segments::{SegmentId, SegmentWriter};
use vortex_layout::segments::{ConcurrentSegmentWriter, SegmentId, SegmentWriter};

use super::ordered::{OrderedBuffers, Section};
use crate::footer::SegmentSpec;

/// A segment writer that holds buffers in memory until they are flushed by a writer.
#[derive(Default)]
pub(crate) struct BufferedSegmentWriter {
/// A Vec byte buffers for segments
segments_buffers: Vec<Vec<ByteBuffer>>,
next_id: SegmentId,
pub struct InOrderSegmentWriter {
buffers: Arc<Mutex<OrderedBuffers>>,
section: Section,
subsection_idx: usize,
buffers_tx: mpsc::UnboundedSender<Vec<ByteBuffer>>,
}

#[async_trait]
impl SegmentWriter for InOrderSegmentWriter {
async fn put(&mut self, data: Vec<ByteBuffer>) -> VortexResult<SegmentId> {
self.buffers
.lock()
.insert_buffer(self.section.subsection(self.subsection_idx), data);
self.subsection_idx += 1;
self.next_segment_id_once_active().await
}
}

impl ConcurrentSegmentWriter for InOrderSegmentWriter {
fn split_off(&mut self, splits: usize) -> VortexResult<Vec<Box<dyn ConcurrentSegmentWriter>>> {
let mut guard = self.buffers.lock();
let splits = guard
.split_section(&self.section, splits, self.subsection_idx)?
.map(|section| {
Box::new(Self {
buffers: self.buffers.clone(),
buffers_tx: self.buffers_tx.clone(),
section,
subsection_idx: 0,
}) as Box<dyn ConcurrentSegmentWriter>
})
.collect();
self.section.increment();
guard.add_section(&self.section);
Ok(splits)
}
}

impl InOrderSegmentWriter {
pub fn create() -> (Self, SegmentFlusher) {
// TODO(os): make this bounded, slow I/O means we will buffer
// in memory unbounded. Currently tx is used in an impl Drop so
// we can't do a bounded async send.
let (buffers_tx, rx) = mpsc::unbounded();
(
InOrderSegmentWriter {
buffers: Default::default(),
section: Section::default(),
subsection_idx: 0,
buffers_tx,
},
SegmentFlusher {
rx,
segment_specs: Vec::new(),
},
)
}
async fn next_segment_id_once_active(&self) -> VortexResult<SegmentId> {
WaitRegionFuture {
buffers: self.buffers.clone(),
section: self.section.clone(),
}
.await
}
}

impl SegmentWriter for BufferedSegmentWriter {
fn put(&mut self, data: &[ByteBuffer]) -> SegmentId {
self.segments_buffers.push(data.to_vec());
let id = self.next_id;
self.next_id = SegmentId::from(*self.next_id + 1);
id
impl Drop for InOrderSegmentWriter {
fn drop(&mut self) {
let Some(completed) = self.buffers.lock().finish_section(&self.section) else {
return;
};
for buffer in completed.into_values() {
self.buffers_tx.unbounded_send(buffer).expect("no");
}
}
}

impl BufferedSegmentWriter {
/// Flush the segments to the provided async writer.
pub async fn flush_async<W: VortexWrite>(
&mut self,
writer: &mut futures::io::Cursor<W>,
segment_specs: &mut Vec<SegmentSpec>,
) -> VortexResult<()> {
for buffers in self.segments_buffers.drain(..) {
struct WaitRegionFuture {
buffers: Arc<Mutex<OrderedBuffers>>,
section: Section,
}

impl Future for WaitRegionFuture {
type Output = VortexResult<SegmentId>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut guard = self.buffers.lock();
let current_first = match guard.first_section() {
Ok(first) => first,
Err(e) => return Poll::Ready(Err(e)),
};
if self.section == current_first {
return Poll::Ready(Ok(guard.next_segment_id()));
}
guard.register_waker(self.section.clone(), cx.waker().clone());
Poll::Pending
}
}

pub struct SegmentFlusher {
rx: mpsc::UnboundedReceiver<Vec<ByteBuffer>>,
segment_specs: Vec<SegmentSpec>,
}

impl SegmentFlusher {
pub async fn flush<W: VortexWrite>(
mut self,
mut writer: Cursor<W>,
) -> VortexResult<(Cursor<W>, Vec<SegmentSpec>)> {
while let Some(buffers) = self.rx.next().await {
// The API requires us to write these buffers contiguously. Therefore, we can only
// respect the alignment of the first one.
// Don't worry, in most cases the caller knows what they're doing and will align the
Expand All @@ -53,13 +150,13 @@ impl BufferedSegmentWriter {
writer.write_all(buffer).await?;
}

segment_specs.push(SegmentSpec {
self.segment_specs.push(SegmentSpec {
offset,
length: u32::try_from(writer.position() - offset)
.map_err(|_| vortex_err!("segment length exceeds maximum u32"))?,
alignment,
});
}
Ok(())
Ok((writer, self.segment_specs))
}
}
Loading
Loading