diff --git a/src/grimoirelab/core/runner/commands/run.py b/src/grimoirelab/core/runner/commands/run.py index 7854434..05ad628 100644 --- a/src/grimoirelab/core/runner/commands/run.py +++ b/src/grimoirelab/core/runner/commands/run.py @@ -19,18 +19,26 @@ from __future__ import annotations import os +import time import typing import click import django.core import django.core.wsgi +import django_rq +import redis.exceptions from django.conf import settings +from django.db import connections, OperationalError if typing.TYPE_CHECKING: from click import Context +DEFAULT_BACKOFF_MAX = 120 +DEFAULT_MAX_RETRIES = 10 + + @click.group() @click.pass_context def run(ctx: Context): @@ -60,6 +68,8 @@ def server(ctx: Context, devel: bool, clear_tasks: bool): should be run with a reverse proxy. If you activate the '--dev' flag, a HTTP server will be run instead. """ + wait_for_services(wait_archivist_storage=False) + env = os.environ env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}" @@ -114,6 +124,8 @@ def eventizers(workers: int): Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined in the configuration file. """ + wait_for_services(wait_archivist_storage=False) + django.core.management.call_command( 'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS, num_workers=workers @@ -137,6 +149,8 @@ def archivists(workers: int): Workers get jobs from the GRIMOIRELAB_Q_ARCHIVIST_JOBS queue defined in the configuration file. """ + wait_for_services() + django.core.management.call_command( 'rqworker-pool', settings.GRIMOIRELAB_Q_ARCHIVIST_JOBS, num_workers=workers @@ -191,3 +205,80 @@ def create_background_tasks(clear_tasks: bool): elif workers < current: tasks = StorageTask.objects.all()[workers:] tasks.update(burst=True) + + +def wait_for_services( + wait_database: bool = True, + wait_redis: bool = True, + wait_archivist_storage: bool = True +): + """Wait for services to be available before starting""" + + if wait_database: + wait_database_ready() + + if wait_redis: + wait_redis_ready() + + if wait_archivist_storage: + wait_archivist_storage_ready() + + +def _sleep_backoff(attempt: int) -> None: + """Sleep with exponential backoff""" + + backoff = min(DEFAULT_BACKOFF_MAX, 2 ** attempt) + time.sleep(backoff) + + +def wait_database_ready(): + """Wait for the database to be available before starting""" + + for attempt in range(DEFAULT_MAX_RETRIES): + try: + db_conn = connections['default'] + if db_conn: + db_conn.cursor() + db_conn.close() + break + except OperationalError as e: + click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Database connection not ready {e.__cause__}") + _sleep_backoff(attempt) + else: + click.echo("Failed to connect to the database") + exit(1) + + +def wait_redis_ready(): + + for attempt in range(DEFAULT_MAX_RETRIES): + try: + redis_conn = django_rq.get_connection(settings.GRIMOIRELAB_Q_EVENTIZER_JOBS) + redis_conn.ping() + break + except redis.exceptions.ConnectionError as e: + click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Redis connection not ready {e.__cause__}") + _sleep_backoff(attempt) + else: + click.echo("Failed to connect to Redis server") + exit(1) + + +def wait_archivist_storage_ready(): + """Wait for the storage to be available before starting""" + + for attempt in range(DEFAULT_MAX_RETRIES): + from grimoirelab.core.scheduler.tasks.archivist import get_storage_backend + Storage = get_storage_backend(settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE']) + storage = Storage(url=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL'], + db_name=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX'], + verify_certs=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT']) + + if storage.ping(): + break + else: + click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Storage connection not ready") + _sleep_backoff(attempt) + else: + click.echo("Failed to connect to archivist storage") + exit(1) diff --git a/src/grimoirelab/core/scheduler/tasks/archivist.py b/src/grimoirelab/core/scheduler/tasks/archivist.py index ca15d10..57fe787 100644 --- a/src/grimoirelab/core/scheduler/tasks/archivist.py +++ b/src/grimoirelab/core/scheduler/tasks/archivist.py @@ -22,6 +22,8 @@ import json import logging import typing + +import requests import urllib3 import redis @@ -79,6 +81,7 @@ def archivist_job( storage = Storage(url=storage_url, db_name=storage_db_name, verify_certs=storage_verify_certs) + storage.initialize() events = events_consumer(rq_job.connection, consumer_name, events_queue, @@ -277,6 +280,11 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None: self.db_name = db_name self.verify_certs = verify_certs + def initialize(self) -> None: + """Initialize the storage backend.""" + + pass + def store(self, data: dict[str, Any]) -> int: """Store data in the storage backend. @@ -291,6 +299,11 @@ def close(self) -> None: pass + def ping(self) -> bool: + """Check if the storage backend is up and running.""" + + raise NotImplementedError + def get_storage_backend(storage_type: str) -> typing.Type[StorageBackend]: """Get the storage backend based on the type. @@ -368,9 +381,13 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) self.client = OpenSearch([url], verify_certs=self.verify_certs) - self._create_index(db_name) self.max_items_bulk = 100 + def initialize(self) -> None: + """Initialize the OpenSearch instance.""" + + self._create_index(self.db_name) + def _create_index(self, index_name: str) -> None: """Create an index in the OpenSearch instance. @@ -431,3 +448,14 @@ def store(self, events: iter) -> None: new_items += self._bulk(body=bulk_json, index=self.db_name) return new_items + + def ping(self) -> bool: + """Check if the OpenSearch instance is up and running.""" + + try: + r = requests.get(self.url, + verify=self.verify_certs) + r.raise_for_status() + return True + except (requests.exceptions.ConnectionError, requests.HTTPError): + return False