diff --git a/gix-odb/src/cache.rs b/gix-odb/src/cache.rs index 77022a0966..ebe1bb9785 100644 --- a/gix-odb/src/cache.rs +++ b/gix-odb/src/cache.rs @@ -91,6 +91,22 @@ impl Cache { } } +impl Cache> +where + S: Deref + Clone, +{ + /// Find an object and return its decoded bytes as a stream. + pub fn try_find_stream( + &self, + id: &gix_hash::oid, + ) -> Result)>, gix_object::find::Error> { + match self.pack_cache.as_ref().map(RefCell::borrow_mut) { + Some(mut pack_cache) => self.inner.try_find_stream(id, pack_cache.deref_mut()), + None => self.inner.try_find_stream(id, &mut gix_pack::cache::Never), + } + } +} + impl From for Cache where S: gix_pack::Find, diff --git a/gix-odb/src/find.rs b/gix-odb/src/find.rs index 1ee1fa8edd..14bae35d31 100644 --- a/gix-odb/src/find.rs +++ b/gix-odb/src/find.rs @@ -1,3 +1,73 @@ +use std::io::{self, Cursor, Read}; + +/// A streaming view over an object's decoded bytes. +pub struct Stream { + kind: gix_object::Kind, + size: u64, + inner: StreamInner, +} + +enum StreamInner { + InMemory(Cursor>), + File(std::fs::File), + Loose(crate::store_impls::loose::find::StreamReader), +} + +impl Stream { + /// Return the kind of the object yielded by this stream. + pub fn kind(&self) -> gix_object::Kind { + self.kind + } + + /// Return the decoded object size in bytes. + pub fn size(&self) -> u64 { + self.size + } + + /// Return an empty blob stream. + pub fn empty_blob() -> Self { + Self::from_bytes(gix_object::Kind::Blob, Vec::new()) + } + + pub(crate) fn from_bytes(kind: gix_object::Kind, data: Vec) -> Self { + Self { + kind, + size: data.len() as u64, + inner: StreamInner::InMemory(Cursor::new(data)), + } + } + + pub(crate) fn from_file(kind: gix_object::Kind, size: u64, file: std::fs::File) -> Self { + Self { + kind, + size, + inner: StreamInner::File(file), + } + } + + pub(crate) fn from_loose( + kind: gix_object::Kind, + size: u64, + reader: crate::store_impls::loose::find::StreamReader, + ) -> Self { + Self { + kind, + size, + inner: StreamInner::Loose(reader), + } + } +} + +impl Read for Stream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + match &mut self.inner { + StreamInner::InMemory(cursor) => cursor.read(buf), + StreamInner::File(file) => file.read(buf), + StreamInner::Loose(reader) => reader.read(buf), + } + } +} + /// An object header informing about object properties, without it being fully decoded in the process. #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum Header { diff --git a/gix-odb/src/memory.rs b/gix-odb/src/memory.rs index 6862437f1f..31a6e33c24 100644 --- a/gix-odb/src/memory.rs +++ b/gix-odb/src/memory.rs @@ -123,6 +123,26 @@ impl Proxy { } } +impl Proxy>> +where + S: Deref + Clone, +{ + /// Find an object and return its decoded bytes as a stream. + pub fn try_find_stream( + &self, + id: &gix_hash::oid, + ) -> Result)>, gix_object::find::Error> + { + if let Some(map) = self.memory.as_ref() { + let map = map.borrow(); + if let Some((kind, data)) = map.get(id) { + return Ok(Some((crate::find::Stream::from_bytes(*kind, data.clone()), None))); + } + } + self.inner.try_find_stream(id) + } +} + impl Clone for Proxy where T: Clone, diff --git a/gix-odb/src/store_impls/dynamic/find.rs b/gix-odb/src/store_impls/dynamic/find.rs index 13608b7c87..78de22491e 100644 --- a/gix-odb/src/store_impls/dynamic/find.rs +++ b/gix-odb/src/store_impls/dynamic/find.rs @@ -85,7 +85,7 @@ impl super::Handle where S: Deref + Clone, { - fn try_find_cached_inner<'a, 'b>( + pub(crate) fn try_find_cached_inner<'a, 'b>( &'b self, mut id: &'b gix_hash::oid, buffer: &'a mut Vec, diff --git a/gix-odb/src/store_impls/dynamic/mod.rs b/gix-odb/src/store_impls/dynamic/mod.rs index 7ea462b24a..a7e6fc5140 100644 --- a/gix-odb/src/store_impls/dynamic/mod.rs +++ b/gix-odb/src/store_impls/dynamic/mod.rs @@ -57,6 +57,7 @@ pub mod find; pub mod prefix; mod header; +mod stream; /// pub mod iter; diff --git a/gix-odb/src/store_impls/dynamic/stream.rs b/gix-odb/src/store_impls/dynamic/stream.rs new file mode 100644 index 0000000000..43e1376134 --- /dev/null +++ b/gix-odb/src/store_impls/dynamic/stream.rs @@ -0,0 +1,230 @@ +use std::{io::Seek, ops::Deref}; + +use gix_features::zlib; +use gix_hash::oid; +use gix_pack::cache::DecodeEntry; + +use super::find::Error; +use crate::store::{find::error::DeltaBaseRecursion, handle, load_index}; + +impl super::Handle +where + S: Deref + Clone, +{ + pub(crate) fn try_find_stream_inner<'b>( + &'b self, + mut id: &'b gix_hash::oid, + inflate: &mut zlib::Inflate, + pack_cache: &mut dyn DecodeEntry, + snapshot: &mut load_index::Snapshot, + recursion: Option>, + ) -> Result)>, Error> { + if let Some(r) = recursion { + if r.depth >= self.max_recursion_depth { + return Err(Error::DeltaBaseRecursionLimit { + max_depth: self.max_recursion_depth, + id: r.original_id.to_owned(), + }); + } + } else if !self.ignore_replacements { + if let Ok(pos) = self + .store + .replacements + .binary_search_by(|(map_this, _)| map_this.as_ref().cmp(id)) + { + id = self.store.replacements[pos].1.as_ref(); + } + } + + 'outer: loop { + { + let marker = snapshot.marker; + for (idx, index) in snapshot.indices.iter_mut().enumerate() { + if let Some(handle::index_lookup::Outcome { + object_index: handle::IndexForObjectInPack { pack_id, pack_offset }, + index_file, + pack: possibly_pack, + }) = index.lookup(id) + { + let pack = match possibly_pack { + Some(pack) => pack, + None => match self.store.load_pack(pack_id, marker)? { + Some(pack) => { + *possibly_pack = Some(pack); + possibly_pack.as_deref().expect("just put it in") + } + None => match self.store.load_one_index(self.refresh, snapshot.marker)? { + Some(new_snapshot) => { + *snapshot = new_snapshot; + self.clear_cache(); + continue 'outer; + } + None => return Ok(None), + }, + }, + }; + let resolved_pack_id = pack.id; + let entry = pack.entry(pack_offset)?; + let header_size = entry.header_size(); + let result = { + let mut scratch = Vec::new(); + let mut temp = tempfile::tempfile()?; + let result = match pack.decode_entry_to_write( + entry, + &mut scratch, + inflate, + &mut temp, + &|id, _out| { + index_file.pack_offset_by_id(id).and_then(|pack_offset| { + pack.entry(pack_offset) + .ok() + .map(gix_pack::data::decode::entry::ResolvedBase::InPack) + }) + }, + pack_cache, + ) { + Ok(outcome) => Ok((outcome, temp)), + Err(gix_pack::data::decode::Error::DeltaBaseUnresolved(base_id)) => { + let mut buf = Vec::new(); + let obj_kind = self + .try_find_cached_inner( + &base_id, + &mut buf, + inflate, + pack_cache, + snapshot, + recursion + .map(DeltaBaseRecursion::inc_depth) + .or_else(|| DeltaBaseRecursion::new(id).into()), + ) + .map_err(|err| Error::DeltaBaseLookup { + err: Box::new(err), + base_id, + id: id.to_owned(), + })? + .ok_or_else(|| Error::DeltaBaseMissing { + base_id, + id: id.to_owned(), + })? + .0 + .kind; + let handle::index_lookup::Outcome { + object_index: + handle::IndexForObjectInPack { + pack_id: _, + pack_offset, + }, + index_file, + pack: possibly_pack, + } = match snapshot.indices[idx].lookup(id) { + Some(res) => res, + None => { + let mut out = None; + for index in &mut snapshot.indices { + out = index.lookup(id); + if out.is_some() { + break; + } + } + + out.unwrap_or_else(|| { + panic!( + "could not find object {id} in any index after looking up one of its base objects {base_id}" + ) + }) + } + }; + let pack = possibly_pack + .as_ref() + .expect("pack to still be available like just now"); + let entry = pack.entry(pack_offset)?; + let mut scratch = Vec::new(); + let mut temp = tempfile::tempfile()?; + pack.decode_entry_to_write( + entry, + &mut scratch, + inflate, + &mut temp, + &|id, out| { + index_file + .pack_offset_by_id(id) + .and_then(|pack_offset| { + pack.entry(pack_offset) + .ok() + .map(gix_pack::data::decode::entry::ResolvedBase::InPack) + }) + .or_else(|| { + (id == base_id).then(|| { + out.resize(buf.len(), 0); + out.copy_from_slice(buf.as_slice()); + gix_pack::data::decode::entry::ResolvedBase::OutOfPack { + kind: obj_kind, + end: out.len(), + } + }) + }) + }, + pack_cache, + ) + .map(|outcome| (outcome, temp)) + } + Err(err) => Err(err), + }?; + result + }; + let (outcome, mut temp) = result; + temp.rewind()?; + let res = ( + crate::find::Stream::from_file(outcome.kind, outcome.object_size, temp), + Some(gix_pack::data::entry::Location { + pack_id: resolved_pack_id, + pack_offset, + entry_size: outcome.compressed_size + header_size, + }), + ); + + if idx != 0 { + snapshot.indices.swap(0, idx); + } + return Ok(Some(res)); + } + } + } + + for lodb in snapshot.loose_dbs.iter() { + if lodb.contains(id) { + return lodb + .try_find_stream(id) + .map(|obj| obj.map(|obj| (obj, None))) + .map_err(Into::into); + } + } + + match self.store.load_one_index(self.refresh, snapshot.marker)? { + Some(new_snapshot) => { + *snapshot = new_snapshot; + self.clear_cache(); + } + None => return Ok(None), + } + } + } +} + +impl super::Handle +where + S: Deref + Clone, +{ + /// Try to find the object identified by `id` in any backing store and return it as a readable stream, + /// along with its pack location if it came from a pack. + pub fn try_find_stream( + &self, + id: &oid, + pack_cache: &mut dyn DecodeEntry, + ) -> Result)>, gix_object::find::Error> { + let mut snapshot = self.snapshot.borrow_mut(); + let mut inflate = self.inflate.borrow_mut(); + self.try_find_stream_inner(id, &mut inflate, pack_cache, &mut snapshot, None) + .map_err(|err| Box::new(err) as _) + } +} diff --git a/gix-odb/src/store_impls/loose/find.rs b/gix-odb/src/store_impls/loose/find.rs index 6ac6d26de6..0dc0fd2e10 100644 --- a/gix-odb/src/store_impls/loose/find.rs +++ b/gix-odb/src/store_impls/loose/find.rs @@ -1,4 +1,10 @@ -use std::{cmp::Ordering, collections::HashSet, fs, io::Read, path::PathBuf}; +use std::{ + cmp::Ordering, + collections::HashSet, + fs, + io::{self, Read}, + path::PathBuf, +}; use gix_features::zlib; @@ -27,6 +33,275 @@ pub enum Error { }, } +pub(crate) struct StreamReader { + file: fs::File, + inflate: zlib::Decompress, + compressed: [u8; 8192], + compressed_pos: usize, + compressed_len: usize, + pending: Vec, + pending_pos: usize, + remaining: u64, + done: bool, +} + +impl StreamReader { + fn new(file: fs::File) -> Self { + Self { + file, + inflate: zlib::Decompress::default(), + compressed: [0; 8192], + compressed_pos: 0, + compressed_len: 0, + pending: Vec::new(), + pending_pos: 0, + remaining: 0, + done: false, + } + } + + fn finish_stream(&mut self) -> io::Result<()> { + if self.done { + return Ok(()); + } + + let mut scratch = [0u8; 256]; + loop { + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed)?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let status = self + .inflate + .decompress(input, &mut scratch, flush) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "corrupt deflate stream"))?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + + if produced != 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "loose object stream exceeded the declared size", + )); + } + + match status { + zlib::Status::StreamEnd => { + self.done = true; + return Ok(()); + } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "deflate stream made no progress", + )); + } + } + } + } + + fn read_header(mut self, path: &std::path::Path) -> Result<(gix_object::Kind, u64, Self), Error> { + let mut decompressed = [0u8; HEADER_MAX_SIZE + 8192]; + let mut produced_total = 0usize; + loop { + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed).map_err(|source| Error::Io { + source, + action: "read", + path: path.to_owned(), + })?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let status = self + .inflate + .decompress(input, &mut decompressed[produced_total..], flush) + .map_err(|source| Error::DecompressFile { + source: source.into(), + path: path.to_owned(), + })?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + produced_total += produced; + + if let Ok((kind, size, header_size)) = gix_object::decode::loose_header(&decompressed[..produced_total]) { + let pending_len = produced_total.saturating_sub(header_size); + if pending_len as u64 > size { + return Err(Error::SizeMismatch { + expected: size, + actual: pending_len as u64, + path: path.to_owned(), + }); + } + self.pending = decompressed[header_size..produced_total].to_vec(); + self.remaining = size - pending_len as u64; + self.done = self.remaining == 0 && matches!(status, zlib::Status::StreamEnd); + return Ok((kind, size, self)); + } + + if produced_total == decompressed.len() { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header exceeded the supported maximum size", + }, + )); + } + + match status { + zlib::Status::StreamEnd => { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header terminated before it could be parsed", + }, + )) + } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(Error::Decode( + gix_object::decode::LooseHeaderDecodeError::InvalidHeader { + message: "loose object header terminated before it could be parsed", + }, + )); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 || produced != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(Error::DecompressFile { + source: zlib::inflate::Error::Status(zlib::Status::BufError), + path: path.to_owned(), + }); + } + } + } + } +} + +impl io::Read for StreamReader { + fn read(&mut self, mut out: &mut [u8]) -> io::Result { + let mut total_written = 0usize; + + if out.is_empty() { + return Ok(0); + } + + if self.pending_pos < self.pending.len() { + let pending = &self.pending[self.pending_pos..]; + let count = pending.len().min(out.len()); + out[..count].copy_from_slice(&pending[..count]); + self.pending_pos += count; + total_written += count; + out = &mut out[count..]; + if self.pending_pos == self.pending.len() { + self.pending.clear(); + self.pending_pos = 0; + } + if self.remaining == 0 { + self.finish_stream()?; + return Ok(total_written); + } + if out.is_empty() { + return Ok(total_written); + } + } + + if self.remaining == 0 { + self.finish_stream()?; + return Ok(total_written); + } + + loop { + if self.done { + return Ok(total_written); + } + if self.compressed_pos == self.compressed_len { + self.compressed_len = self.file.read(&mut self.compressed)?; + self.compressed_pos = 0; + } + + let input = &self.compressed[self.compressed_pos..self.compressed_len]; + let eof = input.is_empty(); + let before_in = self.inflate.total_in(); + let before_out = self.inflate.total_out(); + let flush = if eof { + zlib::FlushDecompress::Finish + } else { + zlib::FlushDecompress::None + }; + let out_len = out.len().min(usize::try_from(self.remaining).unwrap_or(usize::MAX)); + let status = self + .inflate + .decompress(input, &mut out[..out_len], flush) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "corrupt deflate stream"))?; + let consumed = (self.inflate.total_in() - before_in) as usize; + let produced = (self.inflate.total_out() - before_out) as usize; + self.compressed_pos += consumed; + total_written += produced; + self.remaining = self + .remaining + .checked_sub(produced as u64) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "loose object exceeded the declared size"))?; + + match status { + zlib::Status::StreamEnd => { + if self.remaining != 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } + self.done = true; + return Ok(total_written); + } + zlib::Status::Ok | zlib::Status::BufError if produced != 0 => { + if self.remaining == 0 { + self.finish_stream()?; + } + return Ok(total_written); + } + zlib::Status::Ok | zlib::Status::BufError if eof => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "truncated loose object stream", + )); + } + zlib::Status::Ok | zlib::Status::BufError if consumed != 0 => {} + zlib::Status::Ok | zlib::Status::BufError => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "deflate stream made no progress", + )); + } + } + } + } +} + /// Object lookup impl Store { const OPEN_ACTION: &'static str = "open"; @@ -134,6 +409,28 @@ impl Store { } } + /// Return the object identified by `id` as a decoded byte stream. + /// + /// Returns `Ok(None)` if there is no such object. + pub fn try_find_stream(&self, id: &gix_hash::oid) -> Result, Error> { + debug_assert_eq!(self.object_hash, id.kind()); + let path = hash_path(id, self.path.clone()); + let file = match fs::File::open(&path) { + Ok(file) => file, + Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(None), + Err(source) => { + return Err(Error::Io { + source, + action: Self::OPEN_ACTION, + path, + }) + } + }; + + let (kind, size, reader) = StreamReader::new(file).read_header(&path)?; + Ok(Some(crate::find::Stream::from_loose(kind, size, reader))) + } + /// Return only the decompressed size of the object and its kind without fully reading it into memory as tuple of `(size, kind)`. /// Returns `None` if `id` does not exist in the database. pub fn try_header(&self, id: &gix_hash::oid) -> Result, Error> { diff --git a/gix-odb/tests/odb/store/loose.rs b/gix-odb/tests/odb/store/loose.rs index 456a3b30ad..1607da07df 100644 --- a/gix-odb/tests/odb/store/loose.rs +++ b/gix-odb/tests/odb/store/loose.rs @@ -33,6 +33,49 @@ pub fn locate_oid(id: gix_hash::ObjectId, buf: &mut Vec) -> gix_object::Data ldb().try_find(&id, buf).expect("read success").expect("id present") } +mod stream { + use std::io::ErrorKind; + + use gix_object::Write; + use gix_odb::loose::Store; + + #[test] + fn truncated_streams_fail_instead_of_returning_early_eof() -> crate::Result { + let dir = gix_testtools::tempfile::tempdir()?; + let store = Store::at(dir.path(), gix_hash::Kind::Sha1); + let id = store.write_buf(gix_object::Kind::Blob, &vec![b'x'; 64 * 1024])?; + let path = store.object_path(&id); + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + + let mut permissions = std::fs::metadata(&path)?.permissions(); + permissions.set_mode(permissions.mode() | 0o200); + std::fs::set_permissions(&path, permissions)?; + } + #[cfg(not(unix))] + { + let mut permissions = std::fs::metadata(&path)?.permissions(); + permissions.set_readonly(false); + std::fs::set_permissions(&path, permissions)?; + } + + let mut compressed = std::fs::read(&path)?; + compressed.truncate(compressed.len() - 1); + std::fs::write(&path, compressed)?; + + let mut stream = store.try_find_stream(id.as_ref())?.expect("object still addressable"); + let err = std::io::copy(&mut stream, &mut std::io::sink()).expect_err("truncated streams must fail"); + assert!( + matches!(err.kind(), ErrorKind::UnexpectedEof | ErrorKind::InvalidData), + "expected stream corruption to surface as EOF or invalid data, got {:?}", + err.kind() + ); + Ok(()) + } +} + #[test] fn verify_integrity() { let db = ldb(); diff --git a/gix-pack/src/cache/delta/traverse/resolve.rs b/gix-pack/src/cache/delta/traverse/resolve.rs index fbb038b5d0..36ae3b3bff 100644 --- a/gix-pack/src/cache/delta/traverse/resolve.rs +++ b/gix-pack/src/cache/delta/traverse/resolve.rs @@ -188,7 +188,13 @@ where header_ofs += consumed; fully_resolved_delta_bytes.resize(result_size as usize, 0); - data::delta::apply(&base_bytes, fully_resolved_delta_bytes, &delta_bytes[header_ofs..])?; + let mut target = &mut fully_resolved_delta_bytes[..]; + data::delta::apply( + &base_bytes, + &mut target, + &delta_bytes[header_ofs..], + result_size as usize, + )?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers // at all @@ -363,10 +369,12 @@ where header_ofs += consumed; fully_resolved_delta_bytes.resize(result_size as usize, 0); + let mut target = &mut fully_resolved_delta_bytes[..]; data::delta::apply( &base_bytes, - &mut fully_resolved_delta_bytes, + &mut target, &delta_bytes[header_ofs..], + result_size as usize, )?; // FIXME: this actually invalidates the "pack_offset()" computation, which is not obvious to consumers diff --git a/gix-pack/src/data/delta.rs b/gix-pack/src/data/delta.rs index f5b7dfb05e..278ec949ef 100644 --- a/gix-pack/src/data/delta.rs +++ b/gix-pack/src/data/delta.rs @@ -10,6 +10,8 @@ pub mod apply { DeltaCopyBaseSliceMismatch, #[error("Delta copy data: byte slices must match")] DeltaCopyDataSliceMismatch, + #[error("Delta output size mismatch: expected {expected} bytes, got {actual}")] + OutputSizeMismatch { expected: usize, actual: usize }, } } @@ -30,8 +32,14 @@ pub(crate) fn decode_header_size(d: &[u8]) -> (u64, usize) { (size, consumed) } -pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<(), apply::Error> { +pub(crate) fn apply( + base: &[u8], + target: &mut W, + data: &[u8], + expected_size: usize, +) -> Result<(), apply::Error> { let mut i = 0; + let mut produced = 0usize; while let Some(cmd) = data.get(i) { i += 1; match cmd { @@ -69,19 +77,46 @@ pub(crate) fn apply(base: &[u8], mut target: &mut [u8], data: &[u8]) -> Result<( size = 0x10000; // 65536 } let ofs = ofs as usize; - std::io::Write::write(&mut target, &base[ofs..ofs + size as usize]) + let size = size as usize; + target + .write_all(&base[ofs..ofs + size]) .map_err(|_e| apply::Error::DeltaCopyBaseSliceMismatch)?; + produced += size; } 0 => return Err(apply::Error::UnsupportedCommandCode), size => { - std::io::Write::write(&mut target, &data[i..i + *size as usize]) + let size = *size as usize; + target + .write_all(&data[i..i + size]) .map_err(|_e| apply::Error::DeltaCopyDataSliceMismatch)?; - i += *size as usize; + produced += size; + i += size; } } } - assert_eq!(i, data.len()); - assert_eq!(target.len(), 0); + debug_assert_eq!(i, data.len()); + + if produced != expected_size { + return Err(apply::Error::OutputSizeMismatch { + expected: expected_size, + actual: produced, + }); + } Ok(()) } + +#[cfg(test)] +mod tests { + use super::apply; + + #[test] + fn rejects_outputs_shorter_than_declared() { + let mut out = Vec::new(); + let err = super::apply(b"hello", &mut out, &[0x90, 0x05], 6).expect_err("malformed deltas must fail"); + assert!(matches!( + err, + apply::Error::OutputSizeMismatch { expected: 6, actual: 5 } + )); + } +} diff --git a/gix-pack/src/data/file/decode/entry.rs b/gix-pack/src/data/file/decode/entry.rs index 9711961e8f..e79f4402e8 100644 --- a/gix-pack/src/data/file/decode/entry.rs +++ b/gix-pack/src/data/file/decode/entry.rs @@ -1,4 +1,4 @@ -use std::ops::Range; +use std::{io, ops::Range}; use gix_features::zlib; use smallvec::SmallVec; @@ -147,6 +147,40 @@ impl File { .map(|(_status, consumed_in, consumed_out)| (consumed_in, consumed_out)) } + fn decompress_entry_to_write( + &self, + entry: &data::Entry, + inflate: &mut zlib::Inflate, + out: &mut dyn io::Write, + ) -> Result { + let offset: usize = entry.data_offset.try_into().expect("offset representable by machine"); + assert!(offset < self.data.len(), "entry offset out of bounds"); + + inflate.reset(); + let mut input = io::Cursor::new(&self.data[offset..]); + let mut buf = [0u8; 8192]; + let mut total_written = 0usize; + loop { + let written = gix_features::zlib::stream::inflate::read(&mut input, &mut inflate.state, &mut buf)?; + if written == 0 { + break; + } + out.write_all(&buf[..written])?; + total_written += written; + if total_written as u64 == entry.decompressed_size { + break; + } + } + + if total_written as u64 != entry.decompressed_size { + return Err(Error::ZlibInflate(gix_features::zlib::inflate::Error::Status( + gix_features::zlib::Status::BufError, + ))); + } + + Ok(inflate.state.total_in() as usize) + } + /// Decode an entry, resolving delta's as needed, while growing the `out` vector if there is not enough /// space to hold the result object. /// @@ -187,6 +221,49 @@ impl File { } } + /// Decode an entry while streaming the fully resolved object bytes into `out`. + /// + /// Unlike [`decode_entry()`][Self::decode_entry()], this never returns + /// the decoded object bytes to the caller. Instead, it uses `scratch` + /// for the temporary buffers needed during delta resolution and writes + /// the final object bytes into `out`. + pub fn decode_entry_to_write( + &self, + entry: data::Entry, + scratch: &mut Vec, + inflate: &mut zlib::Inflate, + out: &mut dyn io::Write, + resolve: &dyn Fn(&gix_hash::oid, &mut Vec) -> Option, + delta_cache: &mut dyn cache::DecodeEntry, + ) -> Result { + use crate::data::entry::Header::*; + if let Some((kind, packed_size)) = delta_cache.get(self.id, entry.data_offset, scratch) { + out.write_all(scratch)?; + return Ok(Outcome { + kind, + num_deltas: 0, + decompressed_size: entry.decompressed_size, + compressed_size: packed_size, + object_size: scratch.len() as u64, + }); + } + + match entry.header { + Tree | Blob | Commit | Tag => self + .decompress_entry_to_write(&entry, inflate, out) + .map(|consumed_input| { + Outcome::from_object_entry( + entry.header.as_kind().expect("a non-delta entry"), + &entry, + consumed_input, + ) + }), + OfsDelta { .. } | RefDelta { .. } => { + self.resolve_deltas_to_write(entry, resolve, inflate, scratch, out, delta_cache) + } + } + } + /// resolve: technically, this shouldn't ever be required as stored local packs don't refer to objects by id /// that are outside of the pack. Unless, of course, the ref refers to an object within this pack, which means /// it's very, very large as 20bytes are smaller than the corresponding MSB encoded number @@ -375,7 +452,8 @@ impl File { if delta_idx + 1 == chain_len { last_result_size = Some(result_size); } - delta::apply(&source_buf[..base_size], &mut target_buf[..result_size], data)?; + let mut target = &mut target_buf[..result_size]; + delta::apply(&source_buf[..base_size], &mut target, data, result_size)?; // use the target as source for the next delta std::mem::swap(&mut source_buf, &mut target_buf); } @@ -416,6 +494,183 @@ impl File { object_size: last_result_size as u64, }) } + + fn resolve_deltas_to_write( + &self, + last: data::Entry, + resolve: &dyn Fn(&gix_hash::oid, &mut Vec) -> Option, + inflate: &mut zlib::Inflate, + out: &mut Vec, + writer: &mut dyn io::Write, + cache: &mut dyn cache::DecodeEntry, + ) -> Result { + let mut chain = SmallVec::<[Delta; 10]>::default(); + let first_entry = last.clone(); + let mut cursor = last; + let mut base_buffer_size: Option = None; + let mut object_kind: Option = None; + let mut consumed_input: Option = None; + + let mut total_delta_data_size: u64 = 0; + while cursor.header.is_delta() { + if let Some((kind, packed_size)) = cache.get(self.id, cursor.data_offset, out) { + base_buffer_size = Some(out.len()); + object_kind = Some(kind); + if total_delta_data_size == 0 { + consumed_input = Some(packed_size); + } + break; + } + total_delta_data_size += cursor.decompressed_size; + let decompressed_size = cursor + .decompressed_size + .try_into() + .expect("a single delta size small enough to fit a usize"); + chain.push(Delta { + data: Range { + start: 0, + end: decompressed_size, + }, + base_size: 0, + result_size: 0, + decompressed_size, + data_offset: cursor.data_offset, + }); + use crate::data::entry::Header; + cursor = match cursor.header { + Header::OfsDelta { base_distance } => self.entry(cursor.base_pack_offset(base_distance))?, + Header::RefDelta { base_id } => match resolve(base_id.as_ref(), out) { + Some(ResolvedBase::InPack(entry)) => entry, + Some(ResolvedBase::OutOfPack { end, kind }) => { + base_buffer_size = Some(end); + object_kind = Some(kind); + break; + } + None => return Err(Error::DeltaBaseUnresolved(base_id)), + }, + _ => unreachable!("cursor.is_delta() only allows deltas here"), + }; + } + + if chain.is_empty() { + writer.write_all(out)?; + return Ok(Outcome::from_object_entry( + object_kind.expect("object kind as set by cache"), + &first_entry, + consumed_input.expect("consumed bytes as set by cache"), + )); + } + + let total_delta_data_size: usize = total_delta_data_size.try_into().expect("delta data to fit in memory"); + + let chain_len = chain.len(); + let (first_buffer_end, second_buffer_end) = { + let delta_start = base_buffer_size.unwrap_or(0); + + let delta_range = Range { + start: delta_start, + end: delta_start + total_delta_data_size, + }; + out.try_reserve(delta_range.end.saturating_sub(out.len()))?; + out.resize(delta_range.end, 0); + + let mut instructions = &mut out[delta_range.clone()]; + let mut relative_delta_start = 0; + let mut biggest_result_size = 0; + for (delta_idx, delta) in chain.iter_mut().rev().enumerate() { + let consumed_from_data_offset = self.decompress_entry_from_data_offset( + delta.data_offset, + inflate, + &mut instructions[..delta.decompressed_size], + )?; + let is_last_delta_to_be_applied = delta_idx + 1 == chain_len; + if is_last_delta_to_be_applied { + consumed_input = Some(consumed_from_data_offset); + } + + let (base_size, offset) = delta::decode_header_size(instructions); + let mut bytes_consumed_by_header = offset; + biggest_result_size = biggest_result_size.max(base_size); + delta.base_size = base_size.try_into().expect("base size fits into usize"); + + let (result_size, offset) = delta::decode_header_size(&instructions[offset..]); + bytes_consumed_by_header += offset; + biggest_result_size = biggest_result_size.max(result_size); + delta.result_size = result_size.try_into().expect("result size fits into usize"); + + delta.data.start = relative_delta_start + bytes_consumed_by_header; + relative_delta_start += delta.decompressed_size; + delta.data.end = relative_delta_start; + + instructions = &mut instructions[delta.decompressed_size..]; + } + + let biggest_result_size: usize = biggest_result_size.try_into().map_err(|_| Error::OutOfMemory)?; + let first_buffer_size = biggest_result_size; + let second_buffer_size = if chain_len > 1 { first_buffer_size } else { 0 }; + let out_size = first_buffer_size + second_buffer_size + total_delta_data_size; + out.try_reserve(out_size.saturating_sub(out.len()))?; + out.resize(out_size, 0); + + let second_buffer_end = { + let end = first_buffer_size + second_buffer_size; + if delta_range.start < end { + out.copy_within(delta_range, end); + } else { + let (buffers, instructions) = out.split_at_mut(end); + instructions.copy_from_slice(&buffers[delta_range]); + } + end + }; + + if base_buffer_size.is_none() { + let base_entry = cursor; + debug_assert!(!base_entry.header.is_delta()); + object_kind = base_entry.header.as_kind(); + let out_base = &mut out[..out_size - total_delta_data_size]; + self.decompress_entry_from_data_offset(base_entry.data_offset, inflate, out_base)?; + } + + (first_buffer_size, second_buffer_end) + }; + + let (buffers, instructions) = out.split_at_mut(second_buffer_end); + let (mut source_buf, mut target_buf) = buffers.split_at_mut(first_buffer_end); + + let mut last_result_size = None; + for ( + delta_idx, + Delta { + data, + base_size, + result_size, + .. + }, + ) in chain.into_iter().rev().enumerate() + { + let data = &mut instructions[data]; + let is_last_delta = delta_idx + 1 == chain_len; + if is_last_delta { + last_result_size = Some(result_size); + delta::apply(&source_buf[..base_size], writer, data, result_size)?; + } else { + let mut target = &mut target_buf[..result_size]; + delta::apply(&source_buf[..base_size], &mut target, data, result_size)?; + std::mem::swap(&mut source_buf, &mut target_buf); + } + } + + let object_kind = object_kind.expect("a base object as root of any delta chain that we are here to resolve"); + let consumed_input = consumed_input.expect("at least one decompressed delta object"); + let last_result_size = last_result_size.expect("at least one delta chain item"); + Ok(Outcome { + kind: object_kind, + num_deltas: chain_len as u32, + decompressed_size: first_entry.decompressed_size, + compressed_size: consumed_input, + object_size: last_result_size as u64, + }) + } } #[cfg(test)] diff --git a/gix-pack/src/data/file/decode/mod.rs b/gix-pack/src/data/file/decode/mod.rs index 71bbf1595c..ed086e4eeb 100644 --- a/gix-pack/src/data/file/decode/mod.rs +++ b/gix-pack/src/data/file/decode/mod.rs @@ -7,12 +7,15 @@ pub mod header; /// Returned by [`File::decode_header()`][crate::data::File::decode_header()], /// [`File::decode_entry()`][crate::data::File::decode_entry()] and . +/// [`File::decode_entry_to_write()`][crate::data::File::decode_entry_to_write()] and /// [`File::decompress_entry()`][crate::data::File::decompress_entry()] #[derive(thiserror::Error, Debug)] #[allow(missing_docs)] pub enum Error { #[error("Failed to decompress pack entry")] ZlibInflate(#[from] gix_features::zlib::inflate::Error), + #[error("Failed to write decoded object bytes")] + Io(#[from] std::io::Error), #[error("A delta chain could not be followed as the ref base with id {0} could not be found")] DeltaBaseUnresolved(gix_hash::ObjectId), #[error(transparent)] diff --git a/gix/src/repository/mod.rs b/gix/src/repository/mod.rs index 068d3bcbba..06ff5ab785 100644 --- a/gix/src/repository/mod.rs +++ b/gix/src/repository/mod.rs @@ -539,3 +539,35 @@ pub mod worktree_archive { /// The error returned by [`Repository::worktree_archive()`](crate::Repository::worktree_archive()). pub type Error = gix_error::Error; } + +/// +pub mod blob_stream { + /// The error returned by [`Repository::try_find_blob_stream()`](crate::Repository::try_find_blob_stream()). + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error(transparent)] + Find(#[from] crate::object::find::Error), + #[error("Needed {id} to be a blob to stream it, got {actual}")] + NotABlob { + id: gix_hash::ObjectId, + actual: gix_object::Kind, + }, + } + + /// Errors returned by [`Repository::find_blob_stream()`](crate::Repository::find_blob_stream()). + pub mod existing { + /// The error returned by [`Repository::find_blob_stream()`](crate::Repository::find_blob_stream()). + #[derive(Debug, thiserror::Error)] + #[allow(missing_docs)] + pub enum Error { + #[error(transparent)] + Find(#[from] crate::object::find::existing::Error), + #[error("Needed {id} to be a blob to stream it, got {actual}")] + NotABlob { + id: gix_hash::ObjectId, + actual: gix_object::Kind, + }, + } + } +} diff --git a/gix/src/repository/object.rs b/gix/src/repository/object.rs index ff4df5b042..7b56037180 100644 --- a/gix/src/repository/object.rs +++ b/gix/src/repository/object.rs @@ -240,6 +240,62 @@ impl crate::Repository { None => Ok(None), } } + + /// Try to find a blob with `id` and return its decoded bytes as a stream. + pub fn try_find_blob_stream( + &self, + id: impl Into, + ) -> Result, crate::repository::blob_stream::Error> { + let id = id.into(); + if id == ObjectId::empty_blob(self.object_hash()) { + return Ok(Some(gix_odb::find::Stream::empty_blob())); + } + + match self.try_find_header(id)? { + Some(header) => { + if header.kind() != gix_object::Kind::Blob { + return Err(crate::repository::blob_stream::Error::NotABlob { + id, + actual: header.kind(), + }); + } + } + None => return Ok(None), + } + + match self + .objects + .try_find_stream(&id) + .map_err(crate::object::find::Error::from)? + { + Some((stream, _location)) => Ok(Some(stream)), + None => Ok(None), + } + } + + /// Find a blob with `id` and return its decoded bytes as a stream. + pub fn find_blob_stream( + &self, + id: impl Into, + ) -> Result { + let id = id.into(); + self.try_find_blob_stream(id) + .map_err(|err| match err { + crate::repository::blob_stream::Error::Find(err) => { + crate::repository::blob_stream::existing::Error::Find(gix_object::find::existing::Error::Find( + err.0, + )) + } + crate::repository::blob_stream::Error::NotABlob { id, actual } => { + crate::repository::blob_stream::existing::Error::NotABlob { id, actual } + } + })? + .ok_or_else(|| { + crate::repository::blob_stream::existing::Error::Find(gix_object::find::existing::Error::NotFound { + oid: id, + }) + }) + } } /// Write objects of any type. diff --git a/gix/tests/blob_stream_memory.rs b/gix/tests/blob_stream_memory.rs new file mode 100644 index 0000000000..154548e287 --- /dev/null +++ b/gix/tests/blob_stream_memory.rs @@ -0,0 +1,221 @@ +#![allow(clippy::result_large_err)] + +use std::{ + alloc::{GlobalAlloc, Layout, System}, + io, + path::Path, + process::Command, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use gix::odb::find::Header; +use gix_object::Kind; +use gix_testtools::{tempfile, Result}; + +#[global_allocator] +static ALLOCATOR: MeasuringAllocator = MeasuringAllocator::new(); + +struct MeasuringAllocator { + current: AtomicUsize, + peak: AtomicUsize, +} + +impl MeasuringAllocator { + const fn new() -> Self { + Self { + current: AtomicUsize::new(0), + peak: AtomicUsize::new(0), + } + } + + fn prefixed_layout(layout: Layout) -> (Layout, usize) { + let (layout, offset) = Layout::new::() + .extend(layout) + .expect("prefix layout can be extended"); + (layout.pad_to_align(), offset) + } + + fn note_increase(&self, size: usize) { + if size == 0 { + return; + } + let current = self.current.fetch_add(size, Ordering::SeqCst) + size; + let mut observed = self.peak.load(Ordering::SeqCst); + while current > observed + && self + .peak + .compare_exchange(observed, current, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + observed = self.peak.load(Ordering::SeqCst); + } + } + + fn note_decrease(&self, size: usize) { + if size != 0 { + self.current.fetch_sub(size, Ordering::SeqCst); + } + } + + fn measure(&self, f: impl FnOnce() -> T) -> (T, usize) { + let baseline = self.current.load(Ordering::SeqCst); + self.peak.store(baseline, Ordering::SeqCst); + let value = f(); + (value, self.peak.load(Ordering::SeqCst).saturating_sub(baseline)) + } +} + +unsafe impl GlobalAlloc for MeasuringAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + let raw = unsafe { System.alloc(layout) }; + if !raw.is_null() { + self.note_increase(requested); + unsafe { raw.add(offset) } + } else { + raw + } + } + + unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + let raw = unsafe { System.alloc_zeroed(layout) }; + if !raw.is_null() { + self.note_increase(requested); + unsafe { raw.add(offset) } + } else { + raw + } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + let requested = layout.size(); + let (layout, offset) = Self::prefixed_layout(layout); + self.note_decrease(requested); + unsafe { System.dealloc(ptr.sub(offset), layout) }; + } + + unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { + let (old_layout, old_offset) = Self::prefixed_layout(layout); + let (new_layout, new_offset) = + Self::prefixed_layout(Layout::from_size_align(new_size, layout.align()).expect("valid layout")); + let raw = unsafe { System.realloc(ptr.sub(old_offset), old_layout, new_layout.size()) }; + if !raw.is_null() { + match new_size.cmp(&layout.size()) { + std::cmp::Ordering::Greater => self.note_increase(new_size - layout.size()), + std::cmp::Ordering::Less => self.note_decrease(layout.size() - new_size), + std::cmp::Ordering::Equal => {} + } + unsafe { raw.add(new_offset) } + } else { + raw + } + } +} + +fn restricted() -> gix::open::Options { + gix::open::Options::isolated().config_overrides(["user.name=gitoxide", "user.email=gitoxide@localhost"]) +} + +fn open_repo(path: &Path) -> Result { + Ok(gix::ThreadSafeRepository::open_opts(path, restricted())?.to_thread_local()) +} + +fn git(dir: &Path, args: &[&str]) -> Result<()> { + let output = Command::new("git").current_dir(dir).args(args).output()?; + if output.status.success() { + return Ok(()); + } + Err(format!( + "git {:?} failed with status {:?}\nstdout:\n{}\nstderr:\n{}", + args, + output.status.code(), + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ) + .into()) +} + +fn create_packed_delta_repo() -> Result { + let dir = tempfile::tempdir()?; + git(dir.path(), &["init"])?; + git(dir.path(), &["config", "user.name", "gitoxide"])?; + git(dir.path(), &["config", "user.email", "gitoxide@localhost"])?; + + let blob_path = dir.path().join("blob.bin"); + let mut base = vec![b'a'; 16 * 1024 * 1024]; + for chunk in base.chunks_mut(4096) { + chunk[0] = b'A'; + chunk[1] = b'0'; + } + std::fs::write(&blob_path, &base)?; + git(dir.path(), &["add", "blob.bin"])?; + git(dir.path(), &["commit", "-m", "base"])?; + + let mut changed = base; + for idx in (0..changed.len()).step_by(4096) { + changed[idx + 1] = b'1'; + } + std::fs::write(&blob_path, &changed)?; + git(dir.path(), &["add", "blob.bin"])?; + git(dir.path(), &["commit", "-m", "delta"])?; + git( + dir.path(), + &["repack", "-adf", "--window=250", "--depth=50", "--window-memory=1g"], + )?; + git(dir.path(), &["prune-packed"])?; + Ok(dir) +} + +fn packed_delta_blob_id(repo: &gix::Repository) -> Result<(gix::ObjectId, u64)> { + for id in repo.objects.iter()? { + let id = id?; + match repo.try_find_header(id)? { + Some(Header::Packed(header)) if header.kind == Kind::Blob && header.num_deltas > 0 => { + return Ok((id, header.object_size)); + } + _ => {} + } + } + Err("expected at least one packed delta blob".into()) +} + +#[test] +fn streaming_packed_delta_blobs_uses_less_peak_memory_than_eager_lookup() -> Result { + let repo_dir = create_packed_delta_repo()?; + let repo = open_repo(repo_dir.path())?; + let (id, object_size) = packed_delta_blob_id(&repo)?; + assert!(object_size >= 16 * 1024 * 1024); + drop(repo); + + let eager_peak = { + let repo = open_repo(repo_dir.path())?; + let (_, peak) = ALLOCATOR.measure(|| { + let blob = repo.find_blob(id).expect("packed delta blob can be decoded eagerly"); + assert_eq!(blob.data.len() as u64, object_size); + }); + peak + }; + + let streaming_peak = { + let repo = open_repo(repo_dir.path())?; + let (_, peak) = ALLOCATOR.measure(|| { + let mut stream = repo.find_blob_stream(id).expect("packed delta blob can be streamed"); + assert_eq!(stream.size(), object_size); + io::copy(&mut stream, &mut io::sink()).expect("streaming copy to sink succeeds"); + }); + peak + }; + + assert!( + streaming_peak < eager_peak, + "streaming should lower peak allocations, got eager={eager_peak} and stream={streaming_peak}" + ); + assert!( + eager_peak.saturating_sub(streaming_peak) >= 8 * 1024 * 1024, + "expected a meaningful peak-memory reduction, got eager={eager_peak} and stream={streaming_peak}" + ); + Ok(()) +} diff --git a/gix/tests/gix/repository/object.rs b/gix/tests/gix/repository/object.rs index 007bb1cee8..336f7ef7d5 100644 --- a/gix/tests/gix/repository/object.rs +++ b/gix/tests/gix/repository/object.rs @@ -581,6 +581,93 @@ mod find { } } +mod blob_stream { + use std::io::Read; + + use gix_object::Kind; + + fn blob_id_with_storage(repo: &gix::Repository, packed: bool) -> crate::Result { + for id in repo.objects.iter()? { + let id = id?; + let Some(header) = repo.try_find_header(id)? else { + continue; + }; + let is_packed = matches!(header, gix_odb::find::Header::Packed(_)); + if header.kind() == Kind::Blob && is_packed == packed { + return Ok(id); + } + } + panic!( + "expected at least one {} blob in fixture", + if packed { "packed" } else { "loose" } + ); + } + + #[test] + fn streams_loose_blobs() -> crate::Result { + let repo = crate::named_repo("make_packed_and_loose.sh")?; + let id = blob_id_with_storage(&repo, false)?; + let expected = repo.find_blob(id)?.data.clone(); + + let mut stream = repo.find_blob_stream(id)?; + assert_eq!(stream.kind(), Kind::Blob); + assert_eq!(stream.size(), expected.len() as u64); + + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn streams_packed_blobs_via_repository_and_odb() -> crate::Result { + let repo = crate::named_repo("make_packed_and_loose.sh")?; + let id = blob_id_with_storage(&repo, true)?; + let expected = repo.find_blob(id)?.data.clone(); + + let mut stream = repo.find_blob_stream(id)?; + assert_eq!(stream.kind(), Kind::Blob); + assert_eq!(stream.size(), expected.len() as u64); + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + + let Some((mut odb_stream, location)) = repo.objects.try_find_stream(id.as_ref())? else { + panic!("blob must be present"); + }; + assert!(location.is_some(), "packed blobs report their pack location"); + assert_eq!(odb_stream.kind(), Kind::Blob); + assert_eq!(odb_stream.size(), expected.len() as u64); + actual.clear(); + odb_stream.read_to_end(&mut actual)?; + assert_eq!(actual, expected); + Ok(()) + } + + #[test] + fn streams_empty_blob_without_storage_lookup() -> crate::Result { + let repo = crate::basic_repo()?; + let mut stream = repo.find_blob_stream(repo.object_hash().empty_blob())?; + let mut actual = Vec::new(); + stream.read_to_end(&mut actual)?; + assert!(actual.is_empty()); + Ok(()) + } + + #[test] + fn rejects_non_blob_stream_requests() -> crate::Result { + let repo = crate::basic_repo()?; + match repo.find_blob_stream(repo.head_id()?) { + Err(gix::repository::blob_stream::existing::Error::NotABlob { actual, .. }) => { + assert_eq!(actual, Kind::Commit); + } + Ok(_) => panic!("expected the HEAD commit lookup to be rejected"), + Err(other) => panic!("unexpected error: {other}"), + } + Ok(()) + } +} + #[test] fn empty_objects_are_always_present_but_not_in_plumbing() -> crate::Result { let repo = empty_bare_in_memory_repo()?;