Skip to content

Commit

Permalink
Merge pull request #25075 from WillemKauf/datalake_compaction_timeout…
Browse files Browse the repository at this point in the history
…_fix

[CORE-8848] `rptest`: adjust compaction settings in `datalake/compaction_test`
  • Loading branch information
WillemKauf authored Feb 12, 2025
2 parents 01a9c7b + 1849372 commit c8306cf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
11 changes: 8 additions & 3 deletions tests/rptest/remote_scripts/compute_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def __iter__(self) -> Iterator[Header]:

def safe_isdir(p: Path) -> bool:
"""
It's valid for files to be deleted at any time,
It's valid for files to be deleted at any time,
in that case that the file is missing, just return
that it's not a directory
"""
Expand All @@ -81,10 +81,13 @@ def safe_isdir(p: Path) -> bool:

def safe_listdir(p: Path) -> list[Path]:
"""
It's valid for directories to be deleted at any time,
It's valid for directories to be deleted at any time,
in that case that the directory is missing, just return
that there are no files.
"""
if not safe_isdir(p):
return []

try:
return [f for f in p.iterdir()]
except FileNotFoundError:
Expand Down Expand Up @@ -143,7 +146,9 @@ def compute_size(data_dir: Path, sizes: bool, calculate_md5: bool,
for ns in safe_listdir(data_dir):
if not safe_isdir(ns):
continue
if ns.name in ["cloud_storage_cache", "crash_reports"]:
if ns.name in [
"cloud_storage_cache", "crash_reports", "datalake_staging"
]:
continue
ns_output = {}
for topic in safe_listdir(ns):
Expand Down
6 changes: 3 additions & 3 deletions tests/rptest/tests/datalake/compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(self, test_ctx, *args, **kwargs):
"iceberg_enabled": "true",
"iceberg_catalog_commit_interval_ms": 5000,
"datalake_coordinator_snapshot_max_delay_secs": 10,
"log_compaction_interval_ms": 2000
"log_compaction_interval_ms": 5000
},
*args,
**kwargs)
Expand All @@ -61,12 +61,12 @@ def partition_segments(self) -> int:
def wait_until_segment_count(self, count):
wait_until(
lambda: self.partition_segments() == count,
timeout_sec=30,
timeout_sec=120,
backoff_sec=3,
err_msg=f"Timed out waiting for segment count to reach {count}")

def produce_until_segment_count(self, count):
timeout_sec = 30
timeout_sec = 60
deadline = time() + timeout_sec
while True:
current_segment_count = self.partition_segments()
Expand Down

0 comments on commit c8306cf

Please sign in to comment.