diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index e9c247b3..2f02b710 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -113,22 +113,11 @@ def __getitem__(self, index: int): class ModelicaSystemCmd: - """ - Execute a simulation by running the compiled model. - """ - - def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[int] = None) -> None: - """ - Initialisation + """A compiled model executable.""" - Parameters - ---------- - runpath : pathlib.Path - modelname : str - timeout : Optional[int], None - """ + def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[float] = None) -> None: self._runpath = pathlib.Path(runpath).resolve().absolute() - self._modelname = modelname + self._model_name = modelname self._timeout = timeout self._args: dict[str, str | None] = {} self._arg_override: dict[str, str] = {} @@ -188,17 +177,11 @@ def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: self.arg_set(key=arg, val=args[arg]) def get_exe(self) -> pathlib.Path: - """ - Get the path to the executable / complied model. - - Returns - ------- - pathlib.Path - """ + """Get the path to the compiled model executable.""" if platform.system() == "Windows": - path_exe = self._runpath / f"{self._modelname}.exe" + path_exe = self._runpath / f"{self._model_name}.exe" else: - path_exe = self._runpath / self._modelname + path_exe = self._runpath / self._model_name if not path_exe.exists(): raise ModelicaSystemError(f"Application file path not found: {path_exe}") @@ -206,12 +189,9 @@ def get_exe(self) -> pathlib.Path: return path_exe def get_cmd(self) -> list: - """ - Run the requested simulation + """Get a list with the path to the executable and all command line args. - Returns - ------- - list + This can later be used as an argument for subprocess.run(). """ path_exe = self.get_exe() @@ -226,12 +206,11 @@ def get_cmd(self) -> list: return cmdl def run(self) -> int: - """ - Run the requested simulation + """Run the requested simulation. Returns ------- - int + Subprocess return code (0 on success). """ cmdl: list = self.get_cmd() @@ -242,7 +221,7 @@ def run(self) -> int: path_dll = "" # set the process environment from the generated .bat file in windows which should have all the dependencies - path_bat = self._runpath / f"{self._modelname}.bat" + path_bat = self._runpath / f"{self._model_name}.bat" if not path_bat.exists(): raise ModelicaSystemError("Batch file (*.bat) does not exist " + str(path_bat)) @@ -278,17 +257,9 @@ def run(self) -> int: @staticmethod def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, str]]]: """ - Parse a simflag definition; this is depreciated! + Parse a simflag definition; this is deprecated! The return data can be used as input for self.args_set(). - - Parameters - ---------- - simflags : str - - Returns - ------- - dict """ warnings.warn("The argument 'simflags' is depreciated and will be removed in future versions; " "please use 'simargs' instead", DeprecationWarning, stacklevel=2) @@ -378,30 +349,30 @@ def __init__( if modelName is None: raise ModelicaSystemError("A modelname must be provided (argument modelName)!") - self.quantitiesList: list[dict[str, Any]] = [] - self.paramlist: dict[str, str] = {} # even numerical values are stored as str - self.inputlist: dict[str, list | None] = {} - # outputlist values are str before simulate(), but they can be + self._quantities: list[dict[str, Any]] = [] + self._params: dict[str, str] = {} # even numerical values are stored as str + self._inputs: dict[str, list | None] = {} + # _outputs values are str before simulate(), but they can be # np.float64 after simulate(). - self.outputlist: dict[str, Any] = {} - # same for continuouslist - self.continuouslist: dict[str, Any] = {} - self.simulateOptions: dict[str, str] = {} - self.overridevariables: dict[str, str] = {} - self.simoptionsoverride: dict[str, str] = {} - self.linearOptions = {'startTime': 0.0, 'stopTime': 1.0, 'stepSize': 0.002, 'tolerance': 1e-8} - self.optimizeOptions = {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, - 'tolerance': 1e-8} - self.linearinputs: list[str] = [] # linearization input list - self.linearoutputs: list[str] = [] # linearization output list - self.linearstates: list[str] = [] # linearization states list + self._outputs: dict[str, Any] = {} + # same for _continuous + self._continuous: dict[str, Any] = {} + self._simulate_options: dict[str, str] = {} + self._override_variables: dict[str, str] = {} + self._simulate_options_override: dict[str, str] = {} + self._linearization_options = {'startTime': 0.0, 'stopTime': 1.0, 'stepSize': 0.002, 'tolerance': 1e-8} + self._optimization_options = {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, + 'tolerance': 1e-8} + self._linearized_inputs: list[str] = [] # linearization input list + self._linearized_outputs: list[str] = [] # linearization output list + self._linearized_states: list[str] = [] # linearization states list if session is not None: if not isinstance(session, OMCSessionZMQ): raise ModelicaSystemError("Invalid session data provided!") - self.getconn = session + self._getconn = session else: - self.getconn = OMCSessionZMQ(omhome=omhome) + self._getconn = OMCSessionZMQ(omhome=omhome) # set commandLineOptions if provided by users self.setCommandLineOptions(commandLineOptions=commandLineOptions) @@ -412,19 +383,18 @@ def __init__( if not isinstance(lmodel, list): raise ModelicaSystemError(f"Invalid input type for lmodel: {type(lmodel)} - list expected!") - self.xmlFile = None - self.lmodel = lmodel # may be needed if model is derived from other model - self.modelName = modelName # Model class name - self.fileName = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name - self.inputFlag = False # for model with input quantity - self.simulationFlag = False # if the model is simulated? - self.outputFlag = False - self.csvFile: Optional[pathlib.Path] = None # for storing inputs condition - self.resultfile: Optional[pathlib.Path] = None # for storing result file - self.variableFilter = variableFilter + self._xml_file = None + self._lmodel = lmodel # may be needed if model is derived from other model + self._model_name = modelName # Model class name + self._file_name = pathlib.Path(fileName).resolve() if fileName is not None else None # Model file/package name + self._has_inputs = False # for model with input quantity + self._simulated = False # True if the model has already been simulated + self._csvFile: Optional[pathlib.Path] = None # for storing inputs condition + self._result_file: Optional[pathlib.Path] = None # for storing result file + self._variable_filter = variableFilter - if self.fileName is not None and not self.fileName.is_file(): # if file does not exist - raise IOError(f"{self.fileName} does not exist!") + if self._file_name is not None and not self._file_name.is_file(): # if file does not exist + raise IOError(f"{self._file_name} does not exist!") # set default command Line Options for linearization as # linearize() will use the simulation executable and runtime @@ -432,15 +402,15 @@ def __init__( self.setCommandLineOptions("--linearizationDumpLanguage=python") self.setCommandLineOptions("--generateSymbolicLinearization") - self.tempdir = self.setTempDirectory(customBuildDirectory) + self._tempdir = self.setTempDirectory(customBuildDirectory) - if self.fileName is not None: - self.loadLibrary(lmodel=self.lmodel) - self.loadFile(fileName=self.fileName) + if self._file_name is not None: + self._loadLibrary(lmodel=self._lmodel) + self._loadFile(fileName=self._file_name) # allow directly loading models from MSL without fileName elif fileName is None and modelName is not None: - self.loadLibrary(lmodel=self.lmodel) + self._loadLibrary(lmodel=self._lmodel) if build: self.buildModel(variableFilter) @@ -452,12 +422,12 @@ def setCommandLineOptions(self, commandLineOptions: Optional[str] = None): exp = f'setCommandLineOptions("{commandLineOptions}")' self.sendExpression(exp) - def loadFile(self, fileName: pathlib.Path): + def _loadFile(self, fileName: pathlib.Path): # load file self.sendExpression(f'loadFile("{fileName.as_posix()}")') # for loading file/package, loading model and building model - def loadLibrary(self, lmodel: list): + def _loadLibrary(self, lmodel: list): # load Modelica standard libraries or Modelica files if needed for element in lmodel: if element is not None: @@ -466,7 +436,7 @@ def loadLibrary(self, lmodel: list): apiCall = "loadFile" else: apiCall = "loadModel" - self.requestApi(apiCall, element) + self._requestApi(apiCall, element) elif isinstance(element, tuple): if not element[1]: expr_load_lib = f"loadModel({element[0]})" @@ -498,26 +468,26 @@ def setTempDirectory(self, customBuildDirectory: Optional[str | os.PathLike | pa return tempdir def getWorkDirectory(self) -> pathlib.Path: - return self.tempdir + return self._tempdir def buildModel(self, variableFilter: Optional[str] = None): if variableFilter is not None: - self.variableFilter = variableFilter + self._variable_filter = variableFilter - if self.variableFilter is not None: - varFilter = f'variableFilter="{self.variableFilter}"' + if self._variable_filter is not None: + varFilter = f'variableFilter="{self._variable_filter}"' else: varFilter = 'variableFilter=".*"' - buildModelResult = self.requestApi("buildModel", self.modelName, properties=varFilter) + buildModelResult = self._requestApi("buildModel", self._model_name, properties=varFilter) logger.debug("OM model build result: %s", buildModelResult) - self.xmlFile = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] - self.xmlparse() + self._xml_file = pathlib.Path(buildModelResult[0]).parent / buildModelResult[1] + self._xmlparse() def sendExpression(self, expr: str, parsed: bool = True): try: - retval = self.getconn.sendExpression(expr, parsed) + retval = self._getconn.sendExpression(expr, parsed) except OMCSessionException as ex: raise ModelicaSystemError(f"Error executing {repr(expr)}") from ex @@ -526,7 +496,7 @@ def sendExpression(self, expr: str, parsed: bool = True): return retval # request to OMC - def requestApi(self, apiName, entity=None, properties=None): # 2 + def _requestApi(self, apiName, entity=None, properties=None): # 2 if entity is not None and properties is not None: exp = f'{apiName}({entity}, {properties})' elif entity is not None and properties is None: @@ -539,16 +509,16 @@ def requestApi(self, apiName, entity=None, properties=None): # 2 return self.sendExpression(exp) - def xmlparse(self): - if not self.xmlFile.is_file(): - raise ModelicaSystemError(f"XML file not generated: {self.xmlFile}") + def _xmlparse(self): + if not self._xml_file.is_file(): + raise ModelicaSystemError(f"XML file not generated: {self._xml_file}") - tree = ET.parse(self.xmlFile) + tree = ET.parse(self._xml_file) rootCQ = tree.getroot() for attr in rootCQ.iter('DefaultExperiment'): for key in ("startTime", "stopTime", "stepSize", "tolerance", "solver", "outputFormat"): - self.simulateOptions[key] = attr.get(key) + self._simulate_options[key] = attr.get(key) for sv in rootCQ.iter('ScalarVariable'): scalar = {} @@ -564,18 +534,18 @@ def xmlparse(self): scalar["unit"] = att.get('unit') if scalar["variability"] == "parameter": - if scalar["name"] in self.overridevariables: - self.paramlist[scalar["name"]] = self.overridevariables[scalar["name"]] + if scalar["name"] in self._override_variables: + self._params[scalar["name"]] = self._override_variables[scalar["name"]] else: - self.paramlist[scalar["name"]] = scalar["start"] + self._params[scalar["name"]] = scalar["start"] if scalar["variability"] == "continuous": - self.continuouslist[scalar["name"]] = scalar["start"] + self._continuous[scalar["name"]] = scalar["start"] if scalar["causality"] == "input": - self.inputlist[scalar["name"]] = scalar["start"] + self._inputs[scalar["name"]] = scalar["start"] if scalar["causality"] == "output": - self.outputlist[scalar["name"]] = scalar["start"] + self._outputs[scalar["name"]] = scalar["start"] - self.quantitiesList.append(scalar) + self._quantities.append(scalar) def getQuantities(self, names: Optional[str | list[str]] = None) -> list[dict]: """ @@ -621,60 +591,87 @@ def getQuantities(self, names: Optional[str | list[str]] = None) -> list[dict]: ] """ if names is None: - return self.quantitiesList + return self._quantities if isinstance(names, str): - r = [x for x in self.quantitiesList if x["name"] == names] + r = [x for x in self._quantities if x["name"] == names] if r == []: raise KeyError(names) return r if isinstance(names, list): - return [x for y in names for x in self.quantitiesList if x["name"] == y] + return [x for y in names for x in self._quantities if x["name"] == y] raise ModelicaSystemError("Unhandled input for getQuantities()") - def getContinuous(self, names=None): # 4 - """ - This method returns dict. The key is continuous names and value is corresponding continuous value. - usage: - >>> getContinuous() - >>> getContinuous("Name1") - >>> getContinuous(["Name1","Name2"]) + def getContinuous(self, names: Optional[str | list[str]] = None): + """Get values of continuous signals. + + If called before simulate(), the initial values are returned as + strings (or None). If called after simulate(), the final values (at + stopTime) are returned as numpy.float64. + + Args: + names: Either None (default), a string with the continuous signal + name, or a list of signal name strings. + Returns: + If `names` is None, a dict in the format + {signal_name: signal_value} is returned. + If `names` is a string, a single element list [signal_value] is + returned. + If `names` is a list, a list with one value for each signal name + in names is returned: [signal1_value, signal2_value, ...]. + + Examples: + Before simulate(): + >>> mod.getContinuous() + {'x': '1.0', 'der(x)': None, 'y': '-0.4'} + >>> mod.getContinuous("y") + ['-0.4'] + >>> mod.getContinuous(["y","x"]) + ['-0.4', '1.0'] + + After simulate(): + >>> mod.getContinuous() + {'x': np.float64(0.68), 'der(x)': np.float64(-0.24), 'y': np.float64(-0.24)} + >>> mod.getContinuous("x") + [np.float64(0.68)] + >>> mod.getOutputs(["y","x"]) + [np.float64(-0.24), np.float64(0.68)] """ - if not self.simulationFlag: + if not self._simulated: if names is None: - return self.continuouslist + return self._continuous if isinstance(names, str): - return [self.continuouslist[names]] + return [self._continuous[names]] if isinstance(names, list): - return [self.continuouslist[x] for x in names] + return [self._continuous[x] for x in names] else: if names is None: - for i in self.continuouslist: + for i in self._continuous: try: value = self.getSolutions(i) - self.continuouslist[i] = value[0][-1] + self._continuous[i] = value[0][-1] except (OMCSessionException, ModelicaSystemError) as ex: raise ModelicaSystemError(f"{i} could not be computed") from ex - return self.continuouslist + return self._continuous if isinstance(names, str): - if names in self.continuouslist: + if names in self._continuous: value = self.getSolutions(names) - self.continuouslist[names] = value[0][-1] - return [self.continuouslist[names]] + self._continuous[names] = value[0][-1] + return [self._continuous[names]] else: raise ModelicaSystemError(f"{names} is not continuous") if isinstance(names, list): valuelist = [] for i in names: - if i in self.continuouslist: + if i in self._continuous: value = self.getSolutions(i) - self.continuouslist[i] = value[0][-1] + self._continuous[i] = value[0][-1] valuelist.append(value[0][-1]) else: raise ModelicaSystemError(f"{i} is not continuous") @@ -705,16 +702,16 @@ def getParameters(self, names: Optional[str | list[str]] = None) -> dict[str, st ['1.23', '4.56'] """ if names is None: - return self.paramlist + return self._params elif isinstance(names, str): - return [self.paramlist[names]] + return [self._params[names]] elif isinstance(names, list): - return [self.paramlist[x] for x in names] + return [self._params[x] for x in names] raise ModelicaSystemError("Unhandled input for getParameters()") def getInputs(self, names: Optional[str | list[str]] = None) -> dict | list: # 6 - """Get input values. + """Get values of input signals. Args: names: Either None (default), a string with the input name, @@ -739,16 +736,16 @@ def getInputs(self, names: Optional[str | list[str]] = None) -> dict | list: # [[(0.0, 0.0), (1.0, 1.0)], None] """ if names is None: - return self.inputlist + return self._inputs elif isinstance(names, str): - return [self.inputlist[names]] + return [self._inputs[names]] elif isinstance(names, list): - return [self.inputlist[x] for x in names] + return [self._inputs[x] for x in names] raise ModelicaSystemError("Unhandled input for getInputs()") def getOutputs(self, names: Optional[str | list[str]] = None): # 7 - """Get output values. + """Get values of output signals. If called before simulate(), the initial values are returned as strings. If called after simulate(), the final values (at stopTime) @@ -782,32 +779,32 @@ def getOutputs(self, names: Optional[str | list[str]] = None): # 7 >>> mod.getOutputs(["out1","out2"]) [np.float64(-0.1234), np.float64(2.1)] """ - if not self.simulationFlag: + if not self._simulated: if names is None: - return self.outputlist + return self._outputs elif isinstance(names, str): - return [self.outputlist[names]] + return [self._outputs[names]] else: - return [self.outputlist[x] for x in names] + return [self._outputs[x] for x in names] else: if names is None: - for i in self.outputlist: + for i in self._outputs: value = self.getSolutions(i) - self.outputlist[i] = value[0][-1] - return self.outputlist + self._outputs[i] = value[0][-1] + return self._outputs elif isinstance(names, str): - if names in self.outputlist: + if names in self._outputs: value = self.getSolutions(names) - self.outputlist[names] = value[0][-1] - return [self.outputlist[names]] + self._outputs[names] = value[0][-1] + return [self._outputs[names]] else: raise KeyError(names) elif isinstance(names, list): valuelist = [] for i in names: - if i in self.outputlist: + if i in self._outputs: value = self.getSolutions(i) - self.outputlist[i] = value[0][-1] + self._outputs[i] = value[0][-1] valuelist.append(value[0][-1]) else: raise KeyError(i) @@ -815,81 +812,143 @@ def getOutputs(self, names: Optional[str | list[str]] = None): # 7 raise ModelicaSystemError("Unhandled input for getOutputs()") - def getSimulationOptions(self, names=None): # 8 - """ - This method returns dict. The key is simulation option names and value is corresponding simulation option value. - If name is None then the function will return dict which contain all simulation option names as key and value as corresponding values. eg., getSimulationOptions() - usage: - >>> getSimulationOptions() - >>> getSimulationOptions("Name1") - >>> getSimulationOptions(["Name1","Name2"]) + def getSimulationOptions(self, names: Optional[str | list[str]] = None) -> dict[str, str] | list[str]: + """Get simulation options such as stopTime and tolerance. + + Args: + names: Either None (default), a string with the simulation option + name, or a list of option name strings. + + Returns: + If `names` is None, a dict in the format + {option_name: option_value} is returned. + If `names` is a string, a single element list [option_value] is + returned. + If `names` is a list, a list with one value for each option name + in names is returned: [option1_value, option2_value, ...]. + Option values are always returned as strings. + + Examples: + >>> mod.getSimulationOptions() + {'startTime': '0', 'stopTime': '1.234', 'stepSize': '0.002', 'tolerance': '1.1e-08', 'solver': 'dassl', 'outputFormat': 'mat'} + >>> mod.getSimulationOptions("stopTime") + ['1.234'] + >>> mod.getSimulationOptions(["tolerance", "stopTime"]) + ['1.1e-08', '1.234'] """ if names is None: - return self.simulateOptions + return self._simulate_options elif isinstance(names, str): - return [self.simulateOptions[names]] + return [self._simulate_options[names]] elif isinstance(names, list): - return [self.simulateOptions[x] for x in names] + return [self._simulate_options[x] for x in names] raise ModelicaSystemError("Unhandled input for getSimulationOptions()") - def getLinearizationOptions(self, names=None): # 9 - """ - This method returns dict. The key is linearize option names and value is corresponding linearize option value. - If name is None then the function will return dict which contain all linearize option names as key and value as corresponding values. eg., getLinearizationOptions() - usage: - >>> getLinearizationOptions() - >>> getLinearizationOptions("Name1") - >>> getLinearizationOptions(["Name1","Name2"]) + def getLinearizationOptions(self, names: Optional[str | list[str]] = None) -> dict | list: + """Get simulation options used for linearization. + + Args: + names: Either None (default), a string with the linearization option + name, or a list of option name strings. + + Returns: + If `names` is None, a dict in the format + {option_name: option_value} is returned. + If `names` is a string, a single element list [option_value] is + returned. + If `names` is a list, a list with one value for each option name + in names is returned: [option1_value, option2_value, ...]. + Some option values are returned as float when first initialized, + but always as strings after setLinearizationOptions is used to + change them. + + Examples: + >>> mod.getLinearizationOptions() + {'startTime': 0.0, 'stopTime': 1.0, 'stepSize': 0.002, 'tolerance': 1e-08} + >>> mod.getLinearizationOptions("stopTime") + [1.0] + >>> mod.getLinearizationOptions(["tolerance", "stopTime"]) + [1e-08, 1.0] """ if names is None: - return self.linearOptions + return self._linearization_options elif isinstance(names, str): - return [self.linearOptions[names]] + return [self._linearization_options[names]] elif isinstance(names, list): - return [self.linearOptions[x] for x in names] + return [self._linearization_options[x] for x in names] raise ModelicaSystemError("Unhandled input for getLinearizationOptions()") - def getOptimizationOptions(self, names=None): # 10 - """ - usage: - >>> getOptimizationOptions() - >>> getOptimizationOptions("Name1") - >>> getOptimizationOptions(["Name1","Name2"]) + def getOptimizationOptions(self, names: Optional[str | list[str]] = None) -> dict | list: + """Get simulation options used for optimization. + + Args: + names: Either None (default), a string with the optimization option + name, or a list of option name strings. + + Returns: + If `names` is None, a dict in the format + {option_name: option_value} is returned. + If `names` is a string, a single element list [option_value] is + returned. + If `names` is a list, a list with one value for each option name + in names is returned: [option1_value, option2_value, ...]. + Some option values are returned as float when first initialized, + but always as strings after setOptimizationOptions is used to + change them. + + Examples: + >>> mod.getOptimizationOptions() + {'startTime': 0.0, 'stopTime': 1.0, 'numberOfIntervals': 500, 'stepSize': 0.002, 'tolerance': 1e-08} + >>> mod.getOptimizationOptions("stopTime") + [1.0] + >>> mod.getOptimizationOptions(["tolerance", "stopTime"]) + [1e-08, 1.0] """ if names is None: - return self.optimizeOptions + return self._optimization_options elif isinstance(names, str): - return [self.optimizeOptions[names]] + return [self._optimization_options[names]] elif isinstance(names, list): - return [self.optimizeOptions[x] for x in names] + return [self._optimization_options[x] for x in names] raise ModelicaSystemError("Unhandled input for getOptimizationOptions()") - def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = None, + def simulate(self, + resultfile: Optional[str] = None, + simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, - timeout: Optional[int] = None): # 11 - """ - This method simulates model according to the simulation options. - usage - >>> simulate() - >>> simulate(resultfile="a.mat") - >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags - >>> simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "e=0.3,g=10"}) # using simargs + timeout: Optional[float] = None) -> None: + """Simulate the model according to simulation options. + + See setSimulationOptions(). + + Args: + resultfile: Path to a custom result file + simflags: String of extra command line flags for the model binary. + This argument is deprecated, use simargs instead. + simargs: Dict with simulation runtime flags. + timeout: Maximum execution time in seconds. + + Examples: + mod.simulate() + mod.simulate(resultfile="a.mat") + mod.simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags, deprecated + mod.simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "override": {"e": 0.3, "g": 10}}) # using simargs """ - om_cmd = ModelicaSystemCmd(runpath=self.tempdir, modelname=self.modelName, timeout=timeout) + om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) if resultfile is None: # default result file generated by OM - self.resultfile = self.tempdir / f"{self.modelName}_res.mat" + self._result_file = self._tempdir / f"{self._model_name}_res.mat" elif os.path.exists(resultfile): - self.resultfile = pathlib.Path(resultfile) + self._result_file = pathlib.Path(resultfile) else: - self.resultfile = self.tempdir / resultfile + self._result_file = self._tempdir / resultfile # always define the resultfile to use - om_cmd.arg_set(key="r", val=self.resultfile.as_posix()) + om_cmd.arg_set(key="r", val=self._result_file.as_posix()) # allow runtime simulation flags from user input if simflags is not None: @@ -898,10 +957,10 @@ def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = N if simargs: om_cmd.args_set(args=simargs) - overrideFile = self.tempdir / f"{self.modelName}_override.txt" - if self.overridevariables or self.simoptionsoverride: - tmpdict = self.overridevariables.copy() - tmpdict.update(self.simoptionsoverride) + overrideFile = self._tempdir / f"{self._model_name}_override.txt" + if self._override_variables or self._simulate_options_override: + tmpdict = self._override_variables.copy() + tmpdict.update(self._simulate_options_override) # write to override file with open(file=overrideFile, mode="w", encoding="utf-8") as fh: for key, value in tmpdict.items(): @@ -909,55 +968,76 @@ def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = N om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) - if self.inputFlag: # if model has input quantities - for i in self.inputlist: - val = self.inputlist[i] + if self._has_inputs: # if model has input quantities + for i in self._inputs: + val = self._inputs[i] if val is None: - val = [(float(self.simulateOptions["startTime"]), 0.0), - (float(self.simulateOptions["stopTime"]), 0.0)] - self.inputlist[i] = [(float(self.simulateOptions["startTime"]), 0.0), - (float(self.simulateOptions["stopTime"]), 0.0)] - if float(self.simulateOptions["startTime"]) != val[0][0]: + val = [(float(self._simulate_options["startTime"]), 0.0), + (float(self._simulate_options["stopTime"]), 0.0)] + self._inputs[i] = [(float(self._simulate_options["startTime"]), 0.0), + (float(self._simulate_options["stopTime"]), 0.0)] + if float(self._simulate_options["startTime"]) != val[0][0]: raise ModelicaSystemError(f"startTime not matched for Input {i}!") - if float(self.simulateOptions["stopTime"]) != val[-1][0]: + if float(self._simulate_options["stopTime"]) != val[-1][0]: raise ModelicaSystemError(f"stopTime not matched for Input {i}!") - self.csvFile = self.createCSVData() # create csv file + self._csvFile = self._createCSVData() # create csv file - om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) + om_cmd.arg_set(key="csvInput", val=self._csvFile.as_posix()) # delete resultfile ... - if self.resultfile.is_file(): - self.resultfile.unlink() + if self._result_file.is_file(): + self._result_file.unlink() # ... run simulation ... returncode = om_cmd.run() # and check returncode *AND* resultfile - if returncode != 0 and self.resultfile.is_file(): + if returncode != 0 and self._result_file.is_file(): # check for an empty (=> 0B) result file which indicates a crash of the model executable # see: https://github.com/OpenModelica/OMPython/issues/261 # https://github.com/OpenModelica/OpenModelica/issues/13829 - if self.resultfile.stat().st_size == 0: - self.resultfile.unlink() + if self._result_file.stat().st_size == 0: + self._result_file.unlink() raise ModelicaSystemError("Empty result file - this indicates a crash of the model executable!") logger.warning(f"Return code = {returncode} but result file exists!") - self.simulationFlag = True + self._simulated = True - # to extract simulation results - def getSolutions(self, varList=None, resultfile=None): # 12 - """ - This method returns tuple of numpy arrays. It can be called: - •with a list of quantities name in string format as argument: it returns the simulation results of the corresponding names in the same order. Here it supports Python unpacking depending upon the number of variables assigned. - usage: - >>> getSolutions() - >>> getSolutions("Name1") - >>> getSolutions(["Name1","Name2"]) - >>> getSolutions(resultfile="c:/a.mat") - >>> getSolutions("Name1",resultfile=""c:/a.mat"") - >>> getSolutions(["Name1","Name2"],resultfile=""c:/a.mat"") + def getSolutions(self, varList: Optional[str | list[str]] = None, resultfile: Optional[str] = None) -> tuple[str] | np.ndarray: + """Extract simulation results from a result data file. + + Args: + varList: Names of variables to be extracted. Either unspecified to + get names of available variables, or a single variable name + as a string, or a list of variable names. + resultfile: Path to the result file. If unspecified, the result + file created by simulate() is used. + + Returns: + If varList is None, a tuple with names of all variables + is returned. + If varList is a string, a 1D numpy array is returned. + If varList is a list, a 2D numpy array is returned. + + Examples: + >>> mod.getSolutions() + ('a', 'der(x)', 'time', 'x') + >>> mod.getSolutions("x") + np.array([[1. , 0.90483742, 0.81873075]]) + >>> mod.getSolutions(["x", "der(x)"]) + np.array([[1. , 0.90483742 , 0.81873075], + [-1. , -0.90483742, -0.81873075]]) + >>> mod.getSolutions(resultfile="c:/a.mat") + ('a', 'der(x)', 'time', 'x') + >>> mod.getSolutions("x", resultfile="c:/a.mat") + np.array([[1. , 0.90483742, 0.81873075]]) + >>> mod.getSolutions(["x", "der(x)"], resultfile="c:/a.mat") + np.array([[1. , 0.90483742 , 0.81873075], + [-1. , -0.90483742, -0.81873075]]) """ if resultfile is None: - result_file = self.resultfile + if self._result_file is None: + raise ModelicaSystemError("No result file found. Run simulate() first.") + result_file = self._result_file else: result_file = pathlib.Path(resultfile) @@ -1001,9 +1081,9 @@ def _strip_space(name): raise ModelicaSystemError("Unhandled input for strip_space()") - def setMethodHelper(self, args1, args2, args3, args4=None): - """ - Helper function for setParameter(),setContinuous(),setSimulationOptions(),setLinearizationOption(),setOptimizationOption() + def _setMethodHelper(self, args1, args2, args3, args4=None): + """Helper function for setters. + args1 - string or list of string given by user args2 - dict() containing the values of different variables(eg:, parameter,continuous,simulation parameters) args3 - function name (eg; continuous, parameter, simulation, linearization,optimization) @@ -1025,7 +1105,7 @@ def apply_single(args1): return True else: - raise ModelicaSystemError("Unhandled case in setMethodHelper.apply_single() - " + raise ModelicaSystemError("Unhandled case in _setMethodHelper.apply_single() - " f"{repr(value[0])} is not a {repr(args3)} variable") result = [] @@ -1048,7 +1128,7 @@ def setContinuous(self, cvals): # 13 >>> setContinuous("Name=value") >>> setContinuous(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(cvals, self.continuouslist, "continuous", self.overridevariables) + return self._setMethodHelper(cvals, self._continuous, "continuous", self._override_variables) def setParameters(self, pvals): # 14 """ @@ -1058,14 +1138,14 @@ def setParameters(self, pvals): # 14 >>> setParameters("Name=value") >>> setParameters(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(pvals, self.paramlist, "parameter", self.overridevariables) + return self._setMethodHelper(pvals, self._params, "parameter", self._override_variables) def isParameterChangeable(self, name, value): q = self.getQuantities(name) if q[0]["changeable"] == "false": logger.debug(f"setParameters() failed : It is not possible to set the following signal {repr(name)}. " "It seems to be structural, final, protected or evaluated or has a non-constant binding, " - f"use sendExpression(\"setParameterValue({self.modelName}, {name}, {value})\") " + f"use sendExpression(\"setParameterValue({self._model_name}, {name}, {value})\") " "and rebuild the model using buildModel() API") return False return True @@ -1078,7 +1158,7 @@ def setSimulationOptions(self, simOptions): # 16 >>> setSimulationOptions("Name=value") >>> setSimulationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(simOptions, self.simulateOptions, "simulation-option", self.simoptionsoverride) + return self._setMethodHelper(simOptions, self._simulate_options, "simulation-option", self._simulate_options_override) def setLinearizationOptions(self, linearizationOptions): # 18 """ @@ -1088,7 +1168,7 @@ def setLinearizationOptions(self, linearizationOptions): # 18 >>> setLinearizationOptions("Name=value") >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(linearizationOptions, self.linearOptions, "Linearization-option", None) + return self._setMethodHelper(linearizationOptions, self._linearization_options, "Linearization-option", None) def setOptimizationOptions(self, optimizationOptions): # 17 """ @@ -1098,7 +1178,7 @@ def setOptimizationOptions(self, optimizationOptions): # 17 >>> setOptimizationOptions("Name=value") >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) """ - return self.setMethodHelper(optimizationOptions, self.optimizeOptions, "optimization-option", None) + return self._setMethodHelper(optimizationOptions, self._optimization_options, "optimization-option", None) def setInputs(self, name): # 15 """ @@ -1111,53 +1191,53 @@ def setInputs(self, name): # 15 if isinstance(name, str): name = self._strip_space(name) value = name.split("=") - if value[0] in self.inputlist: + if value[0] in self._inputs: tmpvalue = eval(value[1]) if isinstance(tmpvalue, (int, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), - (float(self.simulateOptions["stopTime"]), float(value[1]))] + self._inputs[value[0]] = [(float(self._simulate_options["startTime"]), float(value[1])), + (float(self._simulate_options["stopTime"]), float(value[1]))] elif isinstance(tmpvalue, list): - self.checkValidInputs(tmpvalue) - self.inputlist[value[0]] = tmpvalue - self.inputFlag = True + self._checkValidInputs(tmpvalue) + self._inputs[value[0]] = tmpvalue + self._has_inputs = True else: raise ModelicaSystemError(f"{value[0]} is not an input") elif isinstance(name, list): name = self._strip_space(name) for var in name: value = var.split("=") - if value[0] in self.inputlist: + if value[0] in self._inputs: tmpvalue = eval(value[1]) if isinstance(tmpvalue, (int, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), - (float(self.simulateOptions["stopTime"]), float(value[1]))] + self._inputs[value[0]] = [(float(self._simulate_options["startTime"]), float(value[1])), + (float(self._simulate_options["stopTime"]), float(value[1]))] elif isinstance(tmpvalue, list): - self.checkValidInputs(tmpvalue) - self.inputlist[value[0]] = tmpvalue - self.inputFlag = True + self._checkValidInputs(tmpvalue) + self._inputs[value[0]] = tmpvalue + self._has_inputs = True else: raise ModelicaSystemError(f"{value[0]} is not an input!") - def checkValidInputs(self, name): + def _checkValidInputs(self, name): if name != sorted(name, key=lambda x: x[0]): raise ModelicaSystemError('Time value should be in increasing order') for l in name: if isinstance(l, tuple): # if l[0] < float(self.simValuesList[0]): - if l[0] < float(self.simulateOptions["startTime"]): + if l[0] < float(self._simulate_options["startTime"]): raise ModelicaSystemError('Input time value is less than simulation startTime') if len(l) != 2: raise ModelicaSystemError(f'Value for {l} is in incorrect format!') else: raise ModelicaSystemError('Error!!! Value must be in tuple format') - def createCSVData(self) -> pathlib.Path: - start_time: float = float(self.simulateOptions["startTime"]) - stop_time: float = float(self.simulateOptions["stopTime"]) + def _createCSVData(self) -> pathlib.Path: + start_time: float = float(self._simulate_options["startTime"]) + stop_time: float = float(self._simulate_options["stopTime"]) # Replace None inputs with a default constant zero signal inputs: dict[str, list[tuple[float, float]]] = {} - for input_name, input_signal in self.inputlist.items(): + for input_name, input_signal in self._inputs.items(): if input_signal is None: inputs[input_name] = [(start_time, 0.0), (stop_time, 0.0)] else: @@ -1192,7 +1272,7 @@ def createCSVData(self) -> pathlib.Path: ] csv_rows.append(row) - csvFile = self.tempdir / f'{self.modelName}.csv' + csvFile = self._tempdir / f'{self._model_name}.csv' with open(file=csvFile, mode="w", encoding="utf-8", newline="") as fh: writer = csv.writer(fh) @@ -1200,25 +1280,32 @@ def createCSVData(self) -> pathlib.Path: return csvFile - # to convert Modelica model to FMU - def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="", includeResources=True): # 19 - """ - This method is used to generate FMU from the given Modelica model. It creates "modelName.fmu" in the current working directory. It can be called: - with no arguments - with arguments of https://build.openmodelica.org/Documentation/OpenModelica.Scripting.translateModelFMU.html - usage - >>> convertMo2Fmu() - >>> convertMo2Fmu(version="2.0", fmuType="me|cs|me_cs", fileNamePrefix="", includeResources=True) + def convertMo2Fmu(self, version: str = "2.0", fmuType: str = "me_cs", + fileNamePrefix: str = "", + includeResources: bool = True) -> str: + """Translate the model into a Functional Mockup Unit. + + Args: + See https://build.openmodelica.org/Documentation/OpenModelica.Scripting.translateModelFMU.html + + Returns: + str: Path to the created '*.fmu' file. + + Examples: + >>> mod.convertMo2Fmu() + '/tmp/tmpmhfx9umo/CauerLowPassAnalog.fmu' + >>> mod.convertMo2Fmu(version="2.0", fmuType="me|cs|me_cs", fileNamePrefix="", includeResources=True) + '/tmp/tmpmhfx9umo/CauerLowPassAnalog.fmu' """ if fileNamePrefix == "": - fileNamePrefix = self.modelName + fileNamePrefix = self._model_name if includeResources: includeResourcesStr = "true" else: includeResourcesStr = "false" properties = f'version="{version}", fmuType="{fmuType}", fileNamePrefix="{fileNamePrefix}", includeResources={includeResourcesStr}' - fmu = self.requestApi('buildModelFMU', self.modelName, properties) + fmu = self._requestApi('buildModelFMU', self._model_name, properties) # report proper error message if not os.path.exists(fmu): @@ -1235,7 +1322,7 @@ def convertFmu2Mo(self, fmuName): # 20 >>> convertFmu2Mo("c:/BouncingBall.Fmu") """ - fileName = self.requestApi('importFMU', fmuName) + fileName = self._requestApi('importFMU', fmuName) # report proper error message if not os.path.exists(fileName): @@ -1243,32 +1330,53 @@ def convertFmu2Mo(self, fmuName): # 20 return fileName - # to optimize model - def optimize(self): # 21 - """ - This method optimizes model according to the optimized options. It can be called: - only without any arguments - usage - >>> optimize() + def optimize(self) -> dict[str, Any]: + """Perform model-based optimization. + + Optimization options set by setOptimizationOptions() are used. + + Returns: + A dict with various values is returned. One of these values is the + path to the result file. + + Examples: + >>> mod.optimize() + {'messages': 'LOG_SUCCESS | info | The initialization finished successfully without homotopy method. ...' + 'resultFile': '/tmp/tmp68guvjhs/BangBang2021_res.mat', + 'simulationOptions': 'startTime = 0.0, stopTime = 1.0, numberOfIntervals = ' + "1000, tolerance = 1e-8, method = 'optimization', " + "fileNamePrefix = 'BangBang2021', options = '', " + "outputFormat = 'mat', variableFilter = '.*', cflags = " + "'', simflags = '-s=\\'optimization\\' " + "-optimizerNP=\\'1\\''", + 'timeBackend': 0.008684897, + 'timeCompile': 0.7546678929999999, + 'timeFrontend': 0.045438053000000006, + 'timeSimCode': 0.0018537170000000002, + 'timeSimulation': 0.266354356, + 'timeTemplates': 0.002007785, + 'timeTotal': 1.079097854} """ - cName = self.modelName - properties = ','.join(f"{key}={val}" for key, val in self.optimizeOptions.items()) + cName = self._model_name + properties = ','.join(f"{key}={val}" for key, val in self._optimization_options.items()) self.setCommandLineOptions("-g=Optimica") - optimizeResult = self.requestApi('optimize', cName, properties) + optimizeResult = self._requestApi('optimize', cName, properties) return optimizeResult def linearize(self, lintime: Optional[float] = None, simflags: Optional[str] = None, simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, - timeout: Optional[int] = None) -> LinearizationResult: - """Linearize the model according to linearOptions. + timeout: Optional[float] = None) -> LinearizationResult: + """Linearize the model according to linearization options. + + See setLinearizationOptions. Args: - lintime: Override linearOptions["stopTime"] value. - simflags: A string of extra command line flags for the model - binary. - depreciated in favor of simargs + lintime: Override "stopTime" value. + simflags: String of extra command line flags for the model binary. + This argument is deprecated, use simargs instead. simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" - timeout: Possible timeout for the execution of OM. + timeout: Maximum execution time in seconds. Returns: A LinearizationResult object is returned. This allows several @@ -1288,36 +1396,36 @@ def load_module_from_path(module_name, file_path): return module_def - if self.xmlFile is None: + if self._xml_file is None: raise ModelicaSystemError( "Linearization cannot be performed as the model is not build, " "use ModelicaSystem() to build the model first" ) - om_cmd = ModelicaSystemCmd(runpath=self.tempdir, modelname=self.modelName, timeout=timeout) + om_cmd = ModelicaSystemCmd(runpath=self._tempdir, modelname=self._model_name, timeout=timeout) - overrideLinearFile = self.tempdir / f'{self.modelName}_override_linear.txt' + overrideLinearFile = self._tempdir / f'{self._model_name}_override_linear.txt' with open(file=overrideLinearFile, mode="w", encoding="utf-8") as fh: - for key, value in self.overridevariables.items(): + for key, value in self._override_variables.items(): fh.write(f"{key}={value}\n") - for key, value in self.linearOptions.items(): + for key, value in self._linearization_options.items(): fh.write(f"{key}={value}\n") om_cmd.arg_set(key="overrideFile", val=overrideLinearFile.as_posix()) - if self.inputFlag: + if self._has_inputs: nameVal = self.getInputs() for n in nameVal: tupleList = nameVal.get(n) if tupleList is not None: for l in tupleList: - if l[0] < float(self.simulateOptions["startTime"]): + if l[0] < float(self._simulate_options["startTime"]): raise ModelicaSystemError('Input time value is less than simulation startTime') - self.csvFile = self.createCSVData() - om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) + self._csvFile = self._createCSVData() + om_cmd.arg_set(key="csvInput", val=self._csvFile.as_posix()) - om_cmd.arg_set(key="l", val=str(lintime or self.linearOptions["stopTime"])) + om_cmd.arg_set(key="l", val=str(lintime or self._linearization_options["stopTime"])) # allow runtime simulation flags from user input if simflags is not None: @@ -1330,14 +1438,14 @@ def load_module_from_path(module_name, file_path): if returncode != 0: raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") - self.simulationFlag = True + self._simulated = True # code to get the matrix and linear inputs, outputs and states - linearFile = self.tempdir / "linearized_model.py" + linearFile = self._tempdir / "linearized_model.py" - # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_modelname.mo" file + # support older openmodelica versions before OpenModelica v1.16.2 where linearize() generates "linear_model_name.mo" file if not linearFile.exists(): - linearFile = pathlib.Path(f'linear_{self.modelName}.py') + linearFile = pathlib.Path(f'linear_{self._model_name}.py') if not linearFile.exists(): raise ModelicaSystemError(f"Linearization failed: {linearFile} not found!") @@ -1351,34 +1459,22 @@ def load_module_from_path(module_name, file_path): result = module.linearized_model() (n, m, p, x0, u0, A, B, C, D, stateVars, inputVars, outputVars) = result - self.linearinputs = inputVars - self.linearoutputs = outputVars - self.linearstates = stateVars + self._linearized_inputs = inputVars + self._linearized_outputs = outputVars + self._linearized_states = stateVars return LinearizationResult(n, m, p, A, B, C, D, x0, u0, stateVars, inputVars, outputVars) except ModuleNotFoundError as ex: raise ModelicaSystemError("No module named 'linearized_model'") from ex - def getLinearInputs(self): - """ - function which returns the LinearInputs after Linearization is performed - usage - >>> getLinearInputs() - """ - return self.linearinputs + def getLinearInputs(self) -> list[str]: + """Get names of input variables of the linearized model.""" + return self._linearized_inputs - def getLinearOutputs(self): - """ - function which returns the LinearInputs after Linearization is performed - usage - >>> getLinearOutputs() - """ - return self.linearoutputs + def getLinearOutputs(self) -> list[str]: + """Get names of output variables of the linearized model.""" + return self._linearized_outputs - def getLinearStates(self): - """ - function which returns the LinearInputs after Linearization is performed - usage - >>> getLinearStates() - """ - return self.linearstates + def getLinearStates(self) -> list[str]: + """Get names of state variables of the linearized model.""" + return self._linearized_states diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 156dde03..b4d328e9 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -397,7 +397,7 @@ def test_simulate_inputs(tmp_path): "u2=[(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]", ]) mod.simulate() - assert pathlib.Path(mod.csvFile).read_text() == """time,u1,u2,end + assert pathlib.Path(mod._csvFile).read_text() == """time,u1,u2,end 0.0,0.0,0.0,0 0.25,0.25,0.5,0 0.5,0.5,1.0,0 diff --git a/tests/test_ModelicaSystemCmd.py b/tests/test_ModelicaSystemCmd.py index 420193df..3b28699c 100644 --- a/tests/test_ModelicaSystemCmd.py +++ b/tests/test_ModelicaSystemCmd.py @@ -15,12 +15,18 @@ def model_firstorder(tmp_path): return mod -def test_simflags(model_firstorder): +@pytest.fixture +def mscmd_firstorder(model_firstorder): mod = OMPython.ModelicaSystem(fileName=model_firstorder.as_posix(), modelName="M") - mscmd = OMPython.ModelicaSystemCmd(runpath=mod.tempdir, modelname=mod.modelName) + mscmd = OMPython.ModelicaSystemCmd(runpath=mod.getWorkDirectory(), modelname=mod._model_name) + return mscmd + + +def test_simflags(mscmd_firstorder): + mscmd = mscmd_firstorder + mscmd.args_set({ "noEventEmit": None, - "noRestart": None, "override": {'b': 2} }) with pytest.deprecated_call(): @@ -28,6 +34,7 @@ def test_simflags(model_firstorder): assert mscmd.get_cmd() == [ mscmd.get_exe().as_posix(), - '-noEventEmit', '-noRestart', - '-override=b=2,a=1,x=3' + '-noEventEmit', + '-override=b=2,a=1,x=3', + '-noRestart', ] diff --git a/tests/test_OMSessionCmd.py b/tests/test_OMSessionCmd.py index c76e8ca3..1588fac8 100644 --- a/tests/test_OMSessionCmd.py +++ b/tests/test_OMSessionCmd.py @@ -10,7 +10,7 @@ def test_isPackage(): def test_isPackage2(): mod = OMPython.ModelicaSystem(modelName="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", lmodel=["Modelica"]) - omccmd = OMPython.OMCSessionCmd(session=mod.getconn) + omccmd = OMPython.OMCSessionCmd(session=mod._getconn) assert omccmd.isPackage('Modelica')