Skip to content

[cmd] Check services are up when the platform starts #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
from __future__ import annotations

import os
import time
import typing

import click
import django.core
import django.core.wsgi
import django_rq
import redis.exceptions

from django.conf import settings
from django.db import connections, OperationalError

if typing.TYPE_CHECKING:
from click import Context


DEFAULT_BACKOFF_MAX = 120
DEFAULT_MAX_RETRIES = 10


@click.group()
@click.pass_context
def run(ctx: Context):
Expand Down Expand Up @@ -60,6 +68,8 @@ def server(ctx: Context, devel: bool, clear_tasks: bool):
should be run with a reverse proxy. If you activate the '--dev' flag,
a HTTP server will be run instead.
"""
wait_for_services(wait_archivist_storage=False)

env = os.environ

env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
Expand Down Expand Up @@ -114,6 +124,8 @@ def eventizers(workers: int):
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
in the configuration file.
"""
wait_for_services(wait_archivist_storage=False)

django.core.management.call_command(
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
num_workers=workers
Expand All @@ -137,6 +149,8 @@ def archivists(workers: int):
Workers get jobs from the GRIMOIRELAB_Q_ARCHIVIST_JOBS queue defined
in the configuration file.
"""
wait_for_services()

django.core.management.call_command(
'rqworker-pool', settings.GRIMOIRELAB_Q_ARCHIVIST_JOBS,
num_workers=workers
Expand Down Expand Up @@ -191,3 +205,80 @@ def create_background_tasks(clear_tasks: bool):
elif workers < current:
tasks = StorageTask.objects.all()[workers:]
tasks.update(burst=True)


def wait_for_services(
wait_database: bool = True,
wait_redis: bool = True,
wait_archivist_storage: bool = True
):
"""Wait for services to be available before starting"""

if wait_database:
wait_database_ready()

if wait_redis:
wait_redis_ready()

if wait_archivist_storage:
wait_archivist_storage_ready()


def _sleep_backoff(attempt: int) -> None:
"""Sleep with exponential backoff"""

backoff = min(DEFAULT_BACKOFF_MAX, 2 ** attempt)
time.sleep(backoff)


def wait_database_ready():
"""Wait for the database to be available before starting"""

for attempt in range(DEFAULT_MAX_RETRIES):
try:
db_conn = connections['default']
if db_conn:
db_conn.cursor()
db_conn.close()
break
except OperationalError as e:
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Database connection not ready {e.__cause__}")
_sleep_backoff(attempt)
else:
click.echo("Failed to connect to the database")
exit(1)


def wait_redis_ready():

for attempt in range(DEFAULT_MAX_RETRIES):
try:
redis_conn = django_rq.get_connection(settings.GRIMOIRELAB_Q_EVENTIZER_JOBS)
redis_conn.ping()
break
except redis.exceptions.ConnectionError as e:
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Redis connection not ready {e.__cause__}")
_sleep_backoff(attempt)
else:
click.echo("Failed to connect to Redis server")
exit(1)


def wait_archivist_storage_ready():
"""Wait for the storage to be available before starting"""

for attempt in range(DEFAULT_MAX_RETRIES):
from grimoirelab.core.scheduler.tasks.archivist import get_storage_backend
Storage = get_storage_backend(settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE'])
storage = Storage(url=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL'],
db_name=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX'],
verify_certs=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT'])

if storage.ping():
break
else:
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Storage connection not ready")
_sleep_backoff(attempt)
else:
click.echo("Failed to connect to archivist storage")
exit(1)
30 changes: 29 additions & 1 deletion src/grimoirelab/core/scheduler/tasks/archivist.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import json
import logging
import typing

import requests
import urllib3

import redis
Expand Down Expand Up @@ -79,6 +81,7 @@ def archivist_job(
storage = Storage(url=storage_url,
db_name=storage_db_name,
verify_certs=storage_verify_certs)
storage.initialize()
events = events_consumer(rq_job.connection,
consumer_name,
events_queue,
Expand Down Expand Up @@ -277,6 +280,11 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None:
self.db_name = db_name
self.verify_certs = verify_certs

def initialize(self) -> None:
"""Initialize the storage backend."""

pass

def store(self, data: dict[str, Any]) -> int:
"""Store data in the storage backend.

Expand All @@ -291,6 +299,11 @@ def close(self) -> None:

pass

def ping(self) -> bool:
"""Check if the storage backend is up and running."""

raise NotImplementedError


def get_storage_backend(storage_type: str) -> typing.Type[StorageBackend]:
"""Get the storage backend based on the type.
Expand Down Expand Up @@ -368,9 +381,13 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

self.client = OpenSearch([url], verify_certs=self.verify_certs)
self._create_index(db_name)
self.max_items_bulk = 100

def initialize(self) -> None:
"""Initialize the OpenSearch instance."""

self._create_index(self.db_name)

def _create_index(self, index_name: str) -> None:
"""Create an index in the OpenSearch instance.

Expand Down Expand Up @@ -431,3 +448,14 @@ def store(self, events: iter) -> None:
new_items += self._bulk(body=bulk_json, index=self.db_name)

return new_items

def ping(self) -> bool:
"""Check if the OpenSearch instance is up and running."""

try:
r = requests.get(self.url,
verify=self.verify_certs)
r.raise_for_status()
return True
except (requests.exceptions.ConnectionError, requests.HTTPError):
return False