From f2c3c9a22c2395f9a14335f122e98d5899f83d4a Mon Sep 17 00:00:00 2001 From: Jeremy White <44277022+jwhite242@users.noreply.github.com> Date: Mon, 18 May 2026 17:47:50 -0700 Subject: [PATCH 1/2] Add logging of conductor processes --- maestrowf/conductor.py | 331 ++++++++++++++++++++++++++----- maestrowf/utils.py | 55 +++++ tests/test_conductor_tracking.py | 139 +++++++++++++ 3 files changed, 475 insertions(+), 50 deletions(-) create mode 100644 tests/test_conductor_tracking.py diff --git a/maestrowf/conductor.py b/maestrowf/conductor.py index 64711ffe..8c9a32ff 100644 --- a/maestrowf/conductor.py +++ b/maestrowf/conductor.py @@ -30,19 +30,25 @@ from argparse import ArgumentParser, RawTextHelpFormatter from filelock import FileLock, Timeout from datetime import datetime +import getpass import glob import inspect +import json import logging import os +import shlex import shutil +import socket import sys from time import sleep +import uuid import dill import yaml from maestrowf.abstracts.enums import StudyStatus from maestrowf.datastructures.core import Study -from maestrowf.utils import create_parentdir, csvtable_to_dict, make_safe_path +from maestrowf.utils import \ + atomic_write_file, create_parentdir, csvtable_to_dict, make_safe_path # Logger instantiation ROOTLOGGER = logging.getLogger(inspect.getmodule(__name__)) @@ -143,6 +149,7 @@ class Conductor: _cancel_lock = ".cancel.lock" _study_update = ".study.update.lock" _batch_info = "batch.info" + _conductor_registry = ".conductors" def __init__(self, study): """ @@ -152,6 +159,7 @@ def __init__(self, study): """ self._study = study self._setup = False + self._conductor_id = None @property def output_path(self): @@ -219,6 +227,210 @@ def store_batch(cls, out_path, batch): with open(path, "wb") as batch_info: batch_info.write(yaml.dump(batch).encode("utf-8")) + @staticmethod + def _timestamp(): + """Return a UTC timestamp suitable for durable process records.""" + return datetime.utcnow().isoformat() + "Z" + + @staticmethod + def _get_host_addresses(): + """Return best-effort IP addresses for the current host.""" + addresses = set() + + for host in (socket.gethostname(), socket.getfqdn()): + try: + for entry in socket.getaddrinfo(host, None): + address = entry[4][0] + if address: + addresses.add(address) + except socket.error: + LOGGER.debug( + "Unable to resolve addresses for host '%s'.", host) + + return sorted(addresses) + + @classmethod + def _conductor_registry_path(cls, output_path): + return make_safe_path(output_path, "logs", cls._conductor_registry) + + @classmethod + def _conductor_record_path(cls, output_path, conductor_id): + registry_path = cls._conductor_registry_path(output_path) + return make_safe_path(registry_path, "{}.json".format(conductor_id)) + + @classmethod + def _load_conductor_record(cls, output_path, conductor_id): + path = cls._conductor_record_path(output_path, conductor_id) + if not os.path.exists(path): + return {} + + return cls._load_conductor_record_file(path) + + @staticmethod + def _load_conductor_record_file(path, attempts=3, delay=0.1): + """ + Load a conductor JSON record, retrying transient filesystem errors. + + Some networked filesystems can briefly report stale handles around + replace-heavy files. A short retry avoids treating that as a missing + conductor when readers race a heartbeat update. + """ + last_error = None + for attempt in range(attempts): + try: + with open(path, "r") as data: + return json.load(data) or {} + except (OSError, ValueError) as exc: + last_error = exc + if attempt + 1 < attempts: + sleep(delay * (attempt + 1)) + + raise last_error + + @classmethod + def _store_conductor_record(cls, output_path, conductor_id, record): + path = cls._conductor_record_path(output_path, conductor_id) + contents = json.dumps(record, indent=2, sort_keys=True) + atomic_write_file(path, contents + "\n") + + @classmethod + def _update_conductor_record(cls, output_path, conductor_id, updates): + record = cls._load_conductor_record(output_path, conductor_id) + record.update(updates) + cls._store_conductor_record(output_path, conductor_id, record) + + return record + + @classmethod + def get_conductors(cls, output_path): + """ + Retrieve recorded conductor processes for the study rooted at out_path. + + :param output_path: A string containing the path to a study root. + :returns: A dictionary of conductor records keyed by conductor id. + """ + registry_path = cls._conductor_registry_path(output_path) + if not os.path.isdir(registry_path): + return {} + + conductors = {} + for path in glob.glob(os.path.join(registry_path, "*.json")): + try: + record = cls._load_conductor_record_file(path) + except (OSError, ValueError): + LOGGER.warning("Unable to read conductor record '%s'.", path) + continue + + conductor_id = record.get("conductor_id") + if conductor_id: + conductors[conductor_id] = record + + return conductors + + def _new_conductor_id(self): + hostname = socket.gethostname() + return "{}.{}.{}".format(hostname, os.getpid(), uuid.uuid4().hex[:12]) + + @staticmethod + def _get_conductor_executable(): + """Return the resolved conductor executable path when available.""" + if not sys.argv: + return None + + executable = sys.argv[0] + if os.path.isabs(executable) or os.path.dirname(executable): + return os.path.abspath(executable) + + return shutil.which(executable) or executable + + def register_conductor(self): + """Register this conductor process in the study registry.""" + if not self._conductor_id: + self._conductor_id = self._new_conductor_id() + + now = self._timestamp() + record = { + "conductor_id": self._conductor_id, + "study_name": self.study_name, + "output_path": self.output_path, + "pid": os.getpid(), + "ppid": os.getppid(), + "hostname": socket.gethostname(), + "fqdn": socket.getfqdn(), + "addresses": self._get_host_addresses(), + "user": getpass.getuser(), + "cwd": os.getcwd(), + "argv": list(sys.argv), + "conductor_argv": list(sys.argv), + "conductor_executable": self._get_conductor_executable(), + "conductor_command": shlex.join(sys.argv), + "python_executable": sys.executable, + "status": "running", + "started_at": now, + "last_heartbeat_at": now, + "ended_at": None, + "last_status_message": "started", + "final_study_status": None, + } + + self._update_conductor_record( + self.output_path, self._conductor_id, record) + LOGGER.info( + "Conductor '%s' started for study '%s' at '%s' " + "(pid=%s, host=%s, fqdn=%s, addresses=%s).", + self._conductor_id, self.study_name, self.output_path, + record["pid"], record["hostname"], record["fqdn"], + ",".join(record["addresses"])) + LOGGER.info( + "Conductor '%s' command: %s", + self._conductor_id, record["conductor_command"]) + + return self._conductor_id + + def heartbeat_conductor(self, message="running"): + """Update this conductor process' heartbeat in the study registry.""" + if not self._conductor_id: + self.register_conductor() + + now = self._timestamp() + self._update_conductor_record( + self.output_path, + self._conductor_id, + { + "status": "running", + "last_heartbeat_at": now, + "last_status_message": message, + }) + LOGGER.info( + "Conductor '%s' heartbeat at %s (pid=%s, host=%s): %s", + self._conductor_id, now, os.getpid(), socket.gethostname(), + message) + + def finish_conductor(self, status, final_study_status=None, message=None): + """Mark this conductor process as completed or failed.""" + if not self._conductor_id: + return + + now = self._timestamp() + if isinstance(final_study_status, StudyStatus): + final_study_status = final_study_status.name + + self._update_conductor_record( + self.output_path, + self._conductor_id, + { + "status": status, + "ended_at": now, + "last_heartbeat_at": now, + "last_status_message": message or status, + "final_study_status": final_study_status, + }) + LOGGER.info( + "Conductor '%s' marked %s at %s for study '%s' " + "(final study status=%s).", + self._conductor_id, status, now, self.study_name, + final_study_status) + @classmethod def load_study(cls, out_path): """ @@ -331,8 +543,9 @@ def load_updated_study_exec(cls, output_path): updated_study_config = yaml.load(data) if updated_study_config: - LOGGER.debug("Successfully read updated study config; removing record at %s", - study_update_path) + LOGGER.debug( + "Read updated study config; removing record at %s", + study_update_path) os.remove(study_update_path) return updated_study_config @@ -361,6 +574,7 @@ def initialize(self, batch_info, sleeptime=60): self._exec_dag.set_adapter(batch_info) self._study.store_metadata() self._setup = True + self.register_conductor() def monitor_study(self): """Monitor a running study.""" @@ -374,7 +588,8 @@ def monitor_study(self): # Set some fixed variables that monitor will use. cancel_lock_path = make_safe_path(self.output_path, self._cancel_lock) - study_update_path = make_safe_path(self.output_path, self._study_update) + study_update_path = make_safe_path( + self.output_path, self._study_update) dag = self._exec_dag pkl_path = \ os.path.join(self._pkl_path, "{}.pkl".format(self._study.name)) @@ -389,52 +604,68 @@ def monitor_study(self): pkl_path, cancel_lock_path, sleep_time) completion_status = StudyStatus.RUNNING - while completion_status == StudyStatus.RUNNING: - if os.path.exists(cancel_lock_path): - # cancel the study if a cancel lock file is found - cancel_lock = FileLock(cancel_lock_path) - try: - with cancel_lock.acquire(timeout=10): - # we have the lock - dag.cancel_study() - os.remove(cancel_lock_path) - LOGGER.info("Study '%s' has been cancelled.", dag.name) - except Timeout: - LOGGER.error("Failed to acquire cancellation lock.") - pass - - if os.path.exists(study_update_path): - updated_study_config = self.load_updated_study_exec(self.output_path) - - if "throttle" in updated_study_config and updated_study_config["throttle"]: - LOGGER.info("Updating throttle from %d to %d", - dag._submission_throttle, # NOTE: make a property? - updated_study_config["throttle"]) - dag.update_throttle(updated_study_config["throttle"]) - - if "rlimit" in updated_study_config and updated_study_config["rlimit"]: - LOGGER.info("Updating restart limit to %d", - updated_study_config["rlimit"]) - dag.update_rlimit(updated_study_config["rlimit"]) - - if "sleep" in updated_study_config and updated_study_config["sleep"]: - LOGGER.info("Updating conductor sleep time from %s to %s", - str(self.sleep_time), - str(updated_study_config["sleep"])) - self.sleep_time = updated_study_config["sleep"] - - LOGGER.info("Checking DAG status at %s", str(datetime.now())) - # Execute steps that are ready - # Receives StudyStatus enum - completion_status = dag.execute_ready_steps() - # Re-pickle the ExecutionGraph. - dag.pickle(pkl_path) - # Write out the state - dag.write_status(os.path.split(pkl_path)[0]) - # Sleep for SLEEPTIME in args if study not complete. - if completion_status == StudyStatus.RUNNING: - sleep(sleep_time) - + try: + while completion_status == StudyStatus.RUNNING: + if os.path.exists(cancel_lock_path): + # cancel the study if a cancel lock file is found + cancel_lock = FileLock(cancel_lock_path) + try: + with cancel_lock.acquire(timeout=10): + # we have the lock + dag.cancel_study() + os.remove(cancel_lock_path) + LOGGER.info("Study '%s' has been cancelled.", dag.name) + except Timeout: + LOGGER.error("Failed to acquire cancellation lock.") + pass + + if os.path.exists(study_update_path): + updated_study_config = \ + self.load_updated_study_exec(self.output_path) + + if "throttle" in updated_study_config and \ + updated_study_config["throttle"]: + LOGGER.info("Updating throttle from %d to %d", + dag._submission_throttle, + updated_study_config["throttle"]) + dag.update_throttle(updated_study_config["throttle"]) + + if "rlimit" in updated_study_config and \ + updated_study_config["rlimit"]: + LOGGER.info("Updating restart limit to %d", + updated_study_config["rlimit"]) + dag.update_rlimit(updated_study_config["rlimit"]) + + if "sleep" in updated_study_config and \ + updated_study_config["sleep"]: + LOGGER.info( + "Updating conductor sleep time from %s to %s", + str(self.sleep_time), + str(updated_study_config["sleep"])) + self.sleep_time = updated_study_config["sleep"] + + msg = "Checking DAG status at {}".format(str(datetime.now())) + self.heartbeat_conductor(msg) + LOGGER.info(msg) + # Execute steps that are ready + # Receives StudyStatus enum + completion_status = dag.execute_ready_steps() + # Re-pickle the ExecutionGraph. + dag.pickle(pkl_path) + # Write out the state + dag.write_status(os.path.split(pkl_path)[0]) + # Sleep for SLEEPTIME in args if study not complete. + if completion_status == StudyStatus.RUNNING: + sleep(self.sleep_time) + except Exception: + self.finish_conductor( + "failed", completion_status, + "monitoring failed with an exception") + raise + + self.finish_conductor( + "completed", completion_status, + "study completed with state '{}'".format(completion_status)) return completion_status def cleanup(self): diff --git a/maestrowf/utils.py b/maestrowf/utils.py index 89b3cd13..ae1f8b25 100644 --- a/maestrowf/utils.py +++ b/maestrowf/utils.py @@ -36,6 +36,7 @@ import logging import os import string +import tempfile from subprocess import PIPE, Popen from six.moves.urllib.request import urlopen from six.moves.urllib.error import HTTPError, URLError @@ -134,6 +135,59 @@ def create_parentdir(path): os.makedirs(path) +def atomic_write_file(path, contents, mode="w", encoding="utf-8"): + """ + Write contents to path using a same-directory temp file and os.replace. + + The target path is never opened for writing directly. If the process dies + before replacement, the previous target file remains in place. This helper + is intended for shared filesystems where readers should see either the old + complete file or the new complete file. + + :param path: Path to the file to write. + :param contents: String or bytes to write. + :param mode: File mode for writing contents. + :param encoding: Text encoding to use when mode is text. + """ + directory = os.path.abspath(os.path.dirname(path) or ".") + create_parentdir(directory) + + fd, tmp_path = tempfile.mkstemp( + prefix="{}.tmp.".format(os.path.basename(path)), + dir=directory) + try: + open_kwargs = {} + if "b" not in mode: + open_kwargs["encoding"] = encoding + + with os.fdopen(fd, mode, **open_kwargs) as tmp_file: + fd = None + tmp_file.write(contents) + tmp_file.flush() + os.fsync(tmp_file.fileno()) + + os.replace(tmp_path, path) + try: + dir_fd = os.open(directory, getattr(os, "O_DIRECTORY", 0)) + try: + os.fsync(dir_fd) + finally: + os.close(dir_fd) + except OSError: + LOGGER.debug("Unable to fsync directory '%s'.", directory) + except Exception: + if fd is not None: + try: + os.close(fd) + except OSError: + pass + try: + os.remove(tmp_path) + except OSError: + pass + raise + + def apply_function(item, func): """ Apply a function to items depending on type. @@ -340,6 +394,7 @@ def add_file_handler(self, log_path, log_format, log_lvl=2): fh.setLevel(self.map_level(log_lvl)) fh.setFormatter(formatter) self._logger.addHandler(fh) + return fh @staticmethod def map_level(log_lvl): diff --git a/tests/test_conductor_tracking.py b/tests/test_conductor_tracking.py new file mode 100644 index 00000000..1d73d40a --- /dev/null +++ b/tests/test_conductor_tracking.py @@ -0,0 +1,139 @@ +import pytest + +from maestrowf.abstracts.enums import StudyStatus +from maestrowf.conductor import Conductor +from maestrowf.utils import atomic_write_file + + +class DummyStudy: + name = "dummy_study" + + def __init__(self, output_path): + self.output_path = str(output_path) + + +class FinishingDag: + name = "dummy_study" + + def execute_ready_steps(self): + return StudyStatus.FINISHED + + def pickle(self, path): + pass + + def write_status(self, path): + pass + + +class FailingDag: + name = "dummy_study" + + def execute_ready_steps(self): + raise RuntimeError("boom") + + +def test_conductor_registration_creates_process_record(tmp_path): + conductor = Conductor(DummyStudy(tmp_path)) + + conductor_id = conductor.register_conductor() + conductors = Conductor.get_conductors(tmp_path) + + assert conductor_id in conductors + record = conductors[conductor_id] + assert record["conductor_id"] == conductor_id + assert record["study_name"] == "dummy_study" + assert record["output_path"] == str(tmp_path) + assert record["pid"] + assert record["hostname"] + assert record["argv"] + assert record["conductor_argv"] + assert record["conductor_executable"] + assert record["conductor_command"] + assert record["status"] == "running" + assert record["started_at"] + assert record["last_heartbeat_at"] == record["started_at"] + assert record["ended_at"] is None + record_path = tmp_path / "logs" / ".conductors" / \ + "{}.json".format(conductor_id) + assert record_path.exists() + + +def test_conductor_heartbeat_updates_record(tmp_path): + conductor = Conductor(DummyStudy(tmp_path)) + conductor_id = conductor.register_conductor() + original = Conductor.get_conductors(tmp_path)[conductor_id] + + conductor.heartbeat_conductor("checking work") + updated = Conductor.get_conductors(tmp_path)[conductor_id] + + assert updated["started_at"] == original["started_at"] + assert updated["last_status_message"] == "checking work" + assert updated["last_heartbeat_at"] >= original["last_heartbeat_at"] + assert updated["status"] == "running" + + +def test_conductor_finish_marks_record_completed(tmp_path): + conductor = Conductor(DummyStudy(tmp_path)) + conductor_id = conductor.register_conductor() + + conductor.finish_conductor( + "completed", StudyStatus.FINISHED, "study finished") + record = Conductor.get_conductors(tmp_path)[conductor_id] + + assert record["status"] == "completed" + assert record["final_study_status"] == StudyStatus.FINISHED.name + assert record["last_status_message"] == "study finished" + assert record["ended_at"] + + +def test_multiple_conductor_records_can_coexist(tmp_path): + conductor_a = Conductor(DummyStudy(tmp_path)) + conductor_b = Conductor(DummyStudy(tmp_path)) + + id_a = conductor_a.register_conductor() + id_b = conductor_b.register_conductor() + conductors = Conductor.get_conductors(tmp_path) + + assert id_a in conductors + assert id_b in conductors + assert id_a != id_b + + +def test_atomic_write_file_replaces_complete_file(tmp_path): + path = tmp_path / "record.json" + atomic_write_file(path, '{\n "old": true\n}\n') + + atomic_write_file(path, '{\n "new": true\n}\n') + + assert path.read_text() == '{\n "new": true\n}\n' + + +def test_monitor_marks_record_completed(tmp_path): + conductor = Conductor(DummyStudy(tmp_path)) + conductor._setup = True + conductor._pkl_path = str(tmp_path) + conductor._exec_dag = FinishingDag() + conductor.sleep_time = 1 + conductor_id = conductor.register_conductor() + + assert conductor.monitor_study() == StudyStatus.FINISHED + record = Conductor.get_conductors(tmp_path)[conductor_id] + assert record["status"] == "completed" + assert record["final_study_status"] == StudyStatus.FINISHED.name + + +def test_monitor_marks_record_failed_on_exception(tmp_path): + conductor = Conductor(DummyStudy(tmp_path)) + conductor._setup = True + conductor._pkl_path = str(tmp_path) + conductor._exec_dag = FailingDag() + conductor.sleep_time = 1 + conductor_id = conductor.register_conductor() + + with pytest.raises(RuntimeError): + conductor.monitor_study() + + record = Conductor.get_conductors(tmp_path)[conductor_id] + assert record["status"] == "failed" + assert record["last_status_message"] == \ + "monitoring failed with an exception" From 8ebc17d9e34a178e3e3d153515151cedfe9305ed Mon Sep 17 00:00:00 2001 From: Jeremy White <44277022+jwhite242@users.noreply.github.com> Date: Mon, 18 May 2026 17:48:10 -0700 Subject: [PATCH 2/2] Add staged logging setup to maestro run to capture initial output that was previously stderr/stdout only --- maestrowf/maestro.py | 426 ++++++++++++++++++++++++++----------------- 1 file changed, 261 insertions(+), 165 deletions(-) diff --git a/maestrowf/maestro.py b/maestrowf/maestro.py index d6258c7d..8082a016 100644 --- a/maestrowf/maestro.py +++ b/maestrowf/maestro.py @@ -33,6 +33,7 @@ import jsonschema import logging import os +import shlex import shutil import six import sys @@ -59,15 +60,83 @@ # Program Globals LOGGER = logging.getLogger(__name__) +ROOTLOGGER = logging.getLogger() LOG_UTIL = LoggerUtility(LOGGER) +ROOT_LOG_UTIL = LoggerUtility(ROOTLOGGER) # Configuration globals DEBUG_FORMAT = "[%(asctime)s: %(levelname)s] " \ "[%(module)s: %(lineno)d] %(message)s" LFORMAT = "[%(asctime)s: %(levelname)s] %(message)s" +FILE_LFORMAT = "%(asctime)s - %(name)s:%(funcName)s:%(lineno)s - " \ + "%(levelname)s - %(message)s" ACCEPTED_INPUT = set(["yes", "y"]) +def create_staged_log_handler(log_lvl): + """ + Create a hidden setup log in cwd for early `maestro run` messages. + + The file is promoted into the study workspace once the output path and + final log path are known. + """ + log_name = ".maestro-setup-{}-{}.log".format( + time.strftime("%Y%m%d-%H%M%S"), os.getpid()) + log_path = os.path.abspath(log_name) + ROOTLOGGER.setLevel(LoggerUtility.map_level(log_lvl)) + handler = ROOT_LOG_UTIL.add_file_handler( + log_path, FILE_LFORMAT, log_lvl) + return log_path, handler + + +def close_log_handler(handler): + """Detach and close a logging handler.""" + if not handler: + return + + handler.flush() + ROOTLOGGER.removeHandler(handler) + handler.close() + + +def cleanup_staged_log(log_path, handler=None): + """Close the staged log handler and remove its file.""" + close_log_handler(handler) + if log_path and os.path.exists(log_path): + os.remove(log_path) + + +def promote_staged_log(staged_path, staged_handler, log_path, log_lvl): + """ + Move staged setup logging into the final study log and append from there. + """ + close_log_handler(staged_handler) + create_parentdir(os.path.dirname(log_path)) + + if staged_path and os.path.exists(staged_path): + with open(staged_path, "r") as source: + with open(log_path, "a") as target: + shutil.copyfileobj(source, target) + os.remove(staged_path) + + ROOTLOGGER.setLevel(LoggerUtility.map_level(log_lvl)) + return ROOT_LOG_UTIL.add_file_handler(log_path, FILE_LFORMAT, log_lvl) + + +def log_to_handler(handler, level, msg, *args): + """ + Emit a log record directly to a handler without using stdout handlers. + + This is used for command provenance that belongs in the study log file but + should not add noise to the interactive `maestro run` output. + """ + caller = sys._getframe(1) + record = LOGGER.makeRecord( + LOGGER.name, level, caller.f_code.co_filename, caller.f_lineno, + msg, args, None, caller.f_code.co_name) + handler.handle(record) + + def status_study(args): """Check and print the status of an executing study.""" # Force logging to Warning and above @@ -359,180 +428,207 @@ def load_parameter_generator(path, env, kwargs): def run_study(args): """Run a Maestro study.""" - # Report log lvl - LOGGER.info("INFO Logging Level -- Enabled") - LOGGER.warning("WARNING Logging Level -- Enabled") - LOGGER.critical("CRITICAL Logging Level -- Enabled") - LOGGER.debug("DEBUG Logging Level -- Enabled") - # Load the Specification + staged_log_path = None + staged_handler = None + file_handler = None + try: - spec = YAMLSpecification.load_specification(args.specification) - except jsonschema.ValidationError as e: - LOGGER.error(e.message) - sys.exit(1) - environment = spec.get_study_environment() - steps = spec.get_study_steps() - - # Set up the output directory. - out_dir = environment.remove("OUTPUT_PATH") - if args.out: - # If out is specified in the args, ignore OUTPUT_PATH. - output_path = os.path.abspath(args.out) + staged_log_path, staged_handler = \ + create_staged_log_handler(args.debug_lvl) + log_to_handler( + staged_handler, logging.INFO, "Maestro command: %s", + shlex.join(sys.argv)) + + # Report log lvl + LOGGER.info("INFO Logging Level -- Enabled") + LOGGER.warning("WARNING Logging Level -- Enabled") + LOGGER.critical("CRITICAL Logging Level -- Enabled") + LOGGER.debug("DEBUG Logging Level -- Enabled") + # Load the Specification + try: + spec = YAMLSpecification.load_specification(args.specification) + except jsonschema.ValidationError as e: + LOGGER.error(e.message) + sys.exit(1) + environment = spec.get_study_environment() + steps = spec.get_study_steps() + + # Set up the output directory. + out_dir = environment.remove("OUTPUT_PATH") + if args.out: + # If out is specified in the args, ignore OUTPUT_PATH. + output_path = os.path.abspath(args.out) - # If we are automatically launching, just set the input as yes. - if os.path.exists(output_path): - if args.autoyes: - uinput = "y" - elif args.autono: - uinput = "n" - else: - uinput = six.moves.input( - "Output path already exists. Would you like to overwrite " - "it? [yn] ") + # If we are automatically launching, just set the input as yes. + if os.path.exists(output_path): + if args.autoyes: + uinput = "y" + elif args.autono: + uinput = "n" + else: + uinput = six.moves.input( + "Output path already exists. Would you like to " + "overwrite it? [yn] ") + + if uinput.lower() in ACCEPTED_INPUT: + print("Cleaning up existing out path...") + shutil.rmtree(output_path) + else: + print("Opting to quit -- not cleaning up old out path.") + return 0 - if uinput.lower() in ACCEPTED_INPUT: - print("Cleaning up existing out path...") - shutil.rmtree(output_path) + else: + if out_dir is None: + # If we don't find OUTPUT_PATH in the environment, assume pwd. + out_dir = os.path.abspath("./") else: - print("Opting to quit -- not cleaning up old out path.") - sys.exit(0) + # We just take the value from the environment. + out_dir = os.path.abspath(out_dir.value) - else: - if out_dir is None: - # If we don't find OUTPUT_PATH in the environment, assume pwd. - out_dir = os.path.abspath("./") + out_name = "{}_{}".format( + spec.name.replace(" ", "_"), + time.strftime("%Y%m%d-%H%M%S") + ) + output_path = make_safe_path(out_dir, *[out_name]) + environment.add(Variable("OUTPUT_PATH", output_path)) + + # Set up file logging + log_path = os.path.join( + output_path, "logs", "{}.log".format(spec.name)) + file_handler = promote_staged_log( + staged_log_path, staged_handler, log_path, args.debug_lvl) + staged_log_path = None + staged_handler = None + + # Check for pargs without the matching pgen + if args.pargs and not args.pgen: + msg = \ + "Cannot use the 'pargs' parameter without specifying a 'pgen'!" + LOGGER.exception(msg) + raise ArgumentError(msg) + + # Addition of the $(SPECROOT) to the environment. + spec_root = os.path.split(args.specification)[0] + spec_root = Variable("SPECROOT", os.path.abspath(spec_root)) + environment.add(spec_root) + + # Handle loading a custom ParameterGenerator if specified. + if args.pgen: + # 'pgen_args' has a default of an empty list, which should + # translate to an empty dictionary. + kwargs = create_dictionary(args.pargs) + # Copy the Python file used to generate parameters. + shutil.copy(args.pgen, output_path) + + # Add keywords and environment from the spec to pgen args. + kwargs["OUTPUT_PATH"] = output_path + kwargs["SPECROOT"] = spec_root + + # Load the parameter generator. + parameters = load_parameter_generator( + args.pgen, environment, kwargs) else: - # We just take the value from the environment. - out_dir = os.path.abspath(out_dir.value) + parameters = spec.get_parameters() + + # Setup the study. + study = Study(spec.name, spec.description, studyenv=environment, + parameters=parameters, steps=steps, + out_path=output_path) + + # Check if the submission attempts is greater than 0: + if args.attempts < 1: + _msg = "Submission attempts must be greater than 0. " \ + "'{}' provided.".format(args.attempts) + LOGGER.error(_msg) + raise ArgumentError(_msg) + + # Check if the throttle is zero or greater: + if args.throttle < 0: + _msg = "Submission throttle must be a value of zero or greater. " \ + "'{}' provided.".format(args.throttle) + LOGGER.error(_msg) + raise ArgumentError(_msg) + + # Check if the restart limit is zero or greater: + if args.rlimit < 0: + _msg = "Restart limit must be a value of zero or greater. " \ + "'{}' provided.".format(args.rlimit) + LOGGER.error(_msg) + raise ArgumentError(_msg) + + # Set up the study workspace and configure it for execution. + study.setup_workspace() + study.configure_study( + throttle=args.throttle, submission_attempts=args.attempts, + restart_limit=args.rlimit, use_tmp=args.usetmp, + hash_ws=args.hashws, dry_run=args.dry) + study.setup_environment() + + if args.dry: + # Drive sleep time down during dry runs to generate scripts. + sleeptime = 1 + else: + # else, use args to decide sleeptime + sleeptime = args.sleeptime - out_name = "{}_{}".format( - spec.name.replace(" ", "_"), - time.strftime("%Y%m%d-%H%M%S") - ) - output_path = make_safe_path(out_dir, *[out_name]) - environment.add(Variable("OUTPUT_PATH", output_path)) - - # Set up file logging - create_parentdir(os.path.join(output_path, "logs")) - log_path = os.path.join(output_path, "logs", "{}.log".format(spec.name)) - LOG_UTIL.add_file_handler(log_path, LFORMAT, args.debug_lvl) - - # Check for pargs without the matching pgen - if args.pargs and not args.pgen: - msg = "Cannot use the 'pargs' parameter without specifying a 'pgen'!" - LOGGER.exception(msg) - raise ArgumentError(msg) - - # Addition of the $(SPECROOT) to the environment. - spec_root = os.path.split(args.specification)[0] - spec_root = Variable("SPECROOT", os.path.abspath(spec_root)) - environment.add(spec_root) - - # Handle loading a custom ParameterGenerator if specified. - if args.pgen: - # 'pgen_args' has a default of an empty list, which should translate - # to an empty dictionary. - kwargs = create_dictionary(args.pargs) - # Copy the Python file used to generate parameters. - shutil.copy(args.pgen, output_path) - - # Add keywords and environment from the spec to pgen args. - kwargs["OUTPUT_PATH"] = output_path - kwargs["SPECROOT"] = spec_root - - # Load the parameter generator. - parameters = load_parameter_generator(args.pgen, environment, kwargs) - else: - parameters = spec.get_parameters() - - # Setup the study. - study = Study(spec.name, spec.description, studyenv=environment, - parameters=parameters, steps=steps, out_path=output_path) - - # Check if the submission attempts is greater than 0: - if args.attempts < 1: - _msg = "Submission attempts must be greater than 0. " \ - "'{}' provided.".format(args.attempts) - LOGGER.error(_msg) - raise ArgumentError(_msg) - - # Check if the throttle is zero or greater: - if args.throttle < 0: - _msg = "Submission throttle must be a value of zero or greater. " \ - "'{}' provided.".format(args.throttle) - LOGGER.error(_msg) - raise ArgumentError(_msg) - - # Check if the restart limit is zero or greater: - if args.rlimit < 0: - _msg = "Restart limit must be a value of zero or greater. " \ - "'{}' provided.".format(args.rlimit) - LOGGER.error(_msg) - raise ArgumentError(_msg) - - # Set up the study workspace and configure it for execution. - study.setup_workspace() - study.configure_study( - throttle=args.throttle, submission_attempts=args.attempts, - restart_limit=args.rlimit, use_tmp=args.usetmp, hash_ws=args.hashws, - dry_run=args.dry) - study.setup_environment() - - if args.dry: - # If performing a dry run, drive sleep time down to generate scripts. - sleeptime = 1 - else: - # else, use args to decide sleeptime - sleeptime = args.sleeptime - - batch = {"type": "local"} - if spec.batch: - batch = spec.batch - if "type" not in batch: - batch["type"] = "local" - # Copy the spec to the output directory - shutil.copy(args.specification, study.output_path) - - # Use the Conductor's classmethod to store the study. - Conductor.store_study(study) - Conductor.store_batch(study.output_path, batch) - - # If we are automatically launching, just set the input as yes. - if args.autoyes or args.dry: - uinput = "y" - elif args.autono: - uinput = "n" - else: - uinput = six.moves.input("Would you like to launch the study? [yn] ") - - if uinput.lower() in ACCEPTED_INPUT: - if args.fg: - # Launch in the foreground. - LOGGER.info("Running Maestro Conductor in the foreground.") - conductor = Conductor(study) - conductor.initialize(batch, sleeptime) - completion_status = conductor.monitor_study() - conductor.cleanup() - return completion_status.value + batch = {"type": "local"} + if spec.batch: + batch = spec.batch + if "type" not in batch: + batch["type"] = "local" + # Copy the spec to the output directory + shutil.copy(args.specification, study.output_path) + + # Use the Conductor's classmethod to store the study. + Conductor.store_study(study) + Conductor.store_batch(study.output_path, batch) + + # If we are automatically launching, just set the input as yes. + if args.autoyes or args.dry: + uinput = "y" + elif args.autono: + uinput = "n" else: - # Launch manager with nohup - log_path = make_safe_path( - study.output_path, - *["{}.txt".format(study.name)]) - - cmd = ["nohup", "conductor", - "-t", str(sleeptime), - "-d", str(args.debug_lvl), - study.output_path, - ">", log_path, "2>&1"] - LOGGER.debug(" ".join(cmd)) - start_process(" ".join(cmd)) - - print("Study launched successfully.") - else: - print("Study launch aborted.") + uinput = six.moves.input( + "Would you like to launch the study? [yn] ") + + if uinput.lower() in ACCEPTED_INPUT: + if args.fg: + # Launch in the foreground. + LOGGER.info("Running Maestro Conductor in the foreground.") + conductor = Conductor(study) + conductor.initialize(batch, sleeptime) + completion_status = conductor.monitor_study() + conductor.cleanup() + return completion_status.value + else: + # Launch manager with nohup + log_path = make_safe_path( + study.output_path, + *["{}.txt".format(study.name)]) + + cmd = ["nohup", "conductor", + "-t", str(sleeptime), + "-d", str(args.debug_lvl), + study.output_path, + ">", log_path, "2>&1"] + conductor_cmd = " ".join(cmd) + log_to_handler( + file_handler, logging.INFO, + "Conductor launch command: %s", conductor_cmd) + start_process(conductor_cmd) + + log_to_handler( + file_handler, logging.INFO, + "Study launched successfully.") + print("Study launched successfully.") + else: + log_to_handler(file_handler, logging.INFO, "Study launch aborted.") + print("Study launch aborted.") - return 0 + return 0 + finally: + cleanup_staged_log(staged_log_path, staged_handler) def setup_argparser():