diff --git a/blacs/__main__.py b/blacs/__main__.py index 56aedb9..130109d 100644 --- a/blacs/__main__.py +++ b/blacs/__main__.py @@ -693,20 +693,29 @@ def on_open_preferences(self,*args,**kwargs): self.settings.create_dialog() class ExperimentServer(ZMQServer): - def handler(self, h5_filepath): - print(h5_filepath) - message = self.process(h5_filepath) + def handler(self, data): + """ + This is an override to accept remote messages. + """ + message = self.process(data) logger.info('Request handler: %s ' % message.strip()) return message @inmain_decorator(wait_for_return=True) - def process(self,h5_filepath): + def process(self, data): + + agnostic_path = data["agnostic_path"] + lyse_host = data["lyse_host"] + # Convert path to local slashes and shared drive prefix: - logger.info('received filepath: %s'%h5_filepath) - h5_filepath = labscript_utils.shared_drive.path_to_local(h5_filepath) + logger.info('received filepath: %s'%agnostic_path) + h5_filepath = labscript_utils.shared_drive.path_to_local(agnostic_path) logger.info('local filepath: %s'%h5_filepath) - return app.queue.process_request(h5_filepath) + # NASTY CODE STYLE: this is in reference to the global variable `app = BLACS(qapplication)` defined below + # Took me 30 minutes to track down this logic. + # TODO: banish global variables of this type. + return app.queue.process_request(h5_filepath, lyse_host=lyse_host) if __name__ == '__main__': if 'tracelog' in sys.argv: diff --git a/blacs/analysis_submission.py b/blacs/analysis_submission.py index 40aa332..8a621e2 100644 --- a/blacs/analysis_submission.py +++ b/blacs/analysis_submission.py @@ -32,6 +32,17 @@ class AnalysisSubmission(object): + + icon_names = {'checking': ':/qtutils/fugue/hourglass', + 'online': ':/qtutils/fugue/tick', + 'offline': ':/qtutils/fugue/exclamation', + '': ':/qtutils/fugue/status-offline'} + + tooltips = {'checking': 'Checking...', + 'online': 'Server is responding', + 'offline': 'Server not responding', + '': 'Disabled'} + def __init__(self, BLACS, blacs_ui): self.inqueue = queue.Queue() self.BLACS = BLACS @@ -41,18 +52,20 @@ def __init__(self, BLACS, blacs_ui): blacs_ui.analysis.addWidget(self._ui) self._ui.frame.setMinimumWidth(blacs_ui.queue_controls_frame.sizeHint().width()) elide_label(self._ui.resend_shots_label, self._ui.failed_to_send_frame.layout(), Qt.ElideRight) + + self._waiting_for_submission = {} + self.failure_reason = {} + self.send_to_server = False + self.server = '' + self.time_of_last_connectivity_check = 0 + self.server_online = {} + # connect signals self._ui.send_to_server.toggled.connect(lambda state: self._set_send_to_server(state)) self._ui.server.editingFinished.connect(lambda: self._set_server(self._ui.server.text())) self._ui.clear_unsent_shots_button.clicked.connect(lambda _: self.clear_waiting_files()) self._ui.retry_button.clicked.connect(lambda _: self.check_retry()) - self._waiting_for_submission = [] - self.failure_reason = None - self.server_online = 'offline' - self.send_to_server = False - self.server = '' - self.time_of_last_connectivity_check = 0 self.mainloop_thread = threading.Thread(target=self.mainloop) self.mainloop_thread.daemon = True @@ -64,16 +77,23 @@ def restore_save_data(self,data): if "send_to_server" in data: self.send_to_server = data["send_to_server"] if "waiting_for_submission" in data: - self._waiting_for_submission = list(data["waiting_for_submission"]) - self.inqueue.put(['save data restored', None]) + self._waiting_for_submission = dict(data["waiting_for_submission"]) + self.inqueue.put(['save data restored', None, None]) self.check_retry() def get_save_data(self): - return {"waiting_for_submission":list(self._waiting_for_submission), - "server":self.server, - "send_to_server":self.send_to_server + return {"waiting_for_submission": dict(self._waiting_for_submission), + "server": self.server, + "send_to_server": self.send_to_server } + def _waiting_for_submission_len(self): + length = 0 + for k, v in enumerate(self._waiting_for_submission): + length += len(v) + + return length + def _set_send_to_server(self,value): self.send_to_server = value @@ -118,24 +138,36 @@ def server_online(self): @server_online.setter @inmain_decorator(True) - def server_online(self,value): - self._server_online = str(value) - - icon_names = {'checking': ':/qtutils/fugue/hourglass', - 'online': ':/qtutils/fugue/tick', - 'offline': ':/qtutils/fugue/exclamation', - '': ':/qtutils/fugue/status-offline'} + def server_online(self, value): - tooltips = {'checking': 'Checking...', - 'online': 'Server is responding', - 'offline': 'Server not responding', - '': 'Disabled'} + self._server_online = value - icon = QIcon(icon_names.get(self._server_online, ':/qtutils/fugue/exclamation-red')) + status = 'online' + tooltip = '' + for server in self._waiting_for_submission: + + if server not in value: + value[server] = '' + + v = value[server] + + if v == 'offline': + status = 'offline' + if tooltip != '': + tooltip += '\n' + + tip = self.tooltips.get(status, 'Invalid message {}'.format(status)) + tooltip += 'Server {} status: {}'.format(server, tip) + + if server not in self.failure_reason: + self.failure_reason[server] = None + tooltip += 'Server not checked yet' + + if self.failure_reason[server] is not None: + tooltip += '[[{}]]'.format(self.failure_reason[server]) + + icon = QIcon(self.icon_names.get(status, ':/qtutils/fugue/exclamation-red')) pixmap = icon.pixmap(QSize(16, 16)) - tooltip = tooltips.get(self._server_online, "Invalid server status: %s" % self._server_online) - if self.failure_reason is not None: - tooltip += '\n' + self.failure_reason # Update GUI: self._ui.server_online.setPixmap(pixmap) @@ -145,33 +177,46 @@ def server_online(self,value): @inmain_decorator(True) def update_waiting_files_message(self): - # if there is only one shot and we haven't encountered failure yet, do - # not show the error frame: - if (self.server_online == 'checking') and (len(self._waiting_for_submission) == 1) and not self._ui.failed_to_send_frame.isVisible(): - return - if self._waiting_for_submission: + + message = '' + failed = False + for server, shots in self._waiting_for_submission.items(): + length = len(shots) + + # The server may never have been checked + if server not in self.server_online: + self._server_online[server] = '' + + # if there is only one shot and we haven't encountered failure yet, do + # not show the error frame: + if (self.server_online[server] == 'checking') and (length == 1) and not self._ui.failed_to_send_frame.isVisible(): + pass + elif length: + if self.server_online[server] == 'checking': + message += 'Server {}: Sending {} shot(s)...'.format(server, length) + else: + message += 'Server {}: {} shot(s) to send...'.format(server, length) + + if failed and self._waiting_for_submission_len(): self._ui.failed_to_send_frame.show() - if self.server_online == 'checking': - self._ui.retry_button.hide() - text = 'Sending %s shot(s)...' % len(self._waiting_for_submission) - else: - self._ui.retry_button.show() - text = '%s shot(s) to send' % len(self._waiting_for_submission) - self._ui.resend_shots_label.setText(text) else: self._ui.failed_to_send_frame.hide() + self._ui.resend_shots_label.setText(message) + + self._ui.retry_button.show() + def get_queue(self): return self.inqueue @inmain_decorator(True) def clear_waiting_files(self): - self._waiting_for_submission = [] + self._waiting_for_submission = {} self.update_waiting_files_message() @inmain_decorator(True) def check_retry(self): - self.inqueue.put(['check/retry', None]) + self.inqueue.put(['check/retry', None, None]) def mainloop(self): self._mainloop_logger = logging.getLogger('BLACS.AnalysisSubmission.mainloop') @@ -182,39 +227,27 @@ def mainloop(self): while True: try: try: - signal, data = self.inqueue.get(timeout=timeout) + signal, data, lyse_host = self.inqueue.get(timeout=timeout) except queue.Empty: - timeout = 10 - # Periodic checking of connectivity and resending of files. - # Don't trigger a re-check if we already failed a connectivity - # check within the last second: - if (time.time() - self.time_of_last_connectivity_check) > 1: - signal = 'check/retry' - else: - continue - if signal == 'check/retry': - self.check_connectivity() - if self.server_online == 'online': - self.submit_waiting_files() - elif signal == 'file': + continue + + if signal == 'file': if self.send_to_server: - self._waiting_for_submission.append(data) - if self.server_online != 'online': - # Don't stack connectivity checks if many files are - # arriving. If we failed a connectivity check less - # than a second ago then don't check again. - if (time.time() - self.time_of_last_connectivity_check) > 1: - self.check_connectivity() - else: - # But do queue up a check for when we have - # been idle for one second: - timeout = 1 - if self.server_online == 'online': - self.submit_waiting_files() + + lyse_host = lyse_host if lyse_host != '' else self.server + + if lyse_host not in self._waiting_for_submission: + self._waiting_for_submission[lyse_host] = [] + + self._waiting_for_submission[lyse_host].append(data) + + self.submit_waiting_files() elif signal == 'close': break elif signal == 'save data restored': continue + elif signal == 'check/retry': + self.submit_waiting_files() else: raise ValueError('Invalid signal: %s'%str(signal)) @@ -225,53 +258,71 @@ def mainloop(self): self._mainloop_logger.exception("Exception in mainloop, continuing") def check_connectivity(self): - host = self.server - send_to_server = self.send_to_server - if host and send_to_server: - self.server_online = 'checking' - try: - response = zmq_get(self.port, host, 'hello', timeout=1) - self.failure_reason = None - except (TimeoutError, gaierror, AuthenticationFailure) as e: - success = False - self.failure_reason = str(e) - else: - success = (response == 'hello') - if not success: - self.failure_reason = "unexpected reponse: %s" % str(response) + + server_online = {} + + for server in self._waiting_for_submission: + send_to_server = self.send_to_server + if send_to_server: + server_online[server] = 'checking' + self.server_online = server_online # update GUI - # update GUI - self.server_online = 'online' if success else 'offline' - else: - self.server_online = '' + try: + response = zmq_get(self.port, server, 'hello', timeout=1) + self.failure_reason[k] = None + except (TimeoutError, gaierror, AuthenticationFailure) as e: + success = False + self.failure_reason[k] = str(e) + else: + success = (response == 'hello') + if not success: + self.failure_reason[k] = "unexpected reponse: %s" % str(response) + + server_online[server] = 'online' if success else 'offline' + else: + server_online[server] = '' + # update GUI + self.server_online = server_online + self.time_of_last_connectivity_check = time.time() def submit_waiting_files(self): - success = True - while self._waiting_for_submission and success: - path = self._waiting_for_submission[0] - self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path)) - data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)} - self.server_online = 'checking' - try: - response = zmq_get(self.port, self.server, data, timeout=1) - self.failure_reason = None - except (TimeoutError, gaierror, AuthenticationFailure) as e: - success = False - self.failure_reason = str(e) - else: - success = (response == 'added successfully') - if not success: - self.failure_reason = "unexpected reponse: %s" % str(response) + + server_online = {} + for server, shots in self._waiting_for_submission.items(): + success = True + + while shots and success: + path = shots[0] + self.server = server + + self._mainloop_logger.info('Submitting run file %s.\n'%os.path.basename(path)) + data = {'filepath': labscript_utils.shared_drive.path_to_agnostic(path)} + + server_online[server] = 'checking' + self.server_online = server_online # update GUI + try: - self._waiting_for_submission.pop(0) - except IndexError: - # Queue has been cleared - pass - if not success: - break + response = zmq_get(self.port, server, data, timeout=1) + self.failure_reason[server] = None + except (TimeoutError, gaierror, AuthenticationFailure) as e: + success = False + self.failure_reason[server] = str(e) + else: + success = (response == 'added successfully') + if not success: + self.failure_reason[server] = "unexpected reponse: %s" % str(response) + try: + shots.pop(0) + except IndexError: + # Queue has been cleared + pass + + server_online[server] = 'online' if success else 'offline' + # update GUI - self.server_online = 'online' if success else 'offline' + self.server_online = server_online + self.time_of_last_connectivity_check = time.time() diff --git a/blacs/experiment_queue.py b/blacs/experiment_queue.py index dce7efd..70d49e3 100644 --- a/blacs/experiment_queue.py +++ b/blacs/experiment_queue.py @@ -363,10 +363,15 @@ def _move_bottom(self): row += 1 @inmain_decorator(True) - def append(self, h5files): - for file in h5files: + def append(self, data): + for datum in data: + file = datum["filepath"] + + # TODO: this is where lyse_host stops being encoded. + lyse_host = datum["lyse_host"] + item = QStandardItem(file) - item.setToolTip(file) + item.setToolTip(f"Filename: {file}, lyse: {lyse_host}") self._model.appendRow(item) @inmain_decorator(True) @@ -374,7 +379,7 @@ def prepend(self,h5file): if not self.is_in_queue(h5file): self._model.insertRow(0,QStandardItem(h5file)) - def process_request(self,h5_filepath): + def process_request(self, h5_filepath, lyse_host=''): # check connection table try: new_conn = ConnectionTable(h5_filepath, logging_prefix='BLACS') @@ -388,6 +393,7 @@ def process_request(self,h5_filepath): rerun = True else: rerun = False + if rerun or self.is_in_queue(h5_filepath): self._logger.debug('Run file has already been run! Creating a fresh copy to rerun') new_h5_filepath, repeat_number = self.new_rep_name(h5_filepath) @@ -397,11 +403,16 @@ def process_request(self,h5_filepath): success = self.clean_h5_file(h5_filepath, new_h5_filepath, repeat_number=repeat_number) if not success: return 'Cannot create a re run of this experiment. Is it a valid run file?' - self.append([new_h5_filepath]) message = "Experiment added successfully: experiment to be re-run\n" else: - self.append([h5_filepath]) + new_h5_filepath = h5_filepath message = "Experiment added successfully\n" + + data = {"filepath": new_h5_filepath, + "lyse_host": lyse_host} + + self.append([data]) + if self.manager_paused: message += "Warning: Queue is currently paused\n" if not self.manager_running: @@ -852,7 +863,7 @@ def restart_function(device_name): data_group = hdf5_file['/'].create_group('data') # stamp with the run time of the experiment hdf5_file.attrs['run time'] = run_time.strftime('%Y%m%dT%H%M%S.%f') - + error_condition = False response_list = {} # Keep transitioning tabs to manual mode and waiting on them until they @@ -955,8 +966,9 @@ def restart_function(device_name): logger.exception("Plugin callback raised an exception") # Submit to the analysis server + # TODO: will fail currently as lyse_host is not defined if send_to_analysis: - self.BLACS.analysis_submission.get_queue().put(['file', path]) + self.BLACS.analysis_submission.get_queue().put(['file', path, lyse_host]) ########################################################################################################################################## # Plugin callbacks #