-
Notifications
You must be signed in to change notification settings - Fork 807
Decompose attribute_name
tuple to new DP mapping list format
#3986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
b81860e
15db871
05e9bfb
f7f9186
3f2168b
91268af
c9fc19a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1456,6 +1456,46 @@ | |
converter: Callable[[Any], Any] | None = None | ||
endpoint_id: int | None = None | ||
|
||
def __init__( | ||
self, | ||
ep_attribute: str, | ||
attribute_name: str | tuple[str, ...], | ||
converter: Callable[[Any], Any] | None = None, | ||
endpoint_id: int | None = None, | ||
): | ||
"""Init DPToAttributeMapping.""" | ||
self.ep_attribute = ep_attribute | ||
self.attribute_name = attribute_name | ||
self.converter = converter | ||
self.endpoint_id = endpoint_id | ||
|
||
if not isinstance(attribute_name, str): | ||
_LOGGER.info( | ||
"Using tuple attribute_name is deprecated, please multiple " | ||
"DPToAttributeMapping instances instead. %s", | ||
attribute_name, | ||
) | ||
Comment on lines
+1473
to
+1477
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we may want to expand this message to include a message that (custom) ZHA quirks are affected and need to be updated. Maybe something like the following? "Using a tuple for Also, we may want to bump this to EDIT: Ah, if we still use this in our own codebase, maybe just |
||
|
||
def decompose_attributes(self) -> list[DPToAttributeMapping]: | ||
"""Decompose attributes into multiple mappings.""" | ||
|
||
def wrap_converter(converter: Callable[[Any], Any] | None, attr_index: int): | ||
if converter is None: | ||
return None | ||
return lambda args: converter(args)[attr_index] | ||
|
||
if isinstance(self.attribute_name, tuple): | ||
return [ | ||
DPToAttributeMapping( | ||
self.ep_attribute, | ||
attr_name, | ||
wrap_converter(self.converter, attr_index), | ||
self.endpoint_id, | ||
) | ||
for attr_index, attr_name in enumerate(self.attribute_name) | ||
] | ||
return [self] | ||
|
||
|
||
@dataclasses.dataclass | ||
class AttributeWithMask: | ||
|
@@ -1531,8 +1571,10 @@ | |
super().__init__(*args, **kwargs) | ||
|
||
self._dp_to_attributes: dict[int, list[DPToAttributeMapping]] = { | ||
dp: attr if isinstance(attr, list) else [attr] | ||
for dp, attr in self.dp_to_attribute.items() | ||
dp: [x for mapping in mappings for x in mapping.decompose_attributes()] | ||
if isinstance(mappings, list) | ||
else mappings.decompose_attributes() | ||
for dp, mappings in self.dp_to_attribute.items() | ||
} | ||
for dp_map in self._dp_to_attributes.values(): | ||
# get the endpoint that is being mapped to | ||
|
@@ -1650,15 +1692,9 @@ | |
if mapped_attr.converter: | ||
value = mapped_attr.converter(value) | ||
|
||
if isinstance(mapped_attr.attribute_name, tuple): | ||
for k, v in zip(mapped_attr.attribute_name, value): | ||
if isinstance(v, AttributeWithMask): | ||
v = cluster.get(k, 0) & (~v.mask) | v.value | ||
cluster.update_attribute(k, v) | ||
else: | ||
if isinstance(value, AttributeWithMask): | ||
value = ( | ||
cluster.get(mapped_attr.attribute_name, 0) & (~value.mask) | ||
| value.value | ||
) | ||
cluster.update_attribute(mapped_attr.attribute_name, value) | ||
if isinstance(value, AttributeWithMask): | ||
value = ( | ||
cluster.get(mapped_attr.attribute_name, 0) & (~value.mask) | ||
| value.value | ||
) | ||
cluster.update_attribute(mapped_attr.attribute_name, value) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
from collections.abc import Callable | ||
import datetime | ||
import logging | ||
from typing import Any | ||
|
||
import zigpy.types as t | ||
|
@@ -28,6 +29,8 @@ | |
TuyaTimePayload, | ||
) | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
# New manufacturer attributes | ||
ATTR_MCU_VERSION = 0xEF00 | ||
|
||
|
@@ -49,6 +52,12 @@ | |
"""Init method for compatibility with previous quirks using positional arguments.""" | ||
super().__init__(ep_attribute, attribute_name, converter, endpoint_id) | ||
self.dp_converter = dp_converter | ||
if dp_converter: | ||
_LOGGER.info( | ||
"DPToAttributeMapping with dp_converter is deprecated, use TuyaQuirksBuilder " | ||
"(or TuyaMCUCluster.attributes_to_dp_converters) instead. attribute_name: %s", | ||
attribute_name, | ||
) | ||
Comment on lines
+56
to
+60
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's |
||
|
||
|
||
class TuyaClusterData(t.Struct): | ||
|
@@ -121,6 +130,7 @@ | |
|
||
set_time_offset = 1970 # MCU timestamp from 1/1/1970 | ||
set_time_local_offset = None | ||
attributes_to_dp_converters: dict[int, Callable[[Any], Any]] = {} | ||
|
||
class AttributeDefs(TuyaNewManufCluster.AttributeDefs): | ||
"""Attribute Definitions.""" | ||
|
@@ -199,6 +209,21 @@ | |
def __init__(self, *args, **kwargs): | ||
"""Init.""" | ||
super().__init__(*args, **kwargs) | ||
|
||
self._attributes_to_dp_converters: dict[int, Callable[[Any], Any]] | ||
if self.attributes_to_dp_converters: | ||
self._attributes_to_dp_converters = self.attributes_to_dp_converters | ||
else: | ||
# convert from legacy DP2AttributeMapping with attribute_name tuple to new | ||
# DP2AttributeMapping with single attribute_name | ||
self._attributes_to_dp_converters = {} | ||
for dp, mappings in self.dp_to_attribute.items(): | ||
if not isinstance(mappings, list): | ||
mappings = [mappings] | ||
for dp_mapping in mappings: | ||
if hasattr(dp_mapping, "dp_converter") and dp_mapping.dp_converter: | ||
self._attributes_to_dp_converters[dp] = dp_mapping.dp_converter | ||
|
||
# Cluster for endpoint: 1 (listen MCU commands) | ||
self.endpoint.device.command_bus = Bus() | ||
self.endpoint.device.command_bus.add_listener(self) | ||
|
@@ -223,20 +248,19 @@ | |
cmd_payload.tsn = self.endpoint.device.application.get_sequence() | ||
|
||
val = data.attr_value | ||
if mapping.dp_converter: | ||
|
||
if attr_to_dp_converter := self._attributes_to_dp_converters.get(dp): | ||
args = [] | ||
if isinstance(mapping.attribute_name, tuple): | ||
for dp_attr in self._dp_to_attributes[dp]: | ||
if dp_attr.attribute_name == data.cluster_attr: | ||
args.append(val) | ||
continue | ||
endpoint = self.endpoint | ||
if mapping.endpoint_id: | ||
if dp_attr.endpoint_id: | ||
endpoint = endpoint.device.endpoints[mapping.endpoint_id] | ||
cluster = getattr(endpoint, mapping.ep_attribute) | ||
for attr in mapping.attribute_name: | ||
args.append( | ||
val if attr == data.cluster_attr else cluster.get(attr) | ||
) | ||
else: | ||
args.append(val) | ||
val = mapping.dp_converter(*args) | ||
cluster = getattr(endpoint, dp_attr.ep_attribute) | ||
args.append(cluster.get(dp_attr.attribute_name)) | ||
val = attr_to_dp_converter(*args) | ||
self.debug("value: %s", val) | ||
|
||
dpd = TuyaDatapointData(dp, val) | ||
|
@@ -285,21 +309,20 @@ | |
result: dict[int, DPToAttributeMapping] = {} | ||
for dp, dp_mapping in self._dp_to_attributes.items(): | ||
for mapped_attr in dp_mapping: | ||
if ( | ||
attribute_name == mapped_attr.attribute_name | ||
or ( | ||
isinstance(mapped_attr.attribute_name, tuple) | ||
and attribute_name in mapped_attr.attribute_name | ||
) | ||
) and ( | ||
if attribute_name != mapped_attr.attribute_name: | ||
continue | ||
if not ( | ||
( | ||
mapped_attr.endpoint_id is None | ||
and endpoint_id == self.endpoint.endpoint_id | ||
) | ||
or (endpoint_id == mapped_attr.endpoint_id) | ||
): | ||
self.debug("get_dp_mapping --> found DP: %s", dp) | ||
result[dp] = mapped_attr | ||
continue | ||
self.debug("get_dp_mapping --> found DP: %s", dp) | ||
result[dp] = mapped_attr | ||
break | ||
|
||
return result | ||
|
||
def handle_mcu_version_response(self, payload: MCUVersion) -> foundation.Status: | ||
|
@@ -315,7 +338,7 @@ | |
self.debug("handle_set_time_request payload: %s", payload) | ||
payload_rsp = TuyaTimePayload() | ||
|
||
utc_now = datetime.datetime.utcnow() # noqa: DTZ003 | ||
Check warning on line 341 in zhaquirks/tuya/mcu/__init__.py
|
||
now = datetime.datetime.now() | ||
|
||
offset_time = datetime.datetime(self.set_time_offset, 1, 1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The verb, likely "use", is missing here.