diff --git a/.gitignore b/.gitignore index e2ae38d70..60421e4c2 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,4 @@ IoTuring/Configurator/configurations.json* IoTuring/Configurator/dontmoveconf.itg* .venv build -*.egg-info \ No newline at end of file +*.egg-info diff --git a/IoTuring/Configurator/Configurator.py b/IoTuring/Configurator/Configurator.py index 3e84957ce..da778e2fd 100644 --- a/IoTuring/Configurator/Configurator.py +++ b/IoTuring/Configurator/Configurator.py @@ -74,9 +74,9 @@ def OpenConfigInEditor(self): editor_command = next( (e for e in editors if OsD.CommandExists(e)), "") if editor_command: - subprocess.run(f'{editor_command} "{config_path}"', - shell=True, close_fds=True) - return + OsD.RunCommand(f'{editor_command} "{config_path}"', + shell=True, close_fds=True, capture_output=False) + return self.Log(self.LOG_WARNING, "No editor found") diff --git a/IoTuring/Entity/Deployments/Wifi/Wifi.py b/IoTuring/Entity/Deployments/Wifi/Wifi.py new file mode 100644 index 000000000..eb2670420 --- /dev/null +++ b/IoTuring/Entity/Deployments/Wifi/Wifi.py @@ -0,0 +1,270 @@ +import re +from socket import AddressFamily + +import psutil +import locale + +from IoTuring.Configurator.MenuPreset import MenuPreset +from IoTuring.Entity.Entity import Entity +from IoTuring.Entity.EntityData import EntitySensor +from IoTuring.Entity.ValueFormat import ValueFormatterOptions +from IoTuring.MyApp.SystemConsts import OperatingSystemDetection as OsD + +VALUEFORMATTEROPTIONS_DBM = ValueFormatterOptions(ValueFormatterOptions.TYPE_RADIOPOWER) +VALUEFORMATTEROPTIONS_BYTES = ValueFormatterOptions(ValueFormatterOptions.TYPE_BYTE) +VALUEFORMATTEROPTIONS_BIT_PER_SECOND = ValueFormatterOptions(ValueFormatterOptions.TYPE_BIT_PER_SECOND) + +WIFI_CHOICE_STRING = "Name: {:<15}, IP: {:<16}, MAC: {:<11}" + +CONFIG_KEY_WIFI = "wifi" + +SIGNAL_UNIT = "dBm" # windows also supports "%" +SHOW_NA = False # don't show not available extraAttributes + +KEY_SIGNAL_STRENGTH_DBM = "signal_strength_dbm" + +# LINUX +EXTRA_KEY_NAME = "name" +EXTRA_KEY_DESCRIPTION = "description" +EXTRA_KEY_PHYSICAL_ADDRESS = "physical_address" +EXTRA_KEY_STATE = "state" +EXTRA_KEY_BSSID = "BSSID" +EXTRA_KEY_SSID = "ssid" +EXTRA_KEY_FREQUENCY = "Frequency" +EXTRA_KEY_SIGNAL = "Signal" +EXTRA_KEY_RX_BYTES = "RX_Bytes" +EXTRA_KEY_TX_BYTES = "TX_Bytes" +EXTRA_KEY_RX_BITRATE = "RX_Bitrate" +EXTRA_KEY_TX_BITRATE = "TX_Bitrate" +EXTRA_KEY_BSS_FLAGS = "BSS_Flags" +EXTRA_KEY_DTIM_PERIOD = "DTIM_Period" +EXTRA_KEY_BEACON_INTERVAL = "Beacon_Interval" + +# MACOS +EXTRA_KEY_AGRCTLRSSI = "agrCtlRSSI" +EXTRA_KEY_AGREXTRSSI = "agrExtRSSI" +# state already in linux config +EXTRA_KEY_OP_MODE = "OP_mode" +EXTRA_KEY_LASTTXRATE = "Last_TX_Rate" +EXTRA_KEY_MAXRATE = "Max_Rate" +# BSSID already in linux config +# SSID already in linux config +EXTRA_KEY_CHANNEL = "Channel" + + + +class Wifi(Entity): + NAME = "Wifi" + ALLOW_MULTI_INSTANCE = True + + def Initialize(self): + self.platform = OsD.GetOs() + self.locale_str, _ = locale.getdefaultlocale() + self.language: str = self.locale_str.split("_")[0] + self.showNA = SHOW_NA + + self.wifiInterface = self.GetFromConfigurations(CONFIG_KEY_WIFI) + + self.commands = { + OsD.LINUX: ["iw", "dev", self.wifiInterface, "link"], + OsD.MACOS: [ + "/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport", + "-I", + ], + } + self.patterns = { + OsD.LINUX: { + "BSSID": r'Connected to (\S+) \(on \S+\)', + "SSID": r'SSID: (.+)', + "Frequency": r'freq: ([\d.]+)', + "RX_bytes": r'RX: (\d+) bytes \(\d+ packets\)', + "TX_bytes": r'TX: (\d+) bytes \(\d+ packets\)', + "Signal": r'signal: (-?\d+) dBm', + "RX_bitrate": r'rx bitrate: ([\d.]+) MBit/s', + "TX_bitrate": r'tx bitrate: ([\d.]+) MBit/s', + "BSS_flags": r'bss flags: (.+)', + "DTIM_period": r'dtim period: (\d+)', + "Beacon_interval": r'beacon int: (\d+)' + }, + OsD.MACOS: { # no language differentiation in macos: always english + "agrCtlRSSI": r"[^\n][\s]*agrCtlRSSI:\s+(-?\d+)\n", + "agrExtRSSI": r"[^\n][\s]*agrExtRSSI:\s+(-?\d+)\n", + "state": r"[^\n][\s]*state:\s+(\w+)\n", + "op mode": r"[^\n][\s]*op mode:\s+(\w+)\n", + "lastTxRate": r"[^\n][\s]*lastTxRate:\s+(\d+)\n", + "maxRate": r"[^\n][\s]*maxRate:\s+(\d+)\n", + "BSSID": r"[^\n][\s]*BSSID:\s+([\w:]+)\n", + "SSID": r"\n[\s]*SSID:\s+([\w\s]+)\n", + "channel": r"[^\n][\s]*channel:\s+([\d,]+)\n", + }, + } + + self.keySignalStrength = KEY_SIGNAL_STRENGTH_DBM + + self.valueFormatterOptions: dict = { + KEY_SIGNAL_STRENGTH_DBM : VALUEFORMATTEROPTIONS_DBM, + EXTRA_KEY_RX_BITRATE : VALUEFORMATTEROPTIONS_BIT_PER_SECOND, + EXTRA_KEY_RX_BYTES : VALUEFORMATTEROPTIONS_BYTES + } + + self.RegisterEntitySensor( + EntitySensor( + self, + key=self.keySignalStrength, + supportsExtraAttributes=True, + valueFormatterOptions=self.valueFormatterOptions[KEY_SIGNAL_STRENGTH_DBM], + ), + ) + + def Update(self): + p = self.RunCommand(self.commands[self.platform]) + if not p.stdout: + raise Exception("error in GetWirelessInfo\n", p.stderr) + wifiInfo = self.GetWirelessInfo(p.stdout) + if not wifiInfo: + raise Exception("error while parsing wirelessInfo") + # set signal strength + elif self.platform == OsD.LINUX and "Signal" in wifiInfo: + self.SetEntitySensorValue( + key=self.keySignalStrength, value=wifiInfo["Signal"] + ) + elif self.platform == OsD.MACOS and "agrCtlRSSI" in wifiInfo: + self.SetEntitySensorValue( + key=self.keySignalStrength, value=wifiInfo["agrCtlRSSI"] + ) + else: # if there is no signal level found the interface might not be connected to an access point + self.SetEntitySensorValue(key=self.keySignalStrength, value="not connected") + + # Extra attributes + for key in self.patterns[self.platform]: + extraKey = "EXTRA_KEY_" + key.upper().replace(" ", "_").replace(".", "_") + if key in wifiInfo: + attributevalue = wifiInfo[key] + elif self.showNA: + attributevalue = "not available" + else: + continue + + self.SetEntitySensorExtraAttribute( + sensorDataKey=self.keySignalStrength, + attributeKey=globals()[extraKey], + attributeValue=attributevalue, + valueFormatterOptions = self.valueFormatterOptions[extraKey] if extraKey in self.valueFormatterOptions else None + ) + + def GetWirelessInfo(self, stdout): + wifi_info = {} + for key, pattern in self.patterns[self.platform].items(): + match = re.search(pattern, stdout, re.IGNORECASE) + if match: + wifi_info[key] = match.group(1) if match.group(1) else match.group(0) + return wifi_info + + @classmethod + def ConfigurationPreset(cls) -> MenuPreset: + NIC_CHOICES = Wifi.GetWifiNics(getInfo=True) + + preset = MenuPreset() + preset.AddEntry( + name="Interface to check", + key=CONFIG_KEY_WIFI, + mandatory=True, + question_type="select", + choices=NIC_CHOICES, + ) + return preset + + @staticmethod + def GetWifiNics(getInfo=True): + interfaces = psutil.net_if_addrs() + NIC_CHOICES = [] + + def appendNicChoice(interfaceName, nicip4="", nicip6="", nicmac=""): + NIC_CHOICES.append( + { + "name": WIFI_CHOICE_STRING.format( + interfaceName, + nicip4 if nicip4 else nicip6, # defaults to showing ipv4 + nicmac, + ), + "value": interfaceName, + } + ) + + ip4 = "" + ip6 = "" + mac = "" + + if OsD.IsLinux(): + for interface in interfaces: + p = OsD.RunCommand(["iw", "dev", interface, "link"]) + if ( + p.returncode > 0 + ): # if the returncode is 0 iwconfig succeeded, else continue with next interface + continue + if not getInfo: + appendNicChoice(interface) + continue + else: + nicinfo = interfaces[interface] # TODO Typehint + for nicaddr in nicinfo: + if nicaddr.family == AddressFamily.AF_INET: + ip4 = nicaddr.address + continue + elif nicaddr.family == AddressFamily.AF_INET6: + ip6 = nicaddr.address + continue + elif nicaddr.family == psutil.AF_LINK: + mac = nicaddr.address + continue + appendNicChoice(interface, ip4, ip6, mac) + return NIC_CHOICES + + elif OsD.IsMacos(): + for interface in interfaces: + p = OsD.RunCommand(["airport", interface]) + if ( + p.returncode > 0 + ): # if the returncode is 0 iwconfig succeeded, else continue with next interface + continue + nicinfo = interfaces[interface] # TODO Typehint + if not getInfo: + appendNicChoice(interface) + continue + else: + for nicaddr in nicinfo: + if nicaddr.family == AddressFamily.AF_INET: + ip4 = nicaddr.address + continue + elif nicaddr.family == AddressFamily.AF_INET6: + ip6 = nicaddr.address + continue + elif nicaddr.family == psutil.AF_LINK: + mac = nicaddr.address + continue + + appendNicChoice(interface, ip4, ip6, mac) + return NIC_CHOICES + + @classmethod + def CheckSystemSupport(cls): + if OsD.IsLinux(): + if not OsD.CommandExists("iw"): + raise Exception("iw not found") + wifiNics = Wifi.GetWifiNics(getInfo=False) + if not wifiNics: + raise Exception("no wireless interface found") + + elif OsD.IsWindows(): + # Windows support is WIP https://github.com/richibrics/IoTuring/pull/89 + raise Exception("Windows is not supported at the moment") + + elif OsD.IsMacos(): + if not OsD.CommandExists("/System/Library/PrivateFrameworks/Apple80211.framework/Versions/Current/Resources/airport"): + raise Exception("airport not found") + wifiNics = Wifi.GetWifiNics(getInfo=False) + if not wifiNics: + raise Exception("no wireless interface found") + + else: + raise Exception("OS detection failed") diff --git a/IoTuring/Entity/Entity.py b/IoTuring/Entity/Entity.py index 1f5f258ad..06369058f 100644 --- a/IoTuring/Entity/Entity.py +++ b/IoTuring/Entity/Entity.py @@ -187,29 +187,14 @@ def RunCommand(self, subprocess.CompletedProcess: See subprocess docs """ - # different defaults than in subprocess: - defaults = { - "capture_output": True, - "text": True - } - - for param, value in defaults.items(): - if param not in kwargs: - kwargs[param] = value - try: - if shell == False and isinstance(command, str): - runcommand = command.split() - else: - runcommand = command if command_name: command_name = self.NAME + "-" + command_name else: command_name = self.NAME - p = subprocess.run( - runcommand, shell=shell, **kwargs) + p = OsD.RunCommand(command, shell=shell, **kwargs) self.Log(self.LOG_DEBUG, f"Called {command_name} command: {p}") @@ -218,11 +203,12 @@ def RunCommand(self, if p.stderr: self.Log(error_loglevel, f"Error during {command_name} command: {p.stderr}") + + return p except Exception as e: raise Exception(f"Error during {command_name} command: {str(e)}") - return p @classmethod def CheckSystemSupport(cls): diff --git a/IoTuring/Entity/ValueFormat/ValueFormatter.py b/IoTuring/Entity/ValueFormat/ValueFormatter.py index ec943069e..01011d1e5 100644 --- a/IoTuring/Entity/ValueFormat/ValueFormatter.py +++ b/IoTuring/Entity/ValueFormat/ValueFormatter.py @@ -13,11 +13,14 @@ # Lists of measure units BYTE_SIZES = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] +BYTE_PER_SECOND_SIZES = ['Bps' ,'KBps', 'MBps', 'GBps', 'TBps', 'PBps'] +BIT_PER_SECOND_SIZES = ['bps' ,'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps'] TIME_SIZES = ['s', 'm', 'h', 'd'] FREQUENCY_SIZES = ['Hz', 'kHz', 'MHz', 'GHz'] TIME_SIZES_DIVIDERS = [1, 60, 60, 24] CELSIUS_UNIT = '°C' ROTATION = ['rpm'] +RADIOPOWER =['dBm'] SPACE_BEFORE_UNIT = ' ' @@ -53,11 +56,17 @@ def _ParseValue(value, options: ValueFormatterOptions | None, includeUnit: bool) return ValueFormatter.TemperatureCelsiusFormatter(value, options, includeUnit) elif valueType == ValueFormatterOptions.TYPE_ROTATION: return ValueFormatter.RoundsPerMinuteFormatter(value, options, includeUnit) + elif valueType ==ValueFormatterOptions.TYPE_RADIOPOWER: + return ValueFormatter.RadioPowerFormatter(value, options, includeUnit) elif valueType == ValueFormatterOptions.TYPE_PERCENTAGE: if includeUnit: return str(value) + SPACE_BEFORE_UNIT + '%' else: return str(value) + elif valueType == ValueFormatterOptions.TYPE_BIT_PER_SECOND: + return ValueFormatter.BitPerSecondFormatter(value, options, includeUnit) + elif valueType == ValueFormatterOptions.TYPE_BYTE_PER_SECOND: + return ValueFormatter.BytePerSecondFormatter(value, options, includeUnit) else: return str(value) @@ -168,8 +177,54 @@ def RoundsPerMinuteFormatter(value, options: ValueFormatterOptions, includeUnit: result = result + SPACE_BEFORE_UNIT + ROTATION[0] return result + @staticmethod + def RadioPowerFormatter(value, options: ValueFormatterOptions, includeUnit: bool): + + value = ValueFormatter.roundValue(value, options) + + if includeUnit: + return str(value) + SPACE_BEFORE_UNIT + 'dBm' + else: + return str(value) + + @staticmethod + def BytePerSecondFormatter(value, options: ValueFormatterOptions, includeUnit: bool): + # Get value in hertz, and adjustable + asked_size = options.get_adjust_size() + + if asked_size and asked_size in BYTE_PER_SECOND_SIZES: + index = BYTE_PER_SECOND_SIZES.index(asked_size) + value = value/pow(1000,index) + else: + index = 0 + + value = ValueFormatter.roundValue(value, options) + + if includeUnit: + return str(value) + SPACE_BEFORE_UNIT + BYTE_PER_SECOND_SIZES[index] + else: + return str(value) + + def BitPerSecondFormatter(value, options: ValueFormatterOptions, includeUnit: bool): + # Get value in hertz, and adjustable + asked_size = options.get_adjust_size() + + if asked_size and asked_size in BYTE_PER_SECOND_SIZES: + index = BIT_PER_SECOND_SIZES.index(asked_size) + value = value/pow(1000,index) + else: + index = 0 + + value = ValueFormatter.roundValue(value, options) + + if includeUnit: + return str(value) + SPACE_BEFORE_UNIT + BIT_PER_SECOND_SIZES[index] + else: + return str(value) + @staticmethod def roundValue(value, options: ValueFormatterOptions): if options.get_decimals() != ValueFormatterOptions.DO_NOT_TOUCH_DECIMALS: return round(value, options.get_decimals()) - return value \ No newline at end of file + return value + diff --git a/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py b/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py index 3fc676ecb..272f46c3b 100644 --- a/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py +++ b/IoTuring/Entity/ValueFormat/ValueFormatterOptions.py @@ -7,6 +7,9 @@ class ValueFormatterOptions(): TYPE_MILLISECONDS = 5 TYPE_TEMPERATURE = 6 TYPE_ROTATION = 7 + TYPE_RADIOPOWER = 8 + TYPE_BYTE_PER_SECOND = 9 + TYPE_BIT_PER_SECOND = 10 DO_NOT_TOUCH_DECIMALS = -1 diff --git a/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py b/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py index d34c2b947..07f6b9d86 100644 --- a/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py +++ b/IoTuring/MyApp/SystemConsts/OperatingSystemDetection.py @@ -1,7 +1,10 @@ +from __future__ import annotations + import platform import os import psutil import shutil +import subprocess class OperatingSystemDetection(): OS_NAME = platform.system() @@ -57,6 +60,42 @@ def GetEnv(cls, envvar) -> str: env_value = "" return env_value + @staticmethod + def RunCommand(command: str | list, + shell: bool = False, + **kwargs) -> subprocess.CompletedProcess: + """Safely call a subprocess. Kwargs are other Subprocess options + + Args: + command (str | list): The command to call + shell (bool, optional): Run in shell. Defaults to False. + **kwargs: subprocess args + + Returns: + subprocess.CompletedProcess: See subprocess docs + """ + + # different defaults than in subprocess: + defaults = { + "capture_output": True, + "text": True + } + + for param, value in defaults.items(): + if param not in kwargs: + kwargs[param] = value + + if shell == False and isinstance(command, str): + runcommand = command.split() + else: + runcommand = command + + p = subprocess.run( + runcommand, shell=shell, **kwargs) + + return p + + @staticmethod def CommandExists(command) -> bool: """Check if a command exists""" diff --git a/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml b/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml index c5afe11e0..f00f615f0 100644 --- a/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml +++ b/IoTuring/Warehouse/Deployments/HomeAssistantWarehouse/entities.yaml @@ -97,6 +97,8 @@ Volume: icon: mdi:volume-high unit_of_measurement: "%" custom_type: number +Wifi: + icon: mdi:wifi TerminalPayloadCommand: name: Terminal Command icon: mdi:console-line