Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions gix-odb/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ impl<S> Cache<S> {
}
}

impl<S> Cache<crate::store::Handle<S>>
where
S: Deref<Target = crate::Store> + Clone,
{
/// Find an object and return its decoded bytes as a stream.
pub fn try_find_stream(
&self,
id: &gix_hash::oid,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, gix_object::find::Error> {
match self.pack_cache.as_ref().map(RefCell::borrow_mut) {
Some(mut pack_cache) => self.inner.try_find_stream(id, pack_cache.deref_mut()),
None => self.inner.try_find_stream(id, &mut gix_pack::cache::Never),
}
}
}

impl<S> From<S> for Cache<S>
where
S: gix_pack::Find,
Expand Down
70 changes: 70 additions & 0 deletions gix-odb/src/find.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,73 @@
use std::io::{self, Cursor, Read};

/// A streaming view over an object's decoded bytes.
pub struct Stream {
kind: gix_object::Kind,
size: u64,
inner: StreamInner,
}

enum StreamInner {
InMemory(Cursor<Vec<u8>>),
File(std::fs::File),
Loose(crate::store_impls::loose::find::StreamReader),
}

impl Stream {
/// Return the kind of the object yielded by this stream.
pub fn kind(&self) -> gix_object::Kind {
self.kind
}

/// Return the decoded object size in bytes.
pub fn size(&self) -> u64 {
self.size
}

/// Return an empty blob stream.
pub fn empty_blob() -> Self {
Self::from_bytes(gix_object::Kind::Blob, Vec::new())
}

pub(crate) fn from_bytes(kind: gix_object::Kind, data: Vec<u8>) -> Self {
Self {
kind,
size: data.len() as u64,
inner: StreamInner::InMemory(Cursor::new(data)),
}
}

pub(crate) fn from_file(kind: gix_object::Kind, size: u64, file: std::fs::File) -> Self {
Self {
kind,
size,
inner: StreamInner::File(file),
}
}

pub(crate) fn from_loose(
kind: gix_object::Kind,
size: u64,
reader: crate::store_impls::loose::find::StreamReader,
) -> Self {
Self {
kind,
size,
inner: StreamInner::Loose(reader),
}
}
}

impl Read for Stream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
match &mut self.inner {
StreamInner::InMemory(cursor) => cursor.read(buf),
StreamInner::File(file) => file.read(buf),
StreamInner::Loose(reader) => reader.read(buf),
}
}
}

/// An object header informing about object properties, without it being fully decoded in the process.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum Header {
Expand Down
20 changes: 20 additions & 0 deletions gix-odb/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,26 @@ impl<T> Proxy<T> {
}
}

impl<S> Proxy<crate::Cache<crate::store::Handle<S>>>
where
S: Deref<Target = crate::Store> + Clone,
{
/// Find an object and return its decoded bytes as a stream.
pub fn try_find_stream(
&self,
id: &gix_hash::oid,
) -> Result<Option<(crate::find::Stream, Option<crate::pack::data::entry::Location>)>, gix_object::find::Error>
{
if let Some(map) = self.memory.as_ref() {
let map = map.borrow();
if let Some((kind, data)) = map.get(id) {
return Ok(Some((crate::find::Stream::from_bytes(*kind, data.clone()), None)));
}
}
self.inner.try_find_stream(id)
}
}

impl<T> Clone for Proxy<T>
where
T: Clone,
Expand Down
2 changes: 1 addition & 1 deletion gix-odb/src/store_impls/dynamic/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
fn try_find_cached_inner<'a, 'b>(
pub(crate) fn try_find_cached_inner<'a, 'b>(
&'b self,
mut id: &'b gix_hash::oid,
buffer: &'a mut Vec<u8>,
Expand Down
1 change: 1 addition & 0 deletions gix-odb/src/store_impls/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub mod find;
pub mod prefix;

mod header;
mod stream;

///
pub mod iter;
Expand Down
230 changes: 230 additions & 0 deletions gix-odb/src/store_impls/dynamic/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
use std::{io::Seek, ops::Deref};

use gix_features::zlib;
use gix_hash::oid;
use gix_pack::cache::DecodeEntry;

use super::find::Error;
use crate::store::{find::error::DeltaBaseRecursion, handle, load_index};

impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
pub(crate) fn try_find_stream_inner<'b>(
&'b self,
mut id: &'b gix_hash::oid,
inflate: &mut zlib::Inflate,
pack_cache: &mut dyn DecodeEntry,
snapshot: &mut load_index::Snapshot,
recursion: Option<DeltaBaseRecursion<'_>>,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, Error> {
if let Some(r) = recursion {
if r.depth >= self.max_recursion_depth {
return Err(Error::DeltaBaseRecursionLimit {
max_depth: self.max_recursion_depth,
id: r.original_id.to_owned(),
});
}
} else if !self.ignore_replacements {
if let Ok(pos) = self
.store
.replacements
.binary_search_by(|(map_this, _)| map_this.as_ref().cmp(id))
{
id = self.store.replacements[pos].1.as_ref();
}
}

'outer: loop {
{
let marker = snapshot.marker;
for (idx, index) in snapshot.indices.iter_mut().enumerate() {
if let Some(handle::index_lookup::Outcome {
object_index: handle::IndexForObjectInPack { pack_id, pack_offset },
index_file,
pack: possibly_pack,
}) = index.lookup(id)
{
let pack = match possibly_pack {
Some(pack) => pack,
None => match self.store.load_pack(pack_id, marker)? {
Some(pack) => {
*possibly_pack = Some(pack);
possibly_pack.as_deref().expect("just put it in")
}
None => match self.store.load_one_index(self.refresh, snapshot.marker)? {
Some(new_snapshot) => {
*snapshot = new_snapshot;
self.clear_cache();
continue 'outer;
}
None => return Ok(None),
},
},
};
let resolved_pack_id = pack.id;
let entry = pack.entry(pack_offset)?;
let header_size = entry.header_size();
let result = {
let mut scratch = Vec::new();
let mut temp = tempfile::tempfile()?;
let result = match pack.decode_entry_to_write(
entry,
&mut scratch,
inflate,
&mut temp,
&|id, _out| {
index_file.pack_offset_by_id(id).and_then(|pack_offset| {
pack.entry(pack_offset)
.ok()
.map(gix_pack::data::decode::entry::ResolvedBase::InPack)
})
},
pack_cache,
) {
Ok(outcome) => Ok((outcome, temp)),
Err(gix_pack::data::decode::Error::DeltaBaseUnresolved(base_id)) => {
let mut buf = Vec::new();
let obj_kind = self
.try_find_cached_inner(
&base_id,
&mut buf,
inflate,
pack_cache,
snapshot,
recursion
.map(DeltaBaseRecursion::inc_depth)
.or_else(|| DeltaBaseRecursion::new(id).into()),
)
.map_err(|err| Error::DeltaBaseLookup {
err: Box::new(err),
base_id,
id: id.to_owned(),
})?
.ok_or_else(|| Error::DeltaBaseMissing {
base_id,
id: id.to_owned(),
})?
.0
.kind;
let handle::index_lookup::Outcome {
object_index:
handle::IndexForObjectInPack {
pack_id: _,
pack_offset,
},
index_file,
pack: possibly_pack,
} = match snapshot.indices[idx].lookup(id) {
Some(res) => res,
None => {
let mut out = None;
for index in &mut snapshot.indices {
out = index.lookup(id);
if out.is_some() {
break;
}
}

out.unwrap_or_else(|| {
panic!(
"could not find object {id} in any index after looking up one of its base objects {base_id}"
)
})
}
};
let pack = possibly_pack
.as_ref()
.expect("pack to still be available like just now");
let entry = pack.entry(pack_offset)?;
let mut scratch = Vec::new();
let mut temp = tempfile::tempfile()?;
pack.decode_entry_to_write(
entry,
&mut scratch,
inflate,
&mut temp,
&|id, out| {
index_file
.pack_offset_by_id(id)
.and_then(|pack_offset| {
pack.entry(pack_offset)
.ok()
.map(gix_pack::data::decode::entry::ResolvedBase::InPack)
})
.or_else(|| {
(id == base_id).then(|| {
out.resize(buf.len(), 0);
out.copy_from_slice(buf.as_slice());
gix_pack::data::decode::entry::ResolvedBase::OutOfPack {
kind: obj_kind,
end: out.len(),
}
})
})
},
pack_cache,
)
.map(|outcome| (outcome, temp))
}
Err(err) => Err(err),
}?;
result
};
let (outcome, mut temp) = result;
temp.rewind()?;
let res = (
crate::find::Stream::from_file(outcome.kind, outcome.object_size, temp),
Some(gix_pack::data::entry::Location {
pack_id: resolved_pack_id,
pack_offset,
entry_size: outcome.compressed_size + header_size,
}),
);

if idx != 0 {
snapshot.indices.swap(0, idx);
}
return Ok(Some(res));
}
}
}

for lodb in snapshot.loose_dbs.iter() {
if lodb.contains(id) {
return lodb
.try_find_stream(id)
.map(|obj| obj.map(|obj| (obj, None)))
.map_err(Into::into);
}
}

match self.store.load_one_index(self.refresh, snapshot.marker)? {
Some(new_snapshot) => {
*snapshot = new_snapshot;
self.clear_cache();
}
None => return Ok(None),
}
}
}
}

impl<S> super::Handle<S>
where
S: Deref<Target = super::Store> + Clone,
{
/// Try to find the object identified by `id` in any backing store and return it as a readable stream,
/// along with its pack location if it came from a pack.
pub fn try_find_stream(
&self,
id: &oid,
pack_cache: &mut dyn DecodeEntry,
) -> Result<Option<(crate::find::Stream, Option<gix_pack::data::entry::Location>)>, gix_object::find::Error> {
let mut snapshot = self.snapshot.borrow_mut();
let mut inflate = self.inflate.borrow_mut();
self.try_find_stream_inner(id, &mut inflate, pack_cache, &mut snapshot, None)
.map_err(|err| Box::new(err) as _)
}
}
Loading
Loading