Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 281 additions & 50 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,25 @@
from argparse import ArgumentParser, RawTextHelpFormatter
from filelock import FileLock, Timeout
from datetime import datetime
import getpass
import glob
import inspect
import json
import logging
import os
import shlex
import shutil
import socket
import sys
from time import sleep
import uuid
import dill
import yaml

from maestrowf.abstracts.enums import StudyStatus
from maestrowf.datastructures.core import Study
from maestrowf.utils import create_parentdir, csvtable_to_dict, make_safe_path
from maestrowf.utils import \
atomic_write_file, create_parentdir, csvtable_to_dict, make_safe_path

# Logger instantiation
ROOTLOGGER = logging.getLogger(inspect.getmodule(__name__))
Expand Down Expand Up @@ -143,6 +149,7 @@ class Conductor:
_cancel_lock = ".cancel.lock"
_study_update = ".study.update.lock"
_batch_info = "batch.info"
_conductor_registry = ".conductors"

def __init__(self, study):
"""
Expand All @@ -152,6 +159,7 @@ def __init__(self, study):
"""
self._study = study
self._setup = False
self._conductor_id = None

@property
def output_path(self):
Expand Down Expand Up @@ -219,6 +227,210 @@ def store_batch(cls, out_path, batch):
with open(path, "wb") as batch_info:
batch_info.write(yaml.dump(batch).encode("utf-8"))

@staticmethod
def _timestamp():
"""Return a UTC timestamp suitable for durable process records."""
return datetime.utcnow().isoformat() + "Z"

@staticmethod
def _get_host_addresses():
"""Return best-effort IP addresses for the current host."""
addresses = set()

for host in (socket.gethostname(), socket.getfqdn()):
try:
for entry in socket.getaddrinfo(host, None):
address = entry[4][0]
if address:
addresses.add(address)
except socket.error:
LOGGER.debug(
"Unable to resolve addresses for host '%s'.", host)

return sorted(addresses)

@classmethod
def _conductor_registry_path(cls, output_path):
return make_safe_path(output_path, "logs", cls._conductor_registry)

@classmethod
def _conductor_record_path(cls, output_path, conductor_id):
registry_path = cls._conductor_registry_path(output_path)
return make_safe_path(registry_path, "{}.json".format(conductor_id))

@classmethod
def _load_conductor_record(cls, output_path, conductor_id):
path = cls._conductor_record_path(output_path, conductor_id)
if not os.path.exists(path):
return {}

return cls._load_conductor_record_file(path)

@staticmethod
def _load_conductor_record_file(path, attempts=3, delay=0.1):
"""
Load a conductor JSON record, retrying transient filesystem errors.

Some networked filesystems can briefly report stale handles around
replace-heavy files. A short retry avoids treating that as a missing
conductor when readers race a heartbeat update.
"""
last_error = None
for attempt in range(attempts):
try:
with open(path, "r") as data:
return json.load(data) or {}
except (OSError, ValueError) as exc:
last_error = exc
if attempt + 1 < attempts:
sleep(delay * (attempt + 1))

raise last_error

@classmethod
def _store_conductor_record(cls, output_path, conductor_id, record):
path = cls._conductor_record_path(output_path, conductor_id)
contents = json.dumps(record, indent=2, sort_keys=True)
atomic_write_file(path, contents + "\n")

@classmethod
def _update_conductor_record(cls, output_path, conductor_id, updates):
record = cls._load_conductor_record(output_path, conductor_id)
record.update(updates)
cls._store_conductor_record(output_path, conductor_id, record)

return record

@classmethod
def get_conductors(cls, output_path):
"""
Retrieve recorded conductor processes for the study rooted at out_path.

:param output_path: A string containing the path to a study root.
:returns: A dictionary of conductor records keyed by conductor id.
"""
registry_path = cls._conductor_registry_path(output_path)
if not os.path.isdir(registry_path):
return {}

conductors = {}
for path in glob.glob(os.path.join(registry_path, "*.json")):
try:
record = cls._load_conductor_record_file(path)
except (OSError, ValueError):
LOGGER.warning("Unable to read conductor record '%s'.", path)
continue

conductor_id = record.get("conductor_id")
if conductor_id:
conductors[conductor_id] = record

return conductors

def _new_conductor_id(self):
hostname = socket.gethostname()
return "{}.{}.{}".format(hostname, os.getpid(), uuid.uuid4().hex[:12])

@staticmethod
def _get_conductor_executable():
"""Return the resolved conductor executable path when available."""
if not sys.argv:
return None

executable = sys.argv[0]
if os.path.isabs(executable) or os.path.dirname(executable):
return os.path.abspath(executable)

return shutil.which(executable) or executable

def register_conductor(self):
"""Register this conductor process in the study registry."""
if not self._conductor_id:
self._conductor_id = self._new_conductor_id()

now = self._timestamp()
record = {
"conductor_id": self._conductor_id,
"study_name": self.study_name,
"output_path": self.output_path,
"pid": os.getpid(),
"ppid": os.getppid(),
"hostname": socket.gethostname(),
"fqdn": socket.getfqdn(),
"addresses": self._get_host_addresses(),
"user": getpass.getuser(),
"cwd": os.getcwd(),
"argv": list(sys.argv),
"conductor_argv": list(sys.argv),
"conductor_executable": self._get_conductor_executable(),
"conductor_command": shlex.join(sys.argv),
"python_executable": sys.executable,
"status": "running",
"started_at": now,
"last_heartbeat_at": now,
"ended_at": None,
"last_status_message": "started",
"final_study_status": None,
}

self._update_conductor_record(
self.output_path, self._conductor_id, record)
LOGGER.info(
"Conductor '%s' started for study '%s' at '%s' "
"(pid=%s, host=%s, fqdn=%s, addresses=%s).",
self._conductor_id, self.study_name, self.output_path,
record["pid"], record["hostname"], record["fqdn"],
",".join(record["addresses"]))
LOGGER.info(
"Conductor '%s' command: %s",
self._conductor_id, record["conductor_command"])

return self._conductor_id

def heartbeat_conductor(self, message="running"):
"""Update this conductor process' heartbeat in the study registry."""
if not self._conductor_id:
self.register_conductor()

now = self._timestamp()
self._update_conductor_record(
self.output_path,
self._conductor_id,
{
"status": "running",
"last_heartbeat_at": now,
"last_status_message": message,
})
LOGGER.info(
"Conductor '%s' heartbeat at %s (pid=%s, host=%s): %s",
self._conductor_id, now, os.getpid(), socket.gethostname(),
message)

def finish_conductor(self, status, final_study_status=None, message=None):
"""Mark this conductor process as completed or failed."""
if not self._conductor_id:
return

now = self._timestamp()
if isinstance(final_study_status, StudyStatus):
final_study_status = final_study_status.name

self._update_conductor_record(
self.output_path,
self._conductor_id,
{
"status": status,
"ended_at": now,
"last_heartbeat_at": now,
"last_status_message": message or status,
"final_study_status": final_study_status,
})
LOGGER.info(
"Conductor '%s' marked %s at %s for study '%s' "
"(final study status=%s).",
self._conductor_id, status, now, self.study_name,
final_study_status)

@classmethod
def load_study(cls, out_path):
"""
Expand Down Expand Up @@ -331,8 +543,9 @@ def load_updated_study_exec(cls, output_path):
updated_study_config = yaml.load(data)

if updated_study_config:
LOGGER.debug("Successfully read updated study config; removing record at %s",
study_update_path)
LOGGER.debug(
"Read updated study config; removing record at %s",
study_update_path)
os.remove(study_update_path)

return updated_study_config
Expand Down Expand Up @@ -361,6 +574,7 @@ def initialize(self, batch_info, sleeptime=60):
self._exec_dag.set_adapter(batch_info)
self._study.store_metadata()
self._setup = True
self.register_conductor()

def monitor_study(self):
"""Monitor a running study."""
Expand All @@ -374,7 +588,8 @@ def monitor_study(self):

# Set some fixed variables that monitor will use.
cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock)
study_update_path = make_safe_path(self.output_path, self._study_update)
study_update_path = make_safe_path(
self.output_path, self._study_update)
dag = self._exec_dag
pkl_path = \
os.path.join(self._pkl_path, "{}.pkl".format(self._study.name))
Expand All @@ -389,52 +604,68 @@ def monitor_study(self):
pkl_path, cancel_lock_path, sleep_time)

completion_status = StudyStatus.RUNNING
while completion_status == StudyStatus.RUNNING:
if os.path.exists(cancel_lock_path):
# cancel the study if a cancel lock file is found
cancel_lock = FileLock(cancel_lock_path)
try:
with cancel_lock.acquire(timeout=10):
# we have the lock
dag.cancel_study()
os.remove(cancel_lock_path)
LOGGER.info("Study '%s' has been cancelled.", dag.name)
except Timeout:
LOGGER.error("Failed to acquire cancellation lock.")
pass

if os.path.exists(study_update_path):
updated_study_config = self.load_updated_study_exec(self.output_path)

if "throttle" in updated_study_config and updated_study_config["throttle"]:
LOGGER.info("Updating throttle from %d to %d",
dag._submission_throttle, # NOTE: make a property?
updated_study_config["throttle"])
dag.update_throttle(updated_study_config["throttle"])

if "rlimit" in updated_study_config and updated_study_config["rlimit"]:
LOGGER.info("Updating restart limit to %d",
updated_study_config["rlimit"])
dag.update_rlimit(updated_study_config["rlimit"])

if "sleep" in updated_study_config and updated_study_config["sleep"]:
LOGGER.info("Updating conductor sleep time from %s to %s",
str(self.sleep_time),
str(updated_study_config["sleep"]))
self.sleep_time = updated_study_config["sleep"]

LOGGER.info("Checking DAG status at %s", str(datetime.now()))
# Execute steps that are ready
# Receives StudyStatus enum
completion_status = dag.execute_ready_steps()
# Re-pickle the ExecutionGraph.
dag.pickle(pkl_path)
# Write out the state
dag.write_status(os.path.split(pkl_path)[0])
# Sleep for SLEEPTIME in args if study not complete.
if completion_status == StudyStatus.RUNNING:
sleep(sleep_time)

try:
while completion_status == StudyStatus.RUNNING:
if os.path.exists(cancel_lock_path):
# cancel the study if a cancel lock file is found
cancel_lock = FileLock(cancel_lock_path)
try:
with cancel_lock.acquire(timeout=10):
# we have the lock
dag.cancel_study()
os.remove(cancel_lock_path)
LOGGER.info("Study '%s' has been cancelled.", dag.name)
except Timeout:
LOGGER.error("Failed to acquire cancellation lock.")
pass

if os.path.exists(study_update_path):
updated_study_config = \
self.load_updated_study_exec(self.output_path)

if "throttle" in updated_study_config and \
updated_study_config["throttle"]:
LOGGER.info("Updating throttle from %d to %d",
dag._submission_throttle,
updated_study_config["throttle"])
dag.update_throttle(updated_study_config["throttle"])

if "rlimit" in updated_study_config and \
updated_study_config["rlimit"]:
LOGGER.info("Updating restart limit to %d",
updated_study_config["rlimit"])
dag.update_rlimit(updated_study_config["rlimit"])

if "sleep" in updated_study_config and \
updated_study_config["sleep"]:
LOGGER.info(
"Updating conductor sleep time from %s to %s",
str(self.sleep_time),
str(updated_study_config["sleep"]))
self.sleep_time = updated_study_config["sleep"]

msg = "Checking DAG status at {}".format(str(datetime.now()))
self.heartbeat_conductor(msg)
LOGGER.info(msg)
# Execute steps that are ready
# Receives StudyStatus enum
completion_status = dag.execute_ready_steps()
# Re-pickle the ExecutionGraph.
dag.pickle(pkl_path)
# Write out the state
dag.write_status(os.path.split(pkl_path)[0])
# Sleep for SLEEPTIME in args if study not complete.
if completion_status == StudyStatus.RUNNING:
sleep(self.sleep_time)
except Exception:
self.finish_conductor(
"failed", completion_status,
"monitoring failed with an exception")
raise

self.finish_conductor(
"completed", completion_status,
"study completed with state '{}'".format(completion_status))
return completion_status

def cleanup(self):
Expand Down
Loading
Loading