diff --git a/kuksa-client/kuksa_client/grpc/__init__.py b/kuksa-client/kuksa_client/grpc/__init__.py index 59cd2d0..fdd1dc6 100644 --- a/kuksa-client/kuksa_client/grpc/__init__.py +++ b/kuksa-client/kuksa_client/grpc/__init__.py @@ -1,5 +1,5 @@ ######################################################################## -# Copyright (c) 2022 Robert Bosch GmbH +# Copyright (c) 2022-2025 Robert Bosch GmbH # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -690,13 +690,16 @@ def from_message(cls, message: val_v1.EntryUpdate): @classmethod def from_tuple(cls, path: str, dp: types_v2.Datapoint): - # we assume here that only one field of Value is set -> we use the first entry. - # This should always be the case. + # we assume here that at max one field of Value is set -> we use the first entry. + # If no field is set the value is currently unknown/not avaialable -> set to None. data = dp.value.ListFields() - field_descriptor, value = data[0] - field_name = field_descriptor.name - value = getattr(dp.value, field_name) - if dp.timestamp.seconds == 0 and dp.timestamp.nanos == 0: + if data: + field_descriptor, value = data[0] + field_name = field_descriptor.name + value = getattr(dp.value, field_name) + else: + value = None + if dp.timestamp is None or (dp.timestamp.seconds == 0 and dp.timestamp.nanos == 0): timestamp = None else: timestamp = dp.timestamp.ToDatetime( @@ -709,6 +712,21 @@ def from_tuple(cls, path: str, dp: types_v2.Datapoint): fields=[Field(value=types_v1.FIELD_VALUE)], ) + @classmethod + def from_actuate_value(cls, path: str, value: types_v2.Value): + # we assume here that exactly one field of Value is set -> we use the first entry. + # This should always be the case. + data = value.ListFields() + field_descriptor, target_value = data[0] + field_name = field_descriptor.name + target_value = getattr(value, field_name) + return cls( + entry=DataEntry( + path=path, actuator_target=Datapoint(value=target_value) + ), + fields=[Field(value=types_v1.FIELD_ACTUATOR_TARGET)], + ) + def to_message(self) -> val_v1.EntryUpdate: message = val_v1.EntryUpdate(entry=self.entry.to_message()) message.fields.extend(field.value for field in self.fields) @@ -750,7 +768,6 @@ def __init__( connected: bool = False, tls_server_name: Optional[str] = None, ): - self.authorization_header = self.get_authorization_header(token) self.target_host = f"{host}:{port}" self.root_certificates = root_certificates @@ -868,26 +885,29 @@ def _prepare_subscribe_request( logger.debug("%s: %s", type(req).__name__, req) return req - def _prepare_subscribev2_request( - self, - entries: Iterable[SubscribeEntry], + def _prepare_v2_subscribe_request( + self, paths: Iterable[str] ) -> val_v2.SubscribeRequest: - paths = [] - for entry in entries: - paths.append(entry.path) + req = val_v2.SubscribeRequest(signal_paths=paths) + logger.debug("%s: %s", type(req).__name__, req) + return req - for field in entry.fields: - if field != Field.VALUE: - raise VSSClientError( - error={ - "code": grpc.StatusCode.INVALID_ARGUMENT.value[0], - "reason": grpc.StatusCode.INVALID_ARGUMENT.value[1], - "message": "Cannot use v2 if specifiying fields other than value", - }, - errors=[], - ) + def _prepare_v2_provide_actuation_request( + self, + paths: Iterable[str], + ) -> List[val_v2.OpenProviderStreamRequest]: + signals = [] + for path in paths: + signals.append(types_v2.SignalID(path=path)) + provide_req = val_v2.ProvideActuationRequest(actuator_identifiers=signals) + req = val_v2.OpenProviderStreamRequest(provide_actuation_request=provide_req) + logger.debug("%s: %s", type(req).__name__, req) + return [req] - req = val_v2.SubscribeRequest(signal_paths=paths) + def _prepare_v2_list_metadata_request( + self, path: str + ) -> val_v2.ListMetadataRequest: + req = val_v2.ListMetadataRequest(root=path) logger.debug("%s: %s", type(req).__name__, req) return req @@ -947,6 +967,8 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.channel = None self.exit_stack = contextlib.ExitStack() + self.path_to_id_mapping: Dict[str, int] = dict() + self.id_to_path_mapping: Dict[int, str] = dict() def __enter__(self): self.connect() @@ -969,6 +991,12 @@ def wrapper(self, *args, **kwargs): return wrapper def connect(self, target_host=None): + # Reset the id mapping on each new connection to the data broker because the broker + # could have been restarted and assigned new ids to paths in between. + # Furthermore, the specified target host could have changed. + self.path_to_id_mapping.clear() + self.id_to_path_mapping.clear() + creds = self._load_creds() if target_host is None: target_host = self.target_host @@ -1161,15 +1189,25 @@ def subscribe_current_values( for path, dp in updates.items(): print(f"Current value for {path} is now: {dp.value}") """ - for updates in self.subscribe( - entries=( - SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) - for path in paths - ), - try_v2=True, - **rpc_kwargs, - ): - yield {update.entry.path: update.entry.value for update in updates} + try: + logger.debug("Try to subscribe current values via v2") + for updates in self.v2_subscribe(paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: + raise + + logger.debug("v2 not available - falling back to v1 subscribe current values") + for updates in self.subscribe( + entries=( + SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) + for path in paths + ), + **rpc_kwargs, + ): + yield {update.entry.path: update.entry.value for update in updates} @check_connected def subscribe_target_values( @@ -1186,16 +1224,27 @@ def subscribe_target_values( for path, dp in updates.items(): print(f"Target value for {path} is now: {dp.value}") """ - for updates in self.subscribe( - entries=( - SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)) - for path in paths - ), - **rpc_kwargs, - ): - yield { - update.entry.path: update.entry.actuator_target for update in updates - } + try: + logger.debug("Try to subscribe actuation requests via v2") + for updates in self.v2_subscribe_actuation_requests(paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.actuator_target for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: + raise + + logger.debug("v2 not available - falling back to v1 subscribe target values") + for updates in self.subscribe( + entries=( + SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)) + for path in paths + ), + **rpc_kwargs, + ): + yield { + update.entry.path: update.entry.actuator_target for update in updates + } @check_connected def subscribe_metadata( @@ -1295,6 +1344,41 @@ def set( raise VSSClientError.from_grpc_error(exc) from exc self._process_set_response(resp) + def get_path(self, signal_id: types_v2.SignalID) -> str: + if signal_id.HasField("path"): + return signal_id.path + elif signal_id.HasField("id") and signal_id.id in self.id_to_path_mapping: + return self.id_to_path_mapping[signal_id.id] + return "" + + def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs): + for path in paths: + if path not in self.path_to_id_mapping: + # Prevent duplicate requests for the same path + self.path_to_id_mapping[path] = None + req = self._prepare_v2_list_metadata_request(path) + try: + resp = self.client_stub_v2.ListMetadata(req, **rpc_kwargs) + logger.debug("%s: %s", type(resp).__name__, resp) + if len(resp.metadata) == 1: + self.path_to_id_mapping[path] = resp.metadata[0].id + self.id_to_path_mapping[resp.metadata[0].id] = path + else: + del self.path_to_id_mapping[path] + raise VSSClientError( + error={ + "code": grpc.StatusCode.NOT_FOUND.value[0], + "reason": grpc.StatusCode.NOT_FOUND.value[1], + "message": f"Path {path} not found on server", + }, + errors=[], + ) + except RpcError as exc: + if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available - skip querying ids") + return + raise VSSClientError.from_grpc_error(exc) from exc + @check_connected def subscribe( self, entries: Iterable[SubscribeEntry], try_v2: bool = False, **rpc_kwargs @@ -1305,36 +1389,83 @@ def subscribe( grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ + if try_v2: + raise VSSClientError( + error={ + "code": grpc.StatusCode.INVALID_ARGUMENT.value[0], + "reason": grpc.StatusCode.INVALID_ARGUMENT.value[1], + "message": ("Method subscribe supports v1, only. " + "Use v2_subscribe or v2_subscribe_actuation_requests instead."), + }, + errors=[], + ) + + logger.debug("Try subscribing via v1") rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata") ) - if try_v2: - logger.debug("Trying v2") - req = self._prepare_subscribev2_request(entries) - resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) - try: - for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) + try: + for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + + @check_connected + def v2_subscribe( + self, paths: Iterable[str], **rpc_kwargs + ) -> Iterator[List[EntryUpdate]]: + """ + Parameters: + rpc_kwargs + grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. + """ + + logger.debug("Subscribe current values via v2") + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + req = self._prepare_v2_subscribe_request(paths) + resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) + try: + for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [ + EntryUpdate.from_tuple(path, dp) + for path, dp in resp.entries.items() + ] + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + + @check_connected + def v2_subscribe_actuation_requests( + self, paths: Iterable[str], **rpc_kwargs + ) -> Iterator[List[EntryUpdate]]: + """ + Parameters: + rpc_kwargs + grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. + """ + + logger.debug("Subscribe actuation requests via v2") + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + self.ensure_id_mapping(paths, **rpc_kwargs) + req = self._prepare_v2_provide_actuation_request(paths) + resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs) + try: + for resp in resp_stream: + logger.debug("batch %s: %s", type(resp).__name__, resp) + if resp.HasField("batch_actuate_stream_request"): yield [ - EntryUpdate.from_tuple(path, dp) - for path, dp in resp.entries.items() + EntryUpdate.from_actuate_value(self.get_path(actuate_req.signal_id), actuate_req.value) + for actuate_req in resp.batch_actuate_stream_request.actuate_requests ] - except RpcError as exc: - if exc.code() == grpc.StatusCode.UNIMPLEMENTED: - logger.debug("v2 not available fall back to v1 instead") - self.subscribe(entries) - else: - raise VSSClientError.from_grpc_error(exc) from exc - else: - logger.debug("Trying v1") - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) - try: - for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except RpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc + except RpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc @check_connected def authorize(self, token: str, **rpc_kwargs) -> str: diff --git a/kuksa-client/kuksa_client/grpc/aio.py b/kuksa-client/kuksa_client/grpc/aio.py index 2571b2c..41e61da 100644 --- a/kuksa-client/kuksa_client/grpc/aio.py +++ b/kuksa-client/kuksa_client/grpc/aio.py @@ -1,5 +1,5 @@ ######################################################################## -# Copyright (c) 2022 Robert Bosch GmbH +# Copyright (c) 2022-2025 Robert Bosch GmbH # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ from kuksa.val.v1 import val_pb2 as val_v1 from kuksa.val.v1 import val_pb2_grpc as val_grpc_v1 +from kuksa.val.v2 import types_pb2 as types_v2 from kuksa.val.v2 import val_pb2_grpc as val_grpc_v2 from . import BaseVSSClient @@ -57,6 +58,8 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.channel = None self.exit_stack = contextlib.AsyncExitStack() + self.path_to_id_mapping: Dict[str, int] = dict() + self.id_to_path_mapping: Dict[int, str] = dict() async def __aenter__(self): await self.connect() @@ -66,9 +69,13 @@ async def __aexit__(self, exc_type, exc_value, traceback): await self.disconnect() async def connect(self, target_host=None): + self.path_to_id_mapping.clear() + self.id_to_path_mapping.clear() + creds = self._load_creds() if target_host is None: target_host = self.target_host + if creds is not None: logger.info("Establishing secure channel") if self.tls_server_name: @@ -297,15 +304,25 @@ async def subscribe_current_values( for path, dp in updates.items(): print(f"Current value for {path} is now: {dp.value}") """ - async for updates in self.subscribe( - entries=( - SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) - for path in paths - ), - try_v2=True, - **rpc_kwargs, - ): - yield {update.entry.path: update.entry.value for update in updates} + try: + logger.debug("Try to subscribe current values via v2") + async for updates in self.v2_subscribe(paths=paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.value for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: + raise + + logger.debug("v2 not available - falling back to v1 subscribe current values") + async for updates in self.subscribe( + entries=( + SubscribeEntry(path, View.CURRENT_VALUE, (Field.VALUE,)) + for path in paths + ), + **rpc_kwargs, + ): + yield {update.entry.path: update.entry.value for update in updates} @check_connected_async_iter async def subscribe_target_values( @@ -322,17 +339,27 @@ async def subscribe_target_values( for path, dp in updates.items(): print(f"Target value for {path} is now: {dp.value}") """ - async for updates in self.subscribe( - entries=( - SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)) - for path in paths - ), - try_v2=True, - **rpc_kwargs, - ): - yield { - update.entry.path: update.entry.actuator_target for update in updates - } + try: + logger.debug("Try to subscribe actuation requests via v2") + async for updates in self.v2_subscribe_actuation_requests(paths=paths, **rpc_kwargs): + yield { + update.entry.path: update.entry.actuator_target for update in updates + } + except VSSClientError as exc: + if exc.error["code"] != grpc.StatusCode.UNIMPLEMENTED.value[0]: + raise + + logger.debug("v2 not available - falling back to v1 subscribe target values") + async for updates in self.subscribe( + entries=( + SubscribeEntry(path, View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)) + for path in paths + ), + **rpc_kwargs, + ): + yield { + update.entry.path: update.entry.actuator_target for update in updates + } @check_connected_async_iter async def subscribe_metadata( @@ -434,6 +461,41 @@ async def set( raise VSSClientError.from_grpc_error(exc) from exc self._process_set_response(resp) + def get_path(self, signal_id: types_v2.SignalID) -> str: + if signal_id.HasField("path"): + return signal_id.path + elif signal_id.HasField("id") and signal_id.id in self.id_to_path_mapping: + return self.id_to_path_mapping[signal_id.id] + return "" + + async def ensure_id_mapping(self, paths: Iterable[str], **rpc_kwargs): + for path in paths: + if path not in self.path_to_id_mapping: + # Prevent duplicate requests for the same path + self.path_to_id_mapping[path] = None + req = self._prepare_v2_list_metadata_request(path) + try: + resp = await self.client_stub_v2.ListMetadata(req, **rpc_kwargs) + logger.debug("%s: %s", type(resp).__name__, resp) + if len(resp.metadata) == 1: + self.path_to_id_mapping[path] = resp.metadata[0].id + self.id_to_path_mapping[resp.metadata[0].id] = path + else: + del self.path_to_id_mapping[path] + raise VSSClientError( + error={ + "code": grpc.StatusCode.NOT_FOUND.value[0], + "reason": grpc.StatusCode.NOT_FOUND.value[1], + "message": f"Path {path} not found on server", + }, + errors=[], + ) + except AioRpcError as exc: + if exc.code() == grpc.StatusCode.UNIMPLEMENTED: + logger.debug("v2 not available - skip querying ids") + return + raise VSSClientError.from_grpc_error(exc) from exc + @check_connected_async_iter async def subscribe( self, @@ -446,36 +508,84 @@ async def subscribe( rpc_kwargs grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. """ + + if try_v2: + raise VSSClientError( + error={ + "code": grpc.StatusCode.INVALID_ARGUMENT.value[0], + "reason": grpc.StatusCode.INVALID_ARGUMENT.value[1], + "message": ("Method subscribe supports v1, only. " + "Use v2_subscribe or v2_subscribe_actuation_requests instead."), + }, + errors=[], + ) + + logger.debug("Try subscribing via v1") rpc_kwargs["metadata"] = self.generate_metadata_header( rpc_kwargs.get("metadata") ) - if try_v2: - logger.debug("Trying v2") - req = self._prepare_subscribev2_request(entries) - resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) - try: - async for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) + req = self._prepare_subscribe_request(entries) + resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) + try: + async for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [EntryUpdate.from_message(update) for update in resp.updates] + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + + @check_connected_async_iter + async def v2_subscribe( + self, paths: Iterable[str], **rpc_kwargs + ) -> AsyncIterator[List[EntryUpdate]]: + """ + Parameters: + rpc_kwargs + grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. + """ + + logger.debug("Subscribe current values via v2") + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + req = self._prepare_v2_subscribe_request(paths) + resp_stream = self.client_stub_v2.Subscribe(req, **rpc_kwargs) + try: + async for resp in resp_stream: + logger.debug("%s: %s", type(resp).__name__, resp) + yield [ + EntryUpdate.from_tuple(path, dp) + for path, dp in resp.entries.items() + ] + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc + + @check_connected_async_iter + async def v2_subscribe_actuation_requests( + self, paths: Iterable[str], **rpc_kwargs + ) -> AsyncIterator[List[EntryUpdate]]: + """ + Parameters: + rpc_kwargs + grpc.*MultiCallable kwargs e.g. timeout, metadata, credentials. + """ + + logger.debug("Subscribe actuation requests via v2") + rpc_kwargs["metadata"] = self.generate_metadata_header( + rpc_kwargs.get("metadata") + ) + await self.ensure_id_mapping(paths, **rpc_kwargs) + req = self._prepare_v2_provide_actuation_request(paths) + resp_stream = self.client_stub_v2.OpenProviderStream(iter(req), **rpc_kwargs) + try: + async for resp in resp_stream: + logger.debug("batch %s: %s", type(resp).__name__, resp) + if resp.HasField("batch_actuate_stream_request"): yield [ - EntryUpdate.from_tuple(path, dp) - for path, dp in resp.entries.items() + EntryUpdate.from_actuate_value(self.get_path(actuate_req.signal_id), actuate_req.value) + for actuate_req in resp.batch_actuate_stream_request.actuate_requests ] - except AioRpcError as exc: - if exc.code() == grpc.StatusCode.UNIMPLEMENTED: - logger.debug("v2 not available fall back to v1 instead") - await self.subscribe(entries) - else: - raise VSSClientError.from_grpc_error(exc) from exc - else: - logger.debug("Trying v1") - req = self._prepare_subscribe_request(entries) - resp_stream = self.client_stub_v1.Subscribe(req, **rpc_kwargs) - try: - async for resp in resp_stream: - logger.debug("%s: %s", type(resp).__name__, resp) - yield [EntryUpdate.from_message(update) for update in resp.updates] - except AioRpcError as exc: - raise VSSClientError.from_grpc_error(exc) from exc + except AioRpcError as exc: + raise VSSClientError.from_grpc_error(exc) from exc @check_connected_async async def authorize(self, token: str, **rpc_kwargs) -> str: diff --git a/kuksa-client/tests/conftest.py b/kuksa-client/tests/conftest.py index 9e79132..8e7e8bc 100644 --- a/kuksa-client/tests/conftest.py +++ b/kuksa-client/tests/conftest.py @@ -1,5 +1,5 @@ ######################################################################## -# Copyright (c) 2022 Robert Bosch GmbH +# Copyright (c) 2022-2025 Robert Bosch GmbH # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,6 +47,8 @@ def val_servicer_v1_fixture(mocker): @pytest.fixture(name="val_servicer_v2", scope="function") def val_servicer_v2_fixture(mocker): servicer_v2 = val_v2.VALServicer() + mocker.patch.object(servicer_v2, "ListMetadata", spec=True) + mocker.patch.object(servicer_v2, "OpenProviderStream", spec=True) mocker.patch.object(servicer_v2, "PublishValue", spec=True) mocker.patch.object(servicer_v2, "Subscribe", spec=True) diff --git a/kuksa-client/tests/test_grpc.py b/kuksa-client/tests/test_grpc.py index f837f35..d2f4a05 100644 --- a/kuksa-client/tests/test_grpc.py +++ b/kuksa-client/tests/test_grpc.py @@ -1,5 +1,5 @@ ######################################################################## -# Copyright (c) 2022 Robert Bosch GmbH +# Copyright (c) 2022-2025 Robert Bosch GmbH # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -648,7 +648,7 @@ async def subscribe_response_stream(**kwargs): EntryUpdate(DataEntry('Vehicle.Chassis.Height', value=Datapoint(666)), (Field.VALUE,)), ] - mocker.patch.object(client, 'subscribe', + mocker.patch.object(client, 'v2_subscribe', side_effect=subscribe_response_stream) received_updates = {} @@ -657,13 +657,10 @@ async def subscribe_response_stream(**kwargs): ]): received_updates.update(updates) - assert list(client.subscribe.call_args_list[0][1]['entries']) == [ - SubscribeEntry('Vehicle.Speed', - View.CURRENT_VALUE, (Field.VALUE,)), - SubscribeEntry('Vehicle.ADAS.ABS.IsActive', - View.CURRENT_VALUE, (Field.VALUE,)), - SubscribeEntry('Vehicle.Chassis.Height', - View.CURRENT_VALUE, (Field.VALUE,)), + assert list(client.v2_subscribe.call_args_list[0][1]['paths']) == [ + 'Vehicle.Speed', + 'Vehicle.ADAS.ABS.IsActive', + 'Vehicle.Chassis.Height', ] assert received_updates == { 'Vehicle.Speed': Datapoint(42.0, @@ -686,7 +683,7 @@ async def subscribe_response_stream(**kwargs): EntryUpdate(DataEntry('Vehicle.Chassis.SteeringWheel.Tilt', actuator_target=Datapoint(42)), (Field.ACTUATOR_TARGET,)), ] - mocker.patch.object(client, 'subscribe', + mocker.patch.object(client, 'v2_subscribe_actuation_requests', side_effect=subscribe_response_stream) received_updates = {} @@ -695,11 +692,9 @@ async def subscribe_response_stream(**kwargs): ]): received_updates.update(updates) - assert list(client.subscribe.call_args_list[0][1]['entries']) == [ - SubscribeEntry('Vehicle.ADAS.ABS.IsActive', - View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)), - SubscribeEntry('Vehicle.Chassis.SteeringWheel.Tilt', - View.TARGET_VALUE, (Field.ACTUATOR_TARGET,)), + assert list(client.v2_subscribe_actuation_requests.call_args_list[0][1]['paths']) == [ + 'Vehicle.ADAS.ABS.IsActive', + 'Vehicle.Chassis.SteeringWheel.Tilt', ] assert received_updates == { 'Vehicle.ADAS.ABS.IsActive': Datapoint(True, datetime.datetime(2022, 11, 7, tzinfo=datetime.timezone.utc)), @@ -1768,22 +1763,14 @@ async def test_subscribe_some_entries_v2( ) actual_responses = [] - async for updates in client.subscribe( - entries=( - entry - for entry in ( # generator is intentional (Iterable) - EntryRequest( - "Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,) - ), - EntryRequest( - "Vehicle.ADAS.ABS.IsActive", - # Specified View is ignored so we can use anyone :-) - View.METADATA, - (Field.VALUE,), - ), + async for updates in client.v2_subscribe( + paths=( + path + for path in ( # generator is intentional (Iterable) + "Vehicle.Speed", + "Vehicle.ADAS.ABS.IsActive", ) ), - try_v2=True, ): actual_responses.append(updates) @@ -1859,21 +1846,14 @@ async def test_subscribe_some_entries_v2_target( actual_responses = [] with pytest.raises(VSSClientError): - async for updates in client.subscribe( - entries=( - entry - for entry in ( # generator is intentional (Iterable) - EntryRequest( - "Vehicle.Speed", View.TARGET_VALUE, (Field.ACTUATOR_TARGET,) - ), - EntryRequest( - "Vehicle.ADAS.ABS.IsActive", - View.TARGET_VALUE, - (Field.ACTUATOR_TARGET,), - ), + async for updates in client.v2_subscribe_actuation_requests( + paths=( + path + for path in ( # generator is intentional (Iterable) + "Vehicle.Speed", + "Vehicle.ADAS.ABS.IsActive", ) ), - try_v2=True, ): actual_responses.append(updates) @@ -1900,7 +1880,7 @@ async def test_subscribe_no_entries_requested( "127.0.0.1", unused_tcp_port, ensure_startup_connection=False ) as client: with pytest.raises(VSSClientError): - async for _ in client.subscribe(entries=(), try_v2=True): + async for _ in client.v2_subscribe(paths=()): pass @pytest.mark.usefixtures("mocked_databroker") @@ -1926,17 +1906,7 @@ async def test_subscribe_nonexistent_entries( "127.0.0.1", unused_tcp_port, ensure_startup_connection=False ) as client: with pytest.raises(VSSClientError): - async for _ in client.subscribe( - entries=( - entry - for entry in ( # generator is intentional (Iterable) - EntryRequest( - "Does.Not.Exist", View.CURRENT_VALUE, (Field.VALUE,) - ), - ) - ), - try_v2=True, - ): + async for _ in client.v2_subscribe(paths=["Does.Not.Exist"]): pass @pytest.mark.usefixtures("mocked_databroker") @@ -2124,11 +2094,8 @@ async def test_add_subscriber_v2(self, mocker, unused_tcp_port, val_servicer_v2) response for response in responses ) - subscribe_response_stream = client.subscribe( - entries=( - EntryRequest("Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,)), - ), - try_v2=True, + subscribe_response_stream = client.v2_subscribe( + paths=("Vehicle.Speed"), ) sub_uid = await subscriber_manager.add_subscriber( subscribe_response_stream, callback=callback @@ -2183,11 +2150,8 @@ async def test_remove_subscriber_v2(self, mocker, unused_tcp_port, val_servicer_ val_servicer_v2.Subscribe.return_value = ( response for response in responses ) - subscribe_response_stream = client.subscribe( - entries=( - EntryRequest("Vehicle.Speed", View.CURRENT_VALUE, (Field.VALUE,)), - ), - try_v2=True, + subscribe_response_stream = client.v2_subscribe( + paths=("Vehicle.Speed"), ) sub_uid = await subscriber_manager.add_subscriber( subscribe_response_stream, callback=mocker.Mock()