Skip to content

Commit

Permalink
feat: enable read
Browse files Browse the repository at this point in the history
  • Loading branch information
zjregee committed Jul 22, 2024
1 parent 2af316a commit 16ef11d
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 0 deletions.
34 changes: 34 additions & 0 deletions src/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::cmp::min;
use std::ptr;

use vm_memory::bitmap::BitmapSlice;
Expand All @@ -7,10 +8,15 @@ use vm_memory::VolatileSlice;
use crate::error::*;

pub trait ReadWriteAtVolatile<B: BitmapSlice> {
fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize>;
fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize>;
}

impl<'a, B: BitmapSlice, T: ReadWriteAtVolatile<B> + ?Sized> ReadWriteAtVolatile<B> for &'a T {
fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize> {
(**self).read_vectored_at_volatile(bufs)
}

fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize> {
(**self).write_vectored_at_volatile(bufs)
}
Expand All @@ -33,6 +39,34 @@ impl BufferWrapper {
}

impl<B: BitmapSlice> ReadWriteAtVolatile<B> for BufferWrapper {
fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize> {
let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard_mut()).collect();
let iovecs: Vec<_> = slice_guards
.iter()
.map(|s| libc::iovec {
iov_base: s.as_ptr() as *mut libc::c_void,
iov_len: s.len() as libc::size_t,
})
.collect();
if iovecs.is_empty() {
return Ok(0);
}
let data = self.buffer.borrow().to_vec();
let mut result = 0;
for (index, iovec) in iovecs.iter().enumerate() {
let num = min(data.len() - result, iovec.iov_len);
if num == 0 {
break;
}
unsafe {
ptr::copy_nonoverlapping(data[result..].as_ptr(), iovec.iov_base as *mut u8, num)
}
bufs[index].bitmap().mark_dirty(0, num);
result += num;
}
Ok(result)
}

fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice<B>]) -> Result<usize> {
let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard()).collect();
let iovecs: Vec<_> = slice_guards
Expand Down
59 changes: 59 additions & 0 deletions src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl Filesystem {
Opcode::Setattr => self.setattr(in_header, r, w),
Opcode::Create => self.create(in_header, r, w),
Opcode::Open => self.open(in_header, r, w),
Opcode::Read => self.read(in_header, r, w),
Opcode::Write => self.write(in_header, r, w),
Opcode::Destroy => self.destory(),
Opcode::Access => self.access(in_header, r, w),
Expand Down Expand Up @@ -215,6 +216,7 @@ impl Filesystem {
if let Some(file_key) = self.get_opened(&path) {
let mut file = self.get_opened_inode(file_key).unwrap();
file.stat.st_ino = file_key as u64;
file.stat.st_size = 8;
let out = EntryOut {
nodeid: file_key as u64,
generation: 0,
Expand All @@ -234,6 +236,7 @@ impl Filesystem {
let file_key = self.insert_opened_inode(file.clone());
self.insert_opened(&path, file_key);
file.stat.st_ino = file_key as u64;
file.stat.st_size = 8;

let out = EntryOut {
nodeid: file_key as u64,
Expand All @@ -256,6 +259,7 @@ impl Filesystem {
Err(_) => return Filesystem::reply_error(in_header.unique, w),
};
stat.st_ino = in_header.nodeid;
stat.st_size = 8;

let out = AttrOut {
attr_valid: Duration::from_secs(5).as_secs(),
Expand Down Expand Up @@ -368,6 +372,55 @@ impl Filesystem {
Filesystem::reply_ok(Some(out), None, in_header.unique, w)
}

fn read(&self, in_header: InHeader, mut r: Reader, mut w: Writer) -> Result<usize> {
debug!("[Filesystem] read: key={}", in_header.nodeid);

let file = self.get_opened_inode(in_header.nodeid as usize);
let path = match file {
Ok(file) => file.path,
Err(_) => return Filesystem::reply_error(in_header.unique, w),
};

let ReadIn { offset, size, .. } = r.read_obj().map_err(|e| {
new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into()))
})?;

debug!(
"[Filesystem] read: key={} offset={} size={}",
in_header.nodeid, offset, size
);

let data = match self
.rt
.block_on(self.do_read(&path))
{
Ok(data) => data,
Err(_) => return Filesystem::reply_error(in_header.unique, w),
};
let len = data.len();
let buffer = BufferWrapper::new(data);

debug!(
"[Filesystem] read: key={} offset={} size={} len={} buffer={:?}",
in_header.nodeid, offset, size, len, buffer.get_buffer()
);

let mut data_writer = w.split_at(size_of::<OutHeader>()).unwrap();
data_writer.write_from_at(&buffer, len).map_err(|e| {
new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
})?;

let out = OutHeader {
len: (size_of::<OutHeader>() + len) as u32,
error: 0,
unique: in_header.unique,
};
w.write_all(out.as_slice()).map_err(|e| {
new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into()))
})?;
Ok(out.len as usize)
}

fn write(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result<usize> {
debug!("[Filesystem] write: key={}", in_header.nodeid);

Expand Down Expand Up @@ -511,6 +564,12 @@ impl Filesystem {
Ok(())
}

async fn do_read(&self, path: &str) -> Result<Buffer> {
let data = self.core.read(path).await.map_err(opendal_error2error)?;

Ok(data)
}

async fn do_write(&self, path: &str, data: Buffer) -> Result<()> {
self.core
.write(path, data)
Expand Down
15 changes: 15 additions & 0 deletions src/filesystem_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum Opcode {
Getattr = 3,
Setattr = 4,
Open = 14,
Read = 15,
Write = 16,
Getxattr = 22,
Release = 18,
Expand All @@ -30,6 +31,7 @@ impl TryFrom<u32> for Opcode {
3 => Ok(Opcode::Getattr),
4 => Ok(Opcode::Setattr),
14 => Ok(Opcode::Open),
15 => Ok(Opcode::Read),
16 => Ok(Opcode::Write),
18 => Ok(Opcode::Release),
22 => Ok(Opcode::Getxattr),
Expand Down Expand Up @@ -192,6 +194,18 @@ pub struct WriteOut {
pub padding: u32,
}

#[repr(C)]
#[derive(Debug, Default, Clone, Copy)]
pub struct ReadIn {
pub fh: u64,
pub offset: u64,
pub size: u32,
pub read_flags: u32,
pub lock_owner: u64,
pub flags: u32,
pub padding: u32,
}

unsafe impl ByteValued for InHeader {}
unsafe impl ByteValued for OutHeader {}
unsafe impl ByteValued for InitIn {}
Expand All @@ -202,3 +216,4 @@ unsafe impl ByteValued for CreateIn {}
unsafe impl ByteValued for OpenOut {}
unsafe impl ByteValued for WriteIn {}
unsafe impl ByteValued for WriteOut {}
unsafe impl ByteValued for ReadIn {}
56 changes: 56 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,48 @@ impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> {
self.bytes_consumed = total_bytes_consumed;
Ok(bytes_consumed)
}

fn split_at(&mut self, offset: usize) -> Result<DescriptorChainConsumer<'a, B>> {
let mut remain = offset;
let pos = self.buffers.iter().position(|vs| {
if remain < vs.len() {
true
} else {
remain -= vs.len();
false
}
});
if let Some(at) = pos {
let mut other = self.buffers.split_off(at);
if remain > 0 {
let front = other.pop_front().expect("empty VecDeque after split");
self.buffers.push_back(
front
.subslice(0, remain)
.map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?,
);
other.push_front(
front
.offset(remain)
.map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?,
);
}
Ok(DescriptorChainConsumer {
buffers: other,
bytes_consumed: 0,
})
} else if remain == 0 {
Ok(DescriptorChainConsumer {
buffers: VecDeque::new(),
bytes_consumed: 0,
})
} else {
Err(new_vhost_user_fs_error(
"DescriptorChain split is out of bounds",
None,
))
}
}
}

pub struct Reader<'a, B = ()> {
Expand Down Expand Up @@ -210,6 +252,20 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> {
pub fn bytes_written(&self) -> usize {
self.buffer.bytes_consumed()
}

pub fn split_at(&mut self, offset: usize) -> Result<Writer<'a, B>> {
self.buffer.split_at(offset).map(|buffer| Writer { buffer })
}

pub fn write_from_at<F: ReadWriteAtVolatile<B>>(
&mut self,
src: F,
count: usize,
) -> io::Result<usize> {
self.buffer
.consume(count, |bufs| src.read_vectored_at_volatile(bufs))
.map_err(|err| err.into())
}
}

impl<'a, B: BitmapSlice> Write for Writer<'a, B> {
Expand Down

0 comments on commit 16ef11d

Please sign in to comment.