Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 70 additions & 28 deletions server/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ use std::any::Any;
use std::path::PathBuf;
use std::ptr::slice_from_raw_parts;
use std::usize;
use std::marker::PhantomData;

use mork::{OwnedExpr, ExprTrait};
use mork::SpaceReaderZipper;
use mork::{Space, space::serialize_sexpr_into};
use pathmap::zipper::{ZipperIteration, ZipperMoving, ZipperWriting, ZipperReadOnlyConditionalIteration, ZipperReadOnlyConditionalValues, ZipperAbsolutePath};
use pathmap::path_serialization::PathIterator;
use tokio::fs::File;
use tokio::io::{BufWriter, AsyncWriteExt};

Expand Down Expand Up @@ -483,6 +486,67 @@ fn do_export(ctx: &MorkService, (reader, pattern): (ReadPermission, OwnedExpr),
Ok(WorkResult::Immediate(hyper::body::Bytes::from(buffer)))
}

struct RZExprIterator<'w, 's, RZ>
where
RZ: SpaceReaderZipper<'s>,
<RZ as ZipperReadOnlyConditionalValues<'s, ()>>::WitnessT: 'w,
{
zipper: RZ,
witness: <RZ as ZipperReadOnlyConditionalValues<'s, ()>>::WitnessT,
pattern: OwnedExpr,
template: OwnedExpr,
buf: [u8; 4096],
_marker: PhantomData<&'w()>,
}

impl<'w, 's, RZ> RZExprIterator<'w, 's, RZ>
where
RZ: SpaceReaderZipper<'s>,
<RZ as ZipperReadOnlyConditionalValues<'s, ()>>::WitnessT: 'w,
{
pub fn new(
zipper: RZ,
pattern: OwnedExpr,
template: OwnedExpr,
) -> Self {
let witness = zipper.witness();
Self {
zipper, witness,
pattern, template,
buf: [0; 4096],
_marker: PhantomData,
}
}
}

impl<'w, 's, RZ> PathIterator for RZExprIterator<'w, 's, RZ>
where
RZ: SpaceReaderZipper<'s>,
<RZ as ZipperReadOnlyConditionalValues<'s, ()>>::WitnessT: 'w,
{
fn next_path(&mut self) -> Result<Option<&[u8]>, std::io::Error> {
let Self { zipper, witness, buf, pattern, template, .. } = self;
while let Some(()) = zipper.to_next_get_val_with_witness(&witness) {
let path = zipper.origin_path();
let mut oz = ExprZipper::new(Expr{ ptr: buf.as_mut_ptr() });
// println!("dump transforming {:?} with {:?} => {:?}", Expr{ ptr: p.as_ptr() as *mut u8 }, pattern.borrow(), template.borrow());
match (Expr{ ptr: path.as_ptr() as *mut u8 }.transformData(pattern.borrow(), template.borrow(), &mut oz)) {
Ok(()) => {
let length = oz.loc;
drop(oz); // make oz no longer point at the buffer
// println!("success {:?}", Expr{ ptr: (*buf.get()).as_mut_ptr() });
return Ok(Some(&buf[..length]))
}
Err(_e) => {
// println!("failure");
continue
}
}
}
Ok(None)
}
}

fn dump_as_format<W: Write>(ctx: &MorkService, writer: &mut std::io::BufWriter<W>, (mut reader, pattern): (ReadPermission, OwnedExpr), template: OwnedExpr, format: DataFormat) -> Result<(), CommandError> {
match format {
DataFormat::Metta => {
Expand All @@ -501,32 +565,10 @@ fn dump_as_format<W: Write>(ctx: &MorkService, writer: &mut std::io::BufWriter<W
// #[cfg(not(feature="interning"))]
DataFormat::Paths => {
// println!("serializing...");
thread_local!{
static buf: std::cell::UnsafeCell<[u8; 4096]> = std::cell::UnsafeCell::new([0; 4096]);
}
buf.with(|b| {
let mut rz = ctx.0.space.read_zipper(&mut reader);
let wn = rz.witness();
pathmap::path_serialization::for_each_path_serialize(writer, || {
while let Some(()) = rz.to_next_get_val_with_witness(&wn) {
let p = rz.origin_path();
let mut oz = ExprZipper::new(Expr{ ptr: unsafe { (*b.get()).as_mut_ptr() } });
// println!("dump transforming {:?} with {:?} => {:?}", Expr{ ptr: p.as_ptr() as *mut u8 }, pattern.borrow(), template.borrow());
match (Expr{ ptr: p.as_ptr() as *mut u8 }.transformData(pattern.borrow(), template.borrow(), &mut oz)) {
Ok(()) => unsafe {
// println!("success {:?}", Expr{ ptr: (*b.get()).as_mut_ptr() });
return Ok(Some(slice_from_raw_parts((*b.get()).as_ptr(), oz.loc).as_ref().unwrap()))
}
Err(_e) => {
// println!("failure");
continue
}
}
}
Ok(None)
}
)
}).map_err(|e| CommandError::internal(format!("Error occurred writing raw paths: {e:?}")))?;
let rz = ctx.0.space.read_zipper(&mut reader);
pathmap::path_serialization::for_each_path_serialize(
writer, RZExprIterator::new(rz, pattern, template)
).map_err(|e| CommandError::internal(format!("Error occurred writing raw paths: {e:?}")))?;
}
};
Ok(())
Expand Down Expand Up @@ -746,9 +788,9 @@ fn do_parse<SrcStream: Read + BufRead>(space: &ServerSpace, src: SrcStream, patt
let bl = writer.path().len();
let mut wz = space.write_zipper(writer);
thread_local!{
static buf: std::cell::UnsafeCell<[u8; 4096]> = std::cell::UnsafeCell::new([0; 4096]);
static BUF: std::cell::UnsafeCell<[u8; 4096]> = std::cell::UnsafeCell::new([0; 4096]);
}
let pathmap::path_serialization::DeserializationStats { path_count, .. } = buf.with(|b| {
let pathmap::path_serialization::DeserializationStats { path_count, .. } = BUF.with(|b| {
// println!("for each deserialized...");
pathmap::path_serialization::for_each_deserialized_path(src, |k, p| {
let mut oz = ExprZipper::new(Expr{ ptr: unsafe { (*b.get()).as_mut_ptr() } });
Expand Down