diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index e9c247b3..d41782b6 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -32,20 +32,24 @@ CONDITIONS OF OSMC-PL. """ +import ast import csv from dataclasses import dataclass import importlib +import itertools import logging import numbers import numpy as np import os import pathlib import platform +import queue import re import subprocess import tempfile import textwrap -from typing import Optional, Any +import threading +from typing import Any, Optional import warnings import xml.etree.ElementTree as ET @@ -133,7 +137,7 @@ def __init__(self, runpath: pathlib.Path, modelname: str, timeout: Optional[int] self._args: dict[str, str | None] = {} self._arg_override: dict[str, str] = {} - def arg_set(self, key: str, val: Optional[str | dict] = None) -> None: + def arg_set(self, key: str, val: Optional[str | dict[str, Any]] = None) -> None: """ Set one argument for the executable model. @@ -176,7 +180,7 @@ def arg_get(self, key: str) -> Optional[str | dict]: return None - def args_set(self, args: dict[str, Optional[str | dict[str, str]]]) -> None: + def args_set(self, args: dict[str, Optional[str | dict[str, Any]]]) -> None: """ Define arguments for the model executable. @@ -419,7 +423,6 @@ def __init__( self.inputFlag = False # for model with input quantity self.simulationFlag = False # if the model is simulated? self.outputFlag = False - self.csvFile: Optional[pathlib.Path] = None # for storing inputs condition self.resultfile: Optional[pathlib.Path] = None # for storing result file self.variableFilter = variableFilter @@ -867,29 +870,39 @@ def getOptimizationOptions(self, names=None): # 10 raise ModelicaSystemError("Unhandled input for getOptimizationOptions()") - def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = None, - simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, - timeout: Optional[int] = None): # 11 + def simulate_cmd( + self, + resultfile: pathlib.Path, + simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, + timeout: Optional[int] = None, + ) -> ModelicaSystemCmd: """ - This method simulates model according to the simulation options. - usage - >>> simulate() - >>> simulate(resultfile="a.mat") - >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags - >>> simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "e=0.3,g=10"}) # using simargs + This method prepares the simulates model according to the simulation options. It returns an instance of + ModelicaSystemCmd which can be used to run the simulation. + + Due to the tempdir being unique for the ModelicaSystem instance, *NEVER* use this to create several simulations + with the same instance of ModelicaSystem! Restart each simulation process with a new instance of ModelicaSystem. + + However, if only non-structural parameters are used, it is possible to reuse an existing instance of + ModelicaSystem to create several version ModelicaSystemCmd to run the model using different settings. + + Parameters + ---------- + resultfile + simflags + simargs + timeout + + Returns + ------- + An instance if ModelicaSystemCmd to run the requested simulation. """ om_cmd = ModelicaSystemCmd(runpath=self.tempdir, modelname=self.modelName, timeout=timeout) - if resultfile is None: - # default result file generated by OM - self.resultfile = self.tempdir / f"{self.modelName}_res.mat" - elif os.path.exists(resultfile): - self.resultfile = pathlib.Path(resultfile) - else: - self.resultfile = self.tempdir / resultfile - # always define the resultfile to use - om_cmd.arg_set(key="r", val=self.resultfile.as_posix()) + # always define the result file to use + om_cmd.arg_set(key="r", val=resultfile.as_posix()) # allow runtime simulation flags from user input if simflags is not None: @@ -910,6 +923,9 @@ def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = N om_cmd.arg_set(key="overrideFile", val=overrideFile.as_posix()) if self.inputFlag: # if model has input quantities + # csvfile is based on name used for result file + csvfile = resultfile.parent / f"{resultfile.stem}.csv" + for i in self.inputlist: val = self.inputlist[i] if val is None: @@ -921,9 +937,35 @@ def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = N raise ModelicaSystemError(f"startTime not matched for Input {i}!") if float(self.simulateOptions["stopTime"]) != val[-1][0]: raise ModelicaSystemError(f"stopTime not matched for Input {i}!") - self.csvFile = self.createCSVData() # create csv file - om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) + # write csv file and store the name + csvfile = self.createCSVData(csvfile=csvfile) + + om_cmd.arg_set(key="csvInput", val=csvfile.as_posix()) + + return om_cmd + + def simulate(self, resultfile: Optional[str] = None, simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, + timeout: Optional[int] = None): # 11 + """ + This method simulates model according to the simulation options. + usage + >>> simulate() + >>> simulate(resultfile="a.mat") + >>> simulate(simflags="-noEventEmit -noRestart -override=e=0.3,g=10") # set runtime simulation flags + >>> simulate(simargs={"noEventEmit": None, "noRestart": None, "override": "e=0.3,g=10"}) # using simargs + """ + + if resultfile is None: + # default result file generated by OM + self.resultfile = self.tempdir / f"{self.modelName}_res.mat" + elif os.path.exists(resultfile): + self.resultfile = pathlib.Path(resultfile) + else: + self.resultfile = self.tempdir / resultfile + + om_cmd = self.simulate_cmd(resultfile=self.resultfile, simflags=simflags, simargs=simargs, timeout=timeout) # delete resultfile ... if self.resultfile.is_file(): @@ -992,166 +1034,258 @@ def getSolutions(self, varList=None, resultfile=None): # 12 return np_res @staticmethod - def _strip_space(name): - if isinstance(name, str): - return name.replace(" ", "") - - if isinstance(name, list): - return [x.replace(" ", "") for x in name] - - raise ModelicaSystemError("Unhandled input for strip_space()") - - def setMethodHelper(self, args1, args2, args3, args4=None): - """ - Helper function for setParameter(),setContinuous(),setSimulationOptions(),setLinearizationOption(),setOptimizationOption() - args1 - string or list of string given by user - args2 - dict() containing the values of different variables(eg:, parameter,continuous,simulation parameters) - args3 - function name (eg; continuous, parameter, simulation, linearization,optimization) - args4 - dict() which stores the new override variables list, - """ - def apply_single(args1): - args1 = self._strip_space(args1) - value = args1.split("=") - if value[0] in args2: - if args3 == "parameter" and self.isParameterChangeable(value[0], value[1]): - args2[value[0]] = value[1] - if args4 is not None: - args4[value[0]] = value[1] - elif args3 != "parameter": - args2[value[0]] = value[1] - if args4 is not None: - args4[value[0]] = value[1] - - return True + def _prepare_input_data( + raw_input: str | list[str] | dict[str, Any], + ) -> dict[str, str]: + """ + Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. + """ - else: + def prepare_str(str_in: str) -> dict[str, str]: + str_in = str_in.replace(" ", "") + key_val_list: list[str] = str_in.split("=") + if len(key_val_list) != 2: + raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") + + input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} + + return input_data_from_str + + input_data: dict[str, str] = {} + + if isinstance(raw_input, str): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + return prepare_str(raw_input) + + if isinstance(raw_input, list): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + + for item in raw_input: + input_data |= prepare_str(item) + + return input_data + + if isinstance(raw_input, dict): + for key, val in raw_input.items(): + # convert all values to strings to align it on one type: dict[str, str] + # spaces have to be removed as setInput() could take list of tuples as input and spaces would + str_val = str(val).replace(' ', '') + if ' ' in key or ' ' in str_val: + raise ModelicaSystemError(f"Spaces not allowed in key/value pairs: {repr(key)} = {repr(val)}!") + input_data[key] = str_val + + return input_data + + raise ModelicaSystemError(f"Invalid type of input: {type(raw_input)}") + + def _set_method_helper( + self, + inputdata: dict[str, str], + classdata: dict[str, Any], + datatype: str, + overwritedata: Optional[dict[str, str]] = None, + ) -> bool: + """ + Helper function for: + * setParameter() + * setContinuous() + * setSimulationOptions() + * setLinearizationOption() + * setOptimizationOption() + * setInputs() + + Parameters + ---------- + inputdata + string or list of string given by user + classdata + dict() containing the values of different variables (eg: parameter, continuous, simulation parameters) + datatype + type identifier (eg; continuous, parameter, simulation, linearization, optimization) + overwritedata + dict() which stores the new override variables list, + """ + + inputdata_status: dict[str, bool] = {} + for key, val in inputdata.items(): + if key not in classdata: raise ModelicaSystemError("Unhandled case in setMethodHelper.apply_single() - " - f"{repr(value[0])} is not a {repr(args3)} variable") + f"{repr(key)} is not a {repr(datatype)} variable") + + status = False + if datatype == "parameter" and not self.isParameterChangeable(key): + logger.debug(f"It is not possible to set the parameter {repr(key)}. It seems to be " + "structural, final, protected, evaluated or has a non-constant binding. " + "Use sendExpression(...) and rebuild the model using buildModel() API; example: " + "sendExpression(\"setParameterValue(" + f"{self.modelName}, {key}, {val if val is not None else ''}" + ")\") ") + else: + classdata[key] = val + if overwritedata is not None: + overwritedata[key] = val + status = True - result = [] - if isinstance(args1, str): - result = [apply_single(args1)] + inputdata_status[key] = status - elif isinstance(args1, list): - result = [] - args1 = self._strip_space(args1) - for var in args1: - result.append(apply_single(var)) + return all(inputdata_status.values()) - return all(result) + def isParameterChangeable( + self, + name: str, + ) -> bool: + q = self.getQuantities(name) + if q[0]["changeable"] == "false": + return False + return True - def setContinuous(self, cvals): # 13 + def setContinuous(self, cvals: str | list[str] | dict[str, Any]) -> bool: """ This method is used to set continuous values. It can be called: with a sequence of continuous name and assigning corresponding values as arguments as show in the example below: usage - >>> setContinuous("Name=value") - >>> setContinuous(["Name1=value1","Name2=value2"]) + >>> setContinuous("Name=value") # depreciated + >>> setContinuous(["Name1=value1","Name2=value2"]) # depreciated + >>> setContinuous(cvals={"Name1": "value1", "Name2": "value2"}) """ - return self.setMethodHelper(cvals, self.continuouslist, "continuous", self.overridevariables) + inputdata = self._prepare_input_data(raw_input=cvals) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self.continuouslist, + datatype="continuous", + overwritedata=self.overridevariables) - def setParameters(self, pvals): # 14 + def setParameters(self, pvals: str | list[str] | dict[str, Any]) -> bool: """ This method is used to set parameter values. It can be called: with a sequence of parameter name and assigning corresponding value as arguments as show in the example below: usage - >>> setParameters("Name=value") - >>> setParameters(["Name1=value1","Name2=value2"]) + >>> setParameters("Name=value") # depreciated + >>> setParameters(["Name1=value1","Name2=value2"]) # depreciated + >>> setParameters(pvals={"Name1": "value1", "Name2": "value2"}) """ - return self.setMethodHelper(pvals, self.paramlist, "parameter", self.overridevariables) + inputdata = self._prepare_input_data(raw_input=pvals) - def isParameterChangeable(self, name, value): - q = self.getQuantities(name) - if q[0]["changeable"] == "false": - logger.debug(f"setParameters() failed : It is not possible to set the following signal {repr(name)}. " - "It seems to be structural, final, protected or evaluated or has a non-constant binding, " - f"use sendExpression(\"setParameterValue({self.modelName}, {name}, {value})\") " - "and rebuild the model using buildModel() API") - return False - return True + return self._set_method_helper( + inputdata=inputdata, + classdata=self.paramlist, + datatype="parameter", + overwritedata=self.overridevariables) - def setSimulationOptions(self, simOptions): # 16 + def setSimulationOptions(self, simOptions: str | list[str] | dict[str, Any]) -> bool: """ This method is used to set simulation options. It can be called: with a sequence of simulation options name and assigning corresponding values as arguments as show in the example below: usage - >>> setSimulationOptions("Name=value") - >>> setSimulationOptions(["Name1=value1","Name2=value2"]) + >>> setSimulationOptions("Name=value") # depreciated + >>> setSimulationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setSimulationOptions(simOptions={"Name1": "value1", "Name2": "value2"}) """ - return self.setMethodHelper(simOptions, self.simulateOptions, "simulation-option", self.simoptionsoverride) + inputdata = self._prepare_input_data(raw_input=simOptions) - def setLinearizationOptions(self, linearizationOptions): # 18 + return self._set_method_helper( + inputdata=inputdata, + classdata=self.simulateOptions, + datatype="simulation-option", + overwritedata=self.simoptionsoverride) + + def setLinearizationOptions(self, linearizationOptions: str | list[str] | dict[str, Any]) -> bool: """ This method is used to set linearization options. It can be called: with a sequence of linearization options name and assigning corresponding value as arguments as show in the example below usage - >>> setLinearizationOptions("Name=value") - >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) + >>> setLinearizationOptions("Name=value") # depreciated + >>> setLinearizationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setLinearizationOptions(linearizationOtions={"Name1": "value1", "Name2": "value2"}) """ - return self.setMethodHelper(linearizationOptions, self.linearOptions, "Linearization-option", None) + inputdata = self._prepare_input_data(raw_input=linearizationOptions) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self.linearOptions, + datatype="Linearization-option", + overwritedata=None) - def setOptimizationOptions(self, optimizationOptions): # 17 + def setOptimizationOptions(self, optimizationOptions: str | list[str] | dict[str, Any]) -> bool: """ This method is used to set optimization options. It can be called: with a sequence of optimization options name and assigning corresponding values as arguments as show in the example below: usage - >>> setOptimizationOptions("Name=value") - >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) + >>> setOptimizationOptions("Name=value") # depreciated + >>> setOptimizationOptions(["Name1=value1","Name2=value2"]) # depreciated + >>> setOptimizationOptions(optimizationOptions={"Name1": "value1", "Name2": "value2"}) """ - return self.setMethodHelper(optimizationOptions, self.optimizeOptions, "optimization-option", None) + inputdata = self._prepare_input_data(raw_input=optimizationOptions) + + return self._set_method_helper( + inputdata=inputdata, + classdata=self.optimizeOptions, + datatype="optimization-option", + overwritedata=None) - def setInputs(self, name): # 15 + def setInputs(self, name: str | list[str] | dict[str, Any]) -> bool: """ - This method is used to set input values. It can be called: - with a sequence of input name and assigning corresponding values as arguments as show in the example below: - usage - >>> setInputs("Name=value") - >>> setInputs(["Name1=value1","Name2=value2"]) - """ - if isinstance(name, str): - name = self._strip_space(name) - value = name.split("=") - if value[0] in self.inputlist: - tmpvalue = eval(value[1]) - if isinstance(tmpvalue, (int, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), - (float(self.simulateOptions["stopTime"]), float(value[1]))] - elif isinstance(tmpvalue, list): - self.checkValidInputs(tmpvalue) - self.inputlist[value[0]] = tmpvalue - self.inputFlag = True - else: - raise ModelicaSystemError(f"{value[0]} is not an input") - elif isinstance(name, list): - name = self._strip_space(name) - for var in name: - value = var.split("=") - if value[0] in self.inputlist: - tmpvalue = eval(value[1]) - if isinstance(tmpvalue, (int, float)): - self.inputlist[value[0]] = [(float(self.simulateOptions["startTime"]), float(value[1])), - (float(self.simulateOptions["stopTime"]), float(value[1]))] - elif isinstance(tmpvalue, list): - self.checkValidInputs(tmpvalue) - self.inputlist[value[0]] = tmpvalue - self.inputFlag = True - else: - raise ModelicaSystemError(f"{value[0]} is not an input!") - - def checkValidInputs(self, name): - if name != sorted(name, key=lambda x: x[0]): - raise ModelicaSystemError('Time value should be in increasing order') - for l in name: - if isinstance(l, tuple): - # if l[0] < float(self.simValuesList[0]): - if l[0] < float(self.simulateOptions["startTime"]): - raise ModelicaSystemError('Input time value is less than simulation startTime') - if len(l) != 2: - raise ModelicaSystemError(f'Value for {l} is in incorrect format!') + This method is used to set input values. It can be called with a sequence of input name and assigning + corresponding values as arguments as show in the example below. Compared to other set*() methods this is a + special case as value could be a list of tuples - these are converted to a string in _prepare_input_data() + and restored here via ast.literal_eval(). + + >>> setInputs("Name=value") # depreciated + >>> setInputs(["Name1=value1","Name2=value2"]) # depreciated + >>> setInputs(name={"Name1": "value1", "Name2": "value2"}) + """ + inputdata = self._prepare_input_data(raw_input=name) + + for key, val in inputdata.items(): + if key not in self.inputlist: + raise ModelicaSystemError(f"{key} is not an input") + + if not isinstance(val, str): + raise ModelicaSystemError(f"Invalid data in input for {repr(key)}: {repr(val)}") + + val_evaluated = ast.literal_eval(val) + + if isinstance(val_evaluated, (int, float)): + self.inputlist[key] = [(float(self.simulateOptions["startTime"]), float(val)), + (float(self.simulateOptions["stopTime"]), float(val))] + elif isinstance(val_evaluated, list): + if not all([isinstance(item, tuple) for item in val_evaluated]): + raise ModelicaSystemError("Value for setInput() must be in tuple format; " + f"got {repr(val_evaluated)}") + if val_evaluated != sorted(val_evaluated, key=lambda x: x[0]): + raise ModelicaSystemError("Time value should be in increasing order; " + f"got {repr(val_evaluated)}") + + for item in val_evaluated: + if item[0] < float(self.simulateOptions["startTime"]): + raise ModelicaSystemError(f"Time value in {repr(item)} of {repr(val_evaluated)} is less " + "than the simulation start time") + if len(item) != 2: + raise ModelicaSystemError(f"Value {repr(item)} of {repr(val_evaluated)} " + "is in incorrect format!") + + self.inputlist[key] = val_evaluated else: - raise ModelicaSystemError('Error!!! Value must be in tuple format') + raise ModelicaSystemError(f"Data cannot be evaluated for {repr(key)}: {repr(val)}") + + self.inputFlag = True - def createCSVData(self) -> pathlib.Path: + return True + + def createCSVData(self, csvfile: Optional[pathlib.Path] = None) -> pathlib.Path: + """ + Create a csv file with inputs for the simulation/optimization of the model. If csvfile is provided as argument, + this file is used; else a generic file name is created. + """ start_time: float = float(self.simulateOptions["startTime"]) stop_time: float = float(self.simulateOptions["stopTime"]) @@ -1192,13 +1326,14 @@ def createCSVData(self) -> pathlib.Path: ] csv_rows.append(row) - csvFile = self.tempdir / f'{self.modelName}.csv' + if csvfile is None: + csvfile = self.tempdir / f'{self.modelName}.csv' - with open(file=csvFile, mode="w", encoding="utf-8", newline="") as fh: + with open(file=csvfile, mode="w", encoding="utf-8", newline="") as fh: writer = csv.writer(fh) writer.writerows(csv_rows) - return csvFile + return csvfile # to convert Modelica model to FMU def convertMo2Fmu(self, version="2.0", fmuType="me_cs", fileNamePrefix="", includeResources=True): # 19 @@ -1314,8 +1449,8 @@ def load_module_from_path(module_name, file_path): for l in tupleList: if l[0] < float(self.simulateOptions["startTime"]): raise ModelicaSystemError('Input time value is less than simulation startTime') - self.csvFile = self.createCSVData() - om_cmd.arg_set(key="csvInput", val=self.csvFile.as_posix()) + csvfile = self.createCSVData() + om_cmd.arg_set(key="csvInput", val=csvfile.as_posix()) om_cmd.arg_set(key="l", val=str(lintime or self.linearOptions["stopTime"])) @@ -1382,3 +1517,373 @@ def getLinearStates(self): >>> getLinearStates() """ return self.linearstates + + +class ModelicaSystemDoE: + """ + Class to run DoEs based on a (Open)Modelica model using ModelicaSystem + + Example + ------- + ``` + import OMPython + import pathlib + + + def run_doe(): + mypath = pathlib.Path('.') + + model = mypath / "M.mo" + model.write_text( + " model M\n" + " parameter Integer p=1;\n" + " parameter Integer q=1;\n" + " parameter Real a = -1;\n" + " parameter Real b = -1;\n" + " Real x[p];\n" + " Real y[q];\n" + " equation\n" + " der(x) = a * fill(1.0, p);\n" + " der(y) = b * fill(1.0, q);\n" + " end M;\n" + ) + + param = { + # structural + 'p': [1, 2], + 'q': [3, 4], + # simple + 'a': [5, 6], + 'b': [7, 8], + } + + resdir = mypath / 'DoE' + resdir.mkdir(exist_ok=True) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=model.as_posix(), + modelName="M", + parameters=param, + resultpath=resdir, + simargs={"override": {'stopTime': 1.0}}, + ) + doe_mod.prepare() + doe_dict = doe_mod.get_doe() + doe_mod.simulate() + doe_sol = doe_mod.get_solutions() + + # ... work with doe_df and doe_sol ... + + + if __name__ == "__main__": + run_doe() + ``` + + """ + + DICT_RESULT_FILENAME: str = 'result filename' + DICT_RESULT_AVAILABLE: str = 'result available' + + def __init__( + self, + fileName: Optional[str | os.PathLike | pathlib.Path] = None, + modelName: Optional[str] = None, + lmodel: Optional[list[str | tuple[str, str]]] = None, + commandLineOptions: Optional[str] = None, + variableFilter: Optional[str] = None, + customBuildDirectory: Optional[str | os.PathLike | pathlib.Path] = None, + omhome: Optional[str] = None, + + simargs: Optional[dict[str, Optional[str | dict[str, str]]]] = None, + timeout: Optional[int] = None, + + resultpath: Optional[pathlib.Path] = None, + parameters: Optional[dict[str, list[str] | list[int] | list[float]]] = None, + ) -> None: + """ + Initialisation of ModelicaSystemDoE. The parameters are based on: ModelicaSystem.__init__() and + ModelicaSystem.simulate(). Additionally, the path to store the result files is needed (= resultpath) as well as + a list of parameters to vary for the Doe (= parameters). All possible combinations are considered. + """ + self._lmodel = lmodel + self._modelName = modelName + self._fileName = fileName + + self._CommandLineOptions = commandLineOptions + self._variableFilter = variableFilter + self._customBuildDirectory = customBuildDirectory + self._omhome = omhome + + # reference for the model; not used for any simulations but to evaluate parameters, etc. + self._mod = ModelicaSystem( + fileName=self._fileName, + modelName=self._modelName, + lmodel=self._lmodel, + commandLineOptions=self._CommandLineOptions, + variableFilter=self._variableFilter, + customBuildDirectory=self._customBuildDirectory, + omhome=self._omhome, + ) + + self._simargs = simargs + self._timeout = timeout + + if isinstance(resultpath, pathlib.Path): + self._resultpath = resultpath + else: + self._resultpath = pathlib.Path('.') + + if isinstance(parameters, dict): + self._parameters = parameters + else: + self._parameters = {} + + self._sim_dict: Optional[dict[str, dict[str, Any]]] = None + self._sim_task_query: queue.Queue = queue.Queue() + + def prepare(self) -> int: + """ + Prepare the DoE by evaluating the parameters. Each structural parameter requires a new instance of + ModelicaSystem while the non-structural parameters can just be set on the executable. + + The return value is the number of simulation defined. + """ + + param_structure = {} + param_simple = {} + for param_name in self._parameters.keys(): + changeable = self._mod.isParameterChangeable(name=param_name) + logger.info(f"Parameter {repr(param_name)} is changeable? {changeable}") + + if changeable: + param_simple[param_name] = self._parameters[param_name] + else: + param_structure[param_name] = self._parameters[param_name] + + param_structure_combinations = list(itertools.product(*param_structure.values())) + param_simple_combinations = list(itertools.product(*param_simple.values())) + + self._sim_dict = {} + for idx_pc_structure, pc_structure in enumerate(param_structure_combinations): + mod_structure = ModelicaSystem( + fileName=self._fileName, + modelName=self._modelName, + lmodel=self._lmodel, + commandLineOptions=self._CommandLineOptions, + variableFilter=self._variableFilter, + customBuildDirectory=self._customBuildDirectory, + omhome=self._omhome, + build=False, + ) + + sim_param_structure = {} + for idx_structure, pk_structure in enumerate(param_structure.keys()): + sim_param_structure[pk_structure] = pc_structure[idx_structure] + + pk_value = pc_structure[idx_structure] + if isinstance(pk_value, str): + expression = f"setParameterValue({self._modelName}, {pk_structure}, \"{pk_value}\")" + elif isinstance(pk_value, bool): + pk_value_bool_str = "true" if pk_value else "false" + expression = f"setParameterValue({self._modelName}, {pk_structure}, {pk_value_bool_str});" + else: + expression = f"setParameterValue({self._modelName}, {pk_structure}, {pk_value})" + res = mod_structure.sendExpression(expression) + if not res: + raise ModelicaSystemError(f"Cannot set structural parameter {self._modelName}.{pk_structure} " + f"to {pk_value} using {repr(expression)}") + + mod_structure.buildModel(variableFilter=self._variableFilter) + + for idx_pc_simple, pc_simple in enumerate(param_simple_combinations): + sim_param_simple = {} + for idx_simple, pk_simple in enumerate(param_simple.keys()): + sim_param_simple[pk_simple] = pc_simple[idx_simple] + + resfilename = f"DOE_{idx_pc_structure:09d}_{idx_pc_simple:09d}.mat" + logger.info(f"use result file {repr(resfilename)} " + f"for structural parameters: {sim_param_structure} " + f"and simple parameters: {sim_param_simple}") + resultfile = self._resultpath / resfilename + + df_data = ( + { + 'ID structure': idx_pc_structure, + } + | sim_param_structure + | { + 'ID non-structure': idx_pc_simple, + } + | sim_param_simple + | { + self.DICT_RESULT_AVAILABLE: False, + } + ) + + self._sim_dict[resfilename] = df_data + + mscmd = mod_structure.simulate_cmd( + resultfile=resultfile.absolute().resolve(), + timeout=self._timeout, + ) + if self._simargs is not None: + mscmd.args_set(args=self._simargs) + mscmd.args_set(args={"override": sim_param_simple}) + + self._sim_task_query.put(mscmd) + + logger.info(f"Prepared {self._sim_task_query.qsize()} simulation definitions for the defined DoE.") + + return self._sim_task_query.qsize() + + def get_doe(self) -> Optional[dict[str, dict[str, Any]]]: + """ + Get the defined DoE as a dict, where each key is the result filename and the value is a dict of simulation + settings including structural and non-structural parameters. + + The following code snippet can be used to convert the data to a pandas dataframe: + + ``` + import pandas as pd + + doe_dict = doe_mod.get_doe() + doe_df = pd.DataFrame.from_dict(data=doe_dict, orient='index') + ``` + + """ + return self._sim_dict + + def simulate( + self, + num_workers: int = 3, + ) -> bool: + """ + Simulate the DoE using the defined number of workers. + + Returns True if all simulations were done successfully, else False. + """ + + sim_query_total = self._sim_task_query.qsize() + if not isinstance(self._sim_dict, dict) or len(self._sim_dict) == 0: + raise ModelicaSystemError("Missing Doe Summary!") + sim_dict_total = len(self._sim_dict) + + def worker(worker_id, task_queue): + while True: + try: + # Get the next task from the queue + mscmd = task_queue.get(block=False) + except queue.Empty: + logger.info(f"[Worker {worker_id}] No more simulations to run.") + break + + if mscmd is None: + raise ModelicaSystemError("Missing simulation definition!") + + resultfile = mscmd.arg_get(key='r') + resultpath = pathlib.Path(resultfile) + + logger.info(f"[Worker {worker_id}] Performing task: {resultpath.name}") + + try: + mscmd.run() + except ModelicaSystemError as ex: + logger.warning(f"Simulation error for {resultpath.name}: {ex}") + + # Mark the task as done + task_queue.task_done() + + sim_query_done = sim_query_total - self._sim_task_query.qsize() + logger.info(f"[Worker {worker_id}] Task completed: {resultpath.name} " + f"({sim_query_total - sim_query_done}/{sim_query_total} = " + f"{(sim_query_total - sim_query_done) / sim_query_total * 100:.2f}% of tasks left)") + + logger.info(f"Start simulations for DoE with {sim_query_total} simulations " + f"using {num_workers} workers ...") + + # Create and start worker threads + threads = [] + for i in range(num_workers): + thread = threading.Thread(target=worker, args=(i, self._sim_task_query)) + thread.start() + threads.append(thread) + + # Wait for all threads to complete + for thread in threads: + thread.join() + + sim_dict_done = 0 + for resultfilename in self._sim_dict: + resultfile = self._resultpath / resultfilename + + # include check for an empty (=> 0B) result file which indicates a crash of the model executable + # see: https://github.com/OpenModelica/OMPython/issues/261 + # https://github.com/OpenModelica/OpenModelica/issues/13829 + if resultfile.is_file() and resultfile.stat().st_size > 0: + self._sim_dict[resultfilename][self.DICT_RESULT_AVAILABLE] = True + sim_dict_done += 1 + + logger.info(f"All workers finished ({sim_dict_done} of {sim_dict_total} simulations with a result file).") + + return sim_dict_total == sim_dict_done + + def get_solutions( + self, + var_list: Optional[list] = None, + ) -> Optional[tuple[str] | dict[str, dict[str, np.ndarray]]]: + """ + Get all solutions of the DoE run. The following return values are possible: + + * A list of variables if val_list == None + + * The Solutions as dict[str, pd.DataFrame] if a value list (== val_list) is defined. + + The following code snippet can be used to convert the solution data for each run to a pandas dataframe: + + ``` + import pandas as pd + + doe_sol = doe_mod.get_solutions() + for key in doe_sol: + data = doe_sol[key]['data'] + if data: + doe_sol[key]['df'] = pd.DataFrame.from_dict(data=data) + else: + doe_sol[key]['df'] = None + ``` + + """ + if not isinstance(self._sim_dict, dict): + return None + + if len(self._sim_dict) == 0: + raise ModelicaSystemError("No result files available - all simulations did fail?") + + sol_dict: dict[str, dict[str, Any]] = {} + for resultfilename in self._sim_dict: + resultfile = self._resultpath / resultfilename + + sol_dict[resultfilename] = {} + + if not self._sim_dict[resultfilename][self.DICT_RESULT_AVAILABLE]: + sol_dict[resultfilename]['msg'] = 'No result file available!' + sol_dict[resultfilename]['data'] = {} + continue + + if var_list is None: + var_list_row = list(self._mod.getSolutions(resultfile=resultfile)) + else: + var_list_row = var_list + + try: + sol = self._mod.getSolutions(varList=var_list_row, resultfile=resultfile) + sol_data = {var: sol[idx] for idx, var in enumerate(var_list_row)} + sol_dict[resultfilename]['msg'] = 'Simulation available' + sol_dict[resultfilename]['data'] = sol_data + except ModelicaSystemError as ex: + msg = f"Error reading solution for {resultfilename}: {ex}" + logger.warning(msg) + sol_dict[resultfilename]['msg'] = msg + sol_dict[resultfilename]['data'] = {} + + return sol_dict diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 1da0a0a3..8a0584fd 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -36,7 +36,8 @@ CONDITIONS OF OSMC-PL. """ -from OMPython.ModelicaSystem import LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemError +from OMPython.ModelicaSystem import (LinearizationResult, ModelicaSystem, ModelicaSystemCmd, ModelicaSystemDoE, + ModelicaSystemError) from OMPython.OMCSession import (OMCSessionCmd, OMCSessionException, OMCSessionZMQ, OMCProcessPort, OMCProcessLocal, OMCProcessDocker, OMCProcessDockerContainer, OMCProcessWSL) @@ -46,6 +47,7 @@ 'LinearizationResult', 'ModelicaSystem', 'ModelicaSystemCmd', + 'ModelicaSystemDoE', 'ModelicaSystemError', 'OMCSessionCmd', diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystem.py index 156dde03..b3026279 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystem.py @@ -35,8 +35,8 @@ def test_setParameters(): mod = OMPython.ModelicaSystem(model_path + "BouncingBall.mo", "BouncingBall") # method 1 - mod.setParameters("e=1.234") - mod.setParameters("g=321.0") + mod.setParameters(pvals={"e": 1.234}) + mod.setParameters(pvals={"g": 321.0}) assert mod.getParameters("e") == ["1.234"] assert mod.getParameters("g") == ["321.0"] assert mod.getParameters() == { @@ -47,7 +47,7 @@ def test_setParameters(): mod.getParameters("thisParameterDoesNotExist") # method 2 - mod.setParameters(["e=21.3", "g=0.12"]) + mod.setParameters(pvals={"e": 21.3, "g": 0.12}) assert mod.getParameters() == { "e": "21.3", "g": "0.12", @@ -64,8 +64,8 @@ def test_setSimulationOptions(): mod = OMPython.ModelicaSystem(fileName=model_path + "BouncingBall.mo", modelName="BouncingBall") # method 1 - mod.setSimulationOptions("stopTime=1.234") - mod.setSimulationOptions("tolerance=1.1e-08") + mod.setSimulationOptions(simOptions={"stopTime": 1.234}) + mod.setSimulationOptions(simOptions={"tolerance": 1.1e-08}) assert mod.getSimulationOptions("stopTime") == ["1.234"] assert mod.getSimulationOptions("tolerance") == ["1.1e-08"] assert mod.getSimulationOptions(["tolerance", "stopTime"]) == ["1.1e-08", "1.234"] @@ -77,7 +77,7 @@ def test_setSimulationOptions(): mod.getSimulationOptions("thisOptionDoesNotExist") # method 2 - mod.setSimulationOptions(["stopTime=2.1", "tolerance=1.2e-08"]) + mod.setSimulationOptions(simOptions={"stopTime": 2.1, "tolerance": "1.2e-08"}) d = mod.getSimulationOptions() assert d["stopTime"] == "2.1" assert d["tolerance"] == "1.2e-08" @@ -119,7 +119,7 @@ def test_getSolutions(model_firstorder): a = -1 tau = -1 / a stopTime = 5*tau - mod.setSimulationOptions([f"stopTime={stopTime}", "stepSize=0.1", "tolerance=1e-8"]) + mod.setSimulationOptions(simOptions={"stopTime": stopTime, "stepSize": 0.1, "tolerance": 1e-8}) mod.simulate() x = mod.getSolutions("x") @@ -298,7 +298,7 @@ def test_getters(tmp_path): x0 = 1.0 x_analytical = -b/a + (x0 + b/a) * np.exp(a * stopTime) dx_analytical = (x0 + b/a) * a * np.exp(a * stopTime) - mod.setSimulationOptions(f"stopTime={stopTime}") + mod.setSimulationOptions(simOptions={"stopTime": stopTime}) mod.simulate() # getOutputs after simulate() @@ -327,7 +327,7 @@ def test_getters(tmp_path): mod.getContinuous("a") # a is a parameter with pytest.raises(OMPython.ModelicaSystemError): - mod.setSimulationOptions("thisOptionDoesNotExist=3") + mod.setSimulationOptions(simOptions={"thisOptionDoesNotExist": 3}) def test_simulate_inputs(tmp_path): @@ -345,7 +345,7 @@ def test_simulate_inputs(tmp_path): """) mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="M_input") - mod.setSimulationOptions("stopTime=1.0") + mod.setSimulationOptions(simOptions={"stopTime": 1.0}) # integrate zero (no setInputs call) - it should default to None -> 0 assert mod.getInputs() == { @@ -357,7 +357,7 @@ def test_simulate_inputs(tmp_path): assert np.isclose(y[-1], 0.0) # integrate a constant - mod.setInputs("u1=2.5") + mod.setInputs(name={"u1": 2.5}) assert mod.getInputs() == { "u1": [ (0.0, 2.5), @@ -370,7 +370,7 @@ def test_simulate_inputs(tmp_path): assert np.isclose(y[-1], 2.5) # now let's integrate the sum of two ramps - mod.setInputs("u1=[(0.0, 0.0), (0.5, 2), (1.0, 0)]") + mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 2), (1.0, 0)]}) assert mod.getInputs("u1") == [[ (0.0, 0.0), (0.5, 2.0), @@ -383,25 +383,25 @@ def test_simulate_inputs(tmp_path): # let's try some edge cases # unmatched startTime with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs("u1=[(-0.5, 0.0), (1.0, 1)]") + mod.setInputs(name={"u1": [(-0.5, 0.0), (1.0, 1)]}) mod.simulate() # unmatched stopTime with pytest.raises(OMPython.ModelicaSystemError): - mod.setInputs("u1=[(0.0, 0.0), (0.5, 1)]") + mod.setInputs(name={"u1": [(0.0, 0.0), (0.5, 1)]}) mod.simulate() # Let's use both inputs, but each one with different number of of # samples. This has an effect when generating the csv file. - mod.setInputs([ - "u1=[(0.0, 0), (1.0, 1)]", - "u2=[(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]", - ]) - mod.simulate() - assert pathlib.Path(mod.csvFile).read_text() == """time,u1,u2,end + mod.setInputs(name={"u1": [(0.0, 0), (1.0, 1)], + "u2": [(0.0, 0), (0.25, 0.5), (0.5, 1.0), (1.0, 0)]}) + csv_file = mod.createCSVData() + assert pathlib.Path(csv_file).read_text() == """time,u1,u2,end 0.0,0.0,0.0,0 0.25,0.25,0.5,0 0.5,0.5,1.0,0 1.0,1.0,0.0,0 """ + + mod.simulate() y = mod.getSolutions("y")[0] assert np.isclose(y[-1], 1.0) diff --git a/tests/test_ModelicaSystemDoE.py b/tests/test_ModelicaSystemDoE.py new file mode 100644 index 00000000..40fed90d --- /dev/null +++ b/tests/test_ModelicaSystemDoE.py @@ -0,0 +1,83 @@ +import numpy as np +import OMPython +import pathlib +import pytest + + +@pytest.fixture +def model_doe(tmp_path: pathlib.Path) -> pathlib.Path: + # see: https://trac.openmodelica.org/OpenModelica/ticket/4052 + mod = tmp_path / "M.mo" + mod.write_text(""" +model M + parameter Integer p=1; + parameter Integer q=1; + parameter Real a = -1; + parameter Real b = -1; + Real x[p]; + Real y[q]; +equation + der(x) = a * fill(1.0, p); + der(y) = b * fill(1.0, q); +end M; +""") + return mod + + +@pytest.fixture +def param_doe() -> dict[str, list]: + param = { + # structural + 'p': [1, 2], + 'q': [3, 4], + # simple + 'a': [5, 6], + 'b': [7, 8], + } + return param + + +def test_ModelicaSystemDoE(tmp_path, model_doe, param_doe): + tmpdir = tmp_path / 'DoE' + tmpdir.mkdir(exist_ok=True) + + doe_mod = OMPython.ModelicaSystemDoE( + fileName=model_doe.as_posix(), + modelName="M", + parameters=param_doe, + resultpath=tmpdir, + simargs={"override": {'stopTime': 1.0}}, + ) + doe_count = doe_mod.prepare() + assert doe_count == 16 + + doe_dict = doe_mod.get_doe() + assert isinstance(doe_dict, dict) + assert len(doe_dict.keys()) == 16 + + doe_status = doe_mod.simulate() + assert doe_status is True + + doe_sol = doe_mod.get_solutions() + + for resultfilename in doe_dict: + row = doe_dict[resultfilename] + + assert resultfilename in doe_sol + sol = doe_sol[resultfilename] + + var_dict = { + # simple / non-structural parameters + 'a': float(row['a']), + 'b': float(row['b']), + # structural parameters + 'p': float(row['p']), + 'q': float(row['q']), + # variables using the structural parameters + f"x[{row['p']}]": float(row['a']), + f"y[{row['p']}]": float(row['b']), + } + + for var in var_dict: + assert var in sol['data'] + assert np.isclose(sol['data'][var][-1], var_dict[var]) diff --git a/tests/test_linearization.py b/tests/test_linearization.py index 2c79190c..6af565c6 100644 --- a/tests/test_linearization.py +++ b/tests/test_linearization.py @@ -62,10 +62,10 @@ def test_getters(tmp_path): assert "startTime" in d assert "stopTime" in d assert mod.getLinearizationOptions(["stopTime", "startTime"]) == [d["stopTime"], d["startTime"]] - mod.setLinearizationOptions("stopTime=0.02") + mod.setLinearizationOptions(linearizationOptions={"stopTime": 0.02}) assert mod.getLinearizationOptions("stopTime") == ["0.02"] - mod.setInputs(["u1=10", "u2=0"]) + mod.setInputs(name={"u1": 10, "u2": 0}) [A, B, C, D] = mod.linearize() g = float(mod.getParameters("g")[0]) l = float(mod.getParameters("l")[0]) diff --git a/tests/test_optimization.py b/tests/test_optimization.py index aa74df79..b4164397 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -35,13 +35,15 @@ def test_optimization_example(tmp_path): mod = OMPython.ModelicaSystem(fileName=model_file.as_posix(), modelName="BangBang2021") - mod.setOptimizationOptions(["numberOfIntervals=16", "stopTime=1", - "stepSize=0.001", "tolerance=1e-8"]) + mod.setOptimizationOptions(optimizationOptions={"numberOfIntervals": 16, + "stopTime": 1, + "stepSize": 0.001, + "tolerance": 1e-8}) # test the getter assert mod.getOptimizationOptions()["stopTime"] == "1" assert mod.getOptimizationOptions("stopTime") == ["1"] - assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-8", "1"] + assert mod.getOptimizationOptions(["tolerance", "stopTime"]) == ["1e-08", "1"] r = mod.optimize() # it is necessary to specify resultfile, otherwise it wouldn't find it.