diff --git a/IoTuring/ClassManager/ClassManager.py b/IoTuring/ClassManager/ClassManager.py index ea30c2763..9d611f44f 100644 --- a/IoTuring/ClassManager/ClassManager.py +++ b/IoTuring/ClassManager/ClassManager.py @@ -1,56 +1,97 @@ from __future__ import annotations -import os -from pathlib import Path -from os import path import importlib.util import importlib.machinery import sys import inspect +from pathlib import Path + from IoTuring.Logger.LogObject import LogObject -# from IoTuring.ClassManager import consts -# This is a parent class +class ClassManager(LogObject): + """Base class for ClassManagers + + This class is used to find and load classes without importing them + The important this is that the class is inside a folder that exactly the same name of the Class and of the file (obviously not talking about extensions) + """ -# Implement subclasses in this way: + # Set up these class variables in subclasses: + classesRelativePath = None # Change in subclasses -# def __init__(self): -# ClassManager.__init__(self) -# self.baseClass = Entity : Select the class to find -# self.GetModulesFilename(consts.ENTITIES_PATH) : Select path where it should look for classes and add all classes to found list + def __init__(self) -> None: -# This class is used to find and load classes without importing them -# The important this is that the class is inside a folder that exactly the same name of the Class and of the file (obviously not talking about extensions) + classmanager_file_path = sys.modules[self.__class__.__module__].__file__ + if not classmanager_file_path: + raise Exception("Error getting path: " + + str(classmanager_file_path)) + self.rootPath = Path(classmanager_file_path).parents[1] + + # Store loaded classes here: + self.loadedClasses = [] + + # Collect paths + self.moduleFilePaths = self.GetModuleFilePaths() + + def GetModuleFilePaths(self) -> list[Path]: + """Get the paths of of python files of this class""" + + if not self.classesRelativePath: + raise Exception("Path to deployments not defined") + + classesRootPath = self.rootPath.joinpath(self.classesRelativePath) + + if not classesRootPath.exists: + raise Exception(f"Path does not exist: {classesRootPath}") + + self.Log(self.LOG_DEVELOPMENT, + f'Looking for python files in "{classesRootPath}"...') + + python_files = classesRootPath.rglob("*.py") + + # TO check if a py files is in a folder !!!! with the same name !!! (same without extension) + filepaths = [f for f in python_files if f.stem == f.parent.stem] + + self.Log(self.LOG_DEVELOPMENT, + f"Found {str(len(filepaths))} modules files") + + return filepaths + + def GetClassFromName(self, wantedName: str) -> type | None: + """Get the class of given name, and load it + + Args: + wantedName (str): The name to look for + + Returns: + type | None: The class if found, None if not found + """ + + # Check from already loaded classes: + module_class = next( + (m for m in self.loadedClasses if m.__name__ == wantedName), None) + + if module_class: + return module_class + + modulePath = next( + (m for m in self.moduleFilePaths if m.stem == wantedName), None) + + if modulePath: + + loadedModule = self.LoadModule(modulePath) + loadedClass = self.GetClassFromModule(loadedModule) + self.loadedClasses.append(loadedClass) + return loadedClass -class ClassManager(LogObject): - def __init__(self): - self.modulesFilename = [] - module_path = sys.modules[self.__class__.__module__].__file__ - if not module_path: - raise Exception("Error getting path: " + str(module_path)) else: - self.mainPath = path.dirname(path.abspath(module_path)) - # THIS MUST BE IMPLEMENTED IN SUBCLASSES, IS THE CLASS I WANT TO SEARCH !!!! - self.baseClass = None - - def GetClassFromName(self, wantedName) -> type | None: - # From name, load the correct module and extract the class - for module in self.modulesFilename: # Search the module file - moduleName = self.ModuleNameFromPath(module) - # Check if the module name matches the given name - if wantedName == moduleName: - # Load the module - loadedModule = self.LoadModule(module) - # Now get the class - return self.GetClassFromModule(loadedModule) - return None - - def LoadModule(self, path): # Get module and load it from the path + return None + + def LoadModule(self, module_path: Path): # Get module and load it from the path try: loader = importlib.machinery.SourceFileLoader( - self.ModuleNameFromPath(path), path) + module_path.stem, str(module_path)) spec = importlib.util.spec_from_loader(loader.name, loader) if not spec: @@ -58,50 +99,25 @@ def LoadModule(self, path): # Get module and load it from the path module = importlib.util.module_from_spec(spec) loader.exec_module(module) - moduleName = os.path.split(path)[1][:-3] - sys.modules[moduleName] = module + sys.modules[module_path.stem] = module return module except Exception as e: - self.Log(self.LOG_ERROR, "Error while loading module " + - path + ": " + str(e)) + self.Log(self.LOG_ERROR, + f"Error while loading module {module_path.stem}: {str(e)}") # From the module passed, I search for a Class that has className=moduleName def GetClassFromModule(self, module): for name, obj in inspect.getmembers(module): if inspect.isclass(obj): - if(name == module.__name__): + if (name == module.__name__): return obj raise Exception(f"No class found: {module.__name__}") - # List files in the _path directory and get only files in subfolders - def GetModulesFilename(self, _path): - classesRootPath = path.join(self.mainPath, _path) - if os.path.exists(classesRootPath): - self.Log(self.LOG_DEVELOPMENT, - "Looking for python files in \"" + _path + os.sep + "\"...") - result = list(Path(classesRootPath).rglob("*.py")) - entities = [] - for file in result: - filename = str(file) - # TO check if a py files is in a folder !!!! with the same name !!! (same without extension) - pathList = filename.split(os.sep) - if len(pathList) >= 2: - if pathList[len(pathList)-1][:-3] == pathList[len(pathList)-2]: - entities.append(filename) - - self.modulesFilename = self.modulesFilename + entities - self.Log(self.LOG_DEVELOPMENT, "Found " + - str(len(entities)) + " modules files") - - def ModuleNameFromPath(self, path): - classname = os.path.split(path) - return classname[1][:-3] - - def ListAvailableClassesNames(self) -> list: - res = [] - for py in self.modulesFilename: - res.append(path.basename(py).split(".py")[0]) - return res - def ListAvailableClasses(self) -> list: - return [self.GetClassFromName(n) for n in self.ListAvailableClassesNames()] + """Get all classes of this ClassManager + + Returns: + list: The list of classes + """ + + return [self.GetClassFromName(f.stem) for f in self.moduleFilePaths] diff --git a/IoTuring/ClassManager/EntityClassManager.py b/IoTuring/ClassManager/EntityClassManager.py index 5f6dee5e6..5c2328136 100644 --- a/IoTuring/ClassManager/EntityClassManager.py +++ b/IoTuring/ClassManager/EntityClassManager.py @@ -1,12 +1,7 @@ from IoTuring.ClassManager.ClassManager import ClassManager from IoTuring.ClassManager import consts -from IoTuring.Entity.Entity import Entity -# Class to load Entities from the Entitties dir and get them from name +# Class to load Entities from the Entitties dir class EntityClassManager(ClassManager): - def __init__(self): - ClassManager.__init__(self) - self.baseClass = Entity - self.GetModulesFilename(consts.ENTITIES_PATH) - # self.GetModulesFilename(consts.CUSTOM_ENTITIES_PATH) # TODO Decide if I'll use customs + classesRelativePath = consts.ENTITIES_PATH diff --git a/IoTuring/ClassManager/SettingsClassManager.py b/IoTuring/ClassManager/SettingsClassManager.py new file mode 100644 index 000000000..6e3064436 --- /dev/null +++ b/IoTuring/ClassManager/SettingsClassManager.py @@ -0,0 +1,10 @@ +from IoTuring.ClassManager.ClassManager import ClassManager +from IoTuring.ClassManager import consts + + +# Class to load Entities from the Entitties dir +class SettingsClassManager(ClassManager): + + classesRelativePath = consts.SETTINGS_PATH + + diff --git a/IoTuring/ClassManager/WarehouseClassManager.py b/IoTuring/ClassManager/WarehouseClassManager.py index 0bedb2ae6..ee295dd6c 100644 --- a/IoTuring/ClassManager/WarehouseClassManager.py +++ b/IoTuring/ClassManager/WarehouseClassManager.py @@ -1,11 +1,9 @@ from IoTuring.ClassManager.ClassManager import ClassManager from IoTuring.ClassManager import consts -from IoTuring.Warehouse.Warehouse import Warehouse -# Class to load Entities from the Entitties dir and get them from name +# Class to load Warehouses from the Warehouses dir + class WarehouseClassManager(ClassManager): - def __init__(self): - ClassManager.__init__(self) - self.baseClass = Warehouse - self.GetModulesFilename(consts.WAREHOUSES_PATH) + + classesRelativePath = consts.WAREHOUSES_PATH diff --git a/IoTuring/ClassManager/consts.py b/IoTuring/ClassManager/consts.py index 5a424e024..573a08771 100644 --- a/IoTuring/ClassManager/consts.py +++ b/IoTuring/ClassManager/consts.py @@ -1,2 +1,3 @@ -ENTITIES_PATH = "../Entity/Deployments/" -WAREHOUSES_PATH = "../Warehouse/Deployments/" +ENTITIES_PATH = "Entity/Deployments" +WAREHOUSES_PATH = "Warehouse/Deployments" +SETTINGS_PATH = "Settings/Deployments" diff --git a/IoTuring/Configurator/Configuration.py b/IoTuring/Configurator/Configuration.py new file mode 100644 index 000000000..f49f3d8ba --- /dev/null +++ b/IoTuring/Configurator/Configuration.py @@ -0,0 +1,240 @@ +from __future__ import annotations + +# config categories: +KEY_ACTIVE_ENTITIES = "active_entities" +KEY_ACTIVE_WAREHOUSES = "active_warehouses" +KEY_SETTINGS = "settings" + +CONFIG_CATEGORY_NAME = { + KEY_ACTIVE_ENTITIES: "Entity", + KEY_ACTIVE_WAREHOUSES: "Warehouse", + KEY_SETTINGS: "Setting" +} + + +BLANK_CONFIGURATION = { + KEY_ACTIVE_ENTITIES: [{"type": "AppInfo"}], + KEY_ACTIVE_WAREHOUSES: [], + KEY_SETTINGS: [] +} + +KEY_ENTITY_TAG = "tag" +KEY_ENTITY_TYPE = "type" + + +class FullConfiguration: + """Full configuration of all classes""" + + def __init__(self, config_dict: dict = BLANK_CONFIGURATION) -> None: + + self.configs = [] + + for config_category in config_dict: + for single_config_dict in config_dict[config_category]: + self.configs.append(SingleConfiguration( + config_category, single_config_dict)) + + def GetConfigsInCategory(self, config_category: str) -> list["SingleConfiguration"]: + """Return all configurations in a category + + Args: + config_category (str): KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES or KEY_SETTINGS + + Returns: + list: Configurations in the category. Empty list if none found. + """ + return [config for config in self.configs if config.config_category == config_category] + + def GetAllConfigsOfType(self, config_type: str, config_category: str = "") -> list["SingleConfiguration"]: + """Return all configs with the given type, from the given category + + Args: + config_type (str): The type of config to return + config_category (str, optional): Optional filter for the category. Defaults to no filter. + + Returns: + list: Configurations of the given type. Empty list if none found. + """ + if config_category: + config_list = self.GetConfigsInCategory(config_category) + else: + config_list = self.configs + + return [config for config in config_list if config.GetType() == config_type] + + def LoadSingleConfig(self, config_type: str, config_category: str) -> SingleConfiguration: + """ Return the only configuration of the given type. Raises exception if multiple found. + Add the config if not found. + + Args: + config_type (str): The type of config to return + config_category (str): KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES or KEY_SETTINGS + + Raises: + Exception: Multiple config found + + Returns: + SingleConfiguration: The config + """ + configs = self.GetAllConfigsOfType(config_type=config_type) + if not configs: + return self.AddConfiguration( + config_category, {}, config_type) + + if len(configs) > 1: + raise Exception("Multiple configs found!") + + return configs[0] + + def RemoveActiveConfiguration(self, config: "SingleConfiguration") -> None: + """Remove a configuration from the list of active configurations""" + if config in self.configs: + self.configs.remove(config) + else: + raise ValueError("Configuration not found") + + def AddConfiguration(self, config_category: str, single_config_dict: dict, config_type: str = "") -> "SingleConfiguration": + """Add a new configuration to the list of active configurations + + Args: + config_category (str): KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES or KEY_SETTINGS + single_config_dict (dict): all settings as a dict + config_type (str, optional): The type of the configuration, if not included in the dict. + + Raises: + ValueError: Config type not defined in the dict nor in the function call + + Returns: + SingleConfiguration: The new single config + """ + + if KEY_ENTITY_TYPE not in single_config_dict: + if config_type: + single_config_dict[KEY_ENTITY_TYPE] = config_type + else: + raise ValueError("Configuration type not specified") + + single_config = SingleConfiguration( + config_category, single_config_dict) + + self.configs.append(single_config) + + return single_config + + def ToDict(self) -> dict: + """Full configuration as a dict, for saving to file """ + config_dict = {} + for config_category in BLANK_CONFIGURATION: + config_dict[config_category] = [] + for single_config in self.GetConfigsInCategory(config_category): + config_dict[config_category].append(single_config.ToDict()) + + return config_dict + + +class SingleConfiguration: + """Single configuration of an entity or warehouse or setting""" + + config_category: str + type: str + configurations: dict + + def __init__(self, config_category: str = "", config_dict: dict = {}) -> None: + """Create a new SingleConfiguration + + Args: + config_category (str): KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES or KEY_SETTINGS + config_dict (dict): All options as in config file + Both optional, it will create an empty configuration without them + """ + self.config_category = config_category + + # self.type: + type_name = "" + if KEY_ENTITY_TYPE in config_dict: + type_name = config_dict.pop(KEY_ENTITY_TYPE) + + setattr(self, KEY_ENTITY_TYPE, type_name) + + self.configurations = config_dict + + def GetType(self) -> str: + """ Get the type name of entity""" + return getattr(self, KEY_ENTITY_TYPE) + + def GetTag(self) -> str: + """ Get the tag of entity""" + if KEY_ENTITY_TAG in self.configurations: + return self.configurations[KEY_ENTITY_TAG] + else: + return "" + + def GetLabel(self) -> str: + """ Get the type name of this configuration, add tag if multi""" + + label = self.GetType() + + if self.GetTag(): + label += f" with tag {self.GetTag()}" + + return label + + def GetLongName(self) -> str: + """ Add category name to the end """ + return str(self.GetType() + self.GetCategoryName()) + + def GetCategoryName(self) -> str: + """ Get human readable singular name of the category of this configuration""" + return CONFIG_CATEGORY_NAME[self.config_category] + + def GetConfigValue(self, config_key: str): + """Get the value of a config key + + Args: + config_key (str): The key of the configuration + + Raises: + ValueError: If the key is not found + + Returns: + The value of the key. + """ + if config_key in self.configurations: + return self.configurations[config_key] + else: + raise ValueError("Config key not set") + + def UpdateConfigValue(self, config_key: str, config_value: str) -> None: + """Update the value of the configuration. Overwrites existing value + + Args: + config_key (str): The key of the configuration + config_value (str): The preferred value + """ + self.configurations[config_key] = config_value + + def UpdateConfigDict(self, config_dict: dict) -> None: + """Update all configurations with a dict. Overwrites existing values + + Args: + config_dict (dict): The dict of configurations + """ + self.configurations.update(config_dict) + + def HasConfigKey(self, config_key: str) -> bool: + """Check if key has a value + + Args: + config_key (str): The key of the configuration + + Returns: + bool: If it has a value + """ + return bool(config_key in self.configurations) + + def ToDict(self, include_type: bool = True) -> dict: + """Full configuration as a dict, as it would be saved to a file """ + full_dict = self.configurations + if include_type: + full_dict[KEY_ENTITY_TYPE] = getattr(self, KEY_ENTITY_TYPE) + return full_dict diff --git a/IoTuring/Configurator/Configurator.py b/IoTuring/Configurator/Configurator.py index 136f1aa83..8300dd509 100644 --- a/IoTuring/Configurator/Configurator.py +++ b/IoTuring/Configurator/Configurator.py @@ -3,11 +3,16 @@ import shutil import sys +from IoTuring.Configurator.MenuPreset import QuestionPreset +from IoTuring.Configurator.Configuration import FullConfiguration, SingleConfiguration, KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES, KEY_SETTINGS, CONFIG_CATEGORY_NAME + from IoTuring.Logger.LogObject import LogObject +from IoTuring.Logger.Logger import Logger from IoTuring.Exceptions.Exceptions import UserCancelledException from IoTuring.ClassManager.EntityClassManager import EntityClassManager from IoTuring.ClassManager.WarehouseClassManager import WarehouseClassManager +from IoTuring.ClassManager.SettingsClassManager import SettingsClassManager from IoTuring.Configurator import ConfiguratorIO from IoTuring.Configurator import messages @@ -18,17 +23,6 @@ from InquirerPy.separator import Separator -BLANK_CONFIGURATION = {'active_entities': [ - {"type": "AppInfo"}], 'active_warehouses': []} - -KEY_ACTIVE_ENTITIES = "active_entities" -KEY_ACTIVE_WAREHOUSES = "active_warehouses" - -KEY_WAREHOUSE_TYPE = "type" - -KEY_ENTITY_TYPE = "type" -KEY_ENTITY_TAG = "tag" - CHOICE_GO_BACK = "< Go back" @@ -39,11 +33,14 @@ def __init__(self) -> None: self.pinned_lines = 1 self.configuratorIO = ConfiguratorIO.ConfiguratorIO() - self.config = self.LoadConfigurations() - def GetConfigurations(self) -> dict: - """ Return a copy of the configurations dict""" - return self.config.copy() # Safe return + # Load configuration from file, create Configuration object: + config_dict_from_file = self.configuratorIO.readConfigurations() + if config_dict_from_file: + self.config = FullConfiguration(config_dict_from_file) + else: + # Create blank config: + self.config = FullConfiguration() def CheckFile(self) -> None: """ Make sure config file exists or can be created """ @@ -97,6 +94,7 @@ def Menu(self) -> None: mainMenuChoices = [ {"name": "Manage entities", "value": self.ManageEntities}, {"name": "Manage warehouses", "value": self.ManageWarehouses}, + {"name": "Settings", "value": self.ManageSettings}, {"name": "Start IoTuring", "value": self.WriteConfigurations}, {"name": "Help", "value": self.DisplayHelp}, {"name": "Quit", "value": self.Quit}, @@ -115,13 +113,13 @@ def ManageEntities(self) -> None: manageEntitiesChoices = [] - for entityConfig in self.config[KEY_ACTIVE_ENTITIES]: + for entityConfig in self.config.GetConfigsInCategory(KEY_ACTIVE_ENTITIES): manageEntitiesChoices.append( - {"name": self.GetEntityLabel(entityConfig), + {"name": entityConfig.GetLabel(), "value": entityConfig} ) - manageEntitiesChoices.sort(key=lambda d: d['name']) + manageEntitiesChoices.sort(key=lambda d: d['name']) manageEntitiesChoices = [ CHOICE_GO_BACK, @@ -150,17 +148,15 @@ def ManageWarehouses(self) -> None: manageWhChoices = [] - availableWarehouses = wcm.ListAvailableClassesNames() - for whName in availableWarehouses: - short_wh_name = whName.replace("Warehouse", "") + availableWarehouses = wcm.ListAvailableClasses() + for whClass in availableWarehouses: - enabled_sign = " " - if self.IsWarehouseActive(short_wh_name): - enabled_sign = "X" + enabled_sign = "X" \ + if self.config.GetAllConfigsOfType(whClass.NAME) else " " manageWhChoices.append( - {"name": f"[{enabled_sign}] - {short_wh_name}", - "value": short_wh_name}) + {"name": f"[{enabled_sign}] - {whClass.NAME}", + "value": whClass}) choice = self.DisplayMenu( choices=manageWhChoices, @@ -170,9 +166,40 @@ def ManageWarehouses(self) -> None: if choice == CHOICE_GO_BACK: self.Menu() else: - self.ManageSingleWarehouse(choice, wcm) + self.ManageSingleWarehouse(choice) + + def ManageSettings(self) -> None: + """ UI for App and Log Settings """ + + scm = SettingsClassManager() + + choices = [] + + availableSettings = scm.ListAvailableClasses() + for sClass in availableSettings: + + choices.append( + {"name": sClass.NAME + " Settings", + "value": sClass}) + + choice = self.DisplayMenu( + choices=choices, + message=f"Select settings to edit" + ) + + if choice == CHOICE_GO_BACK: + self.Menu() + + else: + settings_config = self.config.LoadSingleConfig( + choice.NAME, KEY_SETTINGS) + + # Edit: + self.EditActiveClass(choice, settings_config) + self.ManageSettings() def DisplayHelp(self) -> None: + """" Display the help message, and load the main menu """ self.DisplayMessage(messages.HELP_MESSAGE) # Help message is too long: self.pinned_lines = 1 @@ -183,22 +210,16 @@ def Quit(self) -> None: self.WriteConfigurations() sys.exit(0) - def LoadConfigurations(self) -> dict: - """ Reads the configuration file and returns configuration dictionary. - If not available, returns the blank configuration """ - read_config = self.configuratorIO.readConfigurations() - if read_config is None: - read_config = BLANK_CONFIGURATION - return read_config - def WriteConfigurations(self) -> None: """ Save to configurations file """ - self.configuratorIO.writeConfigurations(self.config) + self.configuratorIO.writeConfigurations(self.config.ToDict()) - def ManageSingleWarehouse(self, warehouseName, wcm: WarehouseClassManager): + def ManageSingleWarehouse(self, whClass): """UI for single Warehouse settings""" - if self.IsWarehouseActive(warehouseName): + whConfigList = self.config.GetAllConfigsOfType(whClass.NAME) + + if whConfigList: manageWhChoices = [ {"name": "Edit the warehouse settings", "value": "Edit"}, {"name": "Remove the warehouse", "value": "Remove"} @@ -209,25 +230,25 @@ def ManageSingleWarehouse(self, warehouseName, wcm: WarehouseClassManager): choice = self.DisplayMenu( choices=manageWhChoices, - message=f"Manage warehouse {warehouseName}" + message=f"Manage warehouse {whClass.NAME}" ) if choice == CHOICE_GO_BACK: self.ManageWarehouses() - elif choice == "Edit": - self.EditActiveWarehouse(warehouseName, wcm) elif choice == "Add": - self.AddActiveWarehouse(warehouseName, wcm) - elif choice == "Remove": - confirm = inquirer.confirm(message="Are you sure?").execute() - - if confirm: - self.RemoveActiveWarehouse(warehouseName) - else: + self.AddActiveClass(whClass, KEY_ACTIVE_WAREHOUSES) + self.ManageWarehouses() + elif whConfigList: + whConfig = whConfigList[0] + if choice == "Edit": + self.EditActiveClass(whClass, whConfig) + self.ManageSingleWarehouse(whClass) + elif choice == "Remove": + self.RemoveActiveConfiguration(whConfig) self.ManageWarehouses() - def ManageSingleEntity(self, entityConfig, ecm: EntityClassManager): - """ UI to manage an active warehouse (edit config/remove) """ + def ManageSingleEntity(self, entityConfig: SingleConfiguration, ecm: EntityClassManager): + """ UI to manage an active entity (edit config/remove) """ manageEntityChoices = [ {"name": "Edit the entity settings", "value": "Edit"}, @@ -236,47 +257,42 @@ def ManageSingleEntity(self, entityConfig, ecm: EntityClassManager): choice = self.DisplayMenu( choices=manageEntityChoices, - message=f"Manage entity {self.GetEntityLabel(entityConfig)}" + message=f"Manage entity {entityConfig.GetLabel()}" ) if choice == CHOICE_GO_BACK: self.ManageEntities() elif choice == "Edit": - self.EditActiveEntity(entityConfig, ecm) # type: ignore - elif choice == "Remove": - confirm = inquirer.confirm(message="Are you sure?").execute() + entityClass = ecm.GetClassFromName(entityConfig.GetType()) + self.EditActiveClass( + entityClass, entityConfig) + self.ManageSingleEntity(entityConfig, ecm) - if confirm: - self.RemoveActiveEntity(entityConfig) - else: - self.ManageEntities() + elif choice == "Remove": + self.RemoveActiveConfiguration(entityConfig) + self.ManageEntities() def SelectNewEntity(self, ecm: EntityClassManager): """ UI to add a new Entity """ - # entity classnames without unsupported entities: - entityList = [ - e.NAME for e in ecm.ListAvailableClasses() if e.SystemSupported()] - - # Now I remove the entities that are active and that do not allow multi instances - for activeEntity in self.config[KEY_ACTIVE_ENTITIES]: - entityClass = ecm.GetClassFromName( - activeEntity[KEY_ENTITY_TYPE]) + # entity classes without unsupported entities: + entityClasses = [ + e for e in ecm.ListAvailableClasses() if e.SystemSupported()] - # Malformed entities, from different versions in config, just skip: - if entityClass is None: - continue + entityChoices = [] - # If the Allow Multi Instance option was changed: - if activeEntity[KEY_ENTITY_TYPE] not in entityList: - continue + for entityClass in entityClasses: + if self.config.GetAllConfigsOfType(entityClass.NAME): + if not entityClass.AllowMultiInstance(): + continue + entityChoices.append( + {"name": entityClass.NAME, "value": entityClass} + ) - # not multi, remove: - if not entityClass.AllowMultiInstance(): # type: ignore - entityList.remove(activeEntity[KEY_ENTITY_TYPE]) + entityChoices.sort(key=lambda d: d['name']) choice = self.DisplayMenu( - choices=sorted(entityList), + choices=entityChoices, message="Available entities:", instruction="if you don't see the entity, it may be already active and not accept another activation, or not supported by your system" ) @@ -284,7 +300,8 @@ def SelectNewEntity(self, ecm: EntityClassManager): if choice == CHOICE_GO_BACK: self.ManageEntities() else: - self.AddActiveEntity(choice, ecm) + self.AddActiveClass(choice, KEY_ACTIVE_ENTITIES) + self.ManageEntities() def ShowUnsupportedEntities(self, ecm: EntityClassManager): """ UI to show unsupported entities """ @@ -306,138 +323,118 @@ def ShowUnsupportedEntities(self, ecm: EntityClassManager): self.ManageEntities() - def AddActiveEntity(self, entityName, ecm: EntityClassManager): - """ From entity name, get its class and retrieve the configuration preset, then add to configuration dict """ - entityClass = ecm.GetClassFromName(entityName) - try: - if not entityClass: - raise Exception(f"Entityclass not found: {entityName}") + def RemoveActiveConfiguration(self, singleConfig: SingleConfiguration) -> None: + """ Remove configuration (wh or entity) """ + confirm = inquirer.confirm(message="Are you sure?").execute() + if confirm: + self.config.RemoveActiveConfiguration(singleConfig) + self.DisplayMessage( + f"{singleConfig.GetCategoryName()} removed: {singleConfig.GetLabel()}") - preset = entityClass.ConfigurationPreset() + def AddActiveClass(self, ioClass, config_category: str) -> None: + """Add a wh or Entity to configuration. + + Args: + ioClass: the WH or Entity class + config_category (str): KEY_ACTIVE_ENTITIES or KEY_ACTIVE_WAREHOUSES + """ + try: + preset = ioClass.ConfigurationPreset() if preset.HasQuestions(): - # Ask for Tag if the entity allows multi-instance - multi-instance has sense only if a preset is available - if entityClass.AllowMultiInstance(): - preset.AddTagQuestion() + + if config_category == KEY_ACTIVE_ENTITIES: + # Ask for Tag if the entity allows multi-instance - multi-instance has sense only if a preset is available + if ioClass.AllowMultiInstance(): + preset.AddTagQuestion() self.DisplayMessage(messages.PRESET_RULES) - self.DisplayMessage(f"Configure {entityName} Entity") + self.DisplayMessage( + f"Configure {ioClass.NAME} {CONFIG_CATEGORY_NAME[config_category]}") preset.AskQuestions() self.ClearScreen(force_clear=True) else: self.DisplayMessage( - "No configuration needed for this Entity :)") + f"No configuration needed for this {CONFIG_CATEGORY_NAME[config_category]} :)") + + self.config.AddConfiguration( + config_category, preset.GetDict(), ioClass.NAME) - self.EntityMenuPresetToConfiguration(entityName, preset) except UserCancelledException: self.DisplayMessage("Configuration cancelled", force_clear=True) except Exception as e: - print("Error during entity preset loading: " + str(e)) - - self.ManageEntities() + print( + f"Error during {CONFIG_CATEGORY_NAME[config_category]} preset loading: {str(e)}") - def IsEntityActive(self, entityName) -> bool: - """ Return True if an Entity is active """ - for entity in self.config[KEY_ACTIVE_ENTITIES]: - if entityName == entity[KEY_ENTITY_TYPE]: - return True - return False - - def GetEntityLabel(self, entityConfig) -> str: - """ Get the type name of entity, add tag if multi""" - entityLabel = entityConfig[KEY_ENTITY_TYPE] - if KEY_ENTITY_TAG in entityConfig: - entityLabel += f" with tag {entityConfig[KEY_ENTITY_TAG]}" - return entityLabel - - def RemoveActiveEntity(self, entityConfig) -> None: - """ Remove entity name from the list of active entities if present """ - if entityConfig in self.config[KEY_ACTIVE_ENTITIES]: - self.config[KEY_ACTIVE_ENTITIES].remove(entityConfig) - - self.DisplayMessage( - f"Entity removed: {self.GetEntityLabel(entityConfig)}") - self.ManageEntities() + def EditActiveClass(self, ioClass, single_config: "SingleConfiguration") -> None: + """ UI for changing settings """ + preset = ioClass.ConfigurationPreset() - def IsWarehouseActive(self, warehouseName) -> bool: - """ Return True if a warehouse is active """ - for wh in self.config[KEY_ACTIVE_WAREHOUSES]: - if warehouseName == wh[KEY_WAREHOUSE_TYPE]: - return True - return False + if preset.HasQuestions(): - def AddActiveWarehouse(self, warehouseName, wcm: WarehouseClassManager) -> None: - """ Add warehouse to the preferences using a menu with the warehouse preset if available """ + choices = [] - whClass = wcm.GetClassFromName(warehouseName + "Warehouse") - try: - preset = whClass.ConfigurationPreset() # type: ignore + # Add tag: + if single_config.GetTag(): + preset.AddTagQuestion() - if preset.HasQuestions(): - self.DisplayMessage(messages.PRESET_RULES) - preset.AskQuestions() - self.ClearScreen(force_clear=True) - - else: - self.DisplayMessage( - "No configuration needed for this Warehouse :)") + for entry in preset.presets: + # Load config instead of default: + if single_config.HasConfigKey(entry.key): + value = single_config.GetConfigValue(entry.key) + if entry.question_type == "secret": + value = "*" * len(value) + else: + value = entry.default - # Save added settings - self.WarehouseMenuPresetToConfiguration(warehouseName, preset) + # Nice display for None: + if value is None: + value = "" - except UserCancelledException: - self.DisplayMessage("Configuration cancelled", force_clear=True) + choices.append({ + "name": f"{entry.name}: {value}", + "value": entry.key + }) - except Exception as e: - print("Error during warehouse preset loading: " + str(e)) + choice = self.DisplayMenu( + choices=choices, + message="Select config to edit") - self.ManageWarehouses() + if choice == CHOICE_GO_BACK: + return + else: + q_preset = preset.GetPresetByKey(choice) + if q_preset: + self.EditSinglePreset(q_preset, single_config) + self.EditActiveClass(ioClass, single_config) + else: + self.DisplayMessage(f"Question preset not found: {choice}") - def EditActiveWarehouse(self, warehouseName, wcm: WarehouseClassManager) -> None: - """ UI for single Warehouse settings edit """ - self.DisplayMessage( - "You can't do that at the moment, change the configuration file manually. Sorry for the inconvenience") + else: + self.DisplayMessage( + f"No configuration for this {single_config.GetCategoryName()} :)") - self.ManageWarehouses() + def EditSinglePreset(self, q_preset: QuestionPreset, single_config: SingleConfiguration): + """ UI for changing a single setting """ - # TODO Implement - # WarehouseMenuPresetToConfiguration appends a warehosue to the conf so here I should remove it to read it later - # TO implement only when I know how to add removable value while editing configurations + try: + # Load config as default: + if single_config.HasConfigKey(q_preset.key): + if q_preset.default and q_preset.question_type != "yesno": + q_preset.instruction = f"Default: {q_preset.default}" + q_preset.default = single_config.GetConfigValue(q_preset.key) - def EditActiveEntity(self, entityConfig, ecm: WarehouseClassManager) -> None: - """ UI for single Entity settings edit """ - self.DisplayMessage( - "You can't do that at the moment, change the configuration file manually. Sorry for the inconvenience") + value = q_preset.Ask() - self.ManageEntities() + # If no default and not changed, do not save: + if value or q_preset.default is not None: + # Add to config: + single_config.UpdateConfigValue(q_preset.key, value) - # TODO Implement - - def RemoveActiveWarehouse(self, warehouseName) -> None: - """ Remove warehouse name from the list of active warehouses if present """ - for wh in self.config[KEY_ACTIVE_WAREHOUSES]: - if warehouseName == wh[KEY_WAREHOUSE_TYPE]: - # I remove this wh from the list - self.config[KEY_ACTIVE_WAREHOUSES].remove(wh) - - self.DisplayMessage(f"Warehouse removed: {warehouseName}") - self.ManageWarehouses() - - def WarehouseMenuPresetToConfiguration(self, whName, preset) -> None: - """ Get a MenuPreset with responses and add the entries to the configurations dict in warehouse part """ - _dict = preset.GetDict() - _dict[KEY_WAREHOUSE_TYPE] = whName.replace("Warehouse", "") - self.config[KEY_ACTIVE_WAREHOUSES].append(_dict) - self.DisplayMessage("Configuration added for \""+whName+"\" :)") - - def EntityMenuPresetToConfiguration(self, entityName, preset) -> None: - """ Get a MenuPreset with responses and add the entries to the configurations dict in entity part """ - _dict = preset.GetDict() - _dict[KEY_ENTITY_TYPE] = entityName - self.config[KEY_ACTIVE_ENTITIES].append(_dict) - self.DisplayMessage("Configuration added for \""+entityName+"\" :)") + except UserCancelledException: + self.DisplayMessage("Configuration cancelled", force_clear=True) def ClearScreen(self, force_clear=False): """ Clear the screen on any platform. If self.pinned_lines greater than zero, it won't be cleared. diff --git a/IoTuring/Configurator/ConfiguratorLoader.py b/IoTuring/Configurator/ConfiguratorLoader.py index 0c02f7ec6..fccf855f0 100644 --- a/IoTuring/Configurator/ConfiguratorLoader.py +++ b/IoTuring/Configurator/ConfiguratorLoader.py @@ -3,38 +3,40 @@ from IoTuring.Entity.Entity import Entity from IoTuring.Logger.LogObject import LogObject -from IoTuring.Configurator.Configurator import KEY_ENTITY_TYPE, Configurator, KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES, KEY_WAREHOUSE_TYPE +from IoTuring.Configurator.Configurator import Configurator, KEY_ACTIVE_ENTITIES, KEY_ACTIVE_WAREHOUSES, KEY_SETTINGS from IoTuring.ClassManager.WarehouseClassManager import WarehouseClassManager from IoTuring.ClassManager.EntityClassManager import EntityClassManager +from IoTuring.ClassManager.SettingsClassManager import SettingsClassManager from IoTuring.Warehouse.Warehouse import Warehouse + class ConfiguratorLoader(LogObject): configurator = None def __init__(self, configurator: Configurator) -> None: - self.configurations = configurator.GetConfigurations() + self.configurations = configurator.config # Return list of instances initialized using their configurations def LoadWarehouses(self) -> list[Warehouse]: warehouses = [] wcm = WarehouseClassManager() - if not KEY_ACTIVE_WAREHOUSES in self.configurations: + if not self.configurations.GetConfigsInCategory(KEY_ACTIVE_WAREHOUSES): self.Log( self.LOG_ERROR, "You have to enable at least one warehouse: configure it using -c argument") sys.exit("No warehouse enabled") - for activeWarehouse in self.configurations[KEY_ACTIVE_WAREHOUSES]: + for whConfig in self.configurations.GetConfigsInCategory(KEY_ACTIVE_WAREHOUSES): # Get WareHouse named like in config type field, then init it with configs and add it to warehouses list - whClass = wcm.GetClassFromName( - activeWarehouse[KEY_WAREHOUSE_TYPE]+"Warehouse") + whClass = wcm.GetClassFromName(whConfig.GetLongName()) if whClass is None: - self.Log(self.LOG_ERROR, "Can't find " + - activeWarehouse[KEY_WAREHOUSE_TYPE] + " warehouse, check your configurations.") + self.Log( + self.LOG_ERROR, f"Can't find {whConfig.GetType()} warehouse, check your configurations.") else: - whClass(activeWarehouse).AddMissingDefaultConfigs() - self.Log(self.LOG_DEBUG, f"Full configuration with defaults: {whClass(activeWarehouse).configurations}") - warehouses.append(whClass(activeWarehouse)) + wh = whClass(whConfig) + self.Log( + self.LOG_DEBUG, f"Full configuration with defaults: {wh.configurations.ToDict()}") + warehouses.append(wh) return warehouses # warehouses[0].AddEntity(eM.NewEntity(eM.EntityNameToClass("Username")).getInstance()): may be useful @@ -42,19 +44,20 @@ def LoadWarehouses(self) -> list[Warehouse]: def LoadEntities(self) -> list[Entity]: entities = [] ecm = EntityClassManager() - if not KEY_ACTIVE_ENTITIES in self.configurations: + if not self.configurations.GetConfigsInCategory(KEY_ACTIVE_ENTITIES): self.Log( self.LOG_ERROR, "You have to enable at least one entity: configure it using -c argument") sys.exit("No entity enabled") - for activeEntity in self.configurations[KEY_ACTIVE_ENTITIES]: - entityClass = ecm.GetClassFromName(activeEntity[KEY_ENTITY_TYPE]) + for entityConfig in self.configurations.GetConfigsInCategory(KEY_ACTIVE_ENTITIES): + entityClass = ecm.GetClassFromName(entityConfig.GetType()) if entityClass is None: - self.Log(self.LOG_ERROR, "Can't find " + - activeEntity[KEY_ENTITY_TYPE] + " entity, check your configurations.") + self.Log( + self.LOG_ERROR, f"Can't find {entityConfig.GetType()} entity, check your configurations.") else: - entityClass(activeEntity).AddMissingDefaultConfigs() - self.Log(self.LOG_DEBUG, f"Full configuration with defaults: {entityClass(activeEntity).configurations}") - entities.append(entityClass(activeEntity)) # Entity instance + ec = entityClass(entityConfig) + self.Log( + self.LOG_DEBUG, f"Full configuration with defaults: {ec.configurations.ToDict()}") + entities.append(ec) # Entity instance return entities # How Warehouse configurations works: @@ -64,3 +67,21 @@ def LoadEntities(self) -> list[Entity]: # - for each one: # - pass the configuration to the warehouse function that uses the configuration to init the Warehouse # - append the Warehouse to the list + + def LoadSettings(self) -> list: + scm = SettingsClassManager() + settings = [] + settingsClasses = scm.ListAvailableClasses() + + for settingsClass in settingsClasses: + settingsConfig = self.configurations.LoadSingleConfig( + settingsClass.NAME, KEY_SETTINGS) + sc = settingsClass(settingsConfig) + settings.append(sc) + return settings + + # How SettingsConfigurations work: + # - LogSettings are added to Logger in LogSettings.__init__() + # - Other settings' configs are added to SettingsManager singleton on main __init__ + # - Entities can get configurations from this singleton, e.g.: + # SettingsManager().GetFromConfigurations(CONFIG_KEY_UPDATE_INTERVAL) \ No newline at end of file diff --git a/IoTuring/Configurator/ConfiguratorObject.py b/IoTuring/Configurator/ConfiguratorObject.py index 6dfbf9b18..275aa376e 100644 --- a/IoTuring/Configurator/ConfiguratorObject.py +++ b/IoTuring/Configurator/ConfiguratorObject.py @@ -1,21 +1,34 @@ -from IoTuring.Configurator.MenuPreset import BooleanAnswers -from IoTuring.Configurator.MenuPreset import MenuPreset +from IoTuring.Configurator.MenuPreset import BooleanAnswers, MenuPreset +from IoTuring.Configurator.Configuration import SingleConfiguration class ConfiguratorObject: """ Base class for configurable classes """ + NAME = "Unnamed" - def __init__(self, configurations) -> None: - self.configurations = configurations + def __init__(self, single_configuration: SingleConfiguration) -> None: + self.configurations = single_configuration - def GetConfigurations(self) -> dict: - """ Safe return configurations dict """ - return self.configurations.copy() + # Add missing default values: + preset = self.ConfigurationPreset() + defaults = preset.GetDefaults() + + if defaults: + for default_key, default_value in defaults.items(): + if not self.GetConfigurations().HasConfigKey(default_key): + self.GetConfigurations().UpdateConfigValue(default_key, default_value) + + def GetConfigurations(self) -> SingleConfiguration: + """ Safe return single_configuration object """ + if self.configurations: + return self.configurations + else: + raise Exception(f"Configuration not loaded for {self.NAME}") def GetFromConfigurations(self, key): - """ Get value from confiugurations with key (if not present raise Exception) """ - if key in self.GetConfigurations(): - return self.GetConfigurations()[key] + """ Get value from confiugurations with key (if not present raise Exception).""" + if self.GetConfigurations().HasConfigKey(key): + return self.GetConfigurations().GetConfigValue(key) else: raise Exception("Can't find key " + key + " in configurations") @@ -27,16 +40,6 @@ def GetTrueOrFalseFromConfigurations(self, key) -> bool: else: return False - def AddMissingDefaultConfigs(self) -> None: - """ If some default values are missing add them to the running configuration""" - preset = self.ConfigurationPreset() - defaults = preset.GetDefaults() - - if defaults: - for default_key in defaults: - if default_key not in self.GetConfigurations(): - self.configurations[default_key] = defaults[default_key] - @classmethod def ConfigurationPreset(cls) -> MenuPreset: """ Prepare a preset to manage settings insert/edit for the warehouse or entity """ diff --git a/IoTuring/Configurator/MenuPreset.py b/IoTuring/Configurator/MenuPreset.py index 72c709a03..1fe949751 100644 --- a/IoTuring/Configurator/MenuPreset.py +++ b/IoTuring/Configurator/MenuPreset.py @@ -28,7 +28,13 @@ def __init__(self, self.value = None self.question = self.name - if mandatory: + + # yesno question cannot be mandatory: + if self.question_type == "yesno": + self.mandatory = False + + # Add mandatory mark: + if self.mandatory: self.question += " {!}" def ShouldDisplay(self, menupreset: "MenuPreset") -> bool: @@ -63,6 +69,82 @@ def ShouldDisplay(self, menupreset: "MenuPreset") -> bool: return should_display + def Ask(self): + """Ask a single question preset""" + + question_options = {} + + if self.mandatory: + def validate(x): return bool(x) + question_options.update({ + "validate": validate, + "invalid_message": "You must provide a value for this key" + }) + + question_options["message"] = self.question + ":" + + if self.default is not None: + # yesno questions need boolean default: + if self.question_type == "yesno": + question_options["default"] = \ + bool(str(self.default).lower() + in BooleanAnswers.TRUE_ANSWERS) + elif self.question_type == "integer": + question_options["default"] = int(self.default) + else: + question_options["default"] = self.default + else: + if self.question_type == "integer": + # The default integer is 0, overwrite to None: + question_options["default"] = None + + # text: + prompt_function = inquirer.text + + if self.question_type == "secret": + prompt_function = inquirer.secret + + elif self.question_type == "yesno": + prompt_function = inquirer.confirm + question_options.update({ + "filter": lambda x: "Y" if x else "N" + }) + + elif self.question_type == "select": + prompt_function = inquirer.select + question_options.update({ + "choices": self.choices + }) + + elif self.question_type == "integer": + prompt_function = inquirer.number + question_options["float_allowed"] = False + + elif self.question_type == "filepath": + prompt_function = inquirer.filepath + + # Ask the question: + prompt = prompt_function( + instruction=self.instruction, + **question_options + ) + + self.cancelled = False + + @prompt.register_kb("escape") + def _handle_esc(event): + prompt._mandatory = False + prompt._handle_skip(event) + # exception raised here catched by inquirer. + self.cancelled = True + + value = prompt.execute() + + if self.cancelled: + raise UserCancelledException + + return value + class MenuPreset(): @@ -132,75 +214,7 @@ def AskQuestions(self) -> None: # It should be displayed, ask question: if q_preset.ShouldDisplay(self): - question_options = {} - - if q_preset.mandatory: - def validate(x): return bool(x) - question_options.update({ - "validate": validate, - "invalid_message": "You must provide a value for this key" - }) - - question_options["message"] = q_preset.question + ":" - - if q_preset.default is not None: - # yesno questions need boolean default: - if q_preset.question_type == "yesno": - question_options["default"] = \ - bool(str(q_preset.default).lower() - in BooleanAnswers.TRUE_ANSWERS) - elif q_preset.question_type == "integer": - question_options["default"] = int(q_preset.default) - else: - question_options["default"] = q_preset.default - else: - if q_preset.question_type == "integer": - # The default default is 0, overwrite to None: - question_options["default"] = None - - # text: - prompt_function = inquirer.text - - if q_preset.question_type == "secret": - prompt_function = inquirer.secret - - elif q_preset.question_type == "yesno": - prompt_function = inquirer.confirm - question_options.update({ - "filter": lambda x: "Y" if x else "N" - }) - - elif q_preset.question_type == "select": - prompt_function = inquirer.select - question_options.update({ - "choices": q_preset.choices - }) - - elif q_preset.question_type == "integer": - prompt_function = inquirer.number - question_options["float_allowed"] = False - - elif q_preset.question_type == "filepath": - prompt_function = inquirer.filepath - - # Create the prompt: - prompt = prompt_function( - instruction=q_preset.instruction, - **question_options - ) - - # Handle escape keypress: - @prompt.register_kb("escape") - def _handle_esc(event): - prompt._mandatory = False - prompt._handle_skip(event) - # exception raised here catched by inquirer. - self.cancelled = True - - value = prompt.execute() - - if self.cancelled: - raise UserCancelledException + value = q_preset.Ask() if value: q_preset.value = value @@ -215,6 +229,10 @@ def _handle_esc(event): def GetAnsweredPresetByKey(self, key: str) -> QuestionPreset | None: return next((entry for entry in self.results if entry.key == key), None) + def GetPresetByKey(self, key: str) -> QuestionPreset | None: + """Get the QuestionPreset of this key. Returns None if not found""" + return next((entry for entry in self.presets if entry.key == key), None) + def GetDict(self) -> dict: """ Get a dict with keys and responses""" return {entry.key: entry.value for entry in self.results} diff --git a/IoTuring/Entity/Deployments/FileSwitch/FileSwitch.py b/IoTuring/Entity/Deployments/FileSwitch/FileSwitch.py index c12c5c7c6..a440e6d74 100644 --- a/IoTuring/Entity/Deployments/FileSwitch/FileSwitch.py +++ b/IoTuring/Entity/Deployments/FileSwitch/FileSwitch.py @@ -17,7 +17,7 @@ class FileSwitch(Entity): def Initialize(self): try: - self.config_path = self.GetConfigurations()[CONFIG_KEY_PATH] + self.config_path = self.GetFromConfigurations(CONFIG_KEY_PATH) except Exception as e: raise Exception("Configuration error: " + str(e)) diff --git a/IoTuring/Entity/Deployments/Notify/Notify.py b/IoTuring/Entity/Deployments/Notify/Notify.py index d26a3ecba..7b3105088 100644 --- a/IoTuring/Entity/Deployments/Notify/Notify.py +++ b/IoTuring/Entity/Deployments/Notify/Notify.py @@ -46,13 +46,13 @@ class Notify(Entity): def Initialize(self): # Check if both config is defined or both is empty: - if not bool(self.GetConfigurations()[CONFIG_KEY_TITLE]) == bool(self.GetConfigurations()[CONFIG_KEY_MESSAGE]): + if not bool(self.GetFromConfigurations(CONFIG_KEY_TITLE)) == bool(self.GetFromConfigurations(CONFIG_KEY_MESSAGE)): raise Exception( "Configuration error: Both title and message should be defined, or both should be empty!") try: - self.config_title = self.GetConfigurations()[CONFIG_KEY_TITLE] - self.config_message = self.GetConfigurations()[CONFIG_KEY_MESSAGE] + self.config_title = self.GetFromConfigurations(CONFIG_KEY_TITLE) + self.config_message = self.GetFromConfigurations(CONFIG_KEY_MESSAGE) self.data_mode = MODE_DATA_VIA_CONFIG except Exception as e: self.data_mode = MODE_DATA_VIA_PAYLOAD @@ -67,7 +67,7 @@ def Initialize(self): self.Log(self.LOG_INFO, "Using data from payload") # Set and check icon path: - self.config_icon_path = self.GetConfigurations()[CONFIG_KEY_ICON_PATH] + self.config_icon_path = self.GetFromConfigurations(CONFIG_KEY_ICON_PATH) if not os.path.exists(self.config_icon_path): self.Log( diff --git a/IoTuring/Entity/Deployments/Terminal/Terminal.py b/IoTuring/Entity/Deployments/Terminal/Terminal.py index b2212c87d..dd048e885 100644 --- a/IoTuring/Entity/Deployments/Terminal/Terminal.py +++ b/IoTuring/Entity/Deployments/Terminal/Terminal.py @@ -61,8 +61,8 @@ class Terminal(Entity): def Initialize(self): - self.config_entity_type = self.GetConfigurations()[ - CONFIG_KEY_ENTITY_TYPE] + self.config_entity_type = self.GetFromConfigurations( + CONFIG_KEY_ENTITY_TYPE) # sanitize entity type: self.entity_type = str( @@ -86,16 +86,16 @@ def Initialize(self): # payload_command if self.entity_type == ENTITY_TYPE_KEYS["PAYLOAD_COMMAND"]: self.config_command_regex = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_REGEX] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_REGEX) # Check if it's a correct regex: if not re.search(r"^\^.*\$$", self.config_command_regex): raise Exception( f"Configuration error: Invalid regex: {self.config_command_regex}") # Get max length: - if self.GetConfigurations()[CONFIG_KEY_LENGTH]: + if self.GetFromConfigurations(CONFIG_KEY_LENGTH): self.config_length = int( - self.GetConfigurations()[CONFIG_KEY_LENGTH]) + self.GetFromConfigurations(CONFIG_KEY_LENGTH)) else: # Fall back to infinite: self.config_length = float("inf") @@ -104,52 +104,52 @@ def Initialize(self): # button elif self.entity_type == ENTITY_TYPE_KEYS["BUTTON"]: - self.config_command = self.GetConfigurations()[ - CONFIG_KEY_COMMAND_ON] + self.config_command = self.GetFromConfigurations( + CONFIG_KEY_COMMAND_ON) self.has_command = True # switch elif self.entity_type == ENTITY_TYPE_KEYS["SWITCH"]: self.config_command_on = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_ON] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_ON) self.config_command_off = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_OFF] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_OFF) self.has_command = True - if self.GetConfigurations()[CONFIG_KEY_COMMAND_STATE]: + if self.GetFromConfigurations(CONFIG_KEY_COMMAND_STATE): self.config_command_state = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_STATE] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_STATE) self.has_binary_state = True # sensor elif self.entity_type == ENTITY_TYPE_KEYS["SENSOR"]: self.config_command_state = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_STATE] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_STATE) self.has_state = True - self.config_unit = self.GetConfigurations()[CONFIG_KEY_UNIT] + self.config_unit = self.GetFromConfigurations(CONFIG_KEY_UNIT) if self.config_unit: self.custom_payload["unit_of_measurement"] = self.config_unit - if self.GetConfigurations()[CONFIG_KEY_DECIMALS]: + if self.GetFromConfigurations(CONFIG_KEY_DECIMALS): self.value_formatter_options = \ ValueFormatterOptions(value_type=ValueFormatterOptions.TYPE_NONE, - decimals=int(self.GetConfigurations()[CONFIG_KEY_DECIMALS])) + decimals=int(self.GetFromConfigurations(CONFIG_KEY_DECIMALS))) # binary sensor elif self.entity_type == ENTITY_TYPE_KEYS["BINARY_SENSOR"]: self.config_command_state = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_STATE] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_STATE) self.has_binary_state = True # cover elif self.entity_type == ENTITY_TYPE_KEYS["COVER"]: self.config_cover_commands = { - "OPEN": self.GetConfigurations()[CONFIG_KEY_COMMAND_OPEN], - "CLOSE": self.GetConfigurations()[CONFIG_KEY_COMMAND_CLOSE] + "OPEN": self.GetFromConfigurations(CONFIG_KEY_COMMAND_OPEN), + "CLOSE": self.GetFromConfigurations(CONFIG_KEY_COMMAND_CLOSE) } - stop_command = self.GetConfigurations()[CONFIG_KEY_COMMAND_STOP] + stop_command = self.GetFromConfigurations(CONFIG_KEY_COMMAND_STOP) if stop_command: self.config_cover_commands["STOP"] = stop_command @@ -159,7 +159,7 @@ def Initialize(self): self.has_command = True self.config_command_state = \ - self.GetConfigurations()[CONFIG_KEY_COMMAND_STATE] + self.GetFromConfigurations(CONFIG_KEY_COMMAND_STATE) if self.config_command_state: self.has_state = True diff --git a/IoTuring/Entity/Entity.py b/IoTuring/Entity/Entity.py index e8a992e30..1213d0b5f 100644 --- a/IoTuring/Entity/Entity.py +++ b/IoTuring/Entity/Entity.py @@ -3,31 +3,32 @@ import subprocess from IoTuring.Configurator.ConfiguratorObject import ConfiguratorObject +from IoTuring.Configurator.Configuration import SingleConfiguration +from IoTuring.Settings.Deployments.AppSettings.AppSettings import CONFIG_KEY_UPDATE_INTERVAL +from IoTuring.Settings.SettingsManager import SettingsManager from IoTuring.Exceptions.Exceptions import UnknownEntityKeyException from IoTuring.Logger.LogObject import LogObject from IoTuring.Entity.EntityData import EntityData, EntitySensor, EntityCommand, ExtraAttribute from IoTuring.MyApp.SystemConsts import OperatingSystemDetection as OsD -KEY_ENTITY_TAG = 'tag' # from Configurator.Configurator - -DEFAULT_UPDATE_TIMEOUT = 10 - class Entity(LogObject, ConfiguratorObject): NAME = "Unnamed" ALLOW_MULTI_INSTANCE = False - def __init__(self, configurations) -> None: + def __init__(self, single_configuration: SingleConfiguration) -> None: + # Prepare the entity self.entitySensors = [] self.entityCommands = [] - self.configurations = configurations - self.SetTagFromConfiguration() + self.configurations = single_configuration + self.tag = self.configurations.GetTag() # When I update the values this number changes (randomly) so each warehouse knows I have updated self.valuesID = 0 - self.updateTimeout = DEFAULT_UPDATE_TIMEOUT + + self.updateTimeout = float(SettingsManager().GetFromConfigurations(CONFIG_KEY_UPDATE_INTERVAL)) def Initialize(self): """ Must be implemented in sub-classes, may be useful here to use the configuration """ @@ -145,7 +146,7 @@ def GetEntityName(self) -> str: def GetEntityTag(self) -> str: """ Return entity identifier tag """ - return self.tag # Set from SetTagFromConfiguration on entity init + return self.tag def GetEntityNameWithTag(self) -> str: """ Return entity name and tag combined (or name alone if no tag is present) """ @@ -163,13 +164,6 @@ def GetEntityId(self) -> str: def LogSource(self): return self.GetEntityId() - def SetTagFromConfiguration(self): - """ Set tag from configuration or set it blank if not present there """ - if self.GetConfigurations() is not None and KEY_ENTITY_TAG in self.GetConfigurations(): - self.tag = self.GetConfigurations()[KEY_ENTITY_TAG] - else: - self.tag = "" - def RunCommand(self, command: str | list, command_name: str = "", diff --git a/IoTuring/Logger/LogLevel.py b/IoTuring/Logger/LogLevel.py index 01575055e..14f6e7d91 100644 --- a/IoTuring/Logger/LogLevel.py +++ b/IoTuring/Logger/LogLevel.py @@ -27,15 +27,6 @@ def __str__(self) -> str: def __int__(self) -> int: return self.number - def get_colored_string(self, string: str) -> str: - """ Get colored text according to LogLevel """ - if self.color: - out_string = self.color + string + Colors.reset - else: - out_string = string - return out_string - - class LogLevelObject: """ Base class for loglevel properties """ diff --git a/IoTuring/Logger/Logger.py b/IoTuring/Logger/Logger.py index 4cc098b45..8caaf2c5c 100644 --- a/IoTuring/Logger/Logger.py +++ b/IoTuring/Logger/Logger.py @@ -1,15 +1,21 @@ -import sys -import os -import inspect +from __future__ import annotations + from datetime import datetime import json import threading -from io import TextIOWrapper +from pathlib import Path + + from IoTuring.Logger import consts from IoTuring.Logger.LogLevel import LogLevelObject, LogLevel +from IoTuring.Logger.Colors import Colors from IoTuring.Exceptions.Exceptions import UnknownLoglevelException +from IoTuring.MyApp.SystemConsts import OperatingSystemDetection as OsD +from IoTuring.MyApp.SystemConsts import TerminalDetection as TD + + class Singleton(type): """ Metaclass for singleton classes """ @@ -18,7 +24,7 @@ class Singleton(type): _instances = {} - def __call__(cls): + def __call__(cls): # type: ignore if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__() return cls._instances[cls] @@ -26,48 +32,96 @@ def __call__(cls): class Logger(LogLevelObject, metaclass=Singleton): - lock = threading.Lock() + startupTimeString = datetime.now().strftime( + consts.LOG_FILENAME_FORMAT).replace(":", "_") - log_filename = "" - log_file_descriptor = None + terminalSupportsColors = TD.CheckTerminalSupportsColors() # Default log levels: - console_log_level = LogLevel(consts.CONSOLE_LOG_LEVEL) - file_log_level = LogLevel(consts.FILE_LOG_LEVEL) + console_log_level = LogLevel(consts.DEFAULT_LOG_LEVEL) + file_log_level = LogLevel(consts.DEFAULT_LOG_LEVEL) + + # File logs stored here, before configurator loaded. + file_log_buffer = [] + + # For writing to file: + file_log_descriptor = None + lock = None def __init__(self) -> None: - self.terminalSupportsColors = Logger.checkTerminalSupportsColors() + # Set loglevel from envvar: + self.SetConsoleLogLevel() - # Prepare the log - self.SetLogFilename() - # Open the file descriptor - self.GetLogFileDescriptor() + diag_strings = [ + "Logger Init", + f"Console Loglevel: {str(self.console_log_level)}", + f"File Loglevel: {str(self.file_log_level)}" + ] + + self.Log(self.LOG_DEVELOPMENT, "Logger", diag_strings) + + + + def SetConsoleLogLevel(self, loglevel_string:str = ""): + new_loglevel = None # Override log level from envvar: + if OsD.GetEnv("IOTURING_LOG_LEVEL"): + new_loglevel = self.SanitizeLoglevel(OsD.GetEnv("IOTURING_LOG_LEVEL")) + + # Read from config: + if not new_loglevel and loglevel_string: + new_loglevel = self.SanitizeLoglevel(loglevel_string) + + if new_loglevel: + self.console_log_level = new_loglevel + self.Log(self.LOG_DEBUG, "Logger", + f"Set Console Loglevel to: {str(self.console_log_level)}") + else: + self.Log(self.LOG_DEBUG, "Logger", + f"Console Loglevel not changed.") + + def SanitizeLoglevel(self, loglevel_string) -> LogLevel | None: try: - if os.getenv("IOTURING_LOG_LEVEL"): - level_override = LogLevel(str(os.getenv("IOTURING_LOG_LEVEL"))) - self.console_log_level = level_override + l = LogLevel(str(loglevel_string)) + return l except UnknownLoglevelException as e: - self.Log(self.LOG_ERROR, "Logger", + self.Log(self.LOG_WARNING, "Logger", f"Unknown Loglevel: {e.loglevel}") + return None + + # Called from LogSettings + def StartFileLogging(self, loglevel_string:str, log_dir_path:Path) -> None: + + self.file_log_level = self.SanitizeLoglevel(loglevel_string) or self.file_log_level + + + if not log_dir_path.exists(): + log_dir_path.mkdir(parents=True) + + file_log_filename = log_dir_path.joinpath(self.startupTimeString) + + self.file_log_descriptor = \ + open(file_log_filename, "a", encoding="utf-8") + + self.lock = threading.Lock() - def SetLogFilename(self) -> str: - """ Set filename with timestamp and also call setup folder """ - dateTimeObj = datetime.now() - self.log_filename = os.path.join( - self.SetupFolder(), dateTimeObj.strftime(consts.LOG_FILENAME_FORMAT).replace(":", "_")) - return self.log_filename + self.Log(self.LOG_DEBUG, "Logger", + f"File Log setup finished.") - def SetupFolder(self) -> str: - """ Check if exists (or create) the folder of logs inside this file's folder """ - thisFolder = os.path.dirname(inspect.getfile(Logger)) - newFolder = os.path.join(thisFolder, consts.LOGS_FOLDER) - if not os.path.exists(newFolder): - os.mkdir(newFolder) + # Write buffer to disk: + while self.file_log_buffer: + line = self.file_log_buffer[0] + self.WriteFileLogLine(line["string"], line["loglevel"]) + del self.file_log_buffer[0] - return newFolder + # Called from LogSettings + def DisableFileLogging(self) -> None: + self.file_log_buffer = None + self.CloseFile() + self.Log(self.LOG_DEBUG, "Logger", + f"File logging disabled.") def GetMessageDatetimeString(self) -> str: now = datetime.now() @@ -75,14 +129,12 @@ def GetMessageDatetimeString(self) -> str: # LOG - def Log(self, loglevel: LogLevel, source: str, message, printToConsole=True, writeToFile=True) -> None: + def Log(self, loglevel: LogLevel, source: str, message, **kwargs) -> None: if type(message) == dict: - self.LogDict(loglevel, source, message, - printToConsole, writeToFile) + self.LogDict(loglevel, source, message, **kwargs) return # Log dict will call this function so I don't need to go down at the moment elif type(message) == list: - self.LogList(loglevel, source, message, - printToConsole, writeToFile) + self.LogList(loglevel, source, message, **kwargs) return # Log list will call this function so I don't need to go down at the moment message = str(message) @@ -90,8 +142,7 @@ def Log(self, loglevel: LogLevel, source: str, message, printToConsole=True, wri messageLines = message.split("\n") if len(messageLines) > 1: for line in messageLines: - self.Log(loglevel, source, line, - printToConsole, writeToFile) + self.Log(loglevel, source, line, **kwargs) return # Stop the function then because I've already called this function from each line so I don't have to go down here prestring = f"[ {self.GetMessageDatetimeString()} | {str(loglevel).center(consts.STRINGS_LENGTH[0])} | " \ @@ -105,74 +156,75 @@ def Log(self, loglevel: LogLevel, source: str, message, printToConsole=True, wri # then I add the dash to the row if (len(message) > 0 and string[-1] != " " and string[-1] != "." and string[-1] != ","): string = string + '-' # Print new line indicator if I will go down in the next iteration - self.PrintAndSave(string, loglevel, printToConsole, writeToFile) + self.PrintAndSave(string, loglevel, **kwargs) # -1 + space cause if the char in the prestring isn't a space, it will be directly attached to my message without a space prestring = (len(prestring)-consts.PRESTRING_MESSAGE_SEPARATOR_LEN) * \ consts.LONG_MESSAGE_PRESTRING_CHAR+consts.PRESTRING_MESSAGE_SEPARATOR_LEN*' ' - def LogDict(self, loglevel, source, message_dict: dict, *args): + def LogDict(self, loglevel, source, message_dict: dict, **kwargs): try: string = json.dumps(message_dict, indent=4, sort_keys=False, default=lambda o: '') lines = string.splitlines() for line in lines: - self.Log(loglevel, source, "> "+line, *args) + self.Log(loglevel, source, "> "+line, **kwargs) except Exception as e: - self.Log(self.LOG_ERROR, source, "Can't print dictionary content") + self.Log(self.LOG_ERROR, source, + "Can't print dictionary content", **kwargs) - def LogList(self, loglevel, source, message_list: list, *args): + def LogList(self, loglevel, source, message_list: list, **kwargs): try: for index, item in enumerate(message_list): if type(item) == dict or type(item) == list: - self.Log(loglevel, source, "Item #"+str(index), *args) - self.Log(loglevel, source, item, *args) + self.Log(loglevel, source, "Item #"+str(index), **kwargs) + self.Log(loglevel, source, item, **kwargs) else: self.Log(loglevel, source, - f"{str(index)}: {str(item)}", *args) + f"{str(index)}: {str(item)}", **kwargs) except: - self.Log(self.LOG_ERROR, source, "Can't print dictionary content") + self.Log(self.LOG_ERROR, source, + "Can't print dictionary content", **kwargs) # Both print and save to file - def PrintAndSave(self, string, loglevel: LogLevel, printToConsole=True, writeToFile=True) -> None: - - if printToConsole and int(loglevel) <= int(self.console_log_level): - self.ColoredPrint(string, loglevel) - - if writeToFile and int(loglevel) <= int(self.file_log_level): + def PrintAndSave(self, string: str, loglevel: LogLevel, **kwargs) -> None: + # kwargs defaults: + printToConsole = True if "printToConsole" not in kwargs else kwargs["printToConsole"] + writeToFile = True if "writeToFile" not in kwargs else kwargs["writeToFile"] + color = loglevel.color if "color" not in kwargs else kwargs["color"] + + if printToConsole and (int(loglevel) <= int(self.console_log_level)): + if self.terminalSupportsColors and color: + print(color + string + Colors.reset) + else: + print(string) + + # Config is not loaded, write to buffer: + if self.file_log_buffer is not None: + self.file_log_buffer.append({ + "string": string, + "loglevel": loglevel + }) + + # Real file logging + elif self.file_log_descriptor and writeToFile: + self.WriteFileLogLine(string, loglevel) + + def WriteFileLogLine(self, string: str, loglevel: LogLevel) -> None: + if not self.file_log_descriptor \ + or not self.lock: + raise Exception("File logging error! Descriptor or lock missing!") + if int(loglevel) <= int(self.file_log_level): # acquire the lock with self.lock: - self.GetLogFileDescriptor().write(string+' \n') + self.file_log_descriptor.write(string+' \n') # so I can see the log in real time from a reader - self.GetLogFileDescriptor().flush() - - def ColoredPrint(self, string, loglevel: LogLevel) -> None: - if not self.terminalSupportsColors: - print(string) - else: - print(loglevel.get_colored_string(string)) + self.file_log_descriptor.flush() - def GetLogFileDescriptor(self) -> TextIOWrapper: - if self.log_file_descriptor is None: - self.log_file_descriptor = open(self.log_filename, "a", encoding="utf-8") + def CloseFile(self) -> None: + if self.file_log_descriptor is not None: + self.file_log_descriptor.close() + self.file_log_descriptor = None - return self.log_file_descriptor - def CloseFile(self) -> None: - if self.log_file_descriptor is not None: - self.log_file_descriptor.close() - self.log_file_descriptor = None - - @staticmethod - def checkTerminalSupportsColors(): - """ - Returns True if the running system's terminal supports color, and False - otherwise. - """ - plat = sys.platform - supported_platform = plat != 'Pocket PC' and (plat != 'win32' or - 'ANSICON' in os.environ) - # isatty is not always implemented, #6223. - is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() - return supported_platform and is_a_tty diff --git a/IoTuring/Logger/consts.py b/IoTuring/Logger/consts.py index 8678c2e5a..416409d4c 100644 --- a/IoTuring/Logger/consts.py +++ b/IoTuring/Logger/consts.py @@ -56,7 +56,6 @@ # before those spaces I add this string LONG_MESSAGE_PRESTRING_CHAR = ' ' -CONSOLE_LOG_LEVEL = "LOG_INFO" -FILE_LOG_LEVEL = "LOG_INFO" +DEFAULT_LOG_LEVEL = "LOG_INFO" MESSAGE_WIDTH = 95 diff --git a/IoTuring/MyApp/App.py b/IoTuring/MyApp/App.py index 0467394ce..1442c9722 100644 --- a/IoTuring/MyApp/App.py +++ b/IoTuring/MyApp/App.py @@ -1,4 +1,5 @@ from importlib.metadata import metadata +from pathlib import Path class App(): METADATA = metadata('IoTuring') @@ -40,6 +41,10 @@ def getUrlHomepage() -> str: def getUrlReleases() -> str: return App.URL_RELEASES + @staticmethod + def getRootPath() -> Path: + return Path(__file__).parents[1] + def __str__(self) -> str: msg = "" msg += "Name: " + App.getName() + "\n" diff --git a/IoTuring/MyApp/SystemConsts/TerminalDetection.py b/IoTuring/MyApp/SystemConsts/TerminalDetection.py new file mode 100644 index 000000000..7d0c9f5ef --- /dev/null +++ b/IoTuring/MyApp/SystemConsts/TerminalDetection.py @@ -0,0 +1,16 @@ +import sys +import os + +class TerminalDetection: + @staticmethod + def CheckTerminalSupportsColors() -> bool: + """ + Returns True if the running system's terminal supports color, and False + otherwise. + """ + plat = sys.platform + supported_platform = plat != 'Pocket PC' and (plat != 'win32' or + 'ANSICON' in os.environ) + # isatty is not always implemented, #6223. + is_a_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() + return supported_platform and is_a_tty \ No newline at end of file diff --git a/IoTuring/MyApp/SystemConsts/__init__.py b/IoTuring/MyApp/SystemConsts/__init__.py index dc709cddb..d30843129 100644 --- a/IoTuring/MyApp/SystemConsts/__init__.py +++ b/IoTuring/MyApp/SystemConsts/__init__.py @@ -1,2 +1,3 @@ from .DesktopEnvironmentDetection import DesktopEnvironmentDetection -from .OperatingSystemDetection import OperatingSystemDetection \ No newline at end of file +from .OperatingSystemDetection import OperatingSystemDetection +from .TerminalDetection import TerminalDetection \ No newline at end of file diff --git a/IoTuring/Settings/Deployments/AppSettings/AppSettings.py b/IoTuring/Settings/Deployments/AppSettings/AppSettings.py new file mode 100644 index 000000000..575492bc6 --- /dev/null +++ b/IoTuring/Settings/Deployments/AppSettings/AppSettings.py @@ -0,0 +1,25 @@ +from IoTuring.Configurator.MenuPreset import MenuPreset +from IoTuring.Configurator.ConfiguratorObject import ConfiguratorObject + + +CONFIG_KEY_UPDATE_INTERVAL = "update_interval" +CONFIG_KEY_SLOW_INTERVAL = "slow_interval" + + +class AppSettings(ConfiguratorObject): + """Singleton for storing AppSettings, not related to Entites or Warehouses """ + NAME = "App" + + @classmethod + def ConfigurationPreset(cls): + preset = MenuPreset() + + preset.AddEntry(name="Main update interval in seconds", + key=CONFIG_KEY_UPDATE_INTERVAL, mandatory=True, + question_type="integer", default=10) + + # preset.AddEntry(name="Secondary update interval in minutes", + # key=CONFIG_KEY_SLOW_INTERVAL, mandatory=True, + # question_type="integer", default=10) + + return preset diff --git a/IoTuring/Settings/Deployments/LogSettings/LogSettings.py b/IoTuring/Settings/Deployments/LogSettings/LogSettings.py new file mode 100644 index 000000000..d2c613913 --- /dev/null +++ b/IoTuring/Settings/Deployments/LogSettings/LogSettings.py @@ -0,0 +1,96 @@ +from pathlib import Path + +from IoTuring.Configurator.MenuPreset import MenuPreset +from IoTuring.Logger.Logger import Logger +from IoTuring.Configurator.ConfiguratorObject import ConfiguratorObject +from IoTuring.Configurator.Configuration import SingleConfiguration +from IoTuring.MyApp.App import App +from IoTuring.Logger import consts +from IoTuring.MyApp.SystemConsts import OperatingSystemDetection as OsD + + +# macOS dep (in PyObjC) +try: + from AppKit import * # type:ignore + from Foundation import * # type:ignore + macos_support = True +except: + macos_support = False + + +CONFIG_KEY_CONSOLE_LOG_LEVEL = "console_log_level" +CONFIG_KEY_FILE_LOG_LEVEL = "file_log_level" +CONFIG_KEY_FILE_LOG_ENABLED = "file_log_enabled" +CONFIG_KEY_FILE_LOG_PATH = "file_log_path" + + +LogLevelChoices = [{"name": l["string"], "value": l["const"]} + for l in consts.LOG_LEVELS] + + +class LogSettings(ConfiguratorObject): + NAME = "Log" + + def __init__(self, single_configuration: SingleConfiguration) -> None: + super().__init__(single_configuration) + + # Load settings to logger: + logger = Logger() + + logger.SetConsoleLogLevel( + self.GetFromConfigurations(CONFIG_KEY_CONSOLE_LOG_LEVEL)) + + if self.GetTrueOrFalseFromConfigurations(CONFIG_KEY_FILE_LOG_ENABLED): + try: + logger.StartFileLogging(self.GetFromConfigurations(CONFIG_KEY_FILE_LOG_LEVEL), + Path(self.GetFromConfigurations(CONFIG_KEY_FILE_LOG_PATH))) + except: + logger.DisableFileLogging() + else: + logger.DisableFileLogging() + + @classmethod + def ConfigurationPreset(cls): + preset = MenuPreset() + + preset.AddEntry(name="Console log level", key=CONFIG_KEY_CONSOLE_LOG_LEVEL, + question_type="select", mandatory=True, default=consts.DEFAULT_LOG_LEVEL, + instruction="IOTURING_LOG_LEVEL envvar overwrites this setting!", + choices=LogLevelChoices) + + preset.AddEntry(name="Enable file logging", key=CONFIG_KEY_FILE_LOG_ENABLED, + question_type="yesno", default="N") + + preset.AddEntry(name="File log level", key=CONFIG_KEY_FILE_LOG_LEVEL, + question_type="select", mandatory=True, default=consts.DEFAULT_LOG_LEVEL, + choices=LogLevelChoices) + + preset.AddEntry(name="File log path", key=CONFIG_KEY_FILE_LOG_PATH, + question_type="filepath", mandatory=True, default=cls.GetDefaultLogPath(), + instruction="Directory where log files will be saved") + + return preset + + @staticmethod + def GetDefaultLogPath() -> str: + + default_path = App.getRootPath().joinpath("Logger") + base_path = None + + if OsD.IsMacos() and macos_support: + base_path = \ + Path(NSSearchPathForDirectoriesInDomains( # type: ignore + NSLibraryDirectory, # type: ignore + NSUserDomainMask, True)[0]) # type: ignore + elif OsD.IsWindows(): + base_path = Path(OsD.GetEnv("LOCALAPPDATA")) + elif OsD.IsLinux(): + if OsD.GetEnv("XDG_CACHE_HOME"): + base_path = Path(OsD.GetEnv("XDG_CACHE_HOME")) + elif OsD.GetEnv("HOME"): + base_path = Path(OsD.GetEnv("HOME")).joinpath(".cache") + + if base_path: + default_path = base_path.joinpath(App.getName()) + + return str(default_path.joinpath(consts.LOGS_FOLDER)) diff --git a/IoTuring/Settings/SettingsManager.py b/IoTuring/Settings/SettingsManager.py new file mode 100644 index 000000000..e6ef5bd6a --- /dev/null +++ b/IoTuring/Settings/SettingsManager.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +from IoTuring.Logger.Logger import Singleton +from IoTuring.Configurator.ConfiguratorObject import ConfiguratorObject +from IoTuring.Configurator.Configuration import SingleConfiguration + + +class SettingsManager(ConfiguratorObject, metaclass=Singleton): + """Singleton for storing configurations of Settings""" + + def __init__(self) -> None: + self.configurations = SingleConfiguration() + + def AddSettings(self, setting_entities: list[ConfiguratorObject]) -> None: + for setting_entity in setting_entities: + + conf_dict = setting_entity.GetConfigurations().ToDict(include_type=False) + + for conf_key, conf_value in conf_dict.items(): + self.GetConfigurations().UpdateConfigValue(conf_key, conf_value) diff --git a/IoTuring/Warehouse/Warehouse.py b/IoTuring/Warehouse/Warehouse.py index 82a84855e..3178b5c58 100644 --- a/IoTuring/Warehouse/Warehouse.py +++ b/IoTuring/Warehouse/Warehouse.py @@ -3,19 +3,21 @@ from IoTuring.Logger.LogObject import LogObject from IoTuring.Configurator.ConfiguratorObject import ConfiguratorObject from IoTuring.Entity.EntityManager import EntityManager +from IoTuring.Settings.Deployments.AppSettings.AppSettings import CONFIG_KEY_UPDATE_INTERVAL +from IoTuring.Settings.SettingsManager import SettingsManager +from IoTuring.Configurator.Configuration import SingleConfiguration from threading import Thread import time -DEFAULT_LOOP_TIMEOUT = 10 - class Warehouse(LogObject, ConfiguratorObject): NAME = "Unnamed" - def __init__(self, configurations) -> None: - self.loopTimeout = DEFAULT_LOOP_TIMEOUT - self.configurations = configurations + def __init__(self, single_configuration: SingleConfiguration) -> None: + self.loopTimeout = float(SettingsManager().GetFromConfigurations(CONFIG_KEY_UPDATE_INTERVAL)) + self.configurations = single_configuration + def Start(self) -> None: """ Initial configuration and start the thread that will loop the Warehouse.Loop() function""" diff --git a/IoTuring/__init__.py b/IoTuring/__init__.py index 6dfa3682c..c5ed87c45 100644 --- a/IoTuring/__init__.py +++ b/IoTuring/__init__.py @@ -11,6 +11,7 @@ from IoTuring.Entity.EntityManager import EntityManager from IoTuring.Logger.Logger import Logger from IoTuring.Logger.Colors import Colors +from IoTuring.Settings.SettingsManager import SettingsManager warehouses = [] entities = [] @@ -52,6 +53,9 @@ def loop(): logger = Logger() configurator = Configurator() + # Load Logger settings before everything: + ConfiguratorLoader(configurator).LoadSettings() + logger.Log(Logger.LOG_DEBUG, "App", f"Selected options: {vars(args)}") if args.configurator: @@ -72,17 +76,24 @@ def loop(): # This have to start after configurator.Menu(), otherwise won't work starting from the menu signal.signal(signal.SIGINT, Exit_SIGINT_handler) + # Load Settings: + settings = ConfiguratorLoader(configurator).LoadSettings() + sM = SettingsManager() + sM.AddSettings(settings) + logger.Log(Logger.LOG_INFO, "App", App()) # Print App info - logger.Log(Logger.LOG_INFO, "Configurator", - "Run the script with -c to enter configuration mode") - eM = EntityManager() + # Add help if not started from Configurator + if not args.configurator: + logger.Log(Logger.LOG_INFO, "Configurator", + "Run the script with -c to enter configuration mode") # These will be done from the configuration reader entities = ConfiguratorLoader(configurator).LoadEntities() warehouses = ConfiguratorLoader(configurator).LoadWarehouses() # Add entites to the EntityManager + eM = EntityManager() for entity in entities: eM.AddActiveEntity(entity) @@ -108,18 +119,10 @@ def Exit_SIGINT_handler(sig=None, frame=None): logger.Log(Logger.LOG_INFO, "Main", "Application closed by SigInt", printToConsole=False) # to file - messages = ["Exiting...", - "Thanks for using IoTuring !"] print() # New line - for message in messages: - text = "" - if (Logger.checkTerminalSupportsColors()): - text += Colors.cyan - text += message - if (Logger.checkTerminalSupportsColors()): - text += Colors.reset - logger.Log(Logger.LOG_INFO, "Main", text, - writeToFile=False) # to terminal + goodByeMessage = "Exiting...\nThanks for using IoTuring !" + logger.Log(Logger.LOG_INFO, "Main", goodByeMessage, + writeToFile=False, color=Colors.cyan) # to terminal logger.CloseFile() sys.exit(0)