3030from zha .application .platforms .cover .const import (
3131 ATTR_CURRENT_POSITION ,
3232 ATTR_CURRENT_TILT_POSITION ,
33+ WCT ,
3334 CoverEntityFeature ,
3435 CoverState ,
3536)
3637from zha .exceptions import ZHAException
38+ from zha .zigbee .device import Device
3739
3840Default_Response = zcl_f .GENERAL_COMMANDS [zcl_f .GeneralCommand .Default_Response ].schema
3941
8486
8587WCAttrs = closures .WindowCovering .AttributeDefs
8688WCCmds = closures .WindowCovering .ServerCommandDefs
87- WCT = closures .WindowCovering .WindowCoveringType
8889WCCS = closures .WindowCovering .ConfigStatus
8990
9091
91- async def test_cover_non_tilt_initial_state ( # pylint: disable=unused-argument
92+ async def device_cover_mock (
9293 zha_gateway : Gateway ,
93- ) -> None :
94- """Test ZHA cover platform."""
95-
96- # load up cover domain
97- zigpy_cover_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
98- cluster = zigpy_cover_device .endpoints [1 ].window_covering
94+ current_position_lift_percentage : int | None ,
95+ current_position_tilt_percentage : int | None ,
96+ window_covering_type : WCT ,
97+ ) -> tuple [Device , zigpy .device .Device ]:
98+ """Return a mock zha cover device and its corresponding zipgy device."""
99+
100+ zigpy_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
101+ cluster = zigpy_device .endpoints [1 ].window_covering
99102 cluster .PLUGGED_ATTR_READS = {
100- WCAttrs .current_position_lift_percentage .name : 0 ,
101- WCAttrs .current_position_tilt_percentage .name : 0 , # to validate that this is overridden to None in the state attribute
102- WCAttrs .window_covering_type .name : WCT .Drapery ,
103+ WCAttrs .window_covering_type .name : window_covering_type ,
103104 WCAttrs .config_status .name : WCCS (~ WCCS .Open_up_commands_reversed ),
104105 }
106+ if current_position_lift_percentage is not None :
107+ cluster .PLUGGED_ATTR_READS [WCAttrs .current_position_lift_percentage .name ] = (
108+ current_position_lift_percentage
109+ )
110+ if current_position_tilt_percentage is not None :
111+ cluster .PLUGGED_ATTR_READS [WCAttrs .current_position_tilt_percentage .name ] = (
112+ current_position_tilt_percentage
113+ )
105114 update_attribute_cache (cluster )
106- zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_device )
115+ zha_device = await join_zigpy_device (zha_gateway , zigpy_device )
116+
117+ return (zha_device , zigpy_device )
107118
119+
120+ async def test_cover_non_tilt_initial_state ( # pylint: disable=unused-argument
121+ zha_gateway : Gateway ,
122+ ) -> None :
123+ """Test ZHA cover platform."""
124+
125+ # create mock cover device
126+ zha_device , zigpy_cover_device = await device_cover_mock (
127+ zha_gateway ,
128+ current_position_lift_percentage = 0 ,
129+ current_position_tilt_percentage = 0 , # to validate that this is overridden to None in the state attribute
130+ window_covering_type = WCT .Drapery ,
131+ )
132+
133+ cluster = zigpy_cover_device .endpoints [1 ].window_covering
108134 assert (
109135 not zha_device .endpoints [1 ]
110136 .all_cluster_handlers [f"1:0x{ cluster .cluster_id :04x} " ]
111137 .inverted
112138 )
113-
114139 assert cluster .read_attributes .call_count == 3
115140 assert (
116141 WCAttrs .current_position_lift_percentage .name
@@ -122,10 +147,9 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
122147 )
123148
124149 entity = get_entity (zha_device , platform = Platform .COVER )
125- state = entity .state
126- assert state ["state" ] == CoverState .OPEN
127- assert state [ATTR_CURRENT_POSITION ] == 100
128- assert state [ATTR_CURRENT_TILT_POSITION ] is None
150+ assert entity .state ["state" ] == CoverState .OPEN
151+ assert entity .state [ATTR_CURRENT_POSITION ] == 100
152+ assert entity .state [ATTR_CURRENT_TILT_POSITION ] is None
129153 assert entity .supported_features == (
130154 CoverEntityFeature .OPEN
131155 | CoverEntityFeature .CLOSE
@@ -134,16 +158,9 @@ async def test_cover_non_tilt_initial_state( # pylint: disable=unused-argument
134158 )
135159
136160 # test update
137- cluster .PLUGGED_ATTR_READS = {
138- WCAttrs .current_position_lift_percentage .name : 100 ,
139- WCAttrs .window_covering_type .name : WCT .Drapery ,
140- WCAttrs .config_status .name : WCCS (~ WCCS .Open_up_commands_reversed ),
141- }
142- update_attribute_cache (cluster )
143- prev_call_count = cluster .read_attributes .call_count
144- await entity .async_update ()
145- assert cluster .read_attributes .call_count == prev_call_count + 1
146-
161+ await send_attributes_report (
162+ zha_gateway , cluster , {WCAttrs .current_position_lift_percentage .id : 100 }
163+ )
147164 assert entity .state ["state" ] == CoverState .CLOSED
148165 assert entity .state [ATTR_CURRENT_POSITION ] == 0
149166
@@ -153,24 +170,20 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
153170) -> None :
154171 """Test ZHA cover platform."""
155172
156- # load up cover domain
157- zigpy_cover_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
158- cluster = zigpy_cover_device .endpoints [1 ].window_covering
159- cluster .PLUGGED_ATTR_READS = {
160- WCAttrs .current_position_lift_percentage .name : 0 , # to validate that this is overridden to None in the state attribute
161- WCAttrs .current_position_tilt_percentage .name : 0 ,
162- WCAttrs .window_covering_type .name : WCT .Tilt_blind_tilt_only ,
163- WCAttrs .config_status .name : WCCS (~ WCCS .Open_up_commands_reversed ),
164- }
165- update_attribute_cache (cluster )
166- zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_device )
173+ # create mock cover device
174+ zha_device , zigpy_cover_device = await device_cover_mock (
175+ zha_gateway ,
176+ current_position_lift_percentage = 0 , # to validate that this is overridden to None in the state attribute
177+ current_position_tilt_percentage = 0 ,
178+ window_covering_type = WCT .Tilt_blind_tilt_only ,
179+ )
167180
181+ cluster = zigpy_cover_device .endpoints [1 ].window_covering
168182 assert (
169183 not zha_device .endpoints [1 ]
170184 .all_cluster_handlers [f"1:0x{ cluster .cluster_id :04x} " ]
171185 .inverted
172186 )
173-
174187 assert cluster .read_attributes .call_count == 3
175188 assert (
176189 WCAttrs .current_position_lift_percentage .name
@@ -182,10 +195,9 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
182195 )
183196
184197 entity = get_entity (zha_device , platform = Platform .COVER )
185- state = entity .state
186- assert state ["state" ] == CoverState .OPEN
187- assert state [ATTR_CURRENT_POSITION ] is None
188- assert state [ATTR_CURRENT_TILT_POSITION ] == 100
198+ assert entity .state ["state" ] == CoverState .OPEN
199+ assert entity .state [ATTR_CURRENT_POSITION ] is None
200+ assert entity .state [ATTR_CURRENT_TILT_POSITION ] == 100
189201 assert entity .supported_features == (
190202 CoverEntityFeature .OPEN_TILT
191203 | CoverEntityFeature .CLOSE_TILT
@@ -194,16 +206,9 @@ async def test_cover_non_lift_initial_state( # pylint: disable=unused-argument
194206 )
195207
196208 # test update
197- cluster .PLUGGED_ATTR_READS = {
198- WCAttrs .current_position_tilt_percentage .name : 100 ,
199- WCAttrs .window_covering_type .name : WCT .Tilt_blind_tilt_only ,
200- WCAttrs .config_status .name : WCCS (~ WCCS .Open_up_commands_reversed ),
201- }
202- update_attribute_cache (cluster )
203- prev_call_count = cluster .read_attributes .call_count
204- await entity .async_update ()
205- assert cluster .read_attributes .call_count == prev_call_count + 1
206-
209+ await send_attributes_report (
210+ zha_gateway , cluster , {WCAttrs .current_position_tilt_percentage .id : 100 }
211+ )
207212 assert entity .state ["state" ] == CoverState .CLOSED
208213 assert entity .state [ATTR_CURRENT_TILT_POSITION ] == 0
209214
@@ -213,25 +218,21 @@ async def test_cover(
213218) -> None :
214219 """Test zha cover platform."""
215220
216- zigpy_cover_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
217- cluster = zigpy_cover_device .endpoints .get (1 ).window_covering
218- cluster .PLUGGED_ATTR_READS = {
219- WCAttrs .current_position_lift_percentage .name : 0 ,
220- WCAttrs .current_position_tilt_percentage .name : 42 ,
221- WCAttrs .window_covering_type .name : WCT .Tilt_blind_tilt_and_lift ,
222- WCAttrs .config_status .name : WCCS (~ WCCS .Open_up_commands_reversed ),
223- }
224- update_attribute_cache (cluster )
225- zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_device )
221+ # create mock cover device
222+ zha_device , zigpy_cover_device = await device_cover_mock (
223+ zha_gateway ,
224+ current_position_lift_percentage = 0 ,
225+ current_position_tilt_percentage = 42 ,
226+ window_covering_type = WCT .Tilt_blind_tilt_and_lift ,
227+ )
226228
229+ cluster = zigpy_cover_device .endpoints [1 ].window_covering
227230 assert (
228231 not zha_device .endpoints [1 ]
229232 .all_cluster_handlers [f"1:0x{ cluster .cluster_id :04x} " ]
230233 .inverted
231234 )
232-
233235 assert cluster .read_attributes .call_count == 3
234-
235236 assert (
236237 WCAttrs .current_position_lift_percentage .name
237238 in cluster .read_attributes .call_args [0 ][0 ]
@@ -686,16 +687,15 @@ async def test_cover(
686687async def test_cover_failures (zha_gateway : Gateway ) -> None :
687688 """Test ZHA cover platform failure cases."""
688689
689- # load up cover domain
690- zigpy_cover_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
691- cluster = zigpy_cover_device .endpoints [1 ].window_covering
692- cluster .PLUGGED_ATTR_READS = {
693- WCAttrs .current_position_tilt_percentage .name : 42 ,
694- WCAttrs .window_covering_type .name : WCT .Tilt_blind_tilt_and_lift ,
695- }
696- update_attribute_cache (cluster )
697- zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_device )
690+ # create mock cover device
691+ zha_device , zigpy_cover_device = await device_cover_mock (
692+ zha_gateway ,
693+ current_position_lift_percentage = None ,
694+ current_position_tilt_percentage = 42 ,
695+ window_covering_type = WCT .Tilt_blind_tilt_and_lift ,
696+ )
698697
698+ cluster = zigpy_cover_device .endpoints [1 ].window_covering
699699 entity = get_entity (zha_device , platform = Platform .COVER )
700700
701701 # test to see if it opens
@@ -1071,7 +1071,7 @@ async def test_keen_vent(
10711071async def test_cover_remote (zha_gateway : Gateway ) -> None :
10721072 """Test ZHA cover remote."""
10731073
1074- # load up cover domain
1074+ # create mock cover remote device
10751075 zigpy_cover_remote = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_REMOTE )
10761076 zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_remote )
10771077 zha_device .emit_zha_event = MagicMock (wraps = zha_device .emit_zha_event )
@@ -1103,18 +1103,58 @@ async def test_cover_remote(zha_gateway: Gateway) -> None:
11031103 assert zha_device .emit_zha_event .call_args [0 ][0 ][ATTR_COMMAND ] == "down_close"
11041104
11051105
1106+ @pytest .mark .parametrize (
1107+ "current_position_lift_percentage, current_position_tilt_percentage, restore_state, interim_state, final_state" ,
1108+ (
1109+ (0 , 0 , CoverState .OPENING , None , CoverState .OPEN ),
1110+ (0 , 0 , CoverState .CLOSING , CoverState .CLOSING , CoverState .OPEN ),
1111+ (100 , 100 , CoverState .OPENING , CoverState .OPENING , CoverState .CLOSED ),
1112+ (100 , 100 , CoverState .CLOSING , None , CoverState .CLOSED ),
1113+ (0 , 0 , CoverState .OPEN , None , CoverState .OPEN ),
1114+ (0 , 0 , CoverState .CLOSED , None , CoverState .OPEN ),
1115+ (100 , 100 , CoverState .OPEN , None , CoverState .CLOSED ),
1116+ (100 , 100 , CoverState .CLOSED , None , CoverState .CLOSED ),
1117+ (0 , None , CoverState .OPENING , None , CoverState .OPEN ),
1118+ (None , 0 , CoverState .OPENING , None , CoverState .OPEN ),
1119+ (None , None , CoverState .OPENING , None , None ),
1120+ ),
1121+ )
11061122async def test_cover_state_restoration (
11071123 zha_gateway : Gateway ,
1124+ current_position_lift_percentage : int | None ,
1125+ current_position_tilt_percentage : int | None ,
1126+ restore_state : CoverState | None ,
1127+ interim_state : CoverState | None ,
1128+ final_state : CoverState | None ,
11081129) -> None :
1109- """Test the cover state restoration."""
1110- zigpy_cover_device = create_mock_zigpy_device (zha_gateway , ZIGPY_COVER_DEVICE )
1111- zha_device = await join_zigpy_device (zha_gateway , zigpy_cover_device )
1112- entity = get_entity (zha_device , platform = Platform .COVER )
1130+ """Test the cover state restoration function."""
11131131
1114- assert entity .state ["state" ] != CoverState .CLOSED
1132+ # create mock cover device
1133+ zha_device , zigpy_cover_device = await device_cover_mock (
1134+ zha_gateway ,
1135+ current_position_lift_percentage ,
1136+ current_position_tilt_percentage ,
1137+ WCT .Tilt_blind_tilt_and_lift ,
1138+ )
11151139
1116- entity .restore_external_state_attributes (
1117- state = CoverState .CLOSED ,
1140+ current_position = (
1141+ 100 - current_position_lift_percentage
1142+ if current_position_lift_percentage is not None
1143+ else None
1144+ )
1145+ current_tilt_position = (
1146+ 100 - current_position_tilt_percentage
1147+ if current_position_tilt_percentage is not None
1148+ else None
11181149 )
11191150
1120- assert entity .state ["state" ] == CoverState .CLOSED
1151+ entity = get_entity (zha_device , platform = Platform .COVER )
1152+ assert entity .state ["state" ] == final_state
1153+ assert entity .state [ATTR_CURRENT_POSITION ] == current_position
1154+ assert entity .state [ATTR_CURRENT_TILT_POSITION ] == current_tilt_position
1155+
1156+ entity .restore_external_state_attributes (state = restore_state )
1157+ if interim_state :
1158+ assert entity .state ["state" ] == interim_state
1159+ await asyncio .sleep (Cover .DEFAULT_MOVEMENT_TIMEOUT )
1160+ assert entity .state ["state" ] == final_state
0 commit comments