Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix iceberg data migration test timeout #25069

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 53 additions & 34 deletions tests/rptest/tests/datalake/datalake_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(self,
self._offline_mode_established = False
self._consumer_positions = None # set iff in offline mode
self._partition_hwms = None # set iff in offline mode
self._consumer_lock = threading.Lock()

self._compacted = compacted
# When consuming from a compacted topic, there may be records in the
Expand All @@ -116,25 +117,27 @@ def create_consumer(self):
return c

def update_and_get_fetch_positions(self):
if self._offline_mode_established:
assert self._consumer_positions is not None
return self._consumer_positions

with self._lock:
partitions = [
TopicPartition(topic=self.topic, partition=p)
for p in self._consumed_messages.keys()
]
positions = self._consumer.position(partitions)
for p in positions:
if p.error is not None:
self.logger.warning(
f"Error querying position for partition {p.partition}")
else:
self.logger.debug(
f"next position for {p.partition} is {p.offset}")
self._next_positions[p.partition] = p.offset
return self._next_positions.copy()
with self._consumer_lock:
if self._offline_mode_established:
assert self._consumer_positions is not None
return self._consumer_positions

with self._lock:
partitions = [
TopicPartition(topic=self.topic, partition=p)
for p in self._consumed_messages.keys()
]
positions = self._consumer.position(partitions)
for p in positions:
if p.error is not None:
self.logger.warning(
f"Error querying position for partition {p.partition}"
)
else:
self.logger.debug(
f"next position for {p.partition} is {p.offset}")
self._next_positions[p.partition] = p.offset
return self._next_positions.copy()

def partition_hwms(self) -> List[RpkPartition]:
if self._offline_mode_established:
Expand All @@ -143,24 +146,42 @@ def partition_hwms(self) -> List[RpkPartition]:
return list(self._rpk.describe_topic(self.topic))

# to be called no more than once
def go_offline(self):
def go_offline(self, timeout=60):
assert not self._offline_mode_requested.is_set()
self.logger.debug(f"offline mode requested")
self._offline_mode_requested.set()
self._consumer_stopped.wait()
# todo: send consumer thread stop, remember positions when it stopped
assert self._consumer_stopped.wait(timeout)
self.logger.debug(f"consistent state reached")
self._consumer_positions = self.update_and_get_fetch_positions()
self.logger.debug(f"remembered {self._consumer_positions=}")
self._partition_hwms = self.partition_hwms()
self.logger.debug(f"remembered {self._partition_hwms=}")
self._consumer.close()
self._consumer = None
self._offline_mode_established = True
for p in self._partition_hwms:
self.logger.debug(
f"remembered partition {p.id=} hwm={p.high_watermark}, ")
with self._consumer_lock:
self._consumer.close()
self._consumer = None
self.logger.debug(f"offline mode established")
self._offline_mode_established = True

def _consumed_till_hwm(self, update: bool):
self.logger.debug("checking _consumed_till_hwm")
if update:
self.update_and_get_fetch_positions()
for p in self.partition_hwms():
if self._next_positions[p.id] < p.high_watermark:
self.logger.debug(
f"partition {p.id} high watermark: {p.high_watermark} max offset: {self._next_positions[p.id]} has not been consumed fully"
)
return False
return True

def _consumer_thread(self):
try:
self.logger.info("Starting consumer thread")
while not self._stop.is_set() \
and not self._offline_mode_requested.is_set():
while not self._stop.is_set() and not (
self._offline_mode_requested.is_set()
and self._consumed_till_hwm(update=True)):
self._msg_semaphore.acquire()
if self._stop.is_set():
break
Expand Down Expand Up @@ -295,18 +316,14 @@ def start(self, wait_first_iceberg_msg=False):
def _all_offsets_translated(self):
partition_hwms = self.partition_hwms()
with self._lock:
if not self._consumed_till_hwm(update=False):
return False
for p in partition_hwms:
if p.id not in self._max_queried_offsets:
self.logger.debug(
f"partition {p.id} not found in max offsets: {self._max_queried_offsets}"
)
return False

if self._next_positions[p.id] < p.high_watermark:
self.logger.debug(
f"partition {p.id} high watermark: {p.high_watermark} max offset: {self._next_positions[p.id]} has not been consumed fully"
)
return False
# Ensure all the consumed messages are drained.
return all(
len(messages) == 0
Expand Down Expand Up @@ -343,10 +360,12 @@ def wait(self, progress_timeout_sec=30):
self.logger.debug(f"No errors around waiting")
except Exception as e:
self.logger.error(f"Error around waiting: {e}")
raise
finally:
self.stop()

def stop(self):
self.logger.debug("stopping")
try:
self._stop.set()
self._msg_semaphore.release()
Expand Down
15 changes: 9 additions & 6 deletions tests/rptest/tests/datalake/mount_unmount_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ def __init__(self, test_context):
redpanda=self.redpanda,
include_query_engines=[QueryEngineType.SPARK])

def avro_stream_config(self, topic, subject, cnt=3000):
def avro_stream_config(self, topic, subject, cnt=3000, interval_ms=None):
mapping = dict(
ordinal="this",
timestamp="timestamp_unix_milli()",
verifier_string="uuid_v4()",
)
return counter_stream_config(self.redpanda, topic, subject, mapping,
cnt)
cnt, interval_ms)

def setUp(self):
self.dl.setUp()
Expand All @@ -107,7 +107,8 @@ def test_simple_unmount(self, cloud_storage_type):

connect.start_stream(name="ducky_stream",
config=self.avro_stream_config(
self.TOPIC_NAME, "verifier_schema", 1000000))
self.TOPIC_NAME, "verifier_schema", 1000000,
1))
# todo make reasonable or just make sure something went through
self.redpanda.set_cluster_config({
"iceberg_catalog_commit_interval_ms":
Expand All @@ -127,8 +128,9 @@ def test_simple_unmount(self, cloud_storage_type):
# the topic goes read-only during this wait
self.wait_for_migration_states(out_migration_id, ['executed'])
connect.stop_stream("ducky_stream", should_finish=False)
time.sleep(1) # just it case: let verifier consume remaining messages
verifier.go_offline()
# Verifier consumer thread waits for query thread so that it doesn't
# have to buffer a lot of messages for comparison.
verifier.go_offline(1200)

self.admin.execute_data_migration_action(out_migration_id,
MigrationAction.finish)
Expand All @@ -155,7 +157,8 @@ def test_simple_remount(self, cloud_storage_type):
})
connect.start_stream(name="ducky_stream",
config=self.avro_stream_config(
self.TOPIC_NAME, "verifier_schema", 100000))
self.TOPIC_NAME, "verifier_schema", 100000,
1))
self.admin = Admin(self.redpanda)
ns_topic = NamespacedTopic(self.TOPIC_NAME)
self.logger.info(f"unmounting {self.TOPIC_NAME}")
Expand Down
5 changes: 3 additions & 2 deletions tests/rptest/utils/rpcn_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ def counter_stream_config(redpanda: RedpandaService,
topic: str,
subject: str,
field_to_bloblang: dict[str, str] = {},
cnt: int = 3000) -> dict:
cnt: int = 3000,
interval_ms: int | None = None) -> dict:
"""
Creates a RPCN config where the input is a simple counter, and fields are
mapped via the input mapping of bloblang functions.
Expand All @@ -39,7 +40,7 @@ def counter_stream_config(redpanda: RedpandaService,
"input": {
"generate": {
"mapping": "root = counter()",
"interval": "",
"interval": "" if interval_ms is None else f"{interval_ms}ms",
"count": cnt,
"batch_size": 1
}
Expand Down