Skip to content

Improve cover state restoration from the HA entity cache #412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 123 additions & 83 deletions tests/test_cover.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
from zha.application.platforms.cover.const import (
ATTR_CURRENT_POSITION,
ATTR_CURRENT_TILT_POSITION,
WCT,
CoverEntityFeature,
CoverState,
)
from zha.exceptions import ZHAException
from zha.zigbee.device import Device

Default_Response = zcl_f.GENERAL_COMMANDS[zcl_f.GeneralCommand.Default_Response].schema

Expand Down Expand Up @@ -88,33 +90,56 @@

WCAttrs = closures.WindowCovering.AttributeDefs
WCCmds = closures.WindowCovering.ServerCommandDefs
WCT = closures.WindowCovering.WindowCoveringType
WCCS = closures.WindowCovering.ConfigStatus


async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
async def device_cover_mock(
zha_gateway: Gateway,
) -> None:
"""Test ZHA cover platform."""

# load up cover domain
zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
cluster = zigpy_cover_device.endpoints[1].window_covering
current_position_lift_percentage: int | None,
current_position_tilt_percentage: int | None,
window_covering_type: WCT,
) -> tuple[Device, zigpy.device.Device]:
"""Return a mock zha cover device and its corresponding zipgy device."""

zigpy_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
cluster = zigpy_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 0,
WCAttrs.current_position_tilt_percentage.name: 0, # to validate that this is overridden to None in the state attribute
WCAttrs.window_covering_type.name: WCT.Drapery,
WCAttrs.window_covering_type.name: window_covering_type,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
if current_position_lift_percentage is not None:
cluster.PLUGGED_ATTR_READS[WCAttrs.current_position_lift_percentage.name] = (
current_position_lift_percentage
)
if current_position_tilt_percentage is not None:
cluster.PLUGGED_ATTR_READS[WCAttrs.current_position_tilt_percentage.name] = (
current_position_tilt_percentage
)
update_attribute_cache(cluster)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device)
zha_device = await join_zigpy_device(zha_gateway, zigpy_device)

return (zha_device, zigpy_device)


async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
zha_gateway: Gateway,
) -> None:
"""Test ZHA cover platform."""

# create mock cover device
zha_device, zigpy_cover_device = await device_cover_mock(
zha_gateway,
current_position_lift_percentage=0,
current_position_tilt_percentage=0, # to validate that this is overridden to None in the state attribute
window_covering_type=WCT.Drapery,
)

cluster = zigpy_cover_device.endpoints[1].window_covering
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)

assert cluster.read_attributes.call_count == 3
assert (
WCAttrs.current_position_lift_percentage.name
Expand All @@ -126,10 +151,9 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
)

entity = get_entity(zha_device, platform=Platform.COVER)
state = entity.state
assert state["state"] == CoverState.OPEN
assert state[ATTR_CURRENT_POSITION] == 100
assert state[ATTR_CURRENT_TILT_POSITION] is None
assert entity.state["state"] == CoverState.OPEN
assert entity.state[ATTR_CURRENT_POSITION] == 100
assert entity.state[ATTR_CURRENT_TILT_POSITION] is None
assert entity.supported_features == (
CoverEntityFeature.OPEN
| CoverEntityFeature.CLOSE
Expand All @@ -138,16 +162,9 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
)

# test update
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 100,
WCAttrs.window_covering_type.name: WCT.Drapery,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
prev_call_count = cluster.read_attributes.call_count
await entity.async_update()
assert cluster.read_attributes.call_count == prev_call_count + 1

await send_attributes_report(
zha_gateway, cluster, {WCAttrs.current_position_lift_percentage.id: 100}
)
assert entity.state["state"] == CoverState.CLOSED
assert entity.state[ATTR_CURRENT_POSITION] == 0

Expand All @@ -157,24 +174,20 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
) -> None:
"""Test ZHA cover platform."""

# load up cover domain
zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 0, # to validate that this is overridden to None in the state attribute
WCAttrs.current_position_tilt_percentage.name: 0,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_only,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device)
# create mock cover device
zha_device, zigpy_cover_device = await device_cover_mock(
zha_gateway,
current_position_lift_percentage=0, # to validate that this is overridden to None in the state attribute
current_position_tilt_percentage=0,
window_covering_type=WCT.Tilt_blind_tilt_only,
)

cluster = zigpy_cover_device.endpoints[1].window_covering
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)

assert cluster.read_attributes.call_count == 3
assert (
WCAttrs.current_position_lift_percentage.name
Expand All @@ -186,10 +199,9 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
)

entity = get_entity(zha_device, platform=Platform.COVER)
state = entity.state
assert state["state"] == CoverState.OPEN
assert state[ATTR_CURRENT_POSITION] is None
assert state[ATTR_CURRENT_TILT_POSITION] == 100
assert entity.state["state"] == CoverState.OPEN
assert entity.state[ATTR_CURRENT_POSITION] is None
assert entity.state[ATTR_CURRENT_TILT_POSITION] == 100
assert entity.supported_features == (
CoverEntityFeature.OPEN_TILT
| CoverEntityFeature.CLOSE_TILT
Expand All @@ -198,16 +210,9 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
)

# test update
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_tilt_percentage.name: 100,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_only,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
prev_call_count = cluster.read_attributes.call_count
await entity.async_update()
assert cluster.read_attributes.call_count == prev_call_count + 1

await send_attributes_report(
zha_gateway, cluster, {WCAttrs.current_position_tilt_percentage.id: 100}
)
assert entity.state["state"] == CoverState.CLOSED
assert entity.state[ATTR_CURRENT_TILT_POSITION] == 0

Expand All @@ -217,25 +222,21 @@ async def test_cover(
) -> None:
"""Test zha cover platform."""

zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
cluster = zigpy_cover_device.endpoints.get(1).window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_lift_percentage.name: 0,
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
WCAttrs.config_status.name: WCCS(~WCCS.Open_up_commands_reversed),
}
update_attribute_cache(cluster)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device)
# create mock cover device
zha_device, zigpy_cover_device = await device_cover_mock(
zha_gateway,
current_position_lift_percentage=0,
current_position_tilt_percentage=42,
window_covering_type=WCT.Tilt_blind_tilt_and_lift,
)

cluster = zigpy_cover_device.endpoints[1].window_covering
assert (
not zha_device.endpoints[1]
.all_cluster_handlers[f"1:0x{cluster.cluster_id:04x}"]
.inverted
)

assert cluster.read_attributes.call_count == 3

assert (
WCAttrs.current_position_lift_percentage.name
in cluster.read_attributes.call_args[0][0]
Expand Down Expand Up @@ -688,16 +689,15 @@ async def test_cover(
async def test_cover_failures(zha_gateway: Gateway) -> None:
"""Test ZHA cover platform failure cases."""

# load up cover domain
zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
cluster = zigpy_cover_device.endpoints[1].window_covering
cluster.PLUGGED_ATTR_READS = {
WCAttrs.current_position_tilt_percentage.name: 42,
WCAttrs.window_covering_type.name: WCT.Tilt_blind_tilt_and_lift,
}
update_attribute_cache(cluster)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device)
# create mock cover device
zha_device, zigpy_cover_device = await device_cover_mock(
zha_gateway,
current_position_lift_percentage=None,
current_position_tilt_percentage=42,
window_covering_type=WCT.Tilt_blind_tilt_and_lift,
)

cluster = zigpy_cover_device.endpoints[1].window_covering
entity = get_entity(zha_device, platform=Platform.COVER)

# test to see if it opens
Expand Down Expand Up @@ -1074,7 +1074,7 @@ async def test_keen_vent(
async def test_cover_remote(zha_gateway: Gateway) -> None:
"""Test ZHA cover remote."""

# load up cover domain
# create mock cover remote device
zigpy_cover_remote = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_REMOTE)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_remote)
zha_device.emit_zha_event = MagicMock(wraps=zha_device.emit_zha_event)
Expand Down Expand Up @@ -1106,18 +1106,58 @@ async def test_cover_remote(zha_gateway: Gateway) -> None:
assert zha_device.emit_zha_event.call_args[0][0][ATTR_COMMAND] == "down_close"


@pytest.mark.parametrize(
"current_position_lift_percentage, current_position_tilt_percentage, restore_state, interim_state, final_state",
(
(0, 0, CoverState.OPENING, None, CoverState.OPEN),
(0, 0, CoverState.CLOSING, CoverState.CLOSING, CoverState.OPEN),
(100, 100, CoverState.OPENING, CoverState.OPENING, CoverState.CLOSED),
(100, 100, CoverState.CLOSING, None, CoverState.CLOSED),
(0, 0, CoverState.OPEN, None, CoverState.OPEN),
(0, 0, CoverState.CLOSED, None, CoverState.OPEN),
(100, 100, CoverState.OPEN, None, CoverState.CLOSED),
(100, 100, CoverState.CLOSED, None, CoverState.CLOSED),
(0, None, CoverState.OPENING, None, CoverState.OPEN),
(None, 0, CoverState.OPENING, None, CoverState.OPEN),
(None, None, CoverState.OPENING, None, None),
),
)
async def test_cover_state_restoration(
zha_gateway: Gateway,
current_position_lift_percentage: int | None,
current_position_tilt_percentage: int | None,
restore_state: CoverState | None,
interim_state: CoverState | None,
final_state: CoverState | None,
) -> None:
"""Test the cover state restoration."""
zigpy_cover_device = create_mock_zigpy_device(zha_gateway, ZIGPY_COVER_DEVICE)
zha_device = await join_zigpy_device(zha_gateway, zigpy_cover_device)
entity = get_entity(zha_device, platform=Platform.COVER)
"""Test the cover state restoration function."""

assert entity.state["state"] != CoverState.CLOSED
# create mock cover device
zha_device, zigpy_cover_device = await device_cover_mock(
zha_gateway,
current_position_lift_percentage,
current_position_tilt_percentage,
WCT.Tilt_blind_tilt_and_lift,
)

entity.restore_external_state_attributes(
state=CoverState.CLOSED,
current_position = (
100 - current_position_lift_percentage
if current_position_lift_percentage is not None
else None
)
current_tilt_position = (
100 - current_position_tilt_percentage
if current_position_tilt_percentage is not None
else None
)

assert entity.state["state"] == CoverState.CLOSED
entity = get_entity(zha_device, platform=Platform.COVER)
assert entity.state["state"] == final_state
assert entity.state[ATTR_CURRENT_POSITION] == current_position
assert entity.state[ATTR_CURRENT_TILT_POSITION] == current_tilt_position

entity.restore_external_state_attributes(state=restore_state)
if interim_state:
assert entity.state["state"] == interim_state
await asyncio.sleep(DEFAULT_MOVEMENT_TIMEOUT)
assert entity.state["state"] == final_state
28 changes: 23 additions & 5 deletions zha/application/platforms/cover/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def __init__(
self._lift_transition_timer: asyncio.TimerHandle | None = None
self._tilt_transition_timer: asyncio.TimerHandle | None = None

self._state: CoverState | None = CoverState.OPEN
self._state: CoverState | None = None
self._determine_cover_state(refresh=True)

def recompute_capabilities(self) -> None:
Expand Down Expand Up @@ -213,12 +213,30 @@ def restore_external_state_attributes(
self,
*,
state: CoverState | None,
target_lift_position: int | None = None,
target_tilt_position: int | None = None,
**kwargs: Any, # pylint: disable=unused-argument
):
"""Restore external state attributes."""
"""Restore external state attributes.

If the state is OPENING or CLOSING, a callback is scheduled
to determine the final state after the default timeout period.
"""
if not self._state or state not in (CoverState.OPENING, CoverState.CLOSING):
return
if state == CoverState.CLOSING and self.is_closed:
return
if (
state == CoverState.OPENING
and self._state == CoverState.OPEN
and self.current_cover_position in (100, None)
and self.current_cover_tilt_position in (100, None)
):
return

self._state = state
# Target positions have been removed
self._loop.call_later(
DEFAULT_MOVEMENT_TIMEOUT,
functools.partial(self._determine_cover_state, refresh=True),
)

@property
def supported_features(self) -> CoverEntityFeature:
Expand Down
Loading