diff --git a/zha/application/platforms/sensor/__init__.py b/zha/application/platforms/sensor/__init__.py index fcfa8558e..7b534e26c 100644 --- a/zha/application/platforms/sensor/__init__.py +++ b/zha/application/platforms/sensor/__init__.py @@ -11,7 +11,7 @@ import logging import numbers import typing -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from zhaquirks.danfoss import thermostat as danfoss_thermostat from zhaquirks.quirk_ids import DANFOSS_ALLY_THERMOSTAT @@ -21,7 +21,11 @@ from zigpy.zcl import foundation from zigpy.zcl.clusters.closures import WindowCovering from zigpy.zcl.clusters.general import Basic -from zigpy.zcl.clusters.smartenergy import Metering +from zigpy.zcl.clusters.smartenergy import ( + Metering, + MeteringUnitofMeasure, + NumberFormatting, +) from zha.application import Platform from zha.application.platforms import ( @@ -41,7 +45,10 @@ SensorDeviceClass, SensorStateClass, ) -from zha.application.platforms.sensor.helpers import resolution_to_decimal_precision +from zha.application.platforms.sensor.helpers import ( + create_number_formatter, + resolution_to_decimal_precision, +) from zha.application.registries import PLATFORM_ENTITIES from zha.decorators import periodic from zha.units import ( @@ -96,6 +103,12 @@ from zha.zigbee.device import Device from zha.zigbee.endpoint import Endpoint +DEFAULT_FORMATTING = NumberFormatting( + num_digits_right_of_decimal=1, + num_digits_left_of_decimal=15, + suppress_leading_zeros=1, +) + BATTERY_SIZES = { 0: "No battery", 1: "Built in", @@ -1100,9 +1113,43 @@ def state(self) -> dict[str, Any]: response["zcl_unit_of_measurement"] = self._cluster_handler.unit_of_measurement return response + @property + def _multiplier(self) -> int | float | None: + return self._cluster_handler.multiplier + + @_multiplier.setter + def _multiplier(self, value: int | float | None) -> None: + raise AttributeError("Cannot set multiplier directly") + + @property + def _divisor(self) -> int | float | None: + return self._cluster_handler.divisor + + @_divisor.setter + def _divisor(self, value: int | float | None) -> None: + raise AttributeError("Cannot set divisor directly") + def formatter(self, value: int) -> int | float: - """Pass through cluster handler formatter.""" - return self._cluster_handler.demand_formatter(value) + """Metering formatter.""" + # TODO: improve typing for base class + scaled_value = cast(float, super().formatter(value)) + + if ( + self._cluster_handler.unit_of_measurement + == MeteringUnitofMeasure.Kwh_and_Kwh_binary + ): + # Zigbee spec power unit is kW, but we show the value in W + value_watt = scaled_value * 1000 + if value_watt < 100: + return round(value_watt, 1) + return round(value_watt) + + demand_formater = create_number_formatter( + self._cluster_handler.demand_formatting + if self._cluster_handler.demand_formatting is not None + else DEFAULT_FORMATTING + ) + return float(demand_formater.format(scaled_value)) @dataclass(frozen=True, kw_only=True) @@ -1184,14 +1231,22 @@ class SmartEnergySummation(SmartEnergyMetering): } def formatter(self, value: int) -> int | float: - """Numeric pass-through formatter.""" - if self._cluster_handler.unit_of_measurement != 0: - return self._cluster_handler.summa_formatter(value) + """Metering summation formatter.""" + # TODO: improve typing for base class + scaled_value = cast(float, Sensor.formatter(self, value)) + + if ( + self._cluster_handler.unit_of_measurement + == MeteringUnitofMeasure.Kwh_and_Kwh_binary + ): + return scaled_value - return ( - float(self._cluster_handler.multiplier * value) - / self._cluster_handler.divisor + summation_formater = create_number_formatter( + self._cluster_handler.summation_formatting + if self._cluster_handler.summation_formatting is not None + else DEFAULT_FORMATTING ) + return float(summation_formater.format(scaled_value)) @MULTI_MATCH( diff --git a/zha/application/platforms/sensor/helpers.py b/zha/application/platforms/sensor/helpers.py index 316af4003..d7e1c3a60 100644 --- a/zha/application/platforms/sensor/helpers.py +++ b/zha/application/platforms/sensor/helpers.py @@ -1,7 +1,10 @@ """Helpers for sensor platform.""" +import functools from math import ceil, log10 +from zigpy.zcl.clusters.smartenergy import NumberFormatting + def resolution_to_decimal_precision( resolution: float, *, epsilon: float = 2**-23, max_digits: int = 16 @@ -27,3 +30,22 @@ def resolution_to_decimal_precision( # If nothing was found, fall back to the number of decimal places in epsilon return ceil(-log10(epsilon)) + + +@functools.lru_cache(maxsize=32) +def create_number_formatter(formatting: int) -> str: + """Return a formatting string, given the formatting value.""" + formatting_obj = NumberFormatting(formatting) + r_digits = formatting_obj.num_digits_right_of_decimal + l_digits = formatting_obj.num_digits_left_of_decimal + + if l_digits == 0: + l_digits = 15 + + width = r_digits + l_digits + (1 if r_digits > 0 else 0) + + if formatting_obj.suppress_leading_zeros: + # suppress leading 0 + return f"{{:{width}.{r_digits}f}}" + + return f"{{:0{width}.{r_digits}f}}" diff --git a/zha/zigbee/cluster_handlers/smartenergy.py b/zha/zigbee/cluster_handlers/smartenergy.py index 9b29a7111..723e4baaf 100644 --- a/zha/zigbee/cluster_handlers/smartenergy.py +++ b/zha/zigbee/cluster_handlers/smartenergy.py @@ -3,7 +3,6 @@ from __future__ import annotations import enum -from functools import partialmethod from typing import TYPE_CHECKING import zigpy.zcl @@ -17,7 +16,6 @@ MduPairing, Messaging, Metering, - NumberFormatting, Prepayment, Price, Tunneling, @@ -34,13 +32,6 @@ from zha.zigbee.endpoint import Endpoint -DEFAULT_FORMATTING = NumberFormatting( - num_digits_right_of_decimal=1, - num_digits_left_of_decimal=15, - suppress_leading_zeros=1, -) - - @registries.CLUSTER_HANDLER_REGISTRY.register(Calendar.cluster_id) class CalendarClusterHandler(ClusterHandler): """Calendar cluster handler.""" @@ -306,18 +297,15 @@ def unit_of_measurement(self) -> int: """Return unit of measurement.""" return self.cluster.get(Metering.AttributeDefs.unit_of_measure.name) - async def async_initialize_cluster_handler_specific(self, from_cache: bool) -> None: # pylint: disable=unused-argument - """Fetch config from device and updates format specifier.""" - - fmting = self.cluster.get( - Metering.AttributeDefs.demand_formatting.name, DEFAULT_FORMATTING - ) - self._format_spec = self.get_formatting(fmting) + @property + def demand_formatting(self) -> int | None: + """Return demand formatting.""" + return self.cluster.get(Metering.AttributeDefs.demand_formatting.name) - fmting = self.cluster.get( - Metering.AttributeDefs.summation_formatting.name, DEFAULT_FORMATTING - ) - self._summa_format = self.get_formatting(fmting) + @property + def summation_formatting(self) -> int | None: + """Return summation formatting.""" + return self.cluster.get(Metering.AttributeDefs.summation_formatting.name) async def async_update(self) -> None: """Retrieve latest state.""" @@ -330,44 +318,6 @@ async def async_update(self) -> None: ] await self.get_attributes(attrs, from_cache=False, only_cache=False) - @staticmethod - def get_formatting(formatting: int) -> str: - """Return a formatting string, given the formatting value.""" - formatting_obj = NumberFormatting(formatting) - r_digits = formatting_obj.num_digits_right_of_decimal - l_digits = formatting_obj.num_digits_left_of_decimal - - if l_digits == 0: - l_digits = 15 - - width = r_digits + l_digits + (1 if r_digits > 0 else 0) - - if formatting_obj.suppress_leading_zeros: - # suppress leading 0 - return f"{{:{width}.{r_digits}f}}" - - return f"{{:0{width}.{r_digits}f}}" - - def _formatter_function( - self, selector: FormatSelector, value: int - ) -> int | float | str: - """Return formatted value for display.""" - value_float = value * self.multiplier / self.divisor - if self.unit_of_measurement == 0: - # Zigbee spec power unit is kW, but we show the value in W - value_watt = value_float * 1000 - if value_watt < 100: - return round(value_watt, 1) - return round(value_watt) - if selector == self.FormatSelector.SUMMATION: - assert self._summa_format - return float(self._summa_format.format(value_float).lstrip()) - assert self._format_spec - return float(self._format_spec.format(value_float).lstrip()) - - demand_formatter = partialmethod(_formatter_function, FormatSelector.DEMAND) - summa_formatter = partialmethod(_formatter_function, FormatSelector.SUMMATION) - @registries.CLUSTER_HANDLER_REGISTRY.register(Prepayment.cluster_id) class PrepaymentClusterHandler(ClusterHandler):