-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rust bindings for CDF reads #612
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Cargo.lock | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,37 +1,44 @@ | ||
use std::sync::Arc; | ||
|
||
use arrow::compute::filter_record_batch; | ||
use arrow::datatypes::SchemaRef; | ||
use arrow::datatypes::SchemaRef as ArrowSchemaRef; | ||
use arrow::error::ArrowError; | ||
use arrow::pyarrow::PyArrowType; | ||
use delta_kernel::engine::arrow_data::ArrowEngineData; | ||
use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; | ||
|
||
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; | ||
use delta_kernel::engine::default::DefaultEngine; | ||
use delta_kernel::scan::ScanResult; | ||
use delta_kernel::table_changes::scan::{ | ||
TableChangesScan as KernelTableChangesScan, | ||
TableChangesScanBuilder as KernelTableChangesScanBuilder, | ||
}; | ||
use delta_kernel::Error as KernelError; | ||
use delta_kernel::{engine::arrow_data::ArrowEngineData, schema::StructType}; | ||
use delta_kernel::{DeltaResult, Engine}; | ||
|
||
use pyo3::exceptions::PyValueError; | ||
use pyo3::prelude::*; | ||
|
||
use url::Url; | ||
|
||
use arrow::record_batch::{RecordBatch, RecordBatchIterator, RecordBatchReader}; | ||
use delta_kernel::Engine; | ||
|
||
use std::collections::HashMap; | ||
|
||
struct KernelError(delta_kernel::Error); | ||
struct PyKernelError(KernelError); | ||
|
||
impl From<KernelError> for PyErr { | ||
fn from(error: KernelError) -> Self { | ||
impl From<PyKernelError> for PyErr { | ||
fn from(error: PyKernelError) -> Self { | ||
PyValueError::new_err(format!("Kernel error: {}", error.0)) | ||
} | ||
} | ||
|
||
impl From<delta_kernel::Error> for KernelError { | ||
fn from(delta_kernel_error: delta_kernel::Error) -> Self { | ||
impl From<KernelError> for PyKernelError { | ||
fn from(delta_kernel_error: KernelError) -> Self { | ||
Self(delta_kernel_error) | ||
} | ||
} | ||
|
||
type DeltaPyResult<T> = std::result::Result<T, KernelError>; | ||
type DeltaPyResult<T> = std::result::Result<T, PyKernelError>; | ||
|
||
#[pyclass] | ||
struct Table(delta_kernel::Table); | ||
|
@@ -40,7 +47,7 @@ struct Table(delta_kernel::Table); | |
impl Table { | ||
#[new] | ||
fn new(location: &str) -> DeltaPyResult<Self> { | ||
let location = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?; | ||
let location = Url::parse(location).map_err(KernelError::InvalidUrl)?; | ||
let table = delta_kernel::Table::new(location); | ||
Ok(Table(table)) | ||
} | ||
|
@@ -73,11 +80,43 @@ impl ScanBuilder { | |
} | ||
|
||
fn build(&mut self) -> DeltaPyResult<Scan> { | ||
let scan = self.0.take().unwrap().build()?; | ||
let scan = self | ||
.0 | ||
.take() | ||
.ok_or_else(|| KernelError::generic("Can only call build() once on ScanBuilder"))? | ||
.build()?; | ||
Ok(Scan(scan)) | ||
} | ||
} | ||
|
||
fn try_get_schema(schema: &Arc<StructType>) -> Result<ArrowSchemaRef, KernelError> { | ||
Ok(Arc::new(schema.as_ref().try_into().map_err(|e| { | ||
KernelError::Generic(format!("Could not get result schema: {e}")) | ||
})?)) | ||
} | ||
|
||
fn try_create_record_batch_iter( | ||
results: impl Iterator<Item = DeltaResult<ScanResult>>, | ||
result_schema: ArrowSchemaRef, | ||
) -> RecordBatchIterator<impl Iterator<Item = Result<RecordBatch, ArrowError>>> { | ||
let record_batches = results.map(|res| { | ||
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); | ||
let (mask, data) = scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; | ||
let record_batch: RecordBatch = data | ||
.into_any() | ||
.downcast::<ArrowEngineData>() | ||
.map_err(|_| ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()))? | ||
.into(); | ||
if let Some(mask) = mask { | ||
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; | ||
Ok(filtered_batch) | ||
} else { | ||
Ok(record_batch) | ||
} | ||
}); | ||
RecordBatchIterator::new(record_batches, result_schema) | ||
} | ||
|
||
#[pyclass] | ||
struct Scan(delta_kernel::scan::Scan); | ||
|
||
|
@@ -87,50 +126,80 @@ impl Scan { | |
&self, | ||
engine_interface: &PythonInterface, | ||
) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> { | ||
let result_schema: SchemaRef = | ||
Arc::new(self.0.schema().as_ref().try_into().map_err(|e| { | ||
delta_kernel::Error::Generic(format!("Could not get result schema: {e}")) | ||
})?); | ||
let results = self.0.execute(engine_interface.0.as_ref())?; | ||
let record_batches: Vec<_> = results | ||
.map(|res| { | ||
let scan_res = res.and_then(|res| Ok((res.full_mask(), res.raw_data?))); | ||
let (mask, data) = | ||
scan_res.map_err(|e| ArrowError::from_external_error(Box::new(e)))?; | ||
let record_batch: RecordBatch = data | ||
.into_any() | ||
.downcast::<ArrowEngineData>() | ||
.map_err(|_| { | ||
ArrowError::CastError("Couldn't cast to ArrowEngineData".to_string()) | ||
})? | ||
.into(); | ||
if let Some(mask) = mask { | ||
let filtered_batch = filter_record_batch(&record_batch, &mask.into())?; | ||
Ok(filtered_batch) | ||
} else { | ||
Ok(record_batch) | ||
} | ||
}) | ||
.collect(); | ||
let record_batch_iter = RecordBatchIterator::new(record_batches, result_schema); | ||
let result_schema: ArrowSchemaRef = try_get_schema(self.0.schema())?; | ||
let results = self.0.execute(engine_interface.0.clone())?; | ||
let record_batch_iter = try_create_record_batch_iter(results, result_schema); | ||
Ok(PyArrowType(Box::new(record_batch_iter))) | ||
} | ||
} | ||
|
||
#[pyclass] | ||
struct TableChangesScanBuilder(Option<KernelTableChangesScanBuilder>); | ||
|
||
#[pymethods] | ||
impl TableChangesScanBuilder { | ||
#[new] | ||
#[pyo3(signature = (table, engine_interface, start_version, end_version=None))] | ||
fn new( | ||
table: &Table, | ||
engine_interface: &PythonInterface, | ||
start_version: u64, | ||
end_version: Option<u64>, | ||
) -> DeltaPyResult<TableChangesScanBuilder> { | ||
let table_changes = table | ||
.0 | ||
.table_changes(engine_interface.0.as_ref(), start_version, end_version)?; | ||
Ok(TableChangesScanBuilder(Some( | ||
table_changes.into_scan_builder(), | ||
))) | ||
} | ||
|
||
fn build(&mut self) -> DeltaPyResult<TableChangesScan> { | ||
let scan = self | ||
.0 | ||
.take() | ||
.ok_or_else(|| { | ||
KernelError::generic("Can only call build() once on TableChangesScanBuilder") | ||
})? | ||
.build()?; | ||
let schema: ArrowSchemaRef = try_get_schema(scan.schema())?; | ||
Ok(TableChangesScan { scan, schema }) | ||
} | ||
} | ||
|
||
#[pyclass] | ||
struct TableChangesScan { | ||
scan: KernelTableChangesScan, | ||
schema: ArrowSchemaRef, | ||
} | ||
|
||
#[pymethods] | ||
impl TableChangesScan { | ||
fn execute( | ||
&self, | ||
engine_interface: &PythonInterface, | ||
) -> DeltaPyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> { | ||
let result_schema = self.schema.clone(); | ||
let results = self.scan.execute(engine_interface.0.clone())?; | ||
let record_batch_iter = try_create_record_batch_iter(results, result_schema); | ||
Ok(PyArrowType(Box::new(record_batch_iter))) | ||
} | ||
} | ||
|
||
#[pyclass] | ||
struct PythonInterface(Box<dyn Engine + Send>); | ||
struct PythonInterface(Arc<dyn Engine + Send>); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what was the motivation to change from box to arc? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is related to this kernel change. Now we have to pass an |
||
|
||
#[pymethods] | ||
impl PythonInterface { | ||
#[new] | ||
fn new(location: &str) -> DeltaPyResult<Self> { | ||
let url = Url::parse(location).map_err(delta_kernel::Error::InvalidUrl)?; | ||
let url = Url::parse(location).map_err(KernelError::InvalidUrl)?; | ||
let client = DefaultEngine::try_new( | ||
&url, | ||
HashMap::<String, String>::new(), | ||
Arc::new(TokioBackgroundExecutor::new()), | ||
)?; | ||
Ok(PythonInterface(Box::new(client))) | ||
Ok(PythonInterface(Arc::new(client))) | ||
} | ||
} | ||
|
||
|
@@ -144,5 +213,7 @@ fn delta_kernel_rust_sharing_wrapper(m: &Bound<'_, PyModule>) -> PyResult<()> { | |
m.add_class::<Snapshot>()?; | ||
m.add_class::<ScanBuilder>()?; | ||
m.add_class::<Scan>()?; | ||
m.add_class::<TableChangesScanBuilder>()?; | ||
m.add_class::<TableChangesScan>()?; | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you run
cargo
commands ormaturin develop
, aCargo.lock
file may be generated. Adding it to the gitignore makes sure you don't accidentally add it to git.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for further context, generally for libraries (like this) you don't want to commit whereas binaries you normally do