Skip to content

Commit a664cee

Browse files
authored
[Fixes #13627] Create a fallback task in order to monitor the session's health (#13628)
* call the monitoring task after defining the session as on-going * adding force_failure flag to the finalizer in case it is called by the harvesting monitor
1 parent c9e1bcf commit a664cee

File tree

7 files changed

+223
-27
lines changed

7 files changed

+223
-27
lines changed

.env.sample

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ LDAP_GROUP_PROFILE_MEMBER_ATTR=uniqueMember
238238
# CELERY__HARVESTER_MAX_MEMORY_PER_CHILD="500000"
239239
# CELERY__HARVESTER_LOG_FILE="/var/log/celery_harvester.log"
240240

241+
# Monitoring configuration
242+
HARVESTING_MONITOR_ENABLED=True
243+
HARVESTING_MONITOR_DELAY=60
244+
241245

242246
# PostgreSQL
243247
POSTGRESQL_MAX_CONNECTIONS=200

.env_dev

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
4141
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
4242
ASYNC_SIGNALS=False
4343

44+
# Harvesting Monitoring configuration
45+
HARVESTING_MONITOR_ENABLED=False
46+
HARVESTING_MONITOR_DELAY=60
47+
4448
SITEURL=http://localhost:8000/
4549

4650
ALLOWED_HOSTS="['django', '*']"

.env_local

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
4141
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
4242
ASYNC_SIGNALS=False
4343

44+
# Monitoring configuration
45+
HARVESTING_MONITOR_ENABLED=False
46+
HARVESTING_MONITOR_DELAY=60
47+
4448
SITEURL=http://localhost:8000/
4549

4650
ALLOWED_HOSTS="['django', '*']"

.env_test

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
4141
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
4242
ASYNC_SIGNALS=False
4343

44+
# Monitoring configuration
45+
HARVESTING_MONITOR_ENABLED=False
46+
HARVESTING_MONITOR_DELAY=60
47+
4448
SITEURL=http://localhost:8000/
4549

4650
ALLOWED_HOSTS="['django', 'localhost', '127.0.0.1']"

geonode/harvesting/tasks.py

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import math
2121
import logging
2222
import typing
23+
from datetime import timedelta
2324

2425
from celery import chord
2526
from django.core.exceptions import ValidationError
@@ -128,6 +129,57 @@ def harvesting_dispatcher(self, harvesting_session_id: int):
128129
)
129130

130131

132+
@app.task(
133+
bind=True,
134+
# We define the geonode queue so that the main worker runs this task
135+
queue="geonode",
136+
time_limit=600,
137+
acks_late=False,
138+
ignore_result=False,
139+
)
140+
def harvesting_session_monitor(
141+
self, harvesting_session_id: int, workflow_time: int, delay: int = 30, execution_id: str = None
142+
):
143+
"""
144+
Task to monitor a harvesting session and call finalizer if it gets stuck.
145+
146+
:param harvesting_session_id: ID of AsynchronousHarvestingSession
147+
:param workflow_time: Expected workflow duration in seconds
148+
:param delay: Delay in seconds before re-checking if session is still ongoing
149+
"""
150+
try:
151+
session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
152+
153+
if session.status not in [session.STATUS_ON_GOING, session.STATUS_ABORTING]:
154+
logger.info(f"Session {harvesting_session_id} is not ongoing. Harvesting session monitor task exiting.")
155+
return
156+
157+
now_ = timezone.now()
158+
expected_finish = session.started + timedelta(seconds=workflow_time)
159+
160+
if now_ > expected_finish:
161+
logger.warning(f"Session {harvesting_session_id} appears stuck. Running finalizer.")
162+
# Call your finalizer directly
163+
_finish_harvesting.s(harvesting_session_id, execution_id, force_failure=True).on_error(
164+
_handle_harvesting_error.s(harvesting_session_id=harvesting_session_id)
165+
).apply_async()
166+
else:
167+
logger.debug(
168+
f"Session {harvesting_session_id} still ongoing. Rescheduling Harvesting session monitor in {delay}s."
169+
)
170+
# Reschedule itself
171+
harvesting_session_monitor.apply_async(
172+
args=(harvesting_session_id, workflow_time, delay),
173+
kwargs={"execution_id": execution_id},
174+
countdown=delay,
175+
)
176+
177+
except models.AsynchronousHarvestingSession.DoesNotExist:
178+
logger.warning(f"Session {harvesting_session_id} does not exist. Harvesting session monitor exiting.")
179+
except Exception as exc:
180+
logger.exception(f"Harvesting session monitor failed for session {harvesting_session_id}: {exc}")
181+
182+
131183
@app.task(
132184
bind=True,
133185
queue="harvesting",
@@ -143,6 +195,7 @@ def harvest_resources(
143195
):
144196
"""Harvest a list of remote resources that all belong to the same harvester."""
145197
session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)
198+
146199
if session.status == session.STATUS_ABORTED:
147200
logger.debug("Session has been aborted, skipping...")
148201
return
@@ -192,6 +245,21 @@ def harvest_resources(
192245
# Request ID, which will be passed to the _harvest_resource sub-tasks
193246
execution_id = str(exec_request.exec_id)
194247

248+
# Call the harvesting session monitor
249+
if settings.HARVESTING_MONITOR_ENABLED:
250+
workflow_time = calculate_dynamic_expiration(len(harvestable_resource_ids), buffer_time=1200)
251+
monitor_delay = getattr(settings, "HARVESTING_MONITOR_DELAY", 60)
252+
253+
logger.debug(
254+
f"Starting harvesting session monitor for session {harvesting_session_id} "
255+
f"with workflow_time={workflow_time}s, delay={monitor_delay}s"
256+
)
257+
harvesting_session_monitor.apply_async(
258+
args=(harvesting_session_id, workflow_time, monitor_delay),
259+
kwargs={"execution_id": execution_id},
260+
countdown=0,
261+
)
262+
195263
if len(harvestable_resource_ids) <= harvestable_resources_limit:
196264
# No chunking, just one chord for all resources
197265
resource_tasks = [
@@ -375,7 +443,7 @@ def _harvest_resource(self, harvestable_resource_id: int, harvesting_session_id:
375443
acks_late=False,
376444
ignore_result=False,
377445
)
378-
def _finish_harvesting(self, harvesting_session_id: int, execution_id: str):
446+
def _finish_harvesting(self, harvesting_session_id: int, execution_id: str, force_failure=False):
379447
"""
380448
Finalize the harvesting session by marking it as completed and updating
381449
the ExecutionRequest status and log.
@@ -399,7 +467,10 @@ def _finish_harvesting(self, harvesting_session_id: int, execution_id: str):
399467
# we exclude the faiures with status: skipped
400468
failed_tasks_count = len(failures)
401469

402-
if session.status == session.STATUS_ABORTING:
470+
if force_failure:
471+
final_status = session.STATUS_FINISHED_SOME_FAILED
472+
message = "Finalized with forced failure."
473+
elif session.status == session.STATUS_ABORTING:
403474
message = "Harvesting session aborted by user"
404475
final_status = session.STATUS_ABORTED
405476
elif failed_tasks_count > 0:
@@ -822,7 +893,7 @@ def update_asynchronous_session(
822893
def calculate_dynamic_expiration(
823894
num_resources: int,
824895
estimated_duration_per_resource: int = 20,
825-
buffer_time: int = 300,
896+
buffer_time: int = 600,
826897
) -> int:
827898
"""
828899
Calculate a dynamic expiration time (in seconds)

geonode/harvesting/tests/test_tasks.py

Lines changed: 131 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#########################################################################
1919
from unittest import mock
2020
import uuid
21+
from datetime import datetime, timedelta
2122

2223
from django.contrib.auth import get_user_model
2324
from django.test import override_settings
@@ -265,68 +266,99 @@ def test_harvesting_scheduler(self):
265266
mock_harvester.initiate_perform_harvesting.assert_called()
266267

267268
@mock.patch("geonode.harvesting.tasks.transaction.on_commit")
268-
@mock.patch("geonode.harvesting.tasks.models")
269-
def test_harvest_resources_with_chunks(
270-
self,
271-
mock_models,
272-
mock_on_commit,
273-
):
274-
mock_session = mock.MagicMock()
269+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
270+
def test_harvest_resources_with_chunks(self, mock_get, mock_on_commit):
271+
mock_session = mock.MagicMock(
272+
spec=[
273+
"status",
274+
"STATUS_ON_GOING",
275+
"STATUS_ABORTING",
276+
"STATUS_ABORTED",
277+
"harvester",
278+
"started_at",
279+
"save",
280+
"id",
281+
]
282+
)
283+
mock_session.STATUS_ON_GOING = "ON_GOING"
284+
mock_session.STATUS_ABORTING = "ABORTING"
285+
mock_session.STATUS_ABORTED = "ABORTED"
275286
mock_session.status = mock_session.STATUS_ON_GOING
287+
mock_session.harvester = mock.MagicMock()
276288
mock_session.harvester.update_availability.return_value = True
277-
mock_models.AsynchronousHarvestingSession.objects.get.return_value = mock_session
289+
mock_session.started_at = datetime.now()
290+
mock_session.id = 123
291+
292+
mock_get.side_effect = lambda *_, **__: mock_session
278293

279294
harvestable_resource_ids = list(range(5))
280295

281296
with mock.patch("geonode.harvesting.tasks.queue_next_chunk_batch.apply_async") as mock_apply_async:
282297
with override_settings(CHUNK_SIZE=2, MAX_PARALLEL_QUEUE_CHUNKS=2):
283-
tasks.harvest_resources(harvestable_resource_ids, self.harvesting_session.id)
298+
tasks.harvest_resources(harvestable_resource_ids, mock_session.id)
284299

285-
# Simulate transaction.on_commit callback being run
286300
assert mock_on_commit.called
287301
callback = mock_on_commit.call_args[0][0]
288302
callback()
289303

290-
# Now apply_async should have been called
291304
mock_apply_async.assert_called_once()
292305
_, kwargs = mock_apply_async.call_args
293306

294-
expected_expires = 5 * 20 + 300 # 5 resources * 20 + 300 buffer = 400
295-
expected_time_limit = 2 * 2 * 20 + 300 # CHUNK_SIZE * MAX_PARALLEL_QUEUE_CHUNKS * 20 + 300 = 380
307+
expected_expires = 5 * 20 + 600 # adjust to match production logic
308+
expected_time_limit = 2 * 2 * 20 + 300
296309

297310
self.assertEqual(kwargs["expires"], expected_expires)
298311
self.assertEqual(kwargs["time_limit"], expected_time_limit)
299312

300313
@mock.patch("geonode.harvesting.tasks.transaction.on_commit")
301-
@mock.patch("geonode.harvesting.tasks.models")
302-
def test_harvest_resources_without_chunks(self, mock_models, mock_on_commit):
303-
mock_session = mock.MagicMock()
314+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
315+
def test_harvest_resources_without_chunks(self, mock_get, mock_on_commit):
316+
# Mock session with explicit spec and STATUS constants
317+
mock_session = mock.MagicMock(
318+
spec=[
319+
"status",
320+
"STATUS_ON_GOING",
321+
"STATUS_ABORTING",
322+
"STATUS_ABORTED",
323+
"harvester",
324+
"started_at",
325+
"save",
326+
"id",
327+
]
328+
)
329+
mock_session.STATUS_ON_GOING = "ON_GOING"
330+
mock_session.STATUS_ABORTING = "ABORTING"
331+
mock_session.STATUS_ABORTED = "ABORTED"
304332
mock_session.status = mock_session.STATUS_ON_GOING
333+
mock_session.harvester = mock.MagicMock()
305334
mock_session.harvester.update_availability.return_value = True
306-
mock_models.AsynchronousHarvestingSession.objects.get.return_value = mock_session
335+
mock_session.started_at = datetime.now()
336+
mock_session.id = 456
337+
mock_session.save.return_value = None
338+
339+
mock_get.side_effect = lambda *_, **__: mock_session
307340

308341
harvestable_resource_ids = list(range(5))
309342

310343
with mock.patch("geonode.harvesting.tasks.chord") as mock_chord:
311344
mock_workflow = mock.MagicMock()
312345
mock_chord.return_value = mock_workflow
313346

347+
# Large CHUNK_SIZE disables chunking logic
314348
with override_settings(CHUNK_SIZE=100, MAX_PARALLEL_QUEUE_CHUNKS=2):
315-
tasks.harvest_resources(harvestable_resource_ids, self.harvesting_session.id)
349+
tasks.harvest_resources(harvestable_resource_ids, mock_session.id)
316350

317-
# Check that transaction.on_commit was called
351+
# Simulate post-commit callback
318352
self.assertTrue(mock_on_commit.called)
319353
callback = mock_on_commit.call_args[0][0]
320-
321-
# Simulate the transaction commit
322354
callback()
323355

324-
# Check that chord was built with correct number of subtasks
356+
# Validate that chord was built correctly
325357
self.assertTrue(mock_chord.called)
326-
subtasks = mock_chord.call_args[0][0] # This is the list of resource tasks
358+
subtasks = mock_chord.call_args[0][0]
327359
self.assertEqual(len(subtasks), len(harvestable_resource_ids))
328360

329-
# Check that apply_async was called on the workflow
361+
# Verify final workflow trigger
330362
mock_workflow.apply_async.assert_called_once()
331363

332364
@mock.patch("geonode.harvesting.tasks.logger")
@@ -592,3 +624,78 @@ def test_finish_harvesting_some_tasks_failed(self, mock_get_session, mock_get_ex
592624
updated_log = mock_exec_req.log
593625
assert "Harvesting completed with errors" in updated_log
594626
assert mock_exec_req.status == ExecutionRequest.STATUS_FINISHED
627+
628+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
629+
def test_monitor_exits_when_not_ongoing(self, mock_get):
630+
mock_session = mock.MagicMock()
631+
mock_session.status = "FINISHED"
632+
mock_session.STATUS_ON_GOING = "ON_GOING"
633+
mock_session.STATUS_ABORTING = "ABORTING"
634+
mock_get.return_value = mock_session
635+
636+
tasks.harvesting_session_monitor(1, 60)
637+
mock_get.assert_called_once()
638+
639+
@mock.patch("geonode.harvesting.tasks.harvesting_session_monitor.apply_async")
640+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
641+
def test_monitor_reschedules_itself(self, mock_get, mock_apply_async):
642+
current_time = now()
643+
mock_session = mock.MagicMock()
644+
mock_session.status = "ON_GOING"
645+
mock_session.STATUS_ON_GOING = "ON_GOING"
646+
mock_session.STATUS_ABORTING = "ABORTING"
647+
mock_session.started = current_time
648+
mock_get.return_value = mock_session
649+
650+
tasks.harvesting_session_monitor(1, 3600, delay=5)
651+
mock_apply_async.assert_called_once()
652+
653+
@mock.patch("geonode.harvesting.tasks._finish_harvesting.apply_async")
654+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
655+
def test_monitor_triggers_finalizer_if_stuck(self, mock_get, mock_apply_async):
656+
mock_session = mock.MagicMock()
657+
mock_session.status = "ON_GOING"
658+
mock_session.STATUS_ON_GOING = "ON_GOING"
659+
mock_session.STATUS_ABORTING = "ABORTING"
660+
mock_session.started = now() - timedelta(hours=1)
661+
mock_get.return_value = mock_session
662+
663+
# Call your monitor
664+
tasks.harvesting_session_monitor(harvesting_session_id=1, workflow_time=60, delay=0)
665+
666+
# Check that apply_async was called with correct args
667+
mock_apply_async.assert_called_once()
668+
args, kwargs = mock_apply_async.call_args
669+
670+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
671+
def test_monitor_session_does_not_exist(self, mock_get):
672+
mock_get.side_effect = models.AsynchronousHarvestingSession.DoesNotExist
673+
674+
# Capture logs from the correct logger
675+
with self.assertLogs("geonode.harvesting.tasks", level="WARNING") as log_cm:
676+
tasks.harvesting_session_monitor(999, 60)
677+
678+
# Check that the warning about non-existing session was logged
679+
self.assertTrue(
680+
any(
681+
"Session 999 does not exist. Harvesting session monitor exiting." in message
682+
for message in log_cm.output
683+
)
684+
)
685+
686+
# Ensure the get() was called
687+
mock_get.assert_called_once_with(pk=999)
688+
689+
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
690+
def test_monitor_handles_exception(self, mock_get):
691+
# Force an exception
692+
mock_get.side_effect = RuntimeError("boom")
693+
694+
# Capture logs from the correct logger
695+
with self.assertLogs("geonode.harvesting.tasks", level="ERROR") as log_cm:
696+
tasks.harvesting_session_monitor(1, 60)
697+
698+
# Check that the exception was logged
699+
self.assertTrue(
700+
any("Harvesting session monitor failed for session 1: boom" in message for message in log_cm.output)
701+
)

geonode/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1763,6 +1763,8 @@ def get_geonode_catalogue_service():
17631763
# in the queue in case of harvesting hundreds of resources
17641764
CHUNK_SIZE = os.environ.get("CHUNK_SIZE", 100)
17651765
MAX_PARALLEL_QUEUE_CHUNKS = os.environ.get("MAX_PARALLEL_QUEUE_CHUNKS", 2)
1766+
HARVESTING_MONITOR_ENABLED = ast.literal_eval(os.environ.get("HARVESTING_MONITOR_ENABLED", "True"))
1767+
HARVESTING_MONITOR_DELAY = int(os.environ.get("HARVESTING_MONITOR_DELAY", 60))
17661768

17671769
# Set Tasks Queues
17681770
# CELERY_TASK_DEFAULT_QUEUE = "default"

0 commit comments

Comments
 (0)