diff --git a/.gitignore b/.gitignore index 013a5df4..deb549d2 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ # temporary files *.swp *~ +*# # executable redax diff --git a/dispatcher/DAQController.py b/dispatcher/DAQController.py index a43ee2b4..ac20a170 100644 --- a/dispatcher/DAQController.py +++ b/dispatcher/DAQController.py @@ -14,6 +14,7 @@ class DAQController(): D. Coderre, 12. Mar. 2019 D. Masson, 06 Apr 2020 S. di Pede, 17 Mar 2021 + V. D'Andrea, Oct 2022 Brief: This code handles the logic of what the dispatcher does when. It takes in aggregated status updates and commands from the mongo connector and decides if @@ -28,18 +29,17 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.goal_state = {} self.latest_status = {} - # Timeouts. There are a few things that we want to wait for that might take time. # The keys for these dicts will be detector identifiers. - detectors = list(daq_config.keys()) + detectors = list(daq_config.keys()) # physical detectors self.last_command = {} for k in ['arm', 'start', 'stop']: self.last_command[k] = {} for d in detectors: self.last_command[k][d] = now() - self.error_stop_count = {d : 0 for d in detectors} + self.error_stop_count = {d: 0 for d in detectors} self.max_arm_cycles = int(config['MaxArmCycles']) - self.missed_arm_cycles={k:0 for k in detectors} + self.missed_arm_cycles = {k: 0 for k in detectors} # Timeout properties come from config self.timeouts = { @@ -52,7 +52,7 @@ def __init__(self, config, daq_config, mongo_connector, logger, hypervisor): self.logger = logger self.time_between_commands = int(config['TimeBetweenCommands']) - self.can_force_stop={k:True for k in detectors} + self.can_force_stop = {k:True for k in detectors} self.one_detector_arming = False self.start_cmd_delay = float(config['StartCmdDelay']) @@ -87,125 +87,128 @@ def solve_problem(self, latest_status, goal_state): self.latest_status = latest_status self.one_detector_arming = False - for det in latest_status.keys(): - if latest_status[det]['status'] == DAQ_STATUS.IDLE: + for logical in latest_status.keys(): + det = list(latest_status[logical]['detectors'].keys())[0] + if latest_status[logical]['status'] == DAQ_STATUS.IDLE: self.can_force_stop[det] = True self.error_stop_count[det] = 0 - if latest_status[det]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: + if latest_status[logical]['status'] in [DAQ_STATUS.ARMING, DAQ_STATUS.ARMED]: self.one_detector_arming = True active_states = [DAQ_STATUS.RUNNING, DAQ_STATUS.ARMED, DAQ_STATUS.ARMING, DAQ_STATUS.UNKNOWN] - - for det in latest_status.keys(): + + for logical in latest_status.keys(): + # Take the first physical detector, only one is needed to retrieve the goal status + det = list(latest_status[logical]['detectors'].keys())[0] # The detector should be INACTIVE if goal_state[det]['active'] == 'false': # The detector is not in IDLE, ERROR or TIMEOUT: it needs to be stopped - if latest_status[det]['status'] in active_states: + if latest_status[logical]['status'] in active_states: # Check before if the status is UNKNOWN and it is maybe timing out - if latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: - self.logger.info(f"The status of {det} is unknown, check timeouts") - self.check_timeouts(detector=det) + if latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: + self.logger.info(f"The status of {logical} is unknown, check timeouts") + self.check_timeouts(logical) # Otherwise stop the detector else: - self.logger.info(f"Sending stop command to {det}") - self.stop_detector_gently(detector=det) + self.logger.info(f"Sending stop command to {logical}") + self.stop_detector_gently(logical) # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed - elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: - self.logger.info(f"The {det} is in timeout, check timeouts") - # TODO update - self.handle_timeout(detector=det) - - elif latest_status[det]['status'] == DAQ_STATUS.ERROR: - self.logger.info(f"The {det} has error, sending stop command") - self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) - self.can_force_stop[det]=False + elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: + self.logger.info(f"The {logical} is in timeout, check timeouts") + self.handle_timeout(logical) + + elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: + self.logger.info(f"The {logical} has error, sending stop command") + self.control_detector(logical, 'stop', force=self.can_force_stop[det]) + self.can_force_stop[det] = False else: # the only remaining option is 'idle', which is fine pass # The detector should be ACTIVE (RUNNING) else: #goal_state[det]['active'] == 'true': - if latest_status[det]['status'] == DAQ_STATUS.RUNNING: - self.logger.info(f"The {det} is running") - self.check_run_turnover(detector=det) + if latest_status[logical]['status'] == DAQ_STATUS.RUNNING: + self.logger.info(f"The {logical} is running") + self.check_run_turnover(logical) # TODO does this work properly? - if latest_status[det]['mode'] != goal_state[det]['mode']: - self.control_detector(command='stop', detector=det) + if latest_status[logical]['mode'] != goal_state[det]['mode']: + self.control_detector(logical, 'stop') # ARMED, start the run - elif latest_status[det]['status'] == DAQ_STATUS.ARMED: - self.logger.info(f"The {det} is armed, sending start command") - self.control_detector(command='start', detector=det) + elif latest_status[logical]['status'] == DAQ_STATUS.ARMED: + self.logger.info(f"The {logical} is armed, sending start command") + self.control_detector(logical,'start') # ARMING, check if it is timing out - elif latest_status[det]['status'] == DAQ_STATUS.ARMING: - self.logger.info(f"The {det} is arming, check timeouts") - self.logger.debug(f"Checking the {det} timeouts") - self.check_timeouts(detector=det, command='arm') + elif latest_status[logical]['status'] == DAQ_STATUS.ARMING: + self.logger.info(f"The {logical} is arming, check timeouts") + self.logger.debug(f"Checking the {logical} timeouts") + self.check_timeouts(logical) # UNKNOWN, check if it is timing out - elif latest_status[det]['status'] == DAQ_STATUS.UNKNOWN: - self.logger.info(f"The status of {det} is unknown, check timeouts") - self.logger.debug(f"Checking the {det} timeouts") - self.check_timeouts(detector=det) - + elif latest_status[logical]['status'] == DAQ_STATUS.UNKNOWN: + self.logger.info(f"The status of {logical} is unknown, check timeouts") + self.logger.debug(f"Checking the {logical} timeouts") + self.check_timeouts(logical) # Maybe the detector is IDLE, we should arm a run - elif latest_status[det]['status'] == DAQ_STATUS.IDLE: - self.logger.info(f"The {det} is idle, sending arm command") - self.control_detector(command='arm', detector=det) - + elif latest_status[logical]['status'] == DAQ_STATUS.IDLE: + self.logger.info(f"The {logical} is idle, sending arm command") + self.control_detector(logical, 'arm') # Deal separately with the TIMEOUT and ERROR statuses, by stopping the detector if needed - elif latest_status[det]['status'] == DAQ_STATUS.TIMEOUT: - self.logger.info(f"The {det} is in timeout, check timeouts") - self.logger.debug("Checking %s timeouts", det) - self.handle_timeout(detector=det) - - elif latest_status[det]['status'] == DAQ_STATUS.ERROR: - self.logger.info(f"The {det} has error, sending stop command") - self.control_detector(command='stop', detector=det, force=self.can_force_stop[det]) - self.can_force_stop[det]=False + elif latest_status[logical]['status'] == DAQ_STATUS.TIMEOUT: + self.logger.info(f"The {logical} is in timeout, check timeouts") + self.logger.debug("Checking %s timeouts", logical) + self.handle_timeout(logical) + + elif latest_status[logical]['status'] == DAQ_STATUS.ERROR: + self.logger.info(f"The {logical} has error, sending stop command") + self.control_detector(logical,'stop', force=self.can_force_stop[det]) + self.can_force_stop[det] = False else: # shouldn't be able to get here pass return - def handle_timeout(self, detector): + def handle_timeout(self, logical): """ Detector already in the TIMEOUT status are directly stopped. """ - self.control_detector(command='stop', detector=detector, force=self.can_force_stop[detector]) - self.can_force_stop[detector]=False - self.check_timeouts(detector) + det = list(latest_status[logical]['detectors'].keys())[0] + self.control_detector(logical, 'stop', force = self.can_force_stop[det]) + self.can_force_stop[det] = False + self.check_timeouts(logical) return - def stop_detector_gently(self, detector): + def stop_detector_gently(self, logical): """ Stops the detector, unless we're told to wait for the current run to end """ + det = list(latest_status[logical]['detectors'].keys())[0] if ( # Running normally (not arming, error, timeout, etc) - self.latest_status[detector]['status'] == DAQ_STATUS.RUNNING and + self.latest_status[logical]['status'] == DAQ_STATUS.RUNNING and # We were asked to wait for the current run to stop - self.goal_state[detector].get('softstop', 'false') == 'true'): - self.check_run_turnover(detector) + self.goal_state[det].get('softstop', 'false') == 'true'): + self.check_run_turnover(logical) else: - self.control_detector(detector=detector, command='stop') + self.control_detector(logical,'stop') - def control_detector(self, command, detector, force=False): + def control_detector(self, logical, command, force = False): """ Issues the command to the detector if allowed by the timeout. Returns 0 if a command was issued and 1 otherwise """ time_now = now() + det = list(self.latest_status[logical]['detectors'].keys())[0] try: - dt = (time_now - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][det]).total_seconds() except (KeyError, TypeError): dt = 2*self.timeouts[command] # make sure we don't rush things if command == 'start': - dt_last = (time_now - self.last_command['arm'][detector]).total_seconds() + dt_last = (time_now - self.last_command['arm'][det]).total_seconds() elif command == 'arm': - dt_last = (time_now - self.last_command['stop'][detector]).total_seconds() + dt_last = (time_now - self.last_command['stop'][det]).total_seconds() else: dt_last = self.time_between_commands*2 @@ -214,47 +217,47 @@ def control_detector(self, command, detector, force=False): gs = self.goal_state if command == 'arm': if self.one_detector_arming: - self.logger.info('Another detector already arming, can\'t arm %s' % detector) + self.logger.info('Another detector already arming, can\'t arm %s' % logical) # this leads to run number overlaps return 1 - readers, cc = self.mongo.get_hosts_for_mode(gs[detector]['mode']) + readers, cc = self.mongo.get_hosts_for_mode(gs[det]['mode']) hosts = (cc, readers) delay = 0 self.one_detector_arming = True elif command == 'start': - readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode']) + readers, cc = self.mongo.get_hosts_for_mode(ls[logical]['mode']) hosts = (readers, cc) delay = self.start_cmd_delay - #Reset arming timeout counter - self.missed_arm_cycles[detector]=0 + #Reset arming timeout counter + self.missed_arm_cycles[det] = 0 else: # stop - readers, cc = self.mongo.get_hosts_for_mode(ls[detector]['mode'], detector) + readers, cc = self.mongo.get_hosts_for_mode(ls[logical]['mode'], detector) hosts = (cc, readers) - if force or ls[detector]['status'] not in [DAQ_STATUS.RUNNING]: + if force or ls[logical]['status'] not in [DAQ_STATUS.RUNNING]: delay = 0 else: delay = self.stop_cmd_delay - self.logger.debug(f'Sending {command.upper()} to {detector}') - if self.mongo.send_command(command, hosts, gs[detector]['user'], - detector, gs[detector]['mode'], delay, force): + self.logger.debug(f'Sending {command.upper()} to {logical}') + if self.mongo.send_command(command, hosts, gs[det]['user'], + logical, gs[det]['mode'], delay, force): # failed return 1 - self.last_command[command][detector] = time_now - if command == 'start' and self.mongo.insert_run_doc(detector): + self.last_command[command][det] = time_now + if command == 'start' and self.mongo.insert_run_doc(logical): # db having a moment return 0 - if (command == 'stop' and ls[detector]['number'] != -1 and - self.mongo.set_stop_time(ls[detector]['number'], detector, force)): + if (command == 'stop' and ls[logical]['number'] != -1 and + self.mongo.set_stop_time(ls[logical]['number'], logical, force)): # db having a moment return 0 else: self.logger.debug('Can\'t send %s to %s, timeout at %i/%i' % ( - command, detector, dt, self.timeouts[command])) + command, logical, dt, self.timeouts[command])) return 1 return 0 - def check_timeouts(self, detector, command=None): + def check_timeouts(self, logical, command=None): """ This one is invoked if we think we need to change states. Either a stop command needs to be sent, or we've detected an anomaly and want to decide what to do. @@ -265,13 +268,12 @@ def check_timeouts(self, detector, command=None): """ time_now = now() - - #First check how often we have been timing out, if it happened to often - # something bad happened and we start from scratch again - if self.missed_arm_cycles[detector]>self.max_arm_cycles and detector=='tpc': + det = list(self.latest_status[logical]['detectors'].keys())[0] + #How often we have been timing out? + if self.missed_arm_cycles[det] > self.max_arm_cycles and det == 'tpc': if (dt := (now()-self.last_nuke).total_seconds()) > self.hv_nuclear_timeout: self.logger.critical('There\'s only one way to be sure') - self.control_detector(detector='tpc', command='stop', force=True) + self.control_detector(logical,'stop', force=True) if self.hypervisor.tactical_nuclear_option(self.mongo.is_linked_mode()): self.last_nuke = now() else: @@ -279,24 +281,21 @@ def check_timeouts(self, detector, command=None): self.logger.debug(f'Nuclear timeout at {int(dt)}/{self.hv_nuclear_timeout}') if command is None: # not specified, we figure out it here - command_times = [(cmd,doc[detector]) for cmd,doc in self.last_command.items()] + command_times = [(cmd,doc[det]) for cmd,doc in self.last_command.items()] command = sorted(command_times, key=lambda x : x[1])[-1][0] - self.logger.debug(f'Most recent command for {detector} is {command}') + self.logger.debug(f'Most recent command for {logical} is {command}') else: - self.logger.debug(f'Checking {command} timeout for {detector}') + self.logger.debug(f'Checking {command} timeout for {logical}') - dt = (time_now - self.last_command[command][detector]).total_seconds() + dt = (time_now - self.last_command[command][det]).total_seconds() local_timeouts = dict(self.timeouts.items()) - local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[detector]+1) + local_timeouts['stop'] = self.timeouts['stop']*(self.error_stop_count[det]+1) - if dt < local_timeouts[command]: - self.logger.debug('%i is within the %i second timeout for a %s command' % - (dt, local_timeouts[command], command)) - else: + if dt > local_timeouts[command]: # timing out, maybe send stop? if command == 'stop': - if self.error_stop_count[detector] >= self.stop_retries: + if self.error_stop_count[det] >= self.stop_retries: # failed too many times, issue error self.mongo.log_error( ("Dispatcher control loop detects a timeout that STOP " + @@ -304,34 +303,37 @@ def check_timeouts(self, detector, command=None): 'ERROR', "STOP_TIMEOUT") # also invoke the nuclear option - if detector == 'tpc': + if det == 'tpc': if (dt := (now()-self.last_nuke).total_seconds()) > self.hv_nuclear_timeout: - self.control_detector(detector='tpc', command='stop', force=True) + self.control_detector(logical,'stop', force=True) self.logger.critical('There\'s only one way to be sure') if self.hypervisor.tactical_nuclear_option(self.mongo.is_linked_mode()): self.last_nuke = now() else: - self.control_detector(detector=detector, command='stop') + self.control_detector(logical, 'stop') self.logger.debug(f'Nuclear timeout at {int(dt)}/{self.hv_nuclear_timeout}') - self.error_stop_count[detector] = 0 + self.error_stop_count[det] = 0 else: - self.control_detector(detector=detector, command='stop') - self.logger.debug(f'Working on a stop counter for {detector}') - self.error_stop_count[detector] += 1 + self.control_detector(logical, 'stop') + self.logger.debug(f'Working on a stop counter for {logical}') + self.error_stop_count[det] += 1 else: self.mongo.log_error( ('%s took more than %i seconds to %s, indicating a possible timeout or error' % - (detector, self.timeouts[command], command)), + (logical, self.timeouts[command], command)), 'ERROR', '%s_TIMEOUT' % command.upper()) #Keep track of how often the arming sequence times out - if self.control_detector(detector=detector, command='stop') == 0: + if self.control_detector(logical, 'stop') == 0: # only increment the counter if we actually issued a STOP - self.missed_arm_cycles[detector] += 1 - self.logger.info(f'{detector} missed {self.missed_arm_cycles[detector]} arm cycles') + self.missed_arm_cycles[det] += 1 + self.logger.info(f'{logical} missed {self.missed_arm_cycles[det]} arm cycles') else: - self.logger.debug(f'{detector} didn\'t actually get a command, no arm cycler increment') - + self.logger.debug(f'{logical} didn\'t actually get a command, no arm cycler increment') + else: + self.logger.debug('%i is within the %i second timeout for a %s command' % + (dt, local_timeouts[command], command)) + return def throw_error(self): @@ -343,23 +345,23 @@ def throw_error(self): 'ERROR', "GENERAL_ERROR") - def check_run_turnover(self, detector): + def check_run_turnover(self, logical): """ During normal operation we want to run for a certain number of minutes, then automatically stop and restart the run. No biggie. We check the time here to see if it's something we have to do. """ - - number = self.latest_status[detector]['number'] + det = list(latest_status[logical]['detectors'].keys())[0] + number = self.latest_status[logical]['number'] start_time = self.mongo.get_run_start(number) if start_time is None: self.logger.debug(f'No start time for {number}?') return time_now = now() - run_length = int(self.goal_state[detector]['stop_after'])*60 + run_length = int(self.goal_state[det]['stop_after'])*60 run_duration = (time_now - start_time).total_seconds() - self.logger.debug('Checking run turnover for %s: %i/%i' % (detector, run_duration, run_length)) + self.logger.debug('Checking run turnover for %s: %i/%i' % (logical, run_duration, run_length)) if run_duration > run_length: - self.logger.info('Stopping run for %s' % detector) - self.control_detector(detector=detector, command='stop') + self.logger.info('Stopping run for %s' % logical) + self.control_detector(logical, 'stop') diff --git a/dispatcher/MongoConnect.py b/dispatcher/MongoConnect.py index 6b5589b1..cae2e3b3 100644 --- a/dispatcher/MongoConnect.py +++ b/dispatcher/MongoConnect.py @@ -46,6 +46,7 @@ class MongoConnect(object): D. Coderre, 12. Mar. 2019 D. Masson, 2019-2021 S. di Pede, 2020-2021 + V. D'Andrea, Oct 2022 Brief: This code handles the mongo connectivity for both the DAQ databases (the ones used for system-wide communication) and the @@ -130,13 +131,13 @@ def __init__(self, config, daq_config, logger, control_mc, runs_mc, hypervisor, self.dc = daq_config self.hv_timeout_fix = {} for detector in self.dc: - self.latest_status[detector] = {'readers': {}, 'controller': {}} + #self.latest_status[detector] = {'readers': {}, 'controller': {}} for reader in self.dc[detector]['readers']: - self.latest_status[detector]['readers'][reader] = {} + #self.latest_status[detector]['readers'][reader] = {} self.host_config[reader] = detector self.hv_timeout_fix[reader] = now() for controller in self.dc[detector]['controller']: - self.latest_status[detector]['controller'][controller] = {} + #self.latest_status[detector]['controller'][controller] = {} self.host_config[controller] = detector self.hv_timeout_fix[controller] = now() @@ -213,8 +214,8 @@ def get_update(self, dc): self.latest_status = dc # Now compute aggregate status - return self.latest_status if self.aggregate_status() is None else None - + return self.latest_status if self.aggregate_status() else None + def clear_error_timeouts(self): self.error_sent = {} @@ -236,17 +237,16 @@ def aggregate_status(self): apply to both """ now_time = time.time() - ret = None aggstat = { - k:{ 'status': -1, - 'detector': k, - 'rate': 0, - 'time': now(), - 'buff': 0, - 'mode': None, - 'pll_unlocks': 0, - 'number': -1} - for k in self.dc} + k:{ 'status': -1, + 'detector': k, + 'rate': 0, + 'time': now(), + 'buff': 0, + 'mode': None, + 'pll_unlocks': 0, + 'number': -1} + for k in self.dc} phys_stat = {k: [] for k in self.dc} for detector in self.latest_status.keys(): # detector = logical @@ -304,7 +304,7 @@ def aggregate_status(self): else: status_list = list(statuses.values()) - # Now we aggregate the statuses + # Now we aggregate the statuses for the logical detectors status = self.combine_statuses(status_list) self.latest_status[detector]['status'] = status @@ -316,10 +316,16 @@ def aggregate_status(self): except Exception as e: self.logger.error(f'DB snafu? Couldn\'t update aggregate status. ' f'{type(e)}, {e}') - + return None self.physical_status = phys_stat - return ret - + + # Aggregate status for the physical detectors + for logical in self.latest_status.keys(): + for det in self.latest_status[logical]['detectors'].keys(): + status = self.combine_statuses(phys_stat[det]) + self.latest_status[logical]['detectors'][det]['status'] = status + return True + def combine_statuses(self, status_list): # First, the "or" statuses for stat in ['ARMING','ERROR','TIMEOUT','UNKNOWN']: @@ -425,9 +431,9 @@ def is_linked(self, a, b): self.logger.debug(f'{a} and {b} aren\'t link?? How this happen?? {mode_a} {detectors}') return False - def get_super_detector(self): + def get_logical_detector(self): """ - Get the Super Detector configuration + Get the Logical Detector configuration if the detectors are in a compatible linked mode. - case A: tpc, mv and nv all linked - case B: tpc, mv and nv all un-linked @@ -436,43 +442,56 @@ def get_super_detector(self): - case E: tpc unlinked, mv and nv linked We will check the compatibility of the linked mode for a pair of detectors per time. """ - ret = {'tpc': {'controller': self.dc['tpc']['controller'][:], - 'readers': self.dc['tpc']['readers'][:], - 'detectors': ['tpc']}} + + tpc = self.dc['tpc'] mv = self.dc['muon_veto'] nv = self.dc['neutron_veto'] - - tpc_mv = self.is_linked('tpc', 'muon_veto') - tpc_nv = self.is_linked('tpc', 'neutron_veto') - mv_nv = self.is_linked('muon_veto', 'neutron_veto') - - # tpc and muon_veto linked mode - if tpc_mv: - # case A or C - ret['tpc']['controller'] += mv['controller'] - ret['tpc']['readers'] += mv['readers'] - ret['tpc']['detectors'] += ['muon_veto'] - else: - # case B or E - ret['muon_veto'] = {'controller': mv['controller'][:], - 'readers': mv['readers'][:], - 'detectors': ['muon_veto']} - if tpc_nv: - # case A or D - ret['tpc']['controller'] += nv['controller'][:] - ret['tpc']['readers'] += nv['readers'][:] - ret['tpc']['detectors'] += ['neutron_veto'] - elif mv_nv and not tpc_mv: + + is_tpc_mv = self.is_linked('tpc', 'muon_veto') + is_tpc_nv = self.is_linked('tpc', 'neutron_veto') + is_mv_nv = self.is_linked('muon_veto', 'neutron_veto') + + if is_tpc_mv and is_tpc_nv and is_mv_nv: + # case A + ret = {'all_linked': {'controller': tpc['controller'][:] + mv['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','muon_veto','neutron_veto']}} + elif is_tpc_mv and not is_tpc_nv and not is_mv_nv: + # case C + ret = {'tpc_mv': {'controller': tpc['controller'][:] + mv['controller'][:], + 'readers': tpc['readers'][:] + mv['readers'][:], + 'detectors': ['tpc','muon_veto']}, + 'nv': {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + elif is_tpc_nv and not is_tpc_mv and not is_mv_nv: + # case D + ret = {'tpc_nv': {'controller': tpc['controller'][:] + nv['controller'][:], + 'readers': tpc['readers'][:] + nv['readers'][:], + 'detectors': ['tpc','neutron_veto']}, + 'mv': {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}} + elif is_mv_nv and not is_tpc_mv and not is_tpc_nv: # case E - ret['muon_veto']['controller'] += nv['controller'][:] - ret['muon_veto']['readers'] += nv['readers'][:] - ret['muon_veto']['detectors'] += ['neutron_veto'] + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv_nv': {'controller': mv['controller'][:] + nv['controller'][:], + 'readers': mv['readers'][:] + nv['readers'][:], + 'detectors': ['muon_veto','neutron_veto']}} else: - # case B or C - ret['neutron_veto'] = {'controller': nv['controller'][:], - 'readers': nv['readers'][:], - 'detectors': ['neutron_veto']} - + # case B + ret = {'tpc': {'controller': tpc['controller'][:], + 'readers': tpc['readers'][:], + 'detectors': ['tpc']}, + 'mv': {'controller': mv['controller'][:], + 'readers': mv['readers'][:], + 'detectors': ['muon_veto']}, + 'nv': {'controller': nv['controller'][:], + 'readers': nv['readers'][:], + 'detectors': ['neutron_veto']}} + # convert the host lists to dics for later for det in list(ret.keys()): ret[det]['controller'] = {c:{} for c in ret[det]['controller']} @@ -543,19 +562,20 @@ def get_next_run_number(self): return 0 return list(cursor)[0]['number']+1 - def set_stop_time(self, number, detectors, force): + def set_stop_time(self, number, logical, force): """ Sets the 'end' field of the run doc to the time when the STOP command was ack'd """ - self.logger.info(f"Updating run {number} with end time ({detectors})") + self.logger.info(f"Updating run {number} with end time ({logical})") if number == -1: return try: time.sleep(0.5) # this number depends on the CC command polling time - if (endtime := self.get_ack_time(detectors, 'stop') ) is None: + if (endtime := self.get_ack_time(logical, 'stop') ) is None: self.logger.debug(f'No end time found for run {number}') - endtime = now() -datetime.timedelta(seconds=1) - query = {"number": int(number), "end": None, 'detectors': detectors} + endtime = now() - datetime.timedelta(seconds=1) + det = list(latest_status[logical]['detectors'].keys())[0] + query = {"number": int(number), "end": None, 'detectors': det} updates = {"$set": {"end": endtime}} if force: updates["$push"] = {"tags": {"name": "_messy", "user": "daq", @@ -571,10 +591,11 @@ def set_stop_time(self, number, detectors, force): ]): rate[doc['_id']] = {'avg': doc['avg'], 'max': doc['max']} channels = set() - if 'tpc' in detectors: + + if 'tpc' in latest_status[logical]['detectors'].keys(): # figure out which channels weren't running - readers = list(self.latest_status[detectors]['readers'].keys()) - for doc in self.collections['node_status'].find({'host': {'$in': readers}, 'number': int(number)}): + readers = list(self.latest_status[logical]['readers'].keys()) + for doc in self.collections['node_status'].find({'host': {'$in': readers},'number': int(number)}): channels |= set(map(int, doc['channels'].keys())) updates = {'rate': rate} if len(channels): @@ -589,12 +610,12 @@ def set_stop_time(self, number, detectors, force): self.logger.error(f"Database having a moment, hope this doesn't crash. {type(e)}, {e}") return - def get_ack_time(self, detector, command, recurse=True): + def get_ack_time(self, logical, command, recurse=True): ''' Finds the time when specified detector's crate controller ack'd the specified command ''' # the first cc is the "master", so its ack time is what counts - cc = list(self.latest_status[detector]['controller'].keys())[0] + cc = list(self.latest_status[logical]['controller'].keys())[0] query = {'host': cc, f'acknowledged.{cc}': {'$ne': 0}, 'command': command} sort = [('_id', -1)] doc = self.collections['outgoing_commands'].find_one(query, sort=sort) @@ -602,12 +623,12 @@ def get_ack_time(self, detector, command, recurse=True): if dt > 30: # TODO make this a config value if recurse: # No way we found the correct command here, maybe we're too soon - self.logger.debug(f'Most recent ack for {detector}-{command} is {dt:.1f}?') + self.logger.debug(f'Most recent ack for {logical}-{command} is {dt:.1f}?') time.sleep(2) # if in doubt - return self.get_ack_time(detector, command, False) + return self.get_ack_time(logical, command, False) else: # Welp - self.logger.debug(f'No recent ack time for {detector}-{command}') + self.logger.debug(f'No recent ack time for {logical}-{command}') return None return doc['acknowledged'][cc] @@ -763,35 +784,35 @@ def get_run_start(self, number): return self.run_start_cache[str(number)] return None - def insert_run_doc(self, detector): + def insert_run_doc(self, logical): if (number := self.get_next_run_number()) == NO_NEW_RUN: self.logger.error("DB having a moment") return -1 # the rundoc gets the physical detectors, not the logical detectors = self.latest_status[detector]['detectors'] - + det = list(detectors.keys())[0]: run_doc = { "number": number, 'detectors': detectors, - 'user': self.goal_state[detector]['user'], - 'mode': self.goal_state[detector]['mode'], + 'user': self.goal_state[det]['user'], + 'mode': self.goal_state[det]['mode'], 'bootstrax': {'state': None}, 'end': None } # If there's a source add the source. Also add the complete ini file. - cfg = self.get_run_mode(self.goal_state[detector]['mode']) + cfg = self.get_run_mode(self.goal_state[det]['mode']) if cfg is not None and 'source' in cfg.keys(): run_doc['source'] = str(cfg['source']) run_doc['daq_config'] = cfg # If the user started the run with a comment add that too - if "comment" in self.goal_state[detector] and self.goal_state[detector]['comment'] != "": + if "comment" in self.goal_state[det] and self.goal_state[det]['comment'] != "": run_doc['comments'] = [{ - "user": self.goal_state[detector]['user'], + "user": self.goal_state[det]['user'], "date": now(), - "comment": self.goal_state[detector]['comment'] + "comment": self.goal_state[det]['comment'] }] # Make a data entry so bootstrax can find the thing @@ -805,7 +826,7 @@ def insert_run_doc(self, detector): # The cc needs some time to get started time.sleep(self.cc_start_wait) try: - start_time = self.get_ack_time(detector, 'start') + start_time = self.get_ack_time(logical, 'start') except Exception as e: self.logger.error('Couldn\'t find start time ack') start_time = None diff --git a/dispatcher/config.ini b/dispatcher/config.ini index 6471db91..85e6d356 100644 --- a/dispatcher/config.ini +++ b/dispatcher/config.ini @@ -68,8 +68,8 @@ MasterDAQConfig = { "neutron_veto": { "controller": ["reader6_controller_0"], "readers": ["reader6_reader_0", "reader6_reader_1"] - } - } + } + } # Addresses for the VME crates VMEConfig = { diff --git a/dispatcher/dispatcher.py b/dispatcher/dispatcher.py index b106a1c2..266968f6 100755 --- a/dispatcher/dispatcher.py +++ b/dispatcher/dispatcher.py @@ -90,25 +90,25 @@ def main(config, control_mc, logger, daq_config, vme_config, SlackBot, runs_mc, # Get most recent goal state from database. Users will update this from the website. if (goal_state := mc.get_wanted_state()) is None: continue - # Get the Super-Detector configuration - current_config = mc.get_super_detector() + # Get the Logical Detector configuration + current_config = mc.get_logical_detector() # Get most recent check-in from all connected hosts if (latest_status := mc.get_update(current_config)) is None: continue - # Print an update - for detector in latest_status.keys(): - state = 'ACTIVE' if goal_state[detector]['active'] == 'true' else 'INACTIVE' - msg = (f'The {detector} should be {state} and is ' - f'{latest_status[detector]["status"].name}') - if latest_status[detector]['number'] != -1: - msg += f' ({latest_status[detector]["number"]})' - logger.debug(msg) + for logical in latest_status.keys(): + for det in latest_status[logical]['detectors'].keys(): + state = 'ACTIVE' if goal_state[det]['active'] == 'true' else 'INACTIVE' + msg = (f'{logical} {det} should be {state}: ' + f'logical detector is {latest_status[logical]["status"]}, ' + f'physical detector is {latest_status[logical]["detectors"][det]["status"]}') + if latest_status[logical]['number'] != -1: + msg += f' ({latest_status[logical]["number"]})' + logger.debug(msg) msg = (f"Linking: tpc-mv: {mc.is_linked('tpc', 'muon_veto')}, " f"tpc-nv: {mc.is_linked('tpc', 'neutron_veto')}, " f"mv-nv: {mc.is_linked('muon_veto', 'neutron_veto')}") logger.debug(msg) - # Decision time. Are we actually in our goal state? If not what should we do? dc.solve_problem(latest_status, goal_state) diff --git a/dispatcher/hypervisor.py b/dispatcher/hypervisor.py index a3df0a53..625fd62f 100644 --- a/dispatcher/hypervisor.py +++ b/dispatcher/hypervisor.py @@ -14,7 +14,7 @@ def date_now(): class Hypervisor(object): - __version__ = '4.0.3' + __version__ = '4.0.4' def __init__(self, db, logger, @@ -405,19 +405,51 @@ def linked_nuclear_option(self): if not hasattr(self, 'mongo_connect'): self.logger.error('Darryl hasn\'t made this work in testing yet') raise ValueError('This only works in prod') - ok, not_ok = [], [] + + timeout_list, running_list = [], [] physical_status = self.mongo_connect.physical_status - for phys_det, statuses in physical_status.items(): - if self.mongo_connect.combine_statuses(statuses) in [daqnt.DAQ_STATUS.TIMEOUT]: - not_ok.append(phys_det) - else: - ok.append(phys_det) - self.logger.debug(f'These detectors are ok: {ok}, these aren\'t: {not_ok}') - if self.detector in not_ok: - # welp, looks like we're part of the problem + latest_status = self.mongo_connect.latest_status + host_config = self.mongo_connect.host_config + phys_stat = {k: [] for k in config} + now_time = time.time() + for logical in latest_status.keys(): + log_status = latest_status[logical]["status"] + for n, doc in latest_status[logical]['readers'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + continue + det = host_config[doc['host']] + status = self.mongo_connect.extract_status(doc, now_time) + doc['status'] = status + phys_stat[det].append(status) + self.logger.debug(f'{det}: {doc["host"]} is in {status}') + for n, doc in self.latest_status[logical]['controller'].items(): + if doc is None: + self.logger.debug(f'{n} seems to have been offline for a few days') + continue + det = host_config[doc['host']] + status = self.mongo_connect.extract_status(doc, now_time) + doc['status'] = status + phys_stat[det].append(status) + self.logger.debug(f'{det}: {doc["host"]} is in {status}') + for det in latest_status[logical]['detectors'].keys(): + phy_status = latest_status[logical]['detectors']['status'] + agg_status = self.mongo_connect.combine_statuses(phys_stat[det]) + self.logger.debug(f'{logical} is {log_status}, {det} is {phy_status} -> {agg_stat}') + #if phy_status in [daqnt.DAQ_STATUS.TIMEOUT]: timeout_list.append(phys_det) + #else: running_list.append(phys_det) + if (phy_status != agg_status) or (phy_status in ['ERROR','TIMEOUT','UNKNOWN']): + timeout_list.append(det) + else: + running_list.append(det) + + self.logger.debug(f'These detectors are not in timeout: {running_list}, ' + f'these are in timeout: {timeout_list}') + # check if the TPC is part of the problem + if 'tpc' in timeout_list: return False - if len(ok) == len(physical_status): + if len(timeout_list) == len(physical_status): self.logger.error('Uh, how did you get here???') self.slackbot.send_message('This happened again, you should really' ' get someone to fix this', tags='ALL') @@ -428,12 +460,12 @@ def linked_nuclear_option(self): add_tags='ALL') # ok, we aren't the problem, let's see about unlinking - if len(ok) == 1: + if len(running_list) == 1: # everyone else has died, it's just us left logical_detectors = physical_status.keys() - elif self.mongo_connect.is_linked(ok[0], ok[1]): + elif self.mongo_connect.is_linked(running_list[0], running_list[1]): # the detector we are linked with is fine, the other one crashed - logical_detectors = [ok, not_ok] + logical_detectors = [running_list, timeout_list] else: # the detector we linked with crashed, and the other one is fine logical_detectors = physical_status.keys()