Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ LDAP_GROUP_PROFILE_MEMBER_ATTR=uniqueMember
# CELERY__HARVESTER_MAX_MEMORY_PER_CHILD="500000"
# CELERY__HARVESTER_LOG_FILE="/var/log/celery_harvester.log"

# Monitoring configuration
HARVESTING_MONITOR_ENABLED=True
HARVESTING_MONITOR_DELAY=60


# PostgreSQL
POSTGRESQL_MAX_CONNECTIONS=200
Expand Down
4 changes: 4 additions & 0 deletions .env_dev
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
ASYNC_SIGNALS=False

# Harvesting Monitoring configuration
HARVESTING_MONITOR_ENABLED=False
HARVESTING_MONITOR_DELAY=60

SITEURL=http://localhost:8000/

ALLOWED_HOSTS="['django', '*']"
Expand Down
4 changes: 4 additions & 0 deletions .env_local
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
ASYNC_SIGNALS=False

# Monitoring configuration
HARVESTING_MONITOR_ENABLED=False
HARVESTING_MONITOR_DELAY=60

SITEURL=http://localhost:8000/

ALLOWED_HOSTS="['django', '*']"
Expand Down
4 changes: 4 additions & 0 deletions .env_test
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ BROKER_URL=redis://localhost:6379/0
CELERY_BEAT_SCHEDULER=celery.beat:PersistentScheduler
ASYNC_SIGNALS=False

# Monitoring configuration
HARVESTING_MONITOR_ENABLED=False
HARVESTING_MONITOR_DELAY=60

SITEURL=http://localhost:8000/

ALLOWED_HOSTS="['django', 'localhost', '127.0.0.1']"
Expand Down
77 changes: 74 additions & 3 deletions geonode/harvesting/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import math
import logging
import typing
from datetime import timedelta

from celery import chord
from django.core.exceptions import ValidationError
Expand Down Expand Up @@ -128,6 +129,57 @@ def harvesting_dispatcher(self, harvesting_session_id: int):
)


@app.task(
bind=True,
# We define the geonode queue so that the main worker runs this task
queue="geonode",
time_limit=600,
acks_late=False,
ignore_result=False,
)
def harvesting_session_monitor(
self, harvesting_session_id: int, workflow_time: int, delay: int = 30, execution_id: str = None
):
"""
Task to monitor a harvesting session and call finalizer if it gets stuck.

:param harvesting_session_id: ID of AsynchronousHarvestingSession
:param workflow_time: Expected workflow duration in seconds
:param delay: Delay in seconds before re-checking if session is still ongoing
"""
try:
session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)

if session.status not in [session.STATUS_ON_GOING, session.STATUS_ABORTING]:
logger.info(f"Session {harvesting_session_id} is not ongoing. Harvesting session monitor task exiting.")
return

now_ = timezone.now()
expected_finish = session.started + timedelta(seconds=workflow_time)

if now_ > expected_finish:
logger.warning(f"Session {harvesting_session_id} appears stuck. Running finalizer.")
# Call your finalizer directly
_finish_harvesting.s(harvesting_session_id, execution_id, force_failure=True).on_error(
_handle_harvesting_error.s(harvesting_session_id=harvesting_session_id)
).apply_async()
else:
logger.debug(
f"Session {harvesting_session_id} still ongoing. Rescheduling Harvesting session monitor in {delay}s."
)
# Reschedule itself
harvesting_session_monitor.apply_async(
args=(harvesting_session_id, workflow_time, delay),
kwargs={"execution_id": execution_id},
countdown=delay,
)

except models.AsynchronousHarvestingSession.DoesNotExist:
logger.warning(f"Session {harvesting_session_id} does not exist. Harvesting session monitor exiting.")
except Exception as exc:
logger.exception(f"Harvesting session monitor failed for session {harvesting_session_id}: {exc}")


@app.task(
bind=True,
queue="harvesting",
Expand All @@ -143,6 +195,7 @@ def harvest_resources(
):
"""Harvest a list of remote resources that all belong to the same harvester."""
session = models.AsynchronousHarvestingSession.objects.get(pk=harvesting_session_id)

if session.status == session.STATUS_ABORTED:
logger.debug("Session has been aborted, skipping...")
return
Expand Down Expand Up @@ -192,6 +245,21 @@ def harvest_resources(
# Request ID, which will be passed to the _harvest_resource sub-tasks
execution_id = str(exec_request.exec_id)

# Call the harvesting session monitor
if settings.HARVESTING_MONITOR_ENABLED:
workflow_time = calculate_dynamic_expiration(len(harvestable_resource_ids), buffer_time=1200)
monitor_delay = getattr(settings, "HARVESTING_MONITOR_DELAY", 60)

logger.debug(
f"Starting harvesting session monitor for session {harvesting_session_id} "
f"with workflow_time={workflow_time}s, delay={monitor_delay}s"
)
harvesting_session_monitor.apply_async(
args=(harvesting_session_id, workflow_time, monitor_delay),
kwargs={"execution_id": execution_id},
countdown=0,
)

if len(harvestable_resource_ids) <= harvestable_resources_limit:
# No chunking, just one chord for all resources
resource_tasks = [
Expand Down Expand Up @@ -375,7 +443,7 @@ def _harvest_resource(self, harvestable_resource_id: int, harvesting_session_id:
acks_late=False,
ignore_result=False,
)
def _finish_harvesting(self, harvesting_session_id: int, execution_id: str):
def _finish_harvesting(self, harvesting_session_id: int, execution_id: str, force_failure=False):
"""
Finalize the harvesting session by marking it as completed and updating
the ExecutionRequest status and log.
Expand All @@ -399,7 +467,10 @@ def _finish_harvesting(self, harvesting_session_id: int, execution_id: str):
# we exclude the faiures with status: skipped
failed_tasks_count = len(failures)

if session.status == session.STATUS_ABORTING:
if force_failure:
final_status = session.STATUS_FINISHED_SOME_FAILED
message = "Finalized with forced failure."
elif session.status == session.STATUS_ABORTING:
message = "Harvesting session aborted by user"
final_status = session.STATUS_ABORTED
elif failed_tasks_count > 0:
Expand Down Expand Up @@ -822,7 +893,7 @@ def update_asynchronous_session(
def calculate_dynamic_expiration(
num_resources: int,
estimated_duration_per_resource: int = 20,
buffer_time: int = 300,
buffer_time: int = 600,
) -> int:
"""
Calculate a dynamic expiration time (in seconds)
Expand Down
155 changes: 131 additions & 24 deletions geonode/harvesting/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#########################################################################
from unittest import mock
import uuid
from datetime import datetime, timedelta

from django.contrib.auth import get_user_model
from django.test import override_settings
Expand Down Expand Up @@ -265,68 +266,99 @@ def test_harvesting_scheduler(self):
mock_harvester.initiate_perform_harvesting.assert_called()

@mock.patch("geonode.harvesting.tasks.transaction.on_commit")
@mock.patch("geonode.harvesting.tasks.models")
def test_harvest_resources_with_chunks(
self,
mock_models,
mock_on_commit,
):
mock_session = mock.MagicMock()
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_harvest_resources_with_chunks(self, mock_get, mock_on_commit):
mock_session = mock.MagicMock(
spec=[
"status",
"STATUS_ON_GOING",
"STATUS_ABORTING",
"STATUS_ABORTED",
"harvester",
"started_at",
"save",
"id",
]
)
mock_session.STATUS_ON_GOING = "ON_GOING"
mock_session.STATUS_ABORTING = "ABORTING"
mock_session.STATUS_ABORTED = "ABORTED"
mock_session.status = mock_session.STATUS_ON_GOING
mock_session.harvester = mock.MagicMock()
mock_session.harvester.update_availability.return_value = True
mock_models.AsynchronousHarvestingSession.objects.get.return_value = mock_session
mock_session.started_at = datetime.now()
mock_session.id = 123

mock_get.side_effect = lambda *_, **__: mock_session

harvestable_resource_ids = list(range(5))

with mock.patch("geonode.harvesting.tasks.queue_next_chunk_batch.apply_async") as mock_apply_async:
with override_settings(CHUNK_SIZE=2, MAX_PARALLEL_QUEUE_CHUNKS=2):
tasks.harvest_resources(harvestable_resource_ids, self.harvesting_session.id)
tasks.harvest_resources(harvestable_resource_ids, mock_session.id)

# Simulate transaction.on_commit callback being run
assert mock_on_commit.called
callback = mock_on_commit.call_args[0][0]
callback()

# Now apply_async should have been called
mock_apply_async.assert_called_once()
_, kwargs = mock_apply_async.call_args

expected_expires = 5 * 20 + 300 # 5 resources * 20 + 300 buffer = 400
expected_time_limit = 2 * 2 * 20 + 300 # CHUNK_SIZE * MAX_PARALLEL_QUEUE_CHUNKS * 20 + 300 = 380
expected_expires = 5 * 20 + 600 # adjust to match production logic
expected_time_limit = 2 * 2 * 20 + 300

self.assertEqual(kwargs["expires"], expected_expires)
self.assertEqual(kwargs["time_limit"], expected_time_limit)

@mock.patch("geonode.harvesting.tasks.transaction.on_commit")
@mock.patch("geonode.harvesting.tasks.models")
def test_harvest_resources_without_chunks(self, mock_models, mock_on_commit):
mock_session = mock.MagicMock()
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_harvest_resources_without_chunks(self, mock_get, mock_on_commit):
# Mock session with explicit spec and STATUS constants
mock_session = mock.MagicMock(
spec=[
"status",
"STATUS_ON_GOING",
"STATUS_ABORTING",
"STATUS_ABORTED",
"harvester",
"started_at",
"save",
"id",
]
)
mock_session.STATUS_ON_GOING = "ON_GOING"
mock_session.STATUS_ABORTING = "ABORTING"
mock_session.STATUS_ABORTED = "ABORTED"
mock_session.status = mock_session.STATUS_ON_GOING
mock_session.harvester = mock.MagicMock()
mock_session.harvester.update_availability.return_value = True
mock_models.AsynchronousHarvestingSession.objects.get.return_value = mock_session
mock_session.started_at = datetime.now()
mock_session.id = 456
mock_session.save.return_value = None

mock_get.side_effect = lambda *_, **__: mock_session

harvestable_resource_ids = list(range(5))

with mock.patch("geonode.harvesting.tasks.chord") as mock_chord:
mock_workflow = mock.MagicMock()
mock_chord.return_value = mock_workflow

# Large CHUNK_SIZE disables chunking logic
with override_settings(CHUNK_SIZE=100, MAX_PARALLEL_QUEUE_CHUNKS=2):
tasks.harvest_resources(harvestable_resource_ids, self.harvesting_session.id)
tasks.harvest_resources(harvestable_resource_ids, mock_session.id)

# Check that transaction.on_commit was called
# Simulate post-commit callback
self.assertTrue(mock_on_commit.called)
callback = mock_on_commit.call_args[0][0]

# Simulate the transaction commit
callback()

# Check that chord was built with correct number of subtasks
# Validate that chord was built correctly
self.assertTrue(mock_chord.called)
subtasks = mock_chord.call_args[0][0] # This is the list of resource tasks
subtasks = mock_chord.call_args[0][0]
self.assertEqual(len(subtasks), len(harvestable_resource_ids))

# Check that apply_async was called on the workflow
# Verify final workflow trigger
mock_workflow.apply_async.assert_called_once()

@mock.patch("geonode.harvesting.tasks.logger")
Expand Down Expand Up @@ -592,3 +624,78 @@ def test_finish_harvesting_some_tasks_failed(self, mock_get_session, mock_get_ex
updated_log = mock_exec_req.log
assert "Harvesting completed with errors" in updated_log
assert mock_exec_req.status == ExecutionRequest.STATUS_FINISHED

@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_monitor_exits_when_not_ongoing(self, mock_get):
mock_session = mock.MagicMock()
mock_session.status = "FINISHED"
mock_session.STATUS_ON_GOING = "ON_GOING"
mock_session.STATUS_ABORTING = "ABORTING"
mock_get.return_value = mock_session

tasks.harvesting_session_monitor(1, 60)
mock_get.assert_called_once()

@mock.patch("geonode.harvesting.tasks.harvesting_session_monitor.apply_async")
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_monitor_reschedules_itself(self, mock_get, mock_apply_async):
current_time = now()
mock_session = mock.MagicMock()
mock_session.status = "ON_GOING"
mock_session.STATUS_ON_GOING = "ON_GOING"
mock_session.STATUS_ABORTING = "ABORTING"
mock_session.started = current_time
mock_get.return_value = mock_session

tasks.harvesting_session_monitor(1, 3600, delay=5)
mock_apply_async.assert_called_once()

@mock.patch("geonode.harvesting.tasks._finish_harvesting.apply_async")
@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_monitor_triggers_finalizer_if_stuck(self, mock_get, mock_apply_async):
mock_session = mock.MagicMock()
mock_session.status = "ON_GOING"
mock_session.STATUS_ON_GOING = "ON_GOING"
mock_session.STATUS_ABORTING = "ABORTING"
mock_session.started = now() - timedelta(hours=1)
mock_get.return_value = mock_session

# Call your monitor
tasks.harvesting_session_monitor(harvesting_session_id=1, workflow_time=60, delay=0)

# Check that apply_async was called with correct args
mock_apply_async.assert_called_once()
args, kwargs = mock_apply_async.call_args

@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_monitor_session_does_not_exist(self, mock_get):
mock_get.side_effect = models.AsynchronousHarvestingSession.DoesNotExist

# Capture logs from the correct logger
with self.assertLogs("geonode.harvesting.tasks", level="WARNING") as log_cm:
tasks.harvesting_session_monitor(999, 60)

# Check that the warning about non-existing session was logged
self.assertTrue(
any(
"Session 999 does not exist. Harvesting session monitor exiting." in message
for message in log_cm.output
)
)

# Ensure the get() was called
mock_get.assert_called_once_with(pk=999)

@mock.patch("geonode.harvesting.tasks.models.AsynchronousHarvestingSession.objects.get")
def test_monitor_handles_exception(self, mock_get):
# Force an exception
mock_get.side_effect = RuntimeError("boom")

# Capture logs from the correct logger
with self.assertLogs("geonode.harvesting.tasks", level="ERROR") as log_cm:
tasks.harvesting_session_monitor(1, 60)

# Check that the exception was logged
self.assertTrue(
any("Harvesting session monitor failed for session 1: boom" in message for message in log_cm.output)
)
2 changes: 2 additions & 0 deletions geonode/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,8 @@ def get_geonode_catalogue_service():
# in the queue in case of harvesting hundreds of resources
CHUNK_SIZE = os.environ.get("CHUNK_SIZE", 100)
MAX_PARALLEL_QUEUE_CHUNKS = os.environ.get("MAX_PARALLEL_QUEUE_CHUNKS", 2)
HARVESTING_MONITOR_ENABLED = ast.literal_eval(os.environ.get("HARVESTING_MONITOR_ENABLED", "True"))
HARVESTING_MONITOR_DELAY = int(os.environ.get("HARVESTING_MONITOR_DELAY", 60))

# Set Tasks Queues
# CELERY_TASK_DEFAULT_QUEUE = "default"
Expand Down
Loading