Skip to content

Commit 4c8283d

Browse files
authored
feat: Hard delete database and table (#26553)
* feat: Hard delete database and table * feat: Enable hard-deletion in OSS
1 parent c205fef commit 4c8283d

36 files changed

+4443
-621
lines changed

influxdb3/src/commands/delete.rs

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use super::common::InfluxDb3Config;
22
use influxdb3_client::Client;
3+
use influxdb3_types::http::HardDeletionTime;
34
use secrecy::ExposeSecret;
45
use secrecy::Secret;
56
use std::error::Error;
@@ -115,6 +116,11 @@ pub struct DatabaseConfig {
115116
#[clap(env = "INFLUXDB3_DATABASE_NAME", required = true)]
116117
pub database_name: String,
117118

119+
/// When to perform hard deletion (never/now/default/timestamp).
120+
/// Examples: 'never', 'now', 'default', '2024-06-18T10:30:00Z'
121+
#[clap(long = "hard-delete", value_name = "WHEN")]
122+
pub hard_delete: Option<String>,
123+
118124
/// An optional arg to use a custom ca for useful for testing with self signed certs
119125
#[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")]
120126
ca_cert: Option<PathBuf>,
@@ -164,6 +170,11 @@ pub struct TableConfig {
164170
/// The name of the table to be deleted
165171
table_name: String,
166172

173+
/// When to perform hard deletion (never/now/default/timestamp).
174+
/// Examples: 'never', 'now', 'default', '2024-06-18T10:30:00Z'
175+
#[clap(long = "hard-delete", value_name = "WHEN")]
176+
hard_delete: Option<String>,
177+
167178
/// An optional arg to use a custom ca for useful for testing with self signed certs
168179
#[clap(long = "tls-ca", env = "INFLUXDB3_TLS_CA")]
169180
ca_cert: Option<PathBuf>,
@@ -211,10 +222,26 @@ pub struct TokenConfig {
211222
ca_cert: Option<PathBuf>,
212223
}
213224

225+
fn parse_hard_delete_time(value: Option<String>) -> Option<HardDeletionTime> {
226+
match value {
227+
None => None,
228+
Some(s) => match s.to_lowercase().as_str() {
229+
"never" => Some(HardDeletionTime::Never),
230+
"now" => Some(HardDeletionTime::Now),
231+
"default" => Some(HardDeletionTime::Default),
232+
_ => Some(HardDeletionTime::Timestamp(s)),
233+
},
234+
}
235+
}
236+
214237
pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
215238
let client = config.get_client()?;
216239
match config.cmd {
217-
SubCommand::Database(DatabaseConfig { database_name, .. }) => {
240+
SubCommand::Database(DatabaseConfig {
241+
database_name,
242+
hard_delete,
243+
..
244+
}) => {
218245
println!(
219246
"Are you sure you want to delete {:?}? Enter 'yes' to confirm",
220247
database_name
@@ -224,7 +251,18 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
224251
if confirmation.trim() != "yes" {
225252
println!("Cannot delete database without confirmation");
226253
} else {
227-
client.api_v3_configure_db_delete(&database_name).await?;
254+
let hard_delete_time = parse_hard_delete_time(hard_delete);
255+
256+
if hard_delete_time.is_some() {
257+
client
258+
.api_v3_configure_db_delete_with_hard_delete(
259+
&database_name,
260+
hard_delete_time,
261+
)
262+
.await?;
263+
} else {
264+
client.api_v3_configure_db_delete(&database_name).await?;
265+
}
228266

229267
println!("Database {:?} deleted successfully", &database_name);
230268
}
@@ -256,6 +294,7 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
256294
SubCommand::Table(TableConfig {
257295
influxdb3_config: InfluxDb3Config { database_name, .. },
258296
table_name,
297+
hard_delete,
259298
..
260299
}) => {
261300
println!(
@@ -267,9 +306,21 @@ pub async fn command(config: Config) -> Result<(), Box<dyn Error>> {
267306
if confirmation.trim() != "yes" {
268307
println!("Cannot delete table without confirmation");
269308
} else {
270-
client
271-
.api_v3_configure_table_delete(&database_name, &table_name)
272-
.await?;
309+
let hard_delete_time = parse_hard_delete_time(hard_delete);
310+
311+
if hard_delete_time.is_some() {
312+
client
313+
.api_v3_configure_table_delete_with_hard_delete(
314+
&database_name,
315+
&table_name,
316+
hard_delete_time,
317+
)
318+
.await?;
319+
} else {
320+
client
321+
.api_v3_configure_table_delete(&database_name, &table_name)
322+
.await?;
323+
}
273324

274325
println!(
275326
"Table {:?}.{:?} deleted successfully",

influxdb3/src/commands/serve.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ use influxdb3_telemetry::{
4141
};
4242
use influxdb3_wal::{Gen1Duration, WalConfig};
4343
use influxdb3_write::{
44-
WriteBuffer,
44+
WriteBuffer, deleter,
4545
persister::Persister,
4646
write_buffer::{
4747
WriteBufferImpl, WriteBufferImplArgs, check_mem_and_force_snapshot_loop,
4848
persisted_files::PersistedFiles,
4949
},
5050
};
5151
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig, PerQueryMemoryPoolConfig};
52-
use iox_time::SystemProvider;
52+
use iox_time::{SystemProvider, TimeProvider};
5353
use metric::U64Gauge;
5454
use object_store::ObjectStore;
5555
use object_store_metrics::ObjectStoreMetrics;
@@ -455,6 +455,16 @@ pub struct Config {
455455
default_value_t = Catalog::DEFAULT_HARD_DELETE_DURATION.into(),
456456
)]
457457
pub hard_delete_default_duration: humantime::Duration,
458+
459+
/// Grace period for hard deleted databases and tables before they are removed permanently from
460+
/// the catalog.
461+
#[clap(
462+
long = "delete-grace-period",
463+
env = "INFLUXDB3_DELETE_GRACE_PERIOD",
464+
default_value = "24h",
465+
action
466+
)]
467+
pub delete_grace_period: humantime::Duration,
458468
}
459469

460470
/// The minimum version of TLS to use for InfluxDB
@@ -506,6 +516,9 @@ impl FromStr for MemorySizeMb {
506516
let num_bytes = if s.contains("%") {
507517
let mem_size = MemorySize::from_str(s)?;
508518
mem_size.bytes()
519+
} else if let Some(suffix) = s.strip_suffix('b') {
520+
usize::from_str(suffix)
521+
.map_err(|_| "failed to parse value as unsigned integer".to_string())?
509522
} else {
510523
let num_mb = usize::from_str(s)
511524
.map_err(|_| "failed to parse value as unsigned integer".to_string())?;
@@ -606,7 +619,7 @@ pub async fn command(config: Config) -> Result<()> {
606619
let frontend_shutdown = CancellationToken::new();
607620
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
608621

609-
let time_provider = Arc::new(SystemProvider::new());
622+
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::new());
610623
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
611624
// setup base object store:
612625
let object_store: Arc<dyn ObjectStore> = config
@@ -724,7 +737,7 @@ pub async fn command(config: Config) -> Result<()> {
724737
let catalog = Catalog::new_with_shutdown(
725738
config.node_identifier_prefix.as_str(),
726739
Arc::clone(&object_store),
727-
Arc::<SystemProvider>::clone(&time_provider),
740+
Arc::clone(&time_provider),
728741
Arc::clone(&metrics),
729742
shutdown_manager.register(),
730743
Arc::clone(&process_uuid_getter),
@@ -798,7 +811,7 @@ pub async fn command(config: Config) -> Result<()> {
798811
catalog: Arc::clone(&catalog),
799812
last_cache,
800813
distinct_cache,
801-
time_provider: Arc::<SystemProvider>::clone(&time_provider),
814+
time_provider: Arc::clone(&time_provider),
802815
executor: Arc::clone(&write_path_executor),
803816
wal_config,
804817
parquet_cache,
@@ -812,6 +825,20 @@ pub async fn command(config: Config) -> Result<()> {
812825
.await
813826
.map_err(|e| Error::WriteBufferInit(e.into()))?;
814827

828+
let persisted_files = write_buffer_impl.persisted_files();
829+
830+
let object_deleter = Some(Arc::clone(&persisted_files) as _);
831+
832+
deleter::run(
833+
DeleteManagerArgs {
834+
catalog: Arc::clone(&catalog),
835+
time_provider: Arc::clone(&time_provider),
836+
object_deleter,
837+
delete_grace_period: *config.delete_grace_period,
838+
},
839+
shutdown_manager.register(),
840+
);
841+
815842
info!("setting up background mem check for query buffer");
816843
background_buffer_checker(
817844
config.force_snapshot_mem_threshold.as_num_bytes(),
@@ -824,7 +851,7 @@ pub async fn command(config: Config) -> Result<()> {
824851
object_store_config: &config.object_store_config,
825852
instance_id: node_def.instance_id(),
826853
num_cpus,
827-
persisted_files: Some(Arc::clone(&write_buffer_impl.persisted_files())),
854+
persisted_files: Some(persisted_files),
828855
telemetry_endpoint: &config.telemetry_endpoint,
829856
disable_upload: config.disable_telemetry_upload,
830857
catalog_uuid: catalog.catalog_uuid().to_string(),
@@ -1121,6 +1148,7 @@ async fn background_buffer_checker(
11211148
#[global_allocator]
11221149
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
11231150

1151+
use influxdb3_write::deleter::DeleteManagerArgs;
11241152
#[cfg(tokio_unstable)]
11251153
use tokio_metrics_bridge::setup_tokio_metrics;
11261154

influxdb3/src/help/serve_all.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ Examples:
4646
The amount of time that the server looks back on startup
4747
when populating the in-memory index of gen1 files.
4848
[env: INFLUXDB3_GEN1_LOOKBACK_DURATION=]
49+
--delete-grace-period <DURATION> Grace period for hard deleted databases and tables before they are
50+
removed permanently from the catalog [default: 24h]
51+
[env: INFLUXDB3_DELETE_GRACE_PERIOD=]
4952

5053
{}
5154
--aws-access-key-id <KEY> S3 access key ID [env: AWS_ACCESS_KEY_ID=] [default: ]

influxdb3/tests/server/configure.rs

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,120 @@ async fn api_v3_configure_table_delete_missing_query_param() {
13111311
assert_eq!(StatusCode::BAD_REQUEST, resp.status());
13121312
}
13131313

1314+
#[tokio::test]
1315+
async fn api_v3_configure_table_delete_with_hard_delete_at_never() {
1316+
let db_name = "foo";
1317+
let tbl_name = "tbl";
1318+
let server = TestServer::spawn().await;
1319+
let client = server.http_client();
1320+
let url = format!(
1321+
"{base}/api/v3/configure/table?db={db_name}&table={tbl_name}&hard_delete_at=never",
1322+
base = server.client_addr()
1323+
);
1324+
1325+
server
1326+
.write_lp_to_db(
1327+
db_name,
1328+
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
1329+
influxdb3_client::Precision::Second,
1330+
)
1331+
.await
1332+
.expect("write to db");
1333+
1334+
let resp = client
1335+
.delete(&url)
1336+
.send()
1337+
.await
1338+
.expect("delete table call succeed");
1339+
assert_eq!(200, resp.status());
1340+
}
1341+
1342+
#[tokio::test]
1343+
async fn api_v3_configure_table_delete_with_hard_delete_at_timestamp() {
1344+
let db_name = "foo";
1345+
let tbl_name = "tbl";
1346+
let server = TestServer::spawn().await;
1347+
let client = server.http_client();
1348+
let url = format!(
1349+
"{base}/api/v3/configure/table?db={db_name}&table={tbl_name}&hard_delete_at=2025-12-31T23:59:59Z",
1350+
base = server.client_addr()
1351+
);
1352+
1353+
server
1354+
.write_lp_to_db(
1355+
db_name,
1356+
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
1357+
influxdb3_client::Precision::Second,
1358+
)
1359+
.await
1360+
.expect("write to db");
1361+
1362+
let resp = client
1363+
.delete(&url)
1364+
.send()
1365+
.await
1366+
.expect("delete table call succeed");
1367+
1368+
assert_eq!(200, resp.status());
1369+
}
1370+
1371+
#[tokio::test]
1372+
async fn api_v3_configure_table_delete_with_invalid_timestamp() {
1373+
let db_name = "foo";
1374+
let tbl_name = "tbl";
1375+
let server = TestServer::spawn().await;
1376+
let client = server.http_client();
1377+
let url = format!(
1378+
"{base}/api/v3/configure/table?db={db_name}&table={tbl_name}&hard_delete_at=invalid-timestamp",
1379+
base = server.client_addr()
1380+
);
1381+
1382+
server
1383+
.write_lp_to_db(
1384+
db_name,
1385+
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
1386+
influxdb3_client::Precision::Second,
1387+
)
1388+
.await
1389+
.expect("write to db");
1390+
1391+
let resp = client
1392+
.delete(&url)
1393+
.send()
1394+
.await
1395+
.expect("delete table call succeed");
1396+
assert_eq!(StatusCode::BAD_REQUEST, resp.status());
1397+
}
1398+
1399+
#[tokio::test]
1400+
async fn api_v3_configure_table_delete_with_past_timestamp_becomes_now() {
1401+
let db_name = "foo";
1402+
let tbl_name = "tbl";
1403+
let server = TestServer::spawn().await;
1404+
let client = server.http_client();
1405+
let url = format!(
1406+
"{base}/api/v3/configure/table?db={db_name}&table={tbl_name}&hard_delete_at=2020-01-01T00:00:00Z",
1407+
base = server.client_addr()
1408+
);
1409+
1410+
server
1411+
.write_lp_to_db(
1412+
db_name,
1413+
format!("{tbl_name},t1=a,t2=b,t3=c f1=true,f2=\"hello\",f3=4i,f4=4u,f5=5 1000"),
1414+
influxdb3_client::Precision::Second,
1415+
)
1416+
.await
1417+
.expect("write to db");
1418+
1419+
let resp = client
1420+
.delete(&url)
1421+
.send()
1422+
.await
1423+
.expect("delete table call succeed");
1424+
1425+
assert_eq!(200, resp.status());
1426+
}
1427+
13141428
#[tokio::test]
13151429
async fn try_deleting_table_after_db_is_deleted() {
13161430
let db_name = "db";

influxdb3_cache/src/distinct_cache/provider.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
use std::{collections::HashMap, sync::Arc, time::Duration};
22

3+
use super::{
4+
CacheError,
5+
cache::{CreateDistinctCacheArgs, DistinctCache},
6+
};
37
use arrow::datatypes::SchemaRef;
8+
use influxdb3_catalog::catalog::IfNotDeleted;
49
use influxdb3_catalog::{
510
catalog::Catalog,
611
channel::CatalogUpdateReceiver,
@@ -14,11 +19,6 @@ use influxdb3_wal::{WalContents, WalOp};
1419
use iox_time::TimeProvider;
1520
use parking_lot::RwLock;
1621

17-
use super::{
18-
CacheError,
19-
cache::{CreateDistinctCacheArgs, DistinctCache},
20-
};
21-
2222
#[derive(Debug, thiserror::Error)]
2323
pub enum ProviderError {
2424
#[error("cache error: {0}")]
@@ -54,8 +54,12 @@ impl DistinctCacheProvider {
5454
catalog: Arc::clone(&catalog),
5555
cache_map: Default::default(),
5656
});
57-
for db_schema in catalog.list_db_schema() {
58-
for table_def in db_schema.tables() {
57+
for db_schema in catalog
58+
.list_db_schema()
59+
.into_iter()
60+
.filter_map(IfNotDeleted::if_not_deleted)
61+
{
62+
for table_def in db_schema.tables().filter_map(IfNotDeleted::if_not_deleted) {
5963
for (cache_id, cache_def) in table_def.distinct_caches.iter() {
6064
provider.create_cache(
6165
db_schema.id,

0 commit comments

Comments
 (0)