Skip to content

Commit

Permalink
Added better logging for cluster merging (#40)
Browse files Browse the repository at this point in the history
Signed-off-by: David Pollak <[email protected]>
  • Loading branch information
dpp authored Feb 14, 2025
1 parent 3c30fde commit 9b7e404
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 52 deletions.
12 changes: 11 additions & 1 deletion src/live_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ pub async fn perform_merge(clusters: Vec<GoatRodeoCluster>) -> Result<GoatRodeoC
return Ok(clusters[0].clone());
}

let mut whole_name = "Live Merged: ".to_string();
let mut first_name = true;
for c in &clusters {
if !first_name {
whole_name.push_str(", ");
}
first_name = false;
whole_name.push_str(&c.name());
}

let mut index_holder = vec![];

let mut max_size = 0usize;
Expand Down Expand Up @@ -113,7 +123,7 @@ pub async fn perform_merge(clusters: Vec<GoatRodeoCluster>) -> Result<GoatRodeoC

let b0 = &clusters[0];
info!("Merge took {:?}", Instant::now().duration_since(start));
b0.create_synthetic_with(new_index, data_files, index_files, submap)
b0.create_synthetic_with(new_index, data_files, index_files, submap, whole_name)
}

#[cfg(feature = "longtest")]
Expand Down
59 changes: 8 additions & 51 deletions src/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,66 +48,23 @@ pub async fn merge_fresh<PB: Into<PathBuf>>(

let mut index_holder = vec![];

for cluster in &clusters {
for (idx, cluster) in clusters.iter().enumerate() {
let index = cluster.get_index().await?;
let index_len = index.len();

// let (tx, rx) = flume::bounded(32);

/* if use_threads {
let index_shadow = index.clone();
let cluster_shadow = cluster.clone();
std::thread::spawn(async move || {
let mut loop_cnt = 0usize;
let start = Instant::now();
let index_len = index_shadow.len();
for io in index_shadow.iter() {
let item = cluster_shadow
.data_for_entry_offset(&io.loc)
.await
.expect("Expected to load data");
match tx.send((item, loop_cnt)) {
Ok(_) => {}
Err(e) => {
error!(
"On {:016x} failed to send {} of {} error {}",
cluster_shadow.cluster_file_hash,
loop_cnt.separate_with_commas(),
index_len.separate_with_commas(),
e
);
panic!(
"On {:016x} failed to send {} of {} error {}",
cluster_shadow.cluster_file_hash,
loop_cnt.separate_with_commas(),
index_len.separate_with_commas(),
e
);
}
};
loop_cnt += 1;
if false && loop_cnt % 2_500_000 == 0 {
info!(
"Cluster fetcher {:016x} loop {} of {} at {:?}",
cluster_shadow.cluster_file_hash,
loop_cnt.separate_with_commas(),
index_len.separate_with_commas(),
start.elapsed()
);
}
}
});
}*/

index_holder.push(ClusterPos {
cluster: index.clone(),
pos: 0,
len: index.len(),
// thing: if use_threads { Some(rx) } else { None },
});

info!(
"Loaded cluster {} of {}, from {}",
idx + 1,
clusters.len(),
cluster.name()
);

max_merge_len += index_len;
}

Expand Down
10 changes: 10 additions & 0 deletions src/rodeo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub struct GoatRodeoCluster {
synthetic: bool,
index: Arc<ArcSwap<Option<Arc<Vec<ItemOffset>>>>>,
building_index: Arc<Mutex<bool>>,
name: String,
}

#[allow(non_upper_case_globals)]
Expand Down Expand Up @@ -129,12 +130,18 @@ impl GoatRodeoCluster {
&self.index_files
}

/// get the name of the cluster
pub fn name(&self) -> String {
self.name.clone()
}

pub fn create_synthetic_with(
&self,
index: Vec<ItemOffset>,
data_files: HashMap<u64, Arc<DataFile>>,
index_files: HashMap<u64, IndexFile>,
sub_clusters: HashMap<u64, GoatRodeoCluster>,
name: String,
) -> Result<GoatRodeoCluster> {
if sub_clusters.len() == 0 {
bail!("A synthetic cluster must have at least one underlying real cluster")
Expand Down Expand Up @@ -166,6 +173,7 @@ impl GoatRodeoCluster {
cluster_file_hash: my_hash,
sub_clusters,
synthetic: true,
name: name,
})
}

Expand All @@ -189,6 +197,7 @@ impl GoatRodeoCluster {
cluster_file_hash: 0,
sub_clusters: HashMap::new(),
synthetic: false,
name: format!("{:?}", root_dir),
}
}

Expand Down Expand Up @@ -312,6 +321,7 @@ impl GoatRodeoCluster {
cluster_file_hash: sha_u64,
sub_clusters: HashMap::new(),
synthetic: false,
name: format!("{:?}", cluster_path),
})
}

Expand Down

0 comments on commit 9b7e404

Please sign in to comment.