diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index cbe23036..b3f55eef 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -3,6 +3,7 @@ Definition of main class to run Modelica simulations - ModelicaSystem. """ +import abc import ast from dataclasses import dataclass import itertools @@ -14,7 +15,7 @@ import re import textwrap import threading -from typing import Any, cast, Optional +from typing import Any, cast, Optional, Tuple import warnings import xml.etree.ElementTree as ET @@ -338,28 +339,22 @@ def parse_simflags(simflags: str) -> dict[str, Optional[str | dict[str, Any] | n return simargs -class ModelicaSystem: +class ModelicaSystemABC(metaclass=abc.ABCMeta): """ - Class to simulate a Modelica model using OpenModelica via OMCSession. + Base class to simulate a Modelica models. """ def __init__( self, - command_line_options: Optional[list[str]] = None, + session: OMCSession, work_directory: Optional[str | os.PathLike] = None, - omhome: Optional[str] = None, - session: Optional[OMCSession] = None, ) -> None: """Create a ModelicaSystem instance. To define the model use model() or convertFmu2Mo(). Args: - command_line_options: List with extra command line options as elements. The list elements are - provided to omc via setCommandLineOptions(). If set, the default values will be overridden. - To disable any command line options, use an empty list. work_directory: Path to a directory to be used for temporary files like the model executable. If left unspecified, a tmp directory will be created. - omhome: path to OMC to be used when creating the OMC session (see OMCSession). session: definition of a (local) OMC session to be used. If unspecified, a new local session will be created. """ @@ -385,24 +380,11 @@ def __init__( self._linearized_outputs: list[str] = [] # linearization output list self._linearized_states: list[str] = [] # linearization states list - if session is not None: - self._session = session - else: - self._session = OMCSessionLocal(omhome=omhome) + self._session = session # get OpenModelica version version_str = self._session.get_version() self._version = self._parse_om_version(version=version_str) - # set commandLineOptions using default values or the user defined list - if command_line_options is None: - # set default command line options to improve the performance of linearization and to avoid recompilation if - # the simulation executable is reused in linearize() via the runtime flag '-l' - command_line_options = [ - "--linearizationDumpLanguage=python", - "--generateSymbolicLinearization", - ] - for opt in command_line_options: - self.set_command_line_options(command_line_option=opt) self._simulated = False # True if the model has already been simulated self._result_file: Optional[OMCPath] = None # for storing result file @@ -414,89 +396,6 @@ def __init__( self._file_name: Optional[OMCPath] = None self._variable_filter: Optional[str] = None - def model( - self, - model_name: Optional[str] = None, - model_file: Optional[str | os.PathLike] = None, - libraries: Optional[list[str | tuple[str, str]]] = None, - variable_filter: Optional[str] = None, - build: bool = True, - ) -> None: - """Load and build a Modelica model. - - This method loads the model file and builds it if requested (build == True). - - Args: - model_file: Path to the model file. Either absolute or relative to - the current working directory. - model_name: The name of the model class. If it is contained within - a package, "PackageName.ModelName" should be used. - libraries: List of libraries to be loaded before the model itself is - loaded. Two formats are supported for the list elements: - lmodel=["Modelica"] for just the library name - and lmodel=[("Modelica","3.2.3")] for specifying both the name - and the version. - variable_filter: A regular expression. Only variables fully - matching the regexp will be stored in the result file. - Leaving it unspecified is equivalent to ".*". - build: Boolean controlling whether the model should be - built when constructor is called. If False, the constructor - simply loads the model without compiling. - - Examples: - mod = ModelicaSystem() - # and then one of the lines below - mod.model(name="modelName", file="ModelicaModel.mo", ) - mod.model(name="modelName", file="ModelicaModel.mo", libraries=["Modelica"]) - mod.model(name="modelName", file="ModelicaModel.mo", libraries=[("Modelica","3.2.3"), "PowerSystems"]) - """ - - if self._model_name is not None: - raise ModelicaSystemError("Can not reuse this instance of ModelicaSystem " - f"defined for {repr(self._model_name)}!") - - if model_name is None or not isinstance(model_name, str): - raise ModelicaSystemError("A model name must be provided!") - - if libraries is None: - libraries = [] - - if not isinstance(libraries, list): - raise ModelicaSystemError(f"Invalid input type for libraries: {type(libraries)} - list expected!") - - # set variables - self._model_name = model_name # Model class name - self._libraries = libraries # may be needed if model is derived from other model - self._variable_filter = variable_filter - - if self._libraries: - self._loadLibrary(libraries=self._libraries) - - self._file_name = None - if model_file is not None: - file_path = pathlib.Path(model_file) - # special handling for OMCProcessLocal - consider a relative path - if isinstance(self._session, OMCSessionLocal) and not file_path.is_absolute(): - file_path = pathlib.Path.cwd() / file_path - if not file_path.is_file(): - raise IOError(f"Model file {file_path} does not exist!") - - self._file_name = self.getWorkDirectory() / file_path.name - if (isinstance(self._session, OMCSessionLocal) - and file_path.as_posix() == self._file_name.as_posix()): - pass - elif self._file_name.is_file(): - raise IOError(f"Simulation model file {self._file_name} exist - not overwriting!") - else: - content = file_path.read_text(encoding='utf-8') - self._file_name.write_text(content) - - if self._file_name is not None: - self._loadFile(fileName=self._file_name) - - if build: - self.buildModel(variable_filter) - def get_session(self) -> OMCSession: """ Return the OMC session used for this class. @@ -512,41 +411,6 @@ def get_model_name(self) -> str: return self._model_name - def set_command_line_options(self, command_line_option: str): - """ - Set the provided command line option via OMC setCommandLineOptions(). - """ - expr = f'setCommandLineOptions("{command_line_option}")' - self.sendExpression(expr=expr) - - def _loadFile(self, fileName: OMCPath): - # load file - self.sendExpression(expr=f'loadFile("{fileName.as_posix()}")') - - # for loading file/package, loading model and building model - def _loadLibrary(self, libraries: list): - # load Modelica standard libraries or Modelica files if needed - for element in libraries: - if element is not None: - if isinstance(element, str): - if element.endswith(".mo"): - api_call = "loadFile" - else: - api_call = "loadModel" - self._requestApi(apiName=api_call, entity=element) - elif isinstance(element, tuple): - if not element[1]: - expr_load_lib = f"loadModel({element[0]})" - else: - expr_load_lib = f'loadModel({element[0]}, {{"{element[1]}"}})' - self.sendExpression(expr=expr_load_lib) - else: - raise ModelicaSystemError("loadLibrary() failed, Unknown type detected: " - f"{element} is of type {type(element)}, " - "The following patterns are supported:\n" - '1)["Modelica"]\n' - '2)[("Modelica","3.2.3"), "PowerSystems"]\n') - def setWorkDirectory(self, work_directory: Optional[str | os.PathLike] = None) -> OMCPath: """ Define the work directory for the ModelicaSystem / OpenModelica session. The model is build within this @@ -575,21 +439,10 @@ def getWorkDirectory(self) -> OMCPath: """ return self._work_dir - def buildModel(self, variableFilter: Optional[str] = None): - filter_def: Optional[str] = None - if variableFilter is not None: - filter_def = variableFilter - elif self._variable_filter is not None: - filter_def = self._variable_filter - - if filter_def is not None: - var_filter = f'variableFilter="{filter_def}"' - else: - var_filter = 'variableFilter=".*"' - - build_model_result = self._requestApi(apiName="buildModel", entity=self._model_name, properties=var_filter) - logger.debug("OM model build result: %s", build_model_result) - + def check_model_executable(self): + """ + Check if the model executable is working + """ # check if the executable exists ... om_cmd = ModelExecutionCmd( runpath=self.getWorkDirectory(), @@ -605,41 +458,6 @@ def buildModel(self, variableFilter: Optional[str] = None): if returncode != 0: raise ModelicaSystemError("Model executable not working!") - xml_file = self._session.omcpath(build_model_result[0]).parent / build_model_result[1] - self._xmlparse(xml_file=xml_file) - - def sendExpression(self, expr: str, parsed: bool = True) -> Any: - """ - Wrapper for OMCSession.sendExpression(). - """ - try: - retval = self._session.sendExpression(expr=expr, parsed=parsed) - except OMCSessionException as ex: - raise ModelicaSystemError(f"Error executing {repr(expr)}: {ex}") from ex - - logger.debug(f"Result of executing {repr(expr)}: {textwrap.shorten(repr(retval), width=100)}") - - return retval - - # request to OMC - def _requestApi( - self, - apiName: str, - entity: Optional[str] = None, - properties: Optional[str] = None, - ) -> Any: - if entity is not None and properties is not None: - expr = f'{apiName}({entity}, {properties})' - elif entity is not None and properties is None: - if apiName in ("loadFile", "importFMU"): - expr = f'{apiName}("{entity}")' - else: - expr = f'{apiName}({entity})' - else: - expr = f'{apiName}()' - - return self.sendExpression(expr=expr) - def _xmlparse(self, xml_file: OMCPath): if not xml_file.is_file(): raise ModelicaSystemError(f"XML file not generated: {xml_file}") @@ -782,142 +600,45 @@ def getContinuousInitial( raise ModelicaSystemError("Unhandled input for getContinousInitial()") - def getContinuousFinal( + def getParameters( self, names: Optional[str | list[str]] = None, - ) -> dict[str, np.float64] | list[np.float64]: - """ - Get (final) values of continuous signals (at stopTime). + ) -> dict[str, str] | list[str]: + """Get parameter values. Args: - names: Either None (default), a string with the continuous signal - name, or a list of signal name strings. + names: Either None (default), a string with the parameter name, + or a list of parameter name strings. Returns: If `names` is None, a dict in the format - {signal_name: signal_value} is returned. - If `names` is a string, a single element list [signal_value] is - returned. - If `names` is a list, a list with one value for each signal name - in names is returned: [signal1_value, signal2_value, ...]. + {parameter_name: parameter_value} is returned. + If `names` is a string, a single element list is returned. + If `names` is a list, a list with one value for each parameter name + in names is returned. + In all cases, parameter values are returned as strings. Examples: - >>> mod.getContinuousFinal() - {'x': np.float64(0.68), 'der(x)': np.float64(-0.24), 'y': np.float64(-0.24)} - >>> mod.getContinuousFinal("x") - [np.float64(0.68)] - >>> mod.getContinuousFinal(["y","x"]) - [np.float64(-0.24), np.float64(0.68)] + >>> mod.getParameters() + {'Name1': '1.23', 'Name2': '4.56'} + >>> mod.getParameters("Name1") + ['1.23'] + >>> mod.getParameters(["Name1","Name2"]) + ['1.23', '4.56'] """ - if not self._simulated: - raise ModelicaSystemError("Please use getContinuousInitial() before the simulation was started!") - - def get_continuous_solution(name_list: list[str]) -> None: - for name in name_list: - if name in self._continuous: - value = self.getSolutions(name) - self._continuous[name] = np.float64(value[0][-1]) - else: - raise KeyError(f"{names} is not continuous") - if names is None: - get_continuous_solution(name_list=list(self._continuous.keys())) - return self._continuous - + return self._params if isinstance(names, str): - get_continuous_solution(name_list=[names]) - return [self._continuous[names]] - + return [self._params[names]] if isinstance(names, list): - get_continuous_solution(name_list=names) - values = [] - for name in names: - values.append(self._continuous[name]) - return values + return [self._params[x] for x in names] - raise ModelicaSystemError("Unhandled input for getContinousFinal()") + raise ModelicaSystemError("Unhandled input for getParameters()") - def getContinuous( + def getInputs( self, names: Optional[str | list[str]] = None, - ) -> dict[str, np.float64] | list[np.float64]: - """Get values of continuous signals. - - If called before simulate(), the initial values are returned. - If called after simulate(), the final values (at stopTime) are returned. - The return format is always numpy.float64. - - Args: - names: Either None (default), a string with the continuous signal - name, or a list of signal name strings. - Returns: - If `names` is None, a dict in the format - {signal_name: signal_value} is returned. - If `names` is a string, a single element list [signal_value] is - returned. - If `names` is a list, a list with one value for each signal name - in names is returned: [signal1_value, signal2_value, ...]. - - Examples: - Before simulate(): - >>> mod.getContinuous() - {'x': '1.0', 'der(x)': None, 'y': '-0.4'} - >>> mod.getContinuous("y") - ['-0.4'] - >>> mod.getContinuous(["y","x"]) - ['-0.4', '1.0'] - - After simulate(): - >>> mod.getContinuous() - {'x': np.float64(0.68), 'der(x)': np.float64(-0.24), 'y': np.float64(-0.24)} - >>> mod.getContinuous("x") - [np.float64(0.68)] - >>> mod.getContinuous(["y","x"]) - [np.float64(-0.24), np.float64(0.68)] - """ - if not self._simulated: - return self.getContinuousInitial(names=names) - - return self.getContinuousFinal(names=names) - - def getParameters( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, str] | list[str]: - """Get parameter values. - - Args: - names: Either None (default), a string with the parameter name, - or a list of parameter name strings. - Returns: - If `names` is None, a dict in the format - {parameter_name: parameter_value} is returned. - If `names` is a string, a single element list is returned. - If `names` is a list, a list with one value for each parameter name - in names is returned. - In all cases, parameter values are returned as strings. - - Examples: - >>> mod.getParameters() - {'Name1': '1.23', 'Name2': '4.56'} - >>> mod.getParameters("Name1") - ['1.23'] - >>> mod.getParameters(["Name1","Name2"]) - ['1.23', '4.56'] - """ - if names is None: - return self._params - if isinstance(names, str): - return [self._params[names]] - if isinstance(names, list): - return [self._params[x] for x in names] - - raise ModelicaSystemError("Unhandled input for getParameters()") - - def getInputs( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, list[tuple[float, float]]] | list[list[tuple[float, float]]]: - """Get values of input signals. + ) -> dict[str, list[tuple[float, float]]] | list[list[tuple[float, float]]]: + """Get values of input signals. Args: names: Either None (default), a string with the input name, @@ -985,102 +706,6 @@ def getOutputsInitial( raise ModelicaSystemError("Unhandled input for getOutputsInitial()") - def getOutputsFinal( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, np.float64] | list[np.float64]: - """Get (final) values of output signals (at stopTime). - - Args: - names: Either None (default), a string with the output name, - or a list of output name strings. - Returns: - If `names` is None, a dict in the format - {output_name: output_value} is returned. - If `names` is a string, a single element list [output_value] is - returned. - If `names` is a list, a list with one value for each output name - in names is returned: [output1_value, output2_value, ...]. - - Examples: - >>> mod.getOutputsFinal() - {'out1': np.float64(-0.1234), 'out2': np.float64(2.1)} - >>> mod.getOutputsFinal("out1") - [np.float64(-0.1234)] - >>> mod.getOutputsFinal(["out1","out2"]) - [np.float64(-0.1234), np.float64(2.1)] - """ - if not self._simulated: - raise ModelicaSystemError("Please use getOuputsInitial() before the simulation was started!") - - def get_outputs_solution(name_list: list[str]) -> None: - for name in name_list: - if name in self._outputs: - value = self.getSolutions(name) - self._outputs[name] = np.float64(value[0][-1]) - else: - raise KeyError(f"{names} is not a valid output") - - if names is None: - get_outputs_solution(name_list=list(self._outputs.keys())) - return self._outputs - - if isinstance(names, str): - get_outputs_solution(name_list=[names]) - return [self._outputs[names]] - - if isinstance(names, list): - get_outputs_solution(name_list=names) - values = [] - for name in names: - values.append(self._outputs[name]) - return values - - raise ModelicaSystemError("Unhandled input for getOutputs()") - - def getOutputs( - self, - names: Optional[str | list[str]] = None, - ) -> dict[str, np.float64] | list[np.float64]: - """Get values of output signals. - - If called before simulate(), the initial values are returned. - If called after simulate(), the final values (at stopTime) are returned. - The return format is always numpy.float64. - - Args: - names: Either None (default), a string with the output name, - or a list of output name strings. - Returns: - If `names` is None, a dict in the format - {output_name: output_value} is returned. - If `names` is a string, a single element list [output_value] is - returned. - If `names` is a list, a list with one value for each output name - in names is returned: [output1_value, output2_value, ...]. - - Examples: - Before simulate(): - >>> mod.getOutputs() - {'out1': '-0.4', 'out2': '1.2'} - >>> mod.getOutputs("out1") - ['-0.4'] - >>> mod.getOutputs(["out1","out2"]) - ['-0.4', '1.2'] - - After simulate(): - >>> mod.getOutputs() - {'out1': np.float64(-0.1234), 'out2': np.float64(2.1)} - >>> mod.getOutputs("out1") - [np.float64(-0.1234)] - >>> mod.getOutputs(["out1","out2"]) - [np.float64(-0.1234), np.float64(2.1)] - """ - if not self._simulated: - return self.getOutputsInitial(names=names) - - return self.getOutputsFinal(names=names) - def getSimulationOptions( self, names: Optional[str | list[str]] = None, @@ -1372,151 +997,50 @@ def simulate( self._simulated = True - def plot( - self, - plotdata: str, - resultfile: Optional[str | os.PathLike] = None, - ) -> None: + @staticmethod + def _prepare_input_data( + input_args: Any, + input_kwargs: dict[str, Any], + ) -> dict[str, str]: """ - Plot a variable using OMC; this will work for local OMC usage only (OMCProcessLocal). The reason is that the - plot is created by OMC which needs access to the local display. This is not the case for docker and WSL. + Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. """ - if not isinstance(self._session, OMCSessionLocal): - raise ModelicaSystemError("Plot is using the OMC plot functionality; " - "thus, it is only working if OMC is running locally!") - - if resultfile is not None: - plot_result_file = self._session.omcpath(resultfile) - elif self._result_file is not None: - plot_result_file = self._result_file - else: - raise ModelicaSystemError("No resultfile available - either run simulate() before plotting " - "or provide a result file!") + def prepare_str(str_in: str) -> dict[str, str]: + str_in = str_in.replace(" ", "") + key_val_list: list[str] = str_in.split("=") + if len(key_val_list) != 2: + raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") - if not plot_result_file.is_file(): - raise ModelicaSystemError(f"Provided resultfile {repr(plot_result_file.as_posix())} does not exists!") + input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} - expr = f'plot({plotdata}, fileName="{plot_result_file.as_posix()}")' - self.sendExpression(expr=expr) + return input_data_from_str - def getSolutions( - self, - varList: Optional[str | list[str]] = None, - resultfile: Optional[str | os.PathLike] = None, - ) -> tuple[str] | np.ndarray: - """Extract simulation results from a result data file. + input_data: dict[str, str] = {} - Args: - varList: Names of variables to be extracted. Either unspecified to - get names of available variables, or a single variable name - as a string, or a list of variable names. - resultfile: Path to the result file. If unspecified, the result - file created by simulate() is used. + for input_arg in input_args: + if isinstance(input_arg, str): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) + input_data = input_data | prepare_str(input_arg) + elif isinstance(input_arg, list): + warnings.warn(message="The definition of values to set should use a dictionary, " + "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " + "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", + category=DeprecationWarning, + stacklevel=3) - Returns: - If varList is None, a tuple with names of all variables - is returned. - If varList is a string, a 1D numpy array is returned. - If varList is a list, a 2D numpy array is returned. - - Examples: - >>> mod.getSolutions() - ('a', 'der(x)', 'time', 'x') - >>> mod.getSolutions("x") - np.array([[1. , 0.90483742, 0.81873075]]) - >>> mod.getSolutions(["x", "der(x)"]) - np.array([[1. , 0.90483742 , 0.81873075], - [-1. , -0.90483742, -0.81873075]]) - >>> mod.getSolutions(resultfile="c:/a.mat") - ('a', 'der(x)', 'time', 'x') - >>> mod.getSolutions("x", resultfile="c:/a.mat") - np.array([[1. , 0.90483742, 0.81873075]]) - >>> mod.getSolutions(["x", "der(x)"], resultfile="c:/a.mat") - np.array([[1. , 0.90483742 , 0.81873075], - [-1. , -0.90483742, -0.81873075]]) - """ - if resultfile is None: - if self._result_file is None: - raise ModelicaSystemError("No result file found. Run simulate() first.") - result_file = self._result_file - else: - result_file = self._session.omcpath(resultfile) - - # check if the result file exits - if not result_file.is_file(): - raise ModelicaSystemError(f"Result file does not exist {result_file.as_posix()}") - - # get absolute path - result_file = result_file.absolute() - - result_vars = self.sendExpression(expr=f'readSimulationResultVars("{result_file.as_posix()}")') - self.sendExpression(expr="closeSimulationResultFile()") - if varList is None: - return result_vars - - if isinstance(varList, str): - var_list_checked = [varList] - elif isinstance(varList, list): - var_list_checked = varList - else: - raise ModelicaSystemError("Unhandled input for getSolutions()") - - for var in var_list_checked: - if var == "time": - continue - if var not in result_vars: - raise ModelicaSystemError(f"Requested data {repr(var)} does not exist") - variables = ",".join(var_list_checked) - res = self.sendExpression(expr=f'readSimulationResult("{result_file.as_posix()}",{{{variables}}})') - np_res = np.array(res) - self.sendExpression(expr="closeSimulationResultFile()") - return np_res - - @staticmethod - def _prepare_input_data( - input_args: Any, - input_kwargs: dict[str, Any], - ) -> dict[str, str]: - """ - Convert raw input to a structured dictionary {'key1': 'value1', 'key2': 'value2'}. - """ - - def prepare_str(str_in: str) -> dict[str, str]: - str_in = str_in.replace(" ", "") - key_val_list: list[str] = str_in.split("=") - if len(key_val_list) != 2: - raise ModelicaSystemError(f"Invalid 'key=value' pair: {str_in}") - - input_data_from_str: dict[str, str] = {key_val_list[0]: key_val_list[1]} - - return input_data_from_str - - input_data: dict[str, str] = {} - - for input_arg in input_args: - if isinstance(input_arg, str): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - input_data = input_data | prepare_str(input_arg) - elif isinstance(input_arg, list): - warnings.warn(message="The definition of values to set should use a dictionary, " - "i.e. {'key1': 'val1', 'key2': 'val2', ...}. Please convert all cases which " - "use a string ('key=val') or list ['key1=val1', 'key2=val2', ...]", - category=DeprecationWarning, - stacklevel=3) - - for item in input_arg: - if not isinstance(item, str): - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") - input_data = input_data | prepare_str(item) - elif isinstance(input_arg, dict): - input_data = input_data | input_arg - else: - raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") + for item in input_arg: + if not isinstance(item, str): + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(item)}!") + input_data = input_data | prepare_str(item) + elif isinstance(input_arg, dict): + input_data = input_data | input_arg + else: + raise ModelicaSystemError(f"Invalid input data type for set*() function: {type(input_arg)}!") if len(input_kwargs): for key, val in input_kwargs.items(): @@ -1806,24 +1330,672 @@ def _createCSVData(self, csvfile: Optional[OMCPath] = None) -> OMCPath: input_names = list(interpolated_inputs.keys()) header = ['time'] + input_names + ['end'] - csv_rows = [header] - for i, t in enumerate(all_times): - row = [ - t, # time - *(interpolated_inputs[name][i] for name in input_names), # input values - 0, # trailing 'end' column - ] - csv_rows.append(row) + csv_rows = [header] + for i, t in enumerate(all_times): + row = [ + t, # time + *(interpolated_inputs[name][i] for name in input_names), # input values + 0, # trailing 'end' column + ] + csv_rows.append(row) + + if csvfile is None: + csvfile = self.getWorkDirectory() / f'{self._model_name}.csv' + + # basic definition of a CSV file using csv_rows as input + csv_content = "\n".join([",".join(map(str, row)) for row in csv_rows]) + "\n" + + csvfile.write_text(csv_content) + + return csvfile + + def linearize( + self, + lintime: Optional[float] = None, + simflags: Optional[str] = None, + simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, + ) -> LinearizationResult: + """Linearize the model according to linearization options. + + See setLinearizationOptions. + + Args: + lintime: Override "stopTime" value. + simflags: String of extra command line flags for the model binary. + This argument is deprecated, use simargs instead. + simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" + + Returns: + A LinearizationResult object is returned. This allows several + uses: + * `(A, B, C, D) = linearize()` to get just the matrices, + * `result = linearize(); result.A` to get everything and access the + attributes one by one, + * `result = linearize(); A = result[0]` mostly just for backwards + compatibility, because linearize() used to return `[A, B, C, D]`. + """ + if len(self._quantities) == 0: + # if self._quantities has no content, the xml file was not parsed; see self._xmlparse() + raise ModelicaSystemError( + "Linearization cannot be performed as the model is not build, " + "use ModelicaSystemOMC() to build the model first" + ) + + om_cmd = ModelExecutionCmd( + runpath=self.getWorkDirectory(), + cmd_local=self._session.model_execution_local, + cmd_windows=self._session.model_execution_windows, + cmd_prefix=self._session.model_execution_prefix(cwd=self.getWorkDirectory()), + model_name=self._model_name, + ) + + self._process_override_data( + om_cmd=om_cmd, + override_file=self.getWorkDirectory() / f'{self._model_name}_override_linear.txt', + override_var=self._override_variables, + override_sim=self._linearization_options, + ) + + if self._inputs: + for data in self._inputs.values(): + if data is not None: + for value in data: + if value[0] < float(self._simulate_options["startTime"]): + raise ModelicaSystemError('Input time value is less than simulation startTime') + csvfile = self._createCSVData() + om_cmd.arg_set(key="csvInput", val=csvfile.as_posix()) + + if lintime is None: + lintime = float(self._linearization_options["stopTime"]) + if (float(self._linearization_options["startTime"]) > lintime + or float(self._linearization_options["stopTime"]) < lintime): + raise ModelicaSystemError(f"Invalid linearisation time: {lintime=}; " + f"expected value: {self._linearization_options['startTime']} " + f"<= lintime <= {self._linearization_options['stopTime']}") + om_cmd.arg_set(key="l", val=str(lintime)) + + # allow runtime simulation flags from user input + if simflags is not None: + om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) + + if simargs: + om_cmd.args_set(args=simargs) + + # the file create by the model executable which contains the matrix and linear inputs, outputs and states + linear_file = self.getWorkDirectory() / "linearized_model.py" + linear_file.unlink(missing_ok=True) + + cmd_definition = om_cmd.definition() + returncode = cmd_definition.run() + if returncode != 0: + raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") + if not linear_file.is_file(): + raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") + + self._simulated = True + + # extract data from the python file with the linearized model using the ast module - this allows to get the + # needed information without executing the created code + linear_data = {} + linear_file_content = linear_file.read_text() + try: + # ignore possible typing errors below (mypy) - these are caught by the try .. except .. block + linear_file_ast = ast.parse(linear_file_content) + for body_part in linear_file_ast.body[0].body: # type: ignore + if not isinstance(body_part, ast.Assign): + continue + + target = body_part.targets[0].id # type: ignore + value_ast = ast.literal_eval(body_part.value) + + linear_data[target] = value_ast + except (AttributeError, IndexError, ValueError, SyntaxError, TypeError) as ex: + raise ModelicaSystemError(f"Error parsing linearization file {linear_file}: {ex}") from ex + + # remove the file + linear_file.unlink() + + self._linearized_inputs = linear_data["inputVars"] + self._linearized_outputs = linear_data["outputVars"] + self._linearized_states = linear_data["stateVars"] + + return LinearizationResult( + n=linear_data["n"], + m=linear_data["m"], + p=linear_data["p"], + x0=linear_data["x0"], + u0=linear_data["u0"], + A=linear_data["A"], + B=linear_data["B"], + C=linear_data["C"], + D=linear_data["D"], + stateVars=linear_data["stateVars"], + inputVars=linear_data["inputVars"], + outputVars=linear_data["outputVars"], + ) + + def getLinearInputs(self) -> list[str]: + """Get names of input variables of the linearized model.""" + return self._linearized_inputs + + def getLinearOutputs(self) -> list[str]: + """Get names of output variables of the linearized model.""" + return self._linearized_outputs + + def getLinearStates(self) -> list[str]: + """Get names of state variables of the linearized model.""" + return self._linearized_states + + +class ModelicaSystemOMC(ModelicaSystemABC): + """ + Class to simulate a Modelica model using OpenModelica via OMCSession. + """ + + def __init__( + self, + command_line_options: Optional[list[str]] = None, + work_directory: Optional[str | os.PathLike] = None, + omhome: Optional[str] = None, + session: Optional[OMCSession] = None, + ) -> None: + """Create a ModelicaSystem instance. To define the model use model() or convertFmu2Mo(). + + Args: + command_line_options: List with extra command line options as elements. The list elements are + provided to omc via setCommandLineOptions(). If set, the default values will be overridden. + To disable any command line options, use an empty list. + work_directory: Path to a directory to be used for temporary + files like the model executable. If left unspecified, a tmp + directory will be created. + omhome: path to OMC to be used when creating the OMC session (see OMCSession). + session: definition of a (local) OMC session to be used. If + unspecified, a new local session will be created. + """ + + if session is None: + session = OMCSessionLocal(omhome=omhome) + + super().__init__( + session=session, + work_directory=work_directory, + ) + + # set commandLineOptions using default values or the user defined list + if command_line_options is None: + # set default command line options to improve the performance of linearization and to avoid recompilation if + # the simulation executable is reused in linearize() via the runtime flag '-l' + command_line_options = [ + "--linearizationDumpLanguage=python", + "--generateSymbolicLinearization", + ] + for opt in command_line_options: + self.set_command_line_options(command_line_option=opt) + + def model( + self, + model_name: Optional[str] = None, + model_file: Optional[str | os.PathLike] = None, + libraries: Optional[list[str | tuple[str, str]]] = None, + variable_filter: Optional[str] = None, + build: bool = True, + ) -> None: + """Load and build a Modelica model. + + This method loads the model file and builds it if requested (build == True). + + Args: + model_file: Path to the model file. Either absolute or relative to + the current working directory. + model_name: The name of the model class. If it is contained within + a package, "PackageName.ModelName" should be used. + libraries: List of libraries to be loaded before the model itself is + loaded. Two formats are supported for the list elements: + lmodel=["Modelica"] for just the library name + and lmodel=[("Modelica","3.2.3")] for specifying both the name + and the version. + variable_filter: A regular expression. Only variables fully + matching the regexp will be stored in the result file. + Leaving it unspecified is equivalent to ".*". + build: Boolean controlling whether the model should be + built when constructor is called. If False, the constructor + simply loads the model without compiling. + + Examples: + mod = ModelicaSystemOMC() + # and then one of the lines below + mod.model(name="modelName", file="ModelicaModel.mo", ) + mod.model(name="modelName", file="ModelicaModel.mo", libraries=["Modelica"]) + mod.model(name="modelName", file="ModelicaModel.mo", libraries=[("Modelica","3.2.3"), "PowerSystems"]) + """ + + if self._model_name is not None: + raise ModelicaSystemError("Can not reuse this instance of ModelicaSystem " + f"defined for {repr(self._model_name)}!") + + if model_name is None or not isinstance(model_name, str): + raise ModelicaSystemError("A model name must be provided!") + + if libraries is None: + libraries = [] + + if not isinstance(libraries, list): + raise ModelicaSystemError(f"Invalid input type for libraries: {type(libraries)} - list expected!") + + # set variables + self._model_name = model_name # Model class name + self._libraries = libraries # may be needed if model is derived from other model + self._variable_filter = variable_filter + + if self._libraries: + self._loadLibrary(libraries=self._libraries) + + self._file_name = None + if model_file is not None: + file_path = pathlib.Path(model_file) + # special handling for OMCProcessLocal - consider a relative path + if isinstance(self._session, OMCSessionLocal) and not file_path.is_absolute(): + file_path = pathlib.Path.cwd() / file_path + if not file_path.is_file(): + raise IOError(f"Model file {file_path} does not exist!") + + self._file_name = self.getWorkDirectory() / file_path.name + if (isinstance(self._session, OMCSessionLocal) + and file_path.as_posix() == self._file_name.as_posix()): + pass + elif self._file_name.is_file(): + raise IOError(f"Simulation model file {self._file_name} exist - not overwriting!") + else: + content = file_path.read_text(encoding='utf-8') + self._file_name.write_text(content) + + if self._file_name is not None: + self._loadFile(fileName=self._file_name) + + if build: + self.buildModel(variable_filter) + + def set_command_line_options(self, command_line_option: str): + """ + Set the provided command line option via OMC setCommandLineOptions(). + """ + expr = f'setCommandLineOptions("{command_line_option}")' + self.sendExpression(expr=expr) + + def _loadFile(self, fileName: OMCPath): + # load file + self.sendExpression(expr=f'loadFile("{fileName.as_posix()}")') + + # for loading file/package, loading model and building model + def _loadLibrary(self, libraries: list): + # load Modelica standard libraries or Modelica files if needed + for element in libraries: + if element is not None: + if isinstance(element, str): + if element.endswith(".mo"): + api_call = "loadFile" + else: + api_call = "loadModel" + self._requestApi(apiName=api_call, entity=element) + elif isinstance(element, tuple): + if not element[1]: + expr_load_lib = f"loadModel({element[0]})" + else: + expr_load_lib = f'loadModel({element[0]}, {{"{element[1]}"}})' + self.sendExpression(expr=expr_load_lib) + else: + raise ModelicaSystemError("loadLibrary() failed, Unknown type detected: " + f"{element} is of type {type(element)}, " + "The following patterns are supported:\n" + '1)["Modelica"]\n' + '2)[("Modelica","3.2.3"), "PowerSystems"]\n') + + def buildModel(self, variableFilter: Optional[str] = None): + filter_def: Optional[str] = None + if variableFilter is not None: + filter_def = variableFilter + elif self._variable_filter is not None: + filter_def = self._variable_filter + + if filter_def is not None: + var_filter = f'variableFilter="{filter_def}"' + else: + var_filter = 'variableFilter=".*"' + + build_model_result = self._requestApi(apiName="buildModel", entity=self._model_name, properties=var_filter) + logger.debug("OM model build result: %s", build_model_result) + + # check if the executable exists ... + self.check_model_executable() + + xml_file = self._session.omcpath(build_model_result[0]).parent / build_model_result[1] + self._xmlparse(xml_file=xml_file) + + def sendExpression(self, expr: str, parsed: bool = True) -> Any: + """ + Wrapper for OMCSession.sendExpression(). + """ + try: + retval = self._session.sendExpression(expr=expr, parsed=parsed) + except OMCSessionException as ex: + raise ModelicaSystemError(f"Error executing {repr(expr)}: {ex}") from ex + + logger.debug(f"Result of executing {repr(expr)}: {textwrap.shorten(repr(retval), width=100)}") + + return retval + + # request to OMC + def _requestApi( + self, + apiName: str, + entity: Optional[str] = None, + properties: Optional[str] = None, + ) -> Any: + if entity is not None and properties is not None: + expr = f'{apiName}({entity}, {properties})' + elif entity is not None and properties is None: + if apiName in ("loadFile", "importFMU"): + expr = f'{apiName}("{entity}")' + else: + expr = f'{apiName}({entity})' + else: + expr = f'{apiName}()' + + return self.sendExpression(expr=expr) + + def getContinuousFinal( + self, + names: Optional[str | list[str]] = None, + ) -> dict[str, np.float64] | list[np.float64]: + """ + Get (final) values of continuous signals (at stopTime). + + Args: + names: Either None (default), a string with the continuous signal + name, or a list of signal name strings. + Returns: + If `names` is None, a dict in the format + {signal_name: signal_value} is returned. + If `names` is a string, a single element list [signal_value] is + returned. + If `names` is a list, a list with one value for each signal name + in names is returned: [signal1_value, signal2_value, ...]. + + Examples: + >>> mod.getContinuousFinal() + {'x': np.float64(0.68), 'der(x)': np.float64(-0.24), 'y': np.float64(-0.24)} + >>> mod.getContinuousFinal("x") + [np.float64(0.68)] + >>> mod.getContinuousFinal(["y","x"]) + [np.float64(-0.24), np.float64(0.68)] + """ + if not self._simulated: + raise ModelicaSystemError("Please use getContinuousInitial() before the simulation was started!") + + def get_continuous_solution(name_list: list[str]) -> None: + for name in name_list: + if name in self._continuous: + value = self.getSolutions(name) + self._continuous[name] = np.float64(value[0][-1]) + else: + raise KeyError(f"{names} is not continuous") + + if names is None: + get_continuous_solution(name_list=list(self._continuous.keys())) + return self._continuous + + if isinstance(names, str): + get_continuous_solution(name_list=[names]) + return [self._continuous[names]] + + if isinstance(names, list): + get_continuous_solution(name_list=names) + values = [] + for name in names: + values.append(self._continuous[name]) + return values + + raise ModelicaSystemError("Unhandled input for getContinousFinal()") + + def getContinuous( + self, + names: Optional[str | list[str]] = None, + ) -> dict[str, np.float64] | list[np.float64]: + """Get values of continuous signals. + + If called before simulate(), the initial values are returned. + If called after simulate(), the final values (at stopTime) are returned. + The return format is always numpy.float64. + + Args: + names: Either None (default), a string with the continuous signal + name, or a list of signal name strings. + Returns: + If `names` is None, a dict in the format + {signal_name: signal_value} is returned. + If `names` is a string, a single element list [signal_value] is + returned. + If `names` is a list, a list with one value for each signal name + in names is returned: [signal1_value, signal2_value, ...]. + + Examples: + Before simulate(): + >>> mod.getContinuous() + {'x': '1.0', 'der(x)': None, 'y': '-0.4'} + >>> mod.getContinuous("y") + ['-0.4'] + >>> mod.getContinuous(["y","x"]) + ['-0.4', '1.0'] + + After simulate(): + >>> mod.getContinuous() + {'x': np.float64(0.68), 'der(x)': np.float64(-0.24), 'y': np.float64(-0.24)} + >>> mod.getContinuous("x") + [np.float64(0.68)] + >>> mod.getContinuous(["y","x"]) + [np.float64(-0.24), np.float64(0.68)] + """ + if not self._simulated: + return self.getContinuousInitial(names=names) + + return self.getContinuousFinal(names=names) + + def getOutputsFinal( + self, + names: Optional[str | list[str]] = None, + ) -> dict[str, np.float64] | list[np.float64]: + """Get (final) values of output signals (at stopTime). + + Args: + names: Either None (default), a string with the output name, + or a list of output name strings. + Returns: + If `names` is None, a dict in the format + {output_name: output_value} is returned. + If `names` is a string, a single element list [output_value] is + returned. + If `names` is a list, a list with one value for each output name + in names is returned: [output1_value, output2_value, ...]. + + Examples: + >>> mod.getOutputsFinal() + {'out1': np.float64(-0.1234), 'out2': np.float64(2.1)} + >>> mod.getOutputsFinal("out1") + [np.float64(-0.1234)] + >>> mod.getOutputsFinal(["out1","out2"]) + [np.float64(-0.1234), np.float64(2.1)] + """ + if not self._simulated: + raise ModelicaSystemError("Please use getOuputsInitial() before the simulation was started!") + + def get_outputs_solution(name_list: list[str]) -> None: + for name in name_list: + if name in self._outputs: + value = self.getSolutions(name) + self._outputs[name] = np.float64(value[0][-1]) + else: + raise KeyError(f"{names} is not a valid output") + + if names is None: + get_outputs_solution(name_list=list(self._outputs.keys())) + return self._outputs + + if isinstance(names, str): + get_outputs_solution(name_list=[names]) + return [self._outputs[names]] + + if isinstance(names, list): + get_outputs_solution(name_list=names) + values = [] + for name in names: + values.append(self._outputs[name]) + return values + + raise ModelicaSystemError("Unhandled input for getOutputs()") + + def getOutputs( + self, + names: Optional[str | list[str]] = None, + ) -> dict[str, np.float64] | list[np.float64]: + """Get values of output signals. + + If called before simulate(), the initial values are returned. + If called after simulate(), the final values (at stopTime) are returned. + The return format is always numpy.float64. + + Args: + names: Either None (default), a string with the output name, + or a list of output name strings. + Returns: + If `names` is None, a dict in the format + {output_name: output_value} is returned. + If `names` is a string, a single element list [output_value] is + returned. + If `names` is a list, a list with one value for each output name + in names is returned: [output1_value, output2_value, ...]. + + Examples: + Before simulate(): + >>> mod.getOutputs() + {'out1': '-0.4', 'out2': '1.2'} + >>> mod.getOutputs("out1") + ['-0.4'] + >>> mod.getOutputs(["out1","out2"]) + ['-0.4', '1.2'] + + After simulate(): + >>> mod.getOutputs() + {'out1': np.float64(-0.1234), 'out2': np.float64(2.1)} + >>> mod.getOutputs("out1") + [np.float64(-0.1234)] + >>> mod.getOutputs(["out1","out2"]) + [np.float64(-0.1234), np.float64(2.1)] + """ + if not self._simulated: + return self.getOutputsInitial(names=names) + + return self.getOutputsFinal(names=names) + + def plot( + self, + plotdata: str, + resultfile: Optional[str | os.PathLike] = None, + ) -> None: + """ + Plot a variable using OMC; this will work for local OMC usage only (OMCProcessLocal). The reason is that the + plot is created by OMC which needs access to the local display. This is not the case for docker and WSL. + """ + + if not isinstance(self._session, OMCSessionLocal): + raise ModelicaSystemError("Plot is using the OMC plot functionality; " + "thus, it is only working if OMC is running locally!") + + if resultfile is not None: + plot_result_file = self._session.omcpath(resultfile) + elif self._result_file is not None: + plot_result_file = self._result_file + else: + raise ModelicaSystemError("No resultfile available - either run simulate() before plotting " + "or provide a result file!") + + if not plot_result_file.is_file(): + raise ModelicaSystemError(f"Provided resultfile {repr(plot_result_file.as_posix())} does not exists!") + + expr = f'plot({plotdata}, fileName="{plot_result_file.as_posix()}")' + self.sendExpression(expr=expr) + + def getSolutions( + self, + varList: Optional[str | list[str]] = None, + resultfile: Optional[str | os.PathLike] = None, + ) -> tuple[str] | np.ndarray: + """Extract simulation results from a result data file. + + Args: + varList: Names of variables to be extracted. Either unspecified to + get names of available variables, or a single variable name + as a string, or a list of variable names. + resultfile: Path to the result file. If unspecified, the result + file created by simulate() is used. + + Returns: + If varList is None, a tuple with names of all variables + is returned. + If varList is a string, a 1D numpy array is returned. + If varList is a list, a 2D numpy array is returned. + + Examples: + >>> mod.getSolutions() + ('a', 'der(x)', 'time', 'x') + >>> mod.getSolutions("x") + np.array([[1. , 0.90483742, 0.81873075]]) + >>> mod.getSolutions(["x", "der(x)"]) + np.array([[1. , 0.90483742 , 0.81873075], + [-1. , -0.90483742, -0.81873075]]) + >>> mod.getSolutions(resultfile="c:/a.mat") + ('a', 'der(x)', 'time', 'x') + >>> mod.getSolutions("x", resultfile="c:/a.mat") + np.array([[1. , 0.90483742, 0.81873075]]) + >>> mod.getSolutions(["x", "der(x)"], resultfile="c:/a.mat") + np.array([[1. , 0.90483742 , 0.81873075], + [-1. , -0.90483742, -0.81873075]]) + """ + if resultfile is None: + if self._result_file is None: + raise ModelicaSystemError("No result file found. Run simulate() first.") + result_file = self._result_file + else: + result_file = self._session.omcpath(resultfile) + + # check if the result file exits + if not result_file.is_file(): + raise ModelicaSystemError(f"Result file does not exist {result_file.as_posix()}") - if csvfile is None: - csvfile = self.getWorkDirectory() / f'{self._model_name}.csv' + # get absolute path + result_file = result_file.absolute() - # basic definition of a CSV file using csv_rows as input - csv_content = "\n".join([",".join(map(str, row)) for row in csv_rows]) + "\n" + result_vars = self.sendExpression(expr=f'readSimulationResultVars("{result_file.as_posix()}")') + self.sendExpression(expr="closeSimulationResultFile()") + if varList is None: + return result_vars - csvfile.write_text(csv_content) + if isinstance(varList, str): + var_list_checked = [varList] + elif isinstance(varList, list): + var_list_checked = varList + else: + raise ModelicaSystemError("Unhandled input for getSolutions()") - return csvfile + for var in var_list_checked: + if var == "time": + continue + if var not in result_vars: + raise ModelicaSystemError(f"Requested data {repr(var)} does not exist") + variables = ",".join(var_list_checked) + res = self.sendExpression(expr=f'readSimulationResult("{result_file.as_posix()}",{{{variables}}})') + np_res = np.array(res) + self.sendExpression(expr="closeSimulationResultFile()") + return np_res def convertMo2Fmu( self, @@ -1929,147 +2101,16 @@ def optimize(self) -> dict[str, Any]: self.set_command_line_options("-g=Optimica") return self._requestApi(apiName='optimize', entity=self._model_name, properties=properties) - def linearize( - self, - lintime: Optional[float] = None, - simflags: Optional[str] = None, - simargs: Optional[dict[str, Optional[str | dict[str, Any] | numbers.Number]]] = None, - ) -> LinearizationResult: - """Linearize the model according to linearization options. - - See setLinearizationOptions. - - Args: - lintime: Override "stopTime" value. - simflags: String of extra command line flags for the model binary. - This argument is deprecated, use simargs instead. - simargs: A dict with command line flags and possible options; example: "simargs={'csvInput': 'a.csv'}" - - Returns: - A LinearizationResult object is returned. This allows several - uses: - * `(A, B, C, D) = linearize()` to get just the matrices, - * `result = linearize(); result.A` to get everything and access the - attributes one by one, - * `result = linearize(); A = result[0]` mostly just for backwards - compatibility, because linearize() used to return `[A, B, C, D]`. - """ - if len(self._quantities) == 0: - # if self._quantities has no content, the xml file was not parsed; see self._xmlparse() - raise ModelicaSystemError( - "Linearization cannot be performed as the model is not build, " - "use ModelicaSystem() to build the model first" - ) - - om_cmd = ModelExecutionCmd( - runpath=self.getWorkDirectory(), - cmd_local=self._session.model_execution_local, - cmd_windows=self._session.model_execution_windows, - cmd_prefix=self._session.model_execution_prefix(cwd=self.getWorkDirectory()), - model_name=self._model_name, - ) - - self._process_override_data( - om_cmd=om_cmd, - override_file=self.getWorkDirectory() / f'{self._model_name}_override_linear.txt', - override_var=self._override_variables, - override_sim=self._linearization_options, - ) - - if self._inputs: - for data in self._inputs.values(): - if data is not None: - for value in data: - if value[0] < float(self._simulate_options["startTime"]): - raise ModelicaSystemError('Input time value is less than simulation startTime') - csvfile = self._createCSVData() - om_cmd.arg_set(key="csvInput", val=csvfile.as_posix()) - - if lintime is None: - lintime = float(self._linearization_options["stopTime"]) - if (float(self._linearization_options["startTime"]) > lintime - or float(self._linearization_options["stopTime"]) < lintime): - raise ModelicaSystemError(f"Invalid linearisation time: {lintime=}; " - f"expected value: {self._linearization_options['startTime']} " - f"<= lintime <= {self._linearization_options['stopTime']}") - om_cmd.arg_set(key="l", val=str(lintime)) - - # allow runtime simulation flags from user input - if simflags is not None: - om_cmd.args_set(args=om_cmd.parse_simflags(simflags=simflags)) - - if simargs: - om_cmd.args_set(args=simargs) - - # the file create by the model executable which contains the matrix and linear inputs, outputs and states - linear_file = self.getWorkDirectory() / "linearized_model.py" - linear_file.unlink(missing_ok=True) - - cmd_definition = om_cmd.definition() - returncode = cmd_definition.run() - if returncode != 0: - raise ModelicaSystemError(f"Linearize failed with return code: {returncode}") - if not linear_file.is_file(): - raise ModelicaSystemError(f"Linearization failed: {linear_file} not found!") - - self._simulated = True - - # extract data from the python file with the linearized model using the ast module - this allows to get the - # needed information without executing the created code - linear_data = {} - linear_file_content = linear_file.read_text() - try: - # ignore possible typing errors below (mypy) - these are caught by the try .. except .. block - linear_file_ast = ast.parse(linear_file_content) - for body_part in linear_file_ast.body[0].body: # type: ignore - if not isinstance(body_part, ast.Assign): - continue - - target = body_part.targets[0].id # type: ignore - value_ast = ast.literal_eval(body_part.value) - - linear_data[target] = value_ast - except (AttributeError, IndexError, ValueError, SyntaxError, TypeError) as ex: - raise ModelicaSystemError(f"Error parsing linearization file {linear_file}: {ex}") from ex - - # remove the file - linear_file.unlink() - - self._linearized_inputs = linear_data["inputVars"] - self._linearized_outputs = linear_data["outputVars"] - self._linearized_states = linear_data["stateVars"] - - return LinearizationResult( - n=linear_data["n"], - m=linear_data["m"], - p=linear_data["p"], - x0=linear_data["x0"], - u0=linear_data["u0"], - A=linear_data["A"], - B=linear_data["B"], - C=linear_data["C"], - D=linear_data["D"], - stateVars=linear_data["stateVars"], - inputVars=linear_data["inputVars"], - outputVars=linear_data["outputVars"], - ) - - def getLinearInputs(self) -> list[str]: - """Get names of input variables of the linearized model.""" - return self._linearized_inputs - def getLinearOutputs(self) -> list[str]: - """Get names of output variables of the linearized model.""" - return self._linearized_outputs - - def getLinearStates(self) -> list[str]: - """Get names of state variables of the linearized model.""" - return self._linearized_states +class ModelicaSystem(ModelicaSystemOMC): + """ + Compatibility class. + """ -class ModelicaSystemDoE: +class ModelicaDoEABC(metaclass=abc.ABCMeta): """ - Class to run DoEs based on a (Open)Modelica model using ModelicaSystem + Base class to run DoEs based on a (Open)Modelica model using ModelicaSystem Example ------- @@ -2108,7 +2149,7 @@ def run_doe(): resdir = mypath / 'DoE' resdir.mkdir(exist_ok=True) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_name="M", model_file=model.as_posix(), @@ -2142,7 +2183,7 @@ def run_doe(): def __init__( self, # ModelicaSystem definition to use - mod: ModelicaSystem, + mod: ModelicaSystemABC, # simulation specific input # TODO: add more settings (simulation options, input options, ...) simargs: Optional[dict[str, Optional[str | dict[str, str] | numbers.Number]]] = None, @@ -2155,7 +2196,7 @@ def __init__( ModelicaSystem.simulate(). Additionally, the path to store the result files is needed (= resultpath) as well as a list of parameters to vary for the Doe (= parameters). All possible combinations are considered. """ - if not isinstance(mod, ModelicaSystem): + if not isinstance(mod, ModelicaSystemABC): raise ModelicaSystemError("Missing definition of ModelicaSystem!") self._mod = mod @@ -2211,30 +2252,11 @@ def prepare(self) -> int: param_non_structural_combinations = list(itertools.product(*param_non_structure.values())) for idx_pc_structure, pc_structure in enumerate(param_structure_combinations): - - build_dir = self._resultpath / f"DOE_{idx_pc_structure:09d}" - build_dir.mkdir() - self._mod.setWorkDirectory(work_directory=build_dir) - - sim_param_structure = {} - for idx_structure, pk_structure in enumerate(param_structure.keys()): - sim_param_structure[pk_structure] = pc_structure[idx_structure] - - pk_value = pc_structure[idx_structure] - if isinstance(pk_value, str): - pk_value_str = self.get_session().escape_str(pk_value) - expr = f"setParameterValue({self._model_name}, {pk_structure}, \"{pk_value_str}\")" - elif isinstance(pk_value, bool): - pk_value_bool_str = "true" if pk_value else "false" - expr = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value_bool_str});" - else: - expr = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value})" - res = self._mod.sendExpression(expr=expr) - if not res: - raise ModelicaSystemError(f"Cannot set structural parameter {self._model_name}.{pk_structure} " - f"to {pk_value} using {repr(expr)}") - - self._mod.buildModel() + sim_param_structure = self._prepare_structure_parameters( + idx_pc_structure=idx_pc_structure, + pc_structure=pc_structure, + param_structure=param_structure, + ) for idx_non_structural, pk_non_structural in enumerate(param_non_structural_combinations): sim_param_non_structural = {} @@ -2279,6 +2301,17 @@ def prepare(self) -> int: return len(doe_sim) + @abc.abstractmethod + def _prepare_structure_parameters( + self, + idx_pc_structure: int, + pc_structure: Tuple, + param_structure: dict[str, list[str] | list[int] | list[float]], + ) -> dict[str, str | int | float]: + """ + Handle structural parameters. This should be implemented by the derived class + """ + def get_doe_definition(self) -> Optional[dict[str, dict[str, Any]]]: """ Get the defined DoE as a dict, where each key is the result filename and the value is a dict of simulation @@ -2390,65 +2423,157 @@ def worker(worker_id, task_queue): return doe_def_total == doe_def_done + +class ModelicaDoEOMC(ModelicaDoEABC): + """ + Class to run DoEs based on a (Open)Modelica model using ModelicaSystemOMC + + The example is the same as defined for ModelicaDoEABC + """ + + def __init__( + self, + # ModelicaSystem definition to use + mod: ModelicaSystemOMC, + # simulation specific input + # TODO: add more settings (simulation options, input options, ...) + simargs: Optional[dict[str, Optional[str | dict[str, str] | numbers.Number]]] = None, + # DoE specific inputs + resultpath: Optional[str | os.PathLike] = None, + parameters: Optional[dict[str, list[str] | list[int] | list[float]]] = None, + ) -> None: + + if not isinstance(mod, ModelicaSystemOMC): + raise ModelicaSystemError(f"Invalid definition for mod: {type(mod)} - expect ModelicaSystemOMC!") + + super().__init__( + mod=mod, + simargs=simargs, + resultpath=resultpath, + parameters=parameters, + ) + + def _prepare_structure_parameters( + self, + idx_pc_structure: int, + pc_structure: Tuple, + param_structure: dict[str, list[str] | list[int] | list[float]], + ) -> dict[str, str | int | float]: + build_dir = self._resultpath / f"DOE_{idx_pc_structure:09d}" + build_dir.mkdir() + self._mod.setWorkDirectory(work_directory=build_dir) + + # need to repeat this check to make the linters happy + if not isinstance(self._mod, ModelicaSystemOMC): + raise ModelicaSystemError(f"Invalid definition for mod: {type(self._mod)} - expect ModelicaSystemOMC!") + + sim_param_structure = {} + for idx_structure, pk_structure in enumerate(param_structure.keys()): + sim_param_structure[pk_structure] = pc_structure[idx_structure] + + pk_value = pc_structure[idx_structure] + if isinstance(pk_value, str): + pk_value_str = self.get_session().escape_str(pk_value) + expr = f"setParameterValue({self._model_name}, {pk_structure}, \"{pk_value_str}\")" + elif isinstance(pk_value, bool): + pk_value_bool_str = "true" if pk_value else "false" + expr = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value_bool_str});" + else: + expr = f"setParameterValue({self._model_name}, {pk_structure}, {pk_value})" + res = self._mod.sendExpression(expr=expr) + if not res: + raise ModelicaSystemError(f"Cannot set structural parameter {self._model_name}.{pk_structure} " + f"to {pk_value} using {repr(expr)}") + + self._mod.buildModel() + + return sim_param_structure + def get_doe_solutions( self, var_list: Optional[list] = None, ) -> Optional[tuple[str] | dict[str, dict[str, np.ndarray]]]: """ - Get all solutions of the DoE run. The following return values are possible: + Wrapper for doe_get_solutions() + """ + if not isinstance(self._mod, ModelicaSystemOMC): + raise ModelicaSystemError(f"Invalid definition for mod: {type(self._mod)} - expect ModelicaSystemOMC!") - * A list of variables if val_list == None + return doe_get_solutions( + msomc=self._mod, + resultpath=self._resultpath, + doe_def=self.get_doe_definition(), + var_list=var_list, + ) - * The Solutions as dict[str, pd.DataFrame] if a value list (== val_list) is defined. - The following code snippet can be used to convert the solution data for each run to a pandas dataframe: +def doe_get_solutions( + msomc: ModelicaSystemOMC, + resultpath: OMCPath, + doe_def: Optional[dict] = None, + var_list: Optional[list] = None, +) -> Optional[tuple[str] | dict[str, dict[str, np.ndarray]]]: + """ + Get all solutions of the DoE run. The following return values are possible: - ``` - import pandas as pd + * A list of variables if val_list == None - doe_sol = doe_mod.get_doe_solutions() - for key in doe_sol: - data = doe_sol[key]['data'] - if data: - doe_sol[key]['df'] = pd.DataFrame.from_dict(data=data) - else: - doe_sol[key]['df'] = None - ``` + * The Solutions as dict[str, pd.DataFrame] if a value list (== val_list) is defined. - """ - if not isinstance(self._doe_def, dict): - return None + The following code snippet can be used to convert the solution data for each run to a pandas dataframe: - if len(self._doe_def) == 0: - raise ModelicaSystemError("No result files available - all simulations did fail?") + ``` + import pandas as pd - sol_dict: dict[str, dict[str, Any]] = {} - for resultfilename in self._doe_def: - resultfile = self._resultpath / resultfilename + doe_sol = doe_mod.get_doe_solutions() + for key in doe_sol: + data = doe_sol[key]['data'] + if data: + doe_sol[key]['df'] = pd.DataFrame.from_dict(data=data) + else: + doe_sol[key]['df'] = None + ``` - sol_dict[resultfilename] = {} + """ + if not isinstance(doe_def, dict): + return None - if not self._doe_def[resultfilename][self.DICT_RESULT_AVAILABLE]: - msg = f"No result file available for {resultfilename}" - logger.warning(msg) - sol_dict[resultfilename]['msg'] = msg - sol_dict[resultfilename]['data'] = {} - continue + if len(doe_def) == 0: + raise ModelicaSystemError("No result files available - all simulations did fail?") - if var_list is None: - var_list_row = list(self._mod.getSolutions(resultfile=resultfile)) - else: - var_list_row = var_list - - try: - sol = self._mod.getSolutions(varList=var_list_row, resultfile=resultfile) - sol_data = {var: sol[idx] for idx, var in enumerate(var_list_row)} - sol_dict[resultfilename]['msg'] = 'Simulation available' - sol_dict[resultfilename]['data'] = sol_data - except ModelicaSystemError as ex: - msg = f"Error reading solution for {resultfilename}: {ex}" - logger.warning(msg) - sol_dict[resultfilename]['msg'] = msg - sol_dict[resultfilename]['data'] = {} - - return sol_dict + sol_dict: dict[str, dict[str, Any]] = {} + for resultfilename in doe_def: + resultfile = resultpath / resultfilename + + sol_dict[resultfilename] = {} + + if not doe_def[resultfilename][ModelicaDoEABC.DICT_RESULT_AVAILABLE]: + msg = f"No result file available for {resultfilename}" + logger.warning(msg) + sol_dict[resultfilename]['msg'] = msg + sol_dict[resultfilename]['data'] = {} + continue + + if var_list is None: + var_list_row = list(msomc.getSolutions(resultfile=resultfile)) + else: + var_list_row = var_list + + try: + sol = msomc.getSolutions(varList=var_list_row, resultfile=resultfile) + sol_data = {var: sol[idx] for idx, var in enumerate(var_list_row)} + sol_dict[resultfilename]['msg'] = 'Simulation available' + sol_dict[resultfilename]['data'] = sol_data + except ModelicaSystemError as ex: + msg = f"Error reading solution for {resultfilename}: {ex}" + logger.warning(msg) + sol_dict[resultfilename]['msg'] = msg + sol_dict[resultfilename]['data'] = {} + + return sol_dict + + +class ModelicaSystemDoE(ModelicaDoEOMC): + """ + Compatibility class. + """ diff --git a/OMPython/__init__.py b/OMPython/__init__.py index 7c199ef3..9f4408d5 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -14,9 +14,13 @@ from OMPython.ModelicaSystem import ( LinearizationResult, ModelicaSystem, + ModelicaSystemOMC, ModelExecutionCmd, ModelicaSystemDoE, + ModelicaDoEOMC, ModelicaSystemError, + + doe_get_solutions, ) from OMPython.OMCSession import ( OMCPath, @@ -43,13 +47,18 @@ 'ModelExecutionException', 'ModelicaSystem', + 'ModelicaSystemOMC', 'ModelExecutionCmd', 'ModelicaSystemDoE', + 'ModelicaDoEOMC', 'ModelicaSystemError', 'OMCPath', 'OMCSession', + + 'doe_get_solutions', + 'OMCSessionCmd', 'OMCSessionDocker', 'OMCSessionDockerContainer', diff --git a/tests/test_FMIExport.py b/tests/test_FMIExport.py index 006d2d17..c7ab038a 100644 --- a/tests/test_FMIExport.py +++ b/tests/test_FMIExport.py @@ -6,7 +6,7 @@ def test_CauerLowPassAnalog(): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_name="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", libraries=["Modelica"], @@ -20,7 +20,7 @@ def test_CauerLowPassAnalog(): def test_DrumBoiler(): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_name="Modelica.Fluid.Examples.DrumBoiler.DrumBoiler", libraries=["Modelica"], diff --git a/tests/test_FMIImport.py b/tests/test_FMIImport.py index cb43e0ae..bb3a1201 100644 --- a/tests/test_FMIImport.py +++ b/tests/test_FMIImport.py @@ -22,7 +22,7 @@ def model_firstorder(tmp_path): def test_FMIImport(model_firstorder): # create model & simulate it - mod1 = OMPython.ModelicaSystem() + mod1 = OMPython.ModelicaSystemOMC() mod1.model( model_file=model_firstorder, model_name="M", @@ -35,7 +35,7 @@ def test_FMIImport(model_firstorder): # import FMU & check & simulate # TODO: why is '--allowNonStandardModelica=reinitInAlgorithms' needed? any example without this possible? - mod2 = OMPython.ModelicaSystem(command_line_options=['--allowNonStandardModelica=reinitInAlgorithms']) + mod2 = OMPython.ModelicaSystemOMC(command_line_options=['--allowNonStandardModelica=reinitInAlgorithms']) mo = mod2.convertFmu2Mo(fmu=fmu) assert os.path.exists(mo) diff --git a/tests/test_ModelicaSystemDoE.py b/tests/test_ModelicaDoEOMC.py similarity index 85% rename from tests/test_ModelicaSystemDoE.py rename to tests/test_ModelicaDoEOMC.py index 86c43ce7..143932fc 100644 --- a/tests/test_ModelicaSystemDoE.py +++ b/tests/test_ModelicaDoEOMC.py @@ -51,34 +51,34 @@ def param_doe() -> dict[str, list]: return param -def test_ModelicaSystemDoE_local(tmp_path, model_doe, param_doe): +def test_ModelicaDoEOMC_local(tmp_path, model_doe, param_doe): tmpdir = tmp_path / 'DoE' tmpdir.mkdir(exist_ok=True) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_doe, model_name="M", ) - doe_mod = OMPython.ModelicaSystemDoE( + doe_mod = OMPython.ModelicaDoEOMC( mod=mod, parameters=param_doe, resultpath=tmpdir, simargs={"override": {'stopTime': '1.0'}}, ) - _run_ModelicaSystemDoe(doe_mod=doe_mod) + _run_ModelicaDoEOMC(doe_mod=doe_mod) @skip_on_windows @skip_python_older_312 -def test_ModelicaSystemDoE_docker(tmp_path, model_doe, param_doe): +def test_ModelicaDoEOMC_docker(tmp_path, model_doe, param_doe): omcs = OMPython.OMCSessionDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") omversion = omcs.sendExpression("getVersion()") assert isinstance(omversion, str) and omversion.startswith("OpenModelica") - mod = OMPython.ModelicaSystem( + mod = OMPython.ModelicaSystemOMC( session=omcs, ) mod.model( @@ -86,23 +86,23 @@ def test_ModelicaSystemDoE_docker(tmp_path, model_doe, param_doe): model_name="M", ) - doe_mod = OMPython.ModelicaSystemDoE( + doe_mod = OMPython.ModelicaDoEOMC( mod=mod, parameters=param_doe, simargs={"override": {'stopTime': '1.0'}}, ) - _run_ModelicaSystemDoe(doe_mod=doe_mod) + _run_ModelicaDoEOMC(doe_mod=doe_mod) @pytest.mark.skip(reason="Not able to run WSL on github") @skip_python_older_312 -def test_ModelicaSystemDoE_WSL(tmp_path, model_doe, param_doe): +def test_ModelicaDoEOMC_WSL(tmp_path, model_doe, param_doe): omcs = OMPython.OMCSessionWSL() omversion = omcs.sendExpression("getVersion()") assert isinstance(omversion, str) and omversion.startswith("OpenModelica") - mod = OMPython.ModelicaSystem( + mod = OMPython.ModelicaSystemOMC( session=omcs, ) mod.model( @@ -110,16 +110,16 @@ def test_ModelicaSystemDoE_WSL(tmp_path, model_doe, param_doe): model_name="M", ) - doe_mod = OMPython.ModelicaSystemDoE( + doe_mod = OMPython.ModelicaDoEOMC( mod=mod, parameters=param_doe, simargs={"override": {'stopTime': '1.0'}}, ) - _run_ModelicaSystemDoe(doe_mod=doe_mod) + _run_ModelicaDoEOMC(doe_mod=doe_mod) -def _run_ModelicaSystemDoe(doe_mod): +def _run_ModelicaDoEOMC(doe_mod): doe_count = doe_mod.prepare() assert doe_count == 16 diff --git a/tests/test_ModelicaSystemCmd.py b/tests/test_ModelicaSystemCmd.py index 6fa2658f..3d35376b 100644 --- a/tests/test_ModelicaSystemCmd.py +++ b/tests/test_ModelicaSystemCmd.py @@ -18,7 +18,7 @@ def model_firstorder(tmp_path): @pytest.fixture def mscmd_firstorder(model_firstorder): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_firstorder, model_name="M", diff --git a/tests/test_ModelicaSystem.py b/tests/test_ModelicaSystemOMC.py similarity index 96% rename from tests/test_ModelicaSystem.py rename to tests/test_ModelicaSystemOMC.py index 9bf0a7b9..8dd17ef0 100644 --- a/tests/test_ModelicaSystem.py +++ b/tests/test_ModelicaSystemOMC.py @@ -40,7 +40,7 @@ def model_firstorder(tmp_path, model_firstorder_content): def test_ModelicaSystem_loop(model_firstorder): def worker(): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_firstorder, model_name="M", @@ -56,7 +56,9 @@ def test_setParameters(): omcs = OMPython.OMCSessionLocal() model_path_str = omcs.sendExpression("getInstallationDirectoryPath()") + "/share/doc/omc/testmodels" model_path = omcs.omcpath(model_path_str) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC( + session=omcs, + ) mod.model( model_file=model_path / "BouncingBall.mo", model_name="BouncingBall", @@ -91,7 +93,9 @@ def test_setSimulationOptions(): omcs = OMPython.OMCSessionLocal() model_path_str = omcs.sendExpression("getInstallationDirectoryPath()") + "/share/doc/omc/testmodels" model_path = omcs.omcpath(model_path_str) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC( + session=omcs, + ) mod.model( model_file=model_path / "BouncingBall.mo", model_name="BouncingBall", @@ -128,7 +132,7 @@ def test_relative_path(model_firstorder): model_relative = str(model_file) assert "/" not in model_relative - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_relative, model_name="M", @@ -141,7 +145,7 @@ def test_relative_path(model_firstorder): def test_customBuildDirectory(tmp_path, model_firstorder): tmpdir = tmp_path / "tmpdir1" tmpdir.mkdir() - mod = OMPython.ModelicaSystem(work_directory=tmpdir) + mod = OMPython.ModelicaSystemOMC(work_directory=tmpdir) mod.model( model_file=model_firstorder, model_name="M", @@ -157,7 +161,7 @@ def test_customBuildDirectory(tmp_path, model_firstorder): @skip_python_older_312 def test_getSolutions_docker(model_firstorder): omcs = OMPython.OMCSessionDocker(docker="openmodelica/openmodelica:v1.25.0-minimal") - mod = OMPython.ModelicaSystem( + mod = OMPython.ModelicaSystemOMC( session=omcs, ) mod.model( @@ -169,7 +173,7 @@ def test_getSolutions_docker(model_firstorder): def test_getSolutions(model_firstorder): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_firstorder, model_name="M", @@ -217,7 +221,7 @@ def test_getters(tmp_path): y = der(x); end M_getters; """) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_file, model_name="M_getters", @@ -426,7 +430,7 @@ def test_simulate_inputs(tmp_path): y = x; end M_input; """) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_file, model_name="M_input", diff --git a/tests/test_OMSessionCmd.py b/tests/test_OMSessionCmd.py index d3997ecf..7dbb9705 100644 --- a/tests/test_OMSessionCmd.py +++ b/tests/test_OMSessionCmd.py @@ -8,7 +8,7 @@ def test_isPackage(): def test_isPackage2(): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_name="Modelica.Electrical.Analog.Examples.CauerLowPassAnalog", libraries=["Modelica"], diff --git a/tests/test_linearization.py b/tests/test_linearization.py index c61462bb..7070a45b 100644 --- a/tests/test_linearization.py +++ b/tests/test_linearization.py @@ -25,7 +25,7 @@ def model_linearTest(tmp_path): def test_example(model_linearTest): - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_linearTest, model_name="linearTest", @@ -60,7 +60,7 @@ def test_getters(tmp_path): y2 = phi + u1; end Pendulum; """) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_file, model_name="Pendulum", diff --git a/tests/test_optimization.py b/tests/test_optimization.py index d7494281..823ba1e3 100644 --- a/tests/test_optimization.py +++ b/tests/test_optimization.py @@ -34,7 +34,7 @@ def test_optimization_example(tmp_path): end BangBang2021; """) - mod = OMPython.ModelicaSystem() + mod = OMPython.ModelicaSystemOMC() mod.model( model_file=model_file, model_name="BangBang2021",