Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 66 additions & 11 deletions zha/application/platforms/sensor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import logging
import numbers
import typing
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

from zhaquirks.danfoss import thermostat as danfoss_thermostat
from zhaquirks.quirk_ids import DANFOSS_ALLY_THERMOSTAT
Expand All @@ -21,7 +21,11 @@
from zigpy.zcl import foundation
from zigpy.zcl.clusters.closures import WindowCovering
from zigpy.zcl.clusters.general import Basic
from zigpy.zcl.clusters.smartenergy import Metering
from zigpy.zcl.clusters.smartenergy import (
Metering,
MeteringUnitofMeasure,
NumberFormatting,
)

from zha.application import Platform
from zha.application.platforms import (
Expand All @@ -41,7 +45,10 @@
SensorDeviceClass,
SensorStateClass,
)
from zha.application.platforms.sensor.helpers import resolution_to_decimal_precision
from zha.application.platforms.sensor.helpers import (
create_number_formatter,
resolution_to_decimal_precision,
)
from zha.application.registries import PLATFORM_ENTITIES
from zha.decorators import periodic
from zha.units import (
Expand Down Expand Up @@ -96,6 +103,12 @@
from zha.zigbee.device import Device
from zha.zigbee.endpoint import Endpoint

DEFAULT_FORMATTING = NumberFormatting(
num_digits_right_of_decimal=1,
num_digits_left_of_decimal=15,
suppress_leading_zeros=1,
)

BATTERY_SIZES = {
0: "No battery",
1: "Built in",
Expand Down Expand Up @@ -1100,9 +1113,43 @@ def state(self) -> dict[str, Any]:
response["zcl_unit_of_measurement"] = self._cluster_handler.unit_of_measurement
return response

@property
def _multiplier(self) -> int | float | None:
return self._cluster_handler.multiplier

@_multiplier.setter
def _multiplier(self, value: int | float | None) -> None:
raise AttributeError("Cannot set multiplier directly")

@property
def _divisor(self) -> int | float | None:
return self._cluster_handler.divisor

@_divisor.setter
def _divisor(self, value: int | float | None) -> None:
raise AttributeError("Cannot set divisor directly")

def formatter(self, value: int) -> int | float:
"""Pass through cluster handler formatter."""
return self._cluster_handler.demand_formatter(value)
"""Metering formatter."""
# TODO: improve typing for base class
scaled_value = cast(float, super().formatter(value))

if (
self._cluster_handler.unit_of_measurement
== MeteringUnitofMeasure.Kwh_and_Kwh_binary
):
# Zigbee spec power unit is kW, but we show the value in W
value_watt = scaled_value * 1000
if value_watt < 100:
return round(value_watt, 1)
return round(value_watt)

demand_formater = create_number_formatter(
self._cluster_handler.demand_formatting
if self._cluster_handler.demand_formatting is not None
else DEFAULT_FORMATTING
)
return float(demand_formater.format(scaled_value))


@dataclass(frozen=True, kw_only=True)
Expand Down Expand Up @@ -1184,14 +1231,22 @@ class SmartEnergySummation(SmartEnergyMetering):
}

def formatter(self, value: int) -> int | float:
"""Numeric pass-through formatter."""
if self._cluster_handler.unit_of_measurement != 0:
return self._cluster_handler.summa_formatter(value)
"""Metering summation formatter."""
# TODO: improve typing for base class
scaled_value = cast(float, Sensor.formatter(self, value))

if (
self._cluster_handler.unit_of_measurement
== MeteringUnitofMeasure.Kwh_and_Kwh_binary
):
return scaled_value

return (
float(self._cluster_handler.multiplier * value)
/ self._cluster_handler.divisor
summation_formater = create_number_formatter(
self._cluster_handler.summation_formatting
if self._cluster_handler.summation_formatting is not None
else DEFAULT_FORMATTING
)
return float(summation_formater.format(scaled_value))


@MULTI_MATCH(
Expand Down
22 changes: 22 additions & 0 deletions zha/application/platforms/sensor/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Helpers for sensor platform."""

import functools
from math import ceil, log10

from zigpy.zcl.clusters.smartenergy import NumberFormatting


def resolution_to_decimal_precision(
resolution: float, *, epsilon: float = 2**-23, max_digits: int = 16
Expand All @@ -27,3 +30,22 @@ def resolution_to_decimal_precision(

# If nothing was found, fall back to the number of decimal places in epsilon
return ceil(-log10(epsilon))


@functools.lru_cache(maxsize=32)
def create_number_formatter(formatting: int) -> str:
"""Return a formatting string, given the formatting value."""
formatting_obj = NumberFormatting(formatting)
r_digits = formatting_obj.num_digits_right_of_decimal
l_digits = formatting_obj.num_digits_left_of_decimal

if l_digits == 0:
l_digits = 15

width = r_digits + l_digits + (1 if r_digits > 0 else 0)

if formatting_obj.suppress_leading_zeros:
# suppress leading 0
return f"{{:{width}.{r_digits}f}}"

return f"{{:0{width}.{r_digits}f}}"
66 changes: 8 additions & 58 deletions zha/zigbee/cluster_handlers/smartenergy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import annotations

import enum
from functools import partialmethod
from typing import TYPE_CHECKING

import zigpy.zcl
Expand All @@ -17,7 +16,6 @@
MduPairing,
Messaging,
Metering,
NumberFormatting,
Prepayment,
Price,
Tunneling,
Expand All @@ -34,13 +32,6 @@
from zha.zigbee.endpoint import Endpoint


DEFAULT_FORMATTING = NumberFormatting(
num_digits_right_of_decimal=1,
num_digits_left_of_decimal=15,
suppress_leading_zeros=1,
)


@registries.CLUSTER_HANDLER_REGISTRY.register(Calendar.cluster_id)
class CalendarClusterHandler(ClusterHandler):
"""Calendar cluster handler."""
Expand Down Expand Up @@ -306,18 +297,15 @@ def unit_of_measurement(self) -> int:
"""Return unit of measurement."""
return self.cluster.get(Metering.AttributeDefs.unit_of_measure.name)

async def async_initialize_cluster_handler_specific(self, from_cache: bool) -> None: # pylint: disable=unused-argument
"""Fetch config from device and updates format specifier."""

fmting = self.cluster.get(
Metering.AttributeDefs.demand_formatting.name, DEFAULT_FORMATTING
)
self._format_spec = self.get_formatting(fmting)
@property
def demand_formatting(self) -> int | None:
"""Return demand formatting."""
return self.cluster.get(Metering.AttributeDefs.demand_formatting.name)

fmting = self.cluster.get(
Metering.AttributeDefs.summation_formatting.name, DEFAULT_FORMATTING
)
self._summa_format = self.get_formatting(fmting)
@property
def summation_formatting(self) -> int | None:
"""Return summation formatting."""
return self.cluster.get(Metering.AttributeDefs.summation_formatting.name)

async def async_update(self) -> None:
"""Retrieve latest state."""
Expand All @@ -330,44 +318,6 @@ async def async_update(self) -> None:
]
await self.get_attributes(attrs, from_cache=False, only_cache=False)

@staticmethod
def get_formatting(formatting: int) -> str:
"""Return a formatting string, given the formatting value."""
formatting_obj = NumberFormatting(formatting)
r_digits = formatting_obj.num_digits_right_of_decimal
l_digits = formatting_obj.num_digits_left_of_decimal

if l_digits == 0:
l_digits = 15

width = r_digits + l_digits + (1 if r_digits > 0 else 0)

if formatting_obj.suppress_leading_zeros:
# suppress leading 0
return f"{{:{width}.{r_digits}f}}"

return f"{{:0{width}.{r_digits}f}}"

def _formatter_function(
self, selector: FormatSelector, value: int
) -> int | float | str:
"""Return formatted value for display."""
value_float = value * self.multiplier / self.divisor
if self.unit_of_measurement == 0:
# Zigbee spec power unit is kW, but we show the value in W
value_watt = value_float * 1000
if value_watt < 100:
return round(value_watt, 1)
return round(value_watt)
if selector == self.FormatSelector.SUMMATION:
assert self._summa_format
return float(self._summa_format.format(value_float).lstrip())
assert self._format_spec
return float(self._format_spec.format(value_float).lstrip())

demand_formatter = partialmethod(_formatter_function, FormatSelector.DEMAND)
summa_formatter = partialmethod(_formatter_function, FormatSelector.SUMMATION)


@registries.CLUSTER_HANDLER_REGISTRY.register(Prepayment.cluster_id)
class PrepaymentClusterHandler(ClusterHandler):
Expand Down
Loading