Skip to content

Commit 95f8edd

Browse files
committed
[cmd] Check services are up when the platform starts
Server, producer and consumers wait until the required services are up before starting. If the services are not available after 10 retries, the service fails to start. Signed-off-by: Jose Javier Merchante <[email protected]>
1 parent 5323462 commit 95f8edd

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed

src/grimoirelab/core/runner/commands/run.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,26 @@
1919
from __future__ import annotations
2020

2121
import os
22+
import time
2223
import typing
2324

2425
import click
2526
import django.core
2627
import django.core.wsgi
28+
import django_rq
29+
import redis.exceptions
2730

2831
from django.conf import settings
32+
from django.db import connections, OperationalError
2933

3034
if typing.TYPE_CHECKING:
3135
from click import Context
3236

3337

38+
DEFAULT_BACKOFF_MAX = 120
39+
DEFAULT_MAX_RETRIES = 10
40+
41+
3442
@click.group()
3543
@click.pass_context
3644
def run(ctx: Context):
@@ -60,6 +68,8 @@ def server(ctx: Context, devel: bool, clear_tasks: bool):
6068
should be run with a reverse proxy. If you activate the '--dev' flag,
6169
a HTTP server will be run instead.
6270
"""
71+
wait_for_services(wait_archivist_storage=False)
72+
6373
env = os.environ
6474

6575
env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
@@ -114,6 +124,8 @@ def eventizers(workers: int):
114124
Workers get jobs from the GRIMOIRELAB_Q_EVENTIZER_JOBS queue defined
115125
in the configuration file.
116126
"""
127+
wait_for_services(wait_archivist_storage=False)
128+
117129
django.core.management.call_command(
118130
'rqworker-pool', settings.GRIMOIRELAB_Q_EVENTIZER_JOBS,
119131
num_workers=workers
@@ -137,6 +149,8 @@ def archivists(workers: int):
137149
Workers get jobs from the GRIMOIRELAB_Q_ARCHIVIST_JOBS queue defined
138150
in the configuration file.
139151
"""
152+
wait_for_services()
153+
140154
django.core.management.call_command(
141155
'rqworker-pool', settings.GRIMOIRELAB_Q_ARCHIVIST_JOBS,
142156
num_workers=workers
@@ -191,3 +205,80 @@ def create_background_tasks(clear_tasks: bool):
191205
elif workers < current:
192206
tasks = StorageTask.objects.all()[workers:]
193207
tasks.update(burst=True)
208+
209+
210+
def wait_for_services(
211+
wait_database: bool = True,
212+
wait_redis: bool = True,
213+
wait_archivist_storage: bool = True
214+
):
215+
"""Wait for services to be available before starting"""
216+
217+
if wait_database:
218+
wait_database_ready()
219+
220+
if wait_redis:
221+
wait_redis_ready()
222+
223+
if wait_archivist_storage:
224+
wait_archivist_storage_ready()
225+
226+
227+
def _sleep_backoff(attempt: int) -> None:
228+
"""Sleep with exponential backoff"""
229+
230+
backoff = min(DEFAULT_BACKOFF_MAX, 2 ** attempt)
231+
time.sleep(backoff)
232+
233+
234+
def wait_database_ready():
235+
"""Wait for the database to be available before starting"""
236+
237+
for attempt in range(DEFAULT_MAX_RETRIES):
238+
try:
239+
db_conn = connections['default']
240+
if db_conn:
241+
db_conn.cursor()
242+
db_conn.close()
243+
break
244+
except OperationalError as e:
245+
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Database connection not ready {e.__cause__}")
246+
_sleep_backoff(attempt)
247+
else:
248+
click.echo("Failed to connect to the database")
249+
exit(1)
250+
251+
252+
def wait_redis_ready():
253+
254+
for attempt in range(DEFAULT_MAX_RETRIES):
255+
try:
256+
redis_conn = django_rq.get_connection(settings.GRIMOIRELAB_Q_EVENTIZER_JOBS)
257+
redis_conn.ping()
258+
break
259+
except redis.exceptions.ConnectionError as e:
260+
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Redis connection not ready {e.__cause__}")
261+
_sleep_backoff(attempt)
262+
else:
263+
click.echo("Failed to connect to Redis server")
264+
exit(1)
265+
266+
267+
def wait_archivist_storage_ready():
268+
"""Wait for the storage to be available before starting"""
269+
270+
for attempt in range(DEFAULT_MAX_RETRIES):
271+
from grimoirelab.core.scheduler.tasks.archivist import get_storage_backend
272+
Storage = get_storage_backend(settings.GRIMOIRELAB_ARCHIVIST['STORAGE_TYPE'])
273+
storage = Storage(url=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL'],
274+
db_name=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_INDEX'],
275+
verify_certs=settings.GRIMOIRELAB_ARCHIVIST['STORAGE_VERIFY_CERT'])
276+
277+
if storage.ping():
278+
break
279+
else:
280+
click.echo(f"[{attempt}/{DEFAULT_MAX_RETRIES}] Storage connection not ready")
281+
_sleep_backoff(attempt)
282+
else:
283+
click.echo("Failed to connect to archivist storage")
284+
exit(1)

src/grimoirelab/core/scheduler/tasks/archivist.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import json
2323
import logging
2424
import typing
25+
26+
import requests
2527
import urllib3
2628

2729
import redis
@@ -79,6 +81,7 @@ def archivist_job(
7981
storage = Storage(url=storage_url,
8082
db_name=storage_db_name,
8183
verify_certs=storage_verify_certs)
84+
storage.initialize()
8285
events = events_consumer(rq_job.connection,
8386
consumer_name,
8487
events_queue,
@@ -277,6 +280,11 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None:
277280
self.db_name = db_name
278281
self.verify_certs = verify_certs
279282

283+
def initialize(self) -> None:
284+
"""Initialize the storage backend."""
285+
286+
pass
287+
280288
def store(self, data: dict[str, Any]) -> int:
281289
"""Store data in the storage backend.
282290
@@ -291,6 +299,11 @@ def close(self) -> None:
291299

292300
pass
293301

302+
def ping(self) -> bool:
303+
"""Check if the storage backend is up and running."""
304+
305+
raise NotImplementedError
306+
294307

295308
def get_storage_backend(storage_type: str) -> typing.Type[StorageBackend]:
296309
"""Get the storage backend based on the type.
@@ -368,9 +381,13 @@ def __init__(self, url: str, db_name: str, verify_certs: bool = False) -> None:
368381
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
369382

370383
self.client = OpenSearch([url], verify_certs=self.verify_certs)
371-
self._create_index(db_name)
372384
self.max_items_bulk = 100
373385

386+
def initialize(self) -> None:
387+
"""Initialize the OpenSearch instance."""
388+
389+
self._create_index(self.db_name)
390+
374391
def _create_index(self, index_name: str) -> None:
375392
"""Create an index in the OpenSearch instance.
376393
@@ -431,3 +448,14 @@ def store(self, events: iter) -> None:
431448
new_items += self._bulk(body=bulk_json, index=self.db_name)
432449

433450
return new_items
451+
452+
def ping(self) -> bool:
453+
"""Check if the OpenSearch instance is up and running."""
454+
455+
try:
456+
r = requests.get(self.url,
457+
verify=self.verify_certs)
458+
r.raise_for_status()
459+
return True
460+
except (requests.exceptions.ConnectionError, requests.HTTPError):
461+
return False

0 commit comments

Comments
 (0)