Skip to content

task: add resource and its name to not found error #26531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ impl<I: CatalogId, R: CatalogResource> Repository<I, R> {
pub(crate) fn update(&mut self, id: I, resource: impl Into<Arc<R>>) -> Result<()> {
let resource = resource.into();
if !self.id_exists(&id) {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found(R::KIND, resource.name()));
}
self.id_name_map.insert(id, resource.name());
self.repo.insert(id, resource);
Expand Down Expand Up @@ -2331,7 +2331,7 @@ impl TokenRepository {
let token_id = self
.repo
.name_to_id(&token_name)
.ok_or_else(|| CatalogError::NotFound)?;
.ok_or_else(|| CatalogError::not_found("token", token_name))?;
self.repo.remove(&token_id);
self.hash_lookup_map.remove_by_left(&token_id);
Ok(())
Expand Down
53 changes: 28 additions & 25 deletions influxdb3_catalog/src/catalog/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ impl Catalog {
self.catalog_update_with_retry(|| {
let time_ns = self.time_provider.now().timestamp_nanos();
let Some(node) = self.node(node_id) else {
return Err(crate::CatalogError::NotFound);
return Err(crate::CatalogError::not_found(
"node",
node_id,
));
};
if !node.is_running() {
return Err(crate::CatalogError::NodeAlreadyStopped {
Expand Down Expand Up @@ -283,7 +286,7 @@ impl Catalog {
info!(name, "soft delete database");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", name));
};
if db.deleted {
return Err(CatalogError::AlreadyDeleted);
Expand Down Expand Up @@ -331,10 +334,10 @@ impl Catalog {
info!(db_name, table_name, "soft delete database");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(tbl_def) = db.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};
let deletion_time = self.time_provider.now().timestamp_nanos();
Ok(CatalogBatch::database(
Expand Down Expand Up @@ -367,10 +370,10 @@ impl Catalog {
info!(db_name, table_name, cache_name = ?cache_name, "create distinct cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(mut tbl) = db.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};
if columns.is_empty() {
return Err(CatalogError::invalid_configuration(
Expand Down Expand Up @@ -447,13 +450,13 @@ impl Catalog {
info!(db_name, table_name, cache_name, "delete distinct cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(tbl) = db.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};
let Some(cache) = tbl.distinct_caches.get_by_name(cache_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("cache", cache_name));
};
Ok(CatalogBatch::database(
self.time_provider.now().timestamp_nanos(),
Expand Down Expand Up @@ -486,10 +489,10 @@ impl Catalog {
info!(db_name, table_name, cache_name = ?cache_name, "create last cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(mut tbl) = db.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};

fn is_valid_last_cache_key_col(def: &ColumnDefinition) -> bool {
Expand Down Expand Up @@ -600,13 +603,13 @@ impl Catalog {
info!(db_name, table_name, cache_name, "delete last cache");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(tbl) = db.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};
let Some(cache) = tbl.last_caches.get_by_name(cache_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("cache", cache_name));
};
Ok(CatalogBatch::database(
self.time_provider.now().timestamp_nanos(),
Expand Down Expand Up @@ -639,7 +642,7 @@ impl Catalog {
info!(db_name, trigger_name, "create processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(mut db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let trigger = TriggerSpecificationDefinition::from_string_rep(trigger_specification)?;
if db.processing_engine_triggers.contains_name(trigger_name) {
Expand Down Expand Up @@ -677,10 +680,10 @@ impl Catalog {
info!(db_name, trigger_name, "delete processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(trigger) = db.processing_engine_triggers.get_by_name(trigger_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("trigger", trigger_name));
};
if !trigger.disabled && !force {
return Err(CatalogError::ProcessingEngineTriggerRunning {
Expand Down Expand Up @@ -709,10 +712,10 @@ impl Catalog {
info!(db_name, trigger_name, "enable processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(trigger) = db.processing_engine_triggers.get_by_name(trigger_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("trigger", trigger_name));
};
if !trigger.disabled {
return Err(CatalogError::TriggerAlreadyEnabled);
Expand Down Expand Up @@ -740,10 +743,10 @@ impl Catalog {
info!(db_name, trigger_name, "disable processing engine trigger");
self.catalog_update_with_retry(|| {
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
let Some(trigger) = db.processing_engine_triggers.get_by_name(trigger_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("trigger", trigger_name));
};
if trigger.disabled {
return Err(CatalogError::TriggerAlreadyDisabled);
Expand Down Expand Up @@ -773,7 +776,7 @@ impl Catalog {
self.catalog_update_with_retry(|| {
if !self.inner.read().tokens.repo().contains_name(token_name) {
// maybe deleted by another node or genuinely not present
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("token", token_name));
}

Ok(CatalogBatch::Token(TokenBatch {
Expand All @@ -797,7 +800,7 @@ impl Catalog {
"create new retention policy"
);
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
self.catalog_update_with_retry(|| {
Ok(CatalogBatch::database(
Expand All @@ -822,7 +825,7 @@ impl Catalog {
) -> Result<OrderedCatalogBatch> {
info!(db_name, "delete retention policy");
let Some(db) = self.db_schema(db_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("database", db_name));
};
self.catalog_update_with_retry(|| {
Ok(CatalogBatch::database(
Expand Down Expand Up @@ -1080,7 +1083,7 @@ impl DatabaseCatalogTransaction {
column_type: FieldDataType,
) -> Result<ColumnId> {
let Some(table_def) = self.database_schema.table_definition(table_name) else {
return Err(CatalogError::NotFound);
return Err(CatalogError::not_found("table", table_name));
};
match table_def.column_definition(column_name) {
Some(def) if def.data_type == column_type.into() => Ok(def.id),
Expand Down
11 changes: 9 additions & 2 deletions influxdb3_catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub enum CatalogError {
#[error("attempted to create a resource that already exists")]
AlreadyExists,

#[error("the requested resource was not found")]
NotFound,
#[error("the requested {resource} name {resource_name} was not found")]
NotFound {resource: &'static str, resource_name: String},

#[error("attempted to delete resource that was already deleted")]
AlreadyDeleted,
Expand Down Expand Up @@ -201,4 +201,11 @@ impl CatalogError {
pub fn unexpected(message: impl Into<String>) -> Self {
Self::Other(anyhow!(message.into()))
}

pub fn not_found(resource: &'static str, resource_name: impl Into<String>) -> Self {
Self::NotFound {
resource,
resource_name: resource_name.into(),
}
}
}
2 changes: 2 additions & 0 deletions influxdb3_catalog/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use crate::{
pub trait CatalogResource: Clone {
type Identifier;

const KIND: &'static str;

fn id(&self) -> Self::Identifier;
fn name(&self) -> Arc<str>;
}
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@ mod tests {
)
.await?;

let Err(CatalogError::NotFound) = pem
let Err(CatalogError::not_found("trigger", "nonexistent_trigger")) = pem
.catalog
.enable_processing_engine_trigger("foo", "nonexistent_trigger")
.await
Expand Down