Skip to content

Commit f934455

Browse files
committed
Merge branch 'tickets/DM-51389'
2 parents e2e61bb + 52a8711 commit f934455

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

python/activator/activator.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@
8989
bucket_notification_kafka_offset_reset = os.environ.get("BUCKET_NOTIFICATION_KAFKA_OFFSET_RESET", "latest")
9090
# Max requests to handle before restarting. 0 means no limit.
9191
max_requests = int(os.environ.get("WORKER_RESTART_FREQ", 0))
92+
# The number of seconds to delay retrying connections to the Redis stream.
93+
redis_retry = float(os.environ.get("REDIS_RETRY_DELAY", 30))
9294

9395
# Conditionally load keda environment variables
9496
if platform == "keda":
@@ -234,7 +236,12 @@ def _make_redis_streams_client(self):
234236
redis_client : `redis.Redis`
235237
Initialized Redis client.
236238
"""
237-
return redis.Redis(host=self.host)
239+
policy = redis.retry.Retry(redis.backoff.ConstantBackoff(redis_retry),
240+
1,
241+
# Bare ConnectionError covers things like DNS problems
242+
(redis.exceptions.ConnectionError, ),
243+
)
244+
return redis.Redis(host=self.host, retry=policy)
238245

239246
@staticmethod
240247
def _close_on_error(func):

python/activator/middleware_interface.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@
8484
repo_retry = float(os.environ.get("REPO_RETRY_DELAY", 30))
8585

8686

87-
@connect.retry(2, sqlalchemy.exc.OperationalError, wait=repo_retry)
87+
# TODO: revisit which cases should be retried after DM-50934
88+
# TODO: catch ButlerConnectionError once it's available
89+
SQL_EXCEPTIONS = (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InterfaceError)
90+
DATASTORE_EXCEPTIONS = SQL_EXCEPTIONS + (botocore.exceptions.ClientError, )
91+
92+
93+
@connect.retry(2, SQL_EXCEPTIONS, wait=repo_retry)
8894
def get_central_butler(central_repo: str, instrument_class: str):
8995
"""Provide a Butler that can access the given repository and read and write
9096
data for the given instrument.
@@ -360,7 +366,7 @@ def _init_visit_definer(self):
360366
define_visits_config.groupExposures = "one-to-one"
361367
self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler)
362368

363-
@connect.retry(2, (sqlalchemy.exc.OperationalError, botocore.exceptions.ClientError), wait=repo_retry)
369+
@connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
364370
def _init_governor_datasets(self, timestamp, skymap):
365371
"""Load and store the camera and skymap for later use.
366372
@@ -537,7 +543,7 @@ def prep_butler(self) -> None:
537543
detector=self.visit.detector,
538544
group=self.visit.groupId)
539545

540-
@connect.retry(2, sqlalchemy.exc.OperationalError, wait=repo_retry)
546+
@connect.retry(2, SQL_EXCEPTIONS, wait=repo_retry)
541547
def _find_data_to_preload(self, region):
542548
"""Identify the datasets to export from the central repo.
543549
@@ -912,7 +918,7 @@ def _find_init_outputs(self):
912918
_log.debug("Found %d new init-output datasets from %s.", n_datasets, run)
913919
return datasets
914920

915-
@connect.retry(2, (sqlalchemy.exc.OperationalError, botocore.exceptions.ClientError), wait=repo_retry)
921+
@connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
916922
def _transfer_data(self, datasets, calibs):
917923
"""Transfer datasets and all associated collections from the central
918924
repo to the local repo.
@@ -1602,7 +1608,7 @@ def _get_safe_dataset_types(butler):
16021608
return [dstype.name for dstype in butler.registry.queryDatasetTypes(...)
16031609
if "detector" in dstype.dimensions]
16041610

1605-
@connect.retry(2, (sqlalchemy.exc.OperationalError, botocore.exceptions.ClientError), wait=repo_retry)
1611+
@connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
16061612
def _export_subset(self, exposure_ids: set[int],
16071613
dataset_types: typing.Any, in_collections: typing.Any) -> None:
16081614
"""Copy datasets associated with a processing run back to the

python/initializer/write_init_outputs.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@
4949
repo_retry = float(os.environ.get("REPO_RETRY_DELAY", 30))
5050

5151

52+
SQL_EXCEPTIONS = (sqlalchemy.exc.OperationalError, sqlalchemy.exc.InterfaceError)
53+
DATASTORE_EXCEPTIONS = SQL_EXCEPTIONS + (botocore.exceptions.ClientError, )
54+
55+
5256
def _config_from_yaml(yaml_string):
5357
"""Initialize a PipelinesConfig from a YAML-formatted string.
5458
@@ -79,7 +83,7 @@ def make_parser():
7983
return parser
8084

8185

82-
@connect_utils.retry(2, sqlalchemy.exc.OperationalError, wait=repo_retry)
86+
@connect_utils.retry(2, SQL_EXCEPTIONS, wait=repo_retry)
8387
def _connect_butler(repo):
8488
"""Connect to a particular repo.
8589
@@ -146,7 +150,7 @@ def _get_current_day_obs() -> str:
146150
return run_utils.get_day_obs(astropy.time.Time.now())
147151

148152

149-
@connect_utils.retry(2, (sqlalchemy.exc.OperationalError, botocore.exceptions.ClientError), wait=repo_retry)
153+
@connect_utils.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
150154
def _make_init_outputs(base_butler: lsst.daf.butler.Butler,
151155
instrument: lsst.obs.base.Instrument,
152156
apdb: str,
@@ -200,7 +204,7 @@ def _make_init_outputs(base_butler: lsst.daf.butler.Butler,
200204
return run
201205

202206

203-
@connect_utils.retry(2, sqlalchemy.exc.OperationalError, wait=repo_retry)
207+
@connect_utils.retry(2, SQL_EXCEPTIONS, wait=repo_retry)
204208
def _make_output_chain(butler: lsst.daf.butler.Butler,
205209
instrument: lsst.obs.base.Instrument,
206210
runs: collections.abc.Sequence[str],

0 commit comments

Comments
 (0)