From b32c343967de8db7e8caace92f07575bfa36c216 Mon Sep 17 00:00:00 2001 From: "Alexander \"Tarh" Date: Sat, 4 May 2024 02:48:58 +0300 Subject: [PATCH] [WIP] Adapted main component for ijai models basic support, fixed initialization of Palette and ImageConfig objects --- .../xiaomi_cloud_map_extractor/__init__.py | 1 - .../xiaomi_cloud_map_extractor/camera.py | 49 +-- .../common/vacuum.py | 72 ----- .../xiaomi_cloud_map_extractor/const.py | 2 +- .../ijai/__init__.py | 0 .../ijai/aes_decryptor.py | 62 ---- .../ijai/image_handler.py | 95 ------ .../ijai/map_data_parser.py | 278 ------------------ .../ijai/parsing_buffer.py | 92 ------ .../xiaomi_cloud_map_extractor/initializer.py | 8 + .../vacuum_platforms/vacuum_base.py | 2 +- .../vacuum_platforms/vacuum_ijai.py | 86 +++--- .../viomi/map_data_parser.py | 268 ----------------- .../viomi/vacuum.py | 27 -- 14 files changed, 91 insertions(+), 951 deletions(-) delete mode 100644 custom_components/xiaomi_cloud_map_extractor/__init__.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/common/vacuum.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/initializer.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py delete mode 100644 custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py diff --git a/custom_components/xiaomi_cloud_map_extractor/__init__.py b/custom_components/xiaomi_cloud_map_extractor/__init__.py deleted file mode 100644 index 8fd8875..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Xiaomi cloud map extractor.""" diff --git a/custom_components/xiaomi_cloud_map_extractor/camera.py b/custom_components/xiaomi_cloud_map_extractor/camera.py index 30dd465..eed5f70 100644 --- a/custom_components/xiaomi_cloud_map_extractor/camera.py +++ b/custom_components/xiaomi_cloud_map_extractor/camera.py @@ -2,10 +2,11 @@ import logging from datetime import timedelta from enum import StrEnum +from dataclasses import fields from vacuum_map_parser_base.config.color import ColorsPalette from vacuum_map_parser_base.config.drawable import Drawable -from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.config.image_config import ImageConfig, TrimConfig from vacuum_map_parser_base.config.size import Sizes from vacuum_map_parser_base.config.text import Text @@ -27,6 +28,7 @@ from .vacuum_platforms.vacuum_viomi import ViomiCloudVacuum from .vacuum_platforms.vacuum_ijai import IjaiCloudVacuum from .vacuum_platforms.vacuum_unsupported import UnsupportedCloudVacuum +from .initializer import from_dict from .const import * @@ -122,9 +124,10 @@ }) + async def async_setup_platform(hass, config, async_add_entities, discovery_info=None): await async_setup_reload_service(hass, DOMAIN, PLATFORMS) - + _LOGGER.debug(f"config={config}") host = config[CONF_HOST] token = config[CONF_TOKEN] username = config[CONF_USERNAME] @@ -132,14 +135,14 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info= country = config[CONF_COUNTRY] name = config[CONF_NAME] should_poll = config[CONF_AUTO_UPDATE] - image_config = config[CONF_MAP_TRANSFORM] + image_config = from_dict(ImageConfig, config[CONF_MAP_TRANSFORM]) colors = config[CONF_COLORS] room_colors = config[CONF_ROOM_COLORS] for room, color in room_colors.items(): colors[f"{COLOR_ROOM_PREFIX}{room}"] = color drawables = config[CONF_DRAW] - sizes = config[CONF_SIZES] - texts = config[CONF_TEXTS] + sizes = Sizes(config[CONF_SIZES]) + texts = from_dict(list, config[CONF_TEXTS]) if DRAWABLE_ALL in drawables: drawables = CONF_AVAILABLE_DRAWABLES[1:] attributes = config[CONF_ATTRIBUTES] @@ -149,7 +152,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info= force_api = config[CONF_FORCE_API] entity_id = generate_entity_id(ENTITY_ID_FORMAT, name, hass=hass) async_add_entities([VacuumCamera(entity_id, host, token, username, password, country, name, should_poll, - image_config, colors, drawables, sizes, texts, attributes, store_map_raw, + image_config, ColorsPalette(colors, room_colors), drawables, sizes, texts, attributes, store_map_raw, store_map_image, store_map_path, force_api)]) @@ -230,6 +233,7 @@ def should_poll(self) -> bool: @staticmethod def extract_attributes(map_data: MapData, attributes_to_return: list[str], country) -> dict[str, any]: + _LOGGER.debug(f"extract_attributes{map_data}, {attributes_to_return}, country") attributes = {} rooms = [] if map_data.rooms is not None: @@ -299,7 +303,7 @@ def _login(self): def _initialize_device(self): _LOGGER.debug("Retrieving device info, country: %s", self._country) - country, user_id, device_id, model, mac = self._connector.get_device_details(self._vacuum.token, self._country) + country, user_id, device_id, model, mac = self._connector.get_device_details(self._token, self._country) if model is not None: self._country = country _LOGGER.debug("Retrieved device model: %s", model) @@ -345,26 +349,29 @@ def _create_device(self, user_id: str, device_id: str, model: str, mac: str) -> self._used_api = self._detect_api(model) store_map_path = self._store_map_path if self._store_map_raw else None vacuum_config = VacuumConfig( - self._connector, - self._country, - user_id, - device_id, - self._host, - self._token, - model, - self._colors, - self._drawables, - self._image_config, - self._sizes, - self._texts, - store_map_path + connector = self._connector, + country = self._country, + user_id = user_id, + device_id = device_id, + host = self._host, + token = self._token, + model = model, + _mac = mac, + palette = self._colors, + drawables = self._drawables, + image_config = self._image_config, + sizes = self._sizes, + texts = self._texts, + store_map_path = store_map_path ) if self._used_api == CONF_AVAILABLE_API_XIAOMI: return RoborockCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_VIOMI: return ViomiCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_IJAI: - return IjaiCloudVacuum(self._connector, self._country, user_id, device_id, model, mac) + _LOGGER.debug(f"palette={vacuum_config.palette}") + _LOGGER.debug(f"image_config={vacuum_config.image_config}") + return IjaiCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_ROIDMI: return RoidmiCloudVacuum(vacuum_config) if self._used_api == CONF_AVAILABLE_API_DREAME: diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py deleted file mode 100644 index c15107f..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py +++ /dev/null @@ -1,72 +0,0 @@ -from abc import abstractmethod -from typing import Optional, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts - - -class XiaomiCloudVacuum: - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac: str): - self._connector = connector - self._country = country - self._user_id = user_id - self._device_id = device_id - self.model = model - self.mac = mac - - def decrypt_map(self, data:bytes, wifi_info_sn:str, user_id:str, device_id:str, model:str, mac:str): - return data - - def get_map(self, - map_name: str, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig, - store_map_path: Optional[str] = None) -> Tuple[Optional[MapData], bool]: - response = self.get_raw_map_data(map_name) - if response is None: - return None, False - map_stored = False - if store_map_path is not None: - raw_map_file = open(f"{store_map_path}/map_data_{self.model}.{self.get_map_archive_extension()}", "wb") - raw_map_file.write(response) - raw_map_file.close() - map_stored = True - - map_data_decrypted = self.decrypt_map(response, "######SD##########", self._user_id, self._device_id, self.model, self.mac) - map_data = self.decode_map(map_data_decrypted, colors, drawables, texts, sizes, image_config) - if map_data is None: - return None, map_stored - map_data.map_name = map_name - return map_data, map_stored - - def get_raw_map_data(self, map_name: Optional[str]) -> Optional[bytes]: - if map_name is None: - return None - map_url = self.get_map_url(map_name) - return self._connector.get_raw_map_data(map_url) - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> Optional[MapData]: - return MapDataParser.create_empty(colors, f"Vacuum\n{self.model}\nis not supported") - - @abstractmethod - def get_map_url(self, map_name: str) -> Optional[str]: - pass - - @abstractmethod - def should_get_map_from_vacuum(self) -> bool: - pass - - def get_map_archive_extension(self) -> str: - pass diff --git a/custom_components/xiaomi_cloud_map_extractor/const.py b/custom_components/xiaomi_cloud_map_extractor/const.py index f44d50e..6f557af 100644 --- a/custom_components/xiaomi_cloud_map_extractor/const.py +++ b/custom_components/xiaomi_cloud_map_extractor/const.py @@ -43,7 +43,7 @@ CONF_Y = "y" CONF_AVAILABLE_APIS = [CONF_AVAILABLE_API_XIAOMI, CONF_AVAILABLE_API_VIOMI, CONF_AVAILABLE_API_ROIDMI, - CONF_AVAILABLE_API_DREAME] + CONF_AVAILABLE_API_DREAME, CONF_AVAILABLE_API_IJAI] CONF_AVAILABLE_SIZES = [CONF_SIZE_VACUUM_RADIUS, CONF_SIZE_PATH_WIDTH, CONF_SIZE_IGNORED_OBSTACLE_RADIUS, CONF_SIZE_IGNORED_OBSTACLE_WITH_PHOTO_RADIUS, CONF_SIZE_MOP_PATH_WIDTH, diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py b/custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py b/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py deleted file mode 100644 index fdd297e..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py +++ /dev/null @@ -1,62 +0,0 @@ -from Crypto.Cipher import AES -from Crypto.Hash import MD5 -from Crypto.Util.Padding import pad, unpad -import base64 -import logging - -_LOGGER = logging.getLogger(__name__) - -isEncryptKeyTypeHex = True - -def aesEncrypted(data, key: str): - cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB) - - encryptedData = cipher.encrypt( - pad(data.encode("utf-8"), AES.block_size, 'pkcs7')) - encryptedBase64Str = base64.b64encode(encryptedData).decode("utf-8") - - return encryptedBase64Str - -def aesDecrypted(data, key: str): - parsedKey = key.encode("utf-8") - if isEncryptKeyTypeHex: - parsedKey = bytes.fromhex(key) - - cipher = AES.new(parsedKey, AES.MODE_ECB) - - decryptedBytes = cipher.decrypt(base64.b64decode(data)) - - decryptedData = unpad(decryptedBytes, AES.block_size, 'pkcs7') - - return bytes.fromhex(decryptedData.decode("utf-8")) - - -def md5key(string: str, model: str, device_mac: str): - pjstr = "".join(device_mac.lower().split(":")) - - tempModel = model.split('.')[-1] - - if len(tempModel) == 2: - tempModel = "00" + tempModel - elif len(tempModel) == 3: - tempModel = "0" + tempModel - - tempKey = pjstr + tempModel - aeskey = aesEncrypted(string, tempKey) - - temp = MD5.new(aeskey.encode('utf-8')).hexdigest() - if isEncryptKeyTypeHex: - return temp - else: - return temp[8:-8].upper() - - -def genMD5key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str): - arr = [wifi_info_sn, owner_id, device_id] - tempString = '+'.join(arr) - return md5key(tempString, model, device_mac) - - -def unGzipCommon(data: str, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes: - return aesDecrypted(data, genMD5key(wifi_info_sn, owner_id, device_id, model, device_mac)) - diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py deleted file mode 100644 index f930077..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py +++ /dev/null @@ -1,95 +0,0 @@ -import logging -from typing import Dict, Optional, Set, Tuple - -from PIL import Image -from PIL.Image import Image as ImageType - -from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig -from custom_components.xiaomi_cloud_map_extractor.ijai.parsing_buffer import ParsingBuffer - -_LOGGER = logging.getLogger(__name__) - - -class ImageHandlerIjai(ImageHandler): - MAP_OUTSIDE = 0x00 - MAP_WALL = 0xff - MAP_SCAN = 0x01 - MAP_NEW_DISCOVERED_AREA = 0x02 - MAP_ROOM_MIN = 10 - MAP_ROOM_MAX = 59 - MAP_SELECTED_ROOM_MIN = 60 - MAP_SELECTED_ROOM_MAX = 109 - @staticmethod - def parse(buf: ParsingBuffer, width: int, height: int, colors: Colors, image_config: ImageConfig, - draw_cleaned_area: bool) \ - -> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]], Set[int], Optional[ImageType]]: - palette = {\ - ImageHandlerIjai.MAP_OUTSIDE: ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors),\ - ImageHandlerIjai.MAP_WALL: ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors),\ - ImageHandlerIjai.MAP_SCAN: ImageHandler.__get_color__(COLOR_SCAN, colors),\ - ImageHandlerIjai.MAP_NEW_DISCOVERED_AREA: ImageHandler.__get_color__(COLOR_NEW_DISCOVERED_AREA, colors)} - rooms = {} - cleaned_areas = set() - scale = image_config[CONF_SCALE] - trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) - trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) - trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) - trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) - trimmed_height = height - trim_top - trim_bottom - trimmed_width = width - trim_left - trim_right - if trimmed_width == 0 or trimmed_height == 0: - return ImageHandler.create_empty_map_image(colors), rooms, cleaned_areas, None - image = Image.new('RGBA', (trimmed_width, trimmed_height)) - pixels = image.load() - cleaned_areas_layer = None - cleaned_areas_pixels = None - if draw_cleaned_area: - cleaned_areas_layer = Image.new('RGBA', (trimmed_width, trimmed_height)) - cleaned_areas_pixels = cleaned_areas_layer.load() - _LOGGER.debug(f"trim_bottom = {trim_bottom}, trim_top = {trim_top}, trim_left = {trim_left}, trim_right = {trim_right}") - buf.skip('trim_bottom', trim_bottom * width) - unknown_pixels = set() - _LOGGER.debug(f"buffer: [{buf._offs:#x}] = {buf.peek_uint32("some_int32")}") - for img_y in range(trimmed_height): - buf.skip('trim_left', trim_left) - for img_x in range(trimmed_width): - pixel_type = buf.get_uint8('pixel') - x = img_x - y = trimmed_height - 1 - img_y - if pixel_type in palette.keys(): - pixels[x, y] = palette[pixel_type] - elif ImageHandlerIjai.MAP_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_SELECTED_ROOM_MAX: - room_x = img_x + trim_left - room_y = img_y + trim_bottom - if pixel_type < ImageHandlerIjai.MAP_SELECTED_ROOM_MIN: - room_number = pixel_type - else: - room_number = pixel_type - ImageHandlerIjai.MAP_SELECTED_ROOM_MIN + ImageHandlerIjai.MAP_ROOM_MIN - cleaned_areas.add(room_number) - if draw_cleaned_area: - cleaned_areas_pixels[x, y] = ImageHandler.__get_color__(COLOR_CLEANED_AREA, colors) - if room_number not in rooms: - rooms[room_number] = (room_x, room_y, room_x, room_y) - else: - rooms[room_number] = (min(rooms[room_number][0], room_x), - min(rooms[room_number][1], room_y), - max(rooms[room_number][2], room_x), - max(rooms[room_number][3], room_y)) - default = ImageHandler.ROOM_COLORS[room_number % len(ImageHandler.ROOM_COLORS)] - pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default) - else: - pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) - unknown_pixels.add(pixel_type) - _LOGGER.debug(f"unknown pixel [{x},{y}] = {pixel_type}") - buf.skip('trim_right', trim_right) - buf.skip('trim_top', trim_top * width) - if image_config["scale"] != 1 and trimmed_width != 0 and trimmed_height != 0: - image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - if draw_cleaned_area: - cleaned_areas_layer = cleaned_areas_layer.resize( - (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) - if len(unknown_pixels) > 0: - _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) - return image, rooms, cleaned_areas, cleaned_areas_layer diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py deleted file mode 100644 index a25de88..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py +++ /dev/null @@ -1,278 +0,0 @@ -import logging -import math -from typing import Dict, List, Optional, Set, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ - Wall, Zone -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.ijai.image_handler import ImageHandlerIjai -from custom_components.xiaomi_cloud_map_extractor.ijai.parsing_buffer import ParsingBuffer - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParserIjai(MapDataParser): - FEATURE_ROBOT_STATUS = 0x00000001 - FEATURE_IMAGE = 0x00000002 - FEATURE_HISTORY = 0x00000004 - FEATURE_CHARGE_STATION = 0x00000008 - FEATURE_RESTRICTED_AREAS = 0x00000010 - FEATURE_CLEANING_AREAS = 0x00000020 - FEATURE_NAVIGATE = 0x00000040 - FEATURE_REALTIME = 0x00000080 - FEATURE_ROOMS = 0x00001000 - - POSITION_UNKNOWN = 1100 - - @staticmethod - def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, - image_config: ImageConfig, *args, **kwargs) -> MapData: - map_data = MapData(0, 1) - buf = ParsingBuffer('header', raw, 0, len(raw)) - - id1 = buf.get_uint8('id1') - magic1 = buf.get_uint16('magic1') - offset1 = buf.get_uint8('offset1') - 1 - - buf.skip('unknown_hdr1', offset1) - _LOGGER.debug(f"Skipping {offset1} bytes, value: {buf._data[buf._offs]:#x}") - - feature_flags = MapDataParserIjai.FEATURE_IMAGE - map_id = buf.peek_uint32('map_id') - - if feature_flags & MapDataParserIjai.FEATURE_ROBOT_STATUS != 0: - MapDataParserIjai.parse_section(buf, 'robot_status', map_id) - buf.skip('unknown1', 0x28) - - if feature_flags & MapDataParserIjai.FEATURE_IMAGE != 0: - buf.set_name('image') - map_data.image, map_data.rooms, map_data.cleaned_rooms = \ - MapDataParserIjai.parse_image(buf, colors, image_config, DRAWABLE_CLEANED_AREA in drawables) - - if feature_flags & MapDataParserIjai.FEATURE_HISTORY != 0: - MapDataParserIjai.parse_section(buf, 'history', map_id) - map_data.path = MapDataParserIjai.parse_history(buf) - - if feature_flags & MapDataParserIjai.FEATURE_CHARGE_STATION != 0: - MapDataParserIjai.parse_section(buf, 'charge_station', map_id) - map_data.charger = MapDataParserIjai.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.charger) - - if feature_flags & MapDataParserIjai.FEATURE_RESTRICTED_AREAS != 0: - MapDataParserIjai.parse_section(buf, 'restricted_areas', map_id) - map_data.walls, map_data.no_go_areas = MapDataParserIjai.parse_restricted_areas(buf) - - if feature_flags & MapDataParserIjai.FEATURE_CLEANING_AREAS != 0: - MapDataParserIjai.parse_section(buf, 'cleaning_areas', map_id) - map_data.zones = MapDataParserIjai.parse_cleaning_areas(buf) - - if feature_flags & MapDataParserIjai.FEATURE_NAVIGATE != 0: - MapDataParserIjai.parse_section(buf, 'navigate', map_id) - buf.skip('unknown1', 4) - map_data.goto = MapDataParserIjai.parse_position(buf, 'pos') - foo = buf.get_float32('foo') - _LOGGER.debug('pos: %s, foo: %f', map_data.goto, foo) - - if feature_flags & MapDataParserIjai.FEATURE_REALTIME != 0: - MapDataParserIjai.parse_section(buf, 'realtime', map_id) - buf.skip('unknown1', 5) - map_data.vacuum_position = MapDataParserIjai.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.vacuum_position) - - if feature_flags & 0x00000800 != 0: - MapDataParserIjai.parse_section(buf, 'unknown1', map_id) - MapDataParserIjai.parse_unknown_section(buf) - - if feature_flags & MapDataParserIjai.FEATURE_ROOMS != 0: - MapDataParserIjai.parse_section(buf, 'rooms', map_id) - MapDataParserIjai.parse_rooms(buf, map_data.rooms) - - if feature_flags & 0x00002000 != 0: - MapDataParserIjai.parse_section(buf, 'unknown2', map_id) - MapDataParserIjai.parse_unknown_section(buf) - - if feature_flags & 0x00004000 != 0: - MapDataParserIjai.parse_section(buf, 'room_outlines', map_id) - MapDataParserIjai.parse_room_outlines(buf) - - buf.check_empty() - - if map_data.rooms is not None: - _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()]) - if map_data.image is not None and not map_data.image.is_empty: - MapDataParserIjai.draw_elements(colors, drawables, sizes, map_data, image_config) - if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - map_data.vacuum_room = MapDataParserIjai.get_current_vacuum_room(buf, map_data.vacuum_position) - if map_data.vacuum_room is not None: - map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name - _LOGGER.debug('current vacuum room: %s', map_data.vacuum_room) - ImageHandlerIjai.rotate(map_data.image) - ImageHandlerIjai.draw_texts(map_data.image, texts) - return map_data - - @staticmethod - def map_to_image(p: Point) -> Point: - return Point(p.x * 20 + 400, p.y * 20 + 400) - - @staticmethod - def image_to_map(x: float) -> float: - return (x - 400) / 20 - - @staticmethod - def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position: Point) -> Optional[int]: - vacuum_position_on_image = MapDataParserIjai.map_to_image(vacuum_position) - pixel_type = buf.get_at_image(int(vacuum_position_on_image.y) * 800 + int(vacuum_position_on_image.x)) - if ImageHandlerIjai.MAP_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_ROOM_MAX: - return pixel_type - elif ImageHandlerIjai.MAP_SELECTED_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_SELECTED_ROOM_MAX: - return pixel_type - ImageHandlerIjai.MAP_SELECTED_ROOM_MIN + ImageHandlerIjai.MAP_ROOM_MIN - return None - - @staticmethod - def parse_image(buf: ParsingBuffer, colors: Colors, image_config: ImageConfig, draw_cleaned_area: bool) \ - -> Tuple[ImageData, Dict[int, Room], Set[int]]: - buf.skip('unknown1', 0xA) - - image_top = 0 - image_left = 0 - image_width = buf.get_uint16_remove_parity('image_width') - buf.skip('unknown2', 1) - image_height = buf.get_uint16_remove_parity('image_height') - buf.skip('unknown3', 0x21) - - image_size = image_height * image_width - _LOGGER.debug('width: %d, height: %d', image_width, image_height) - if image_width \ - - image_width * (image_config[CONF_TRIM][CONF_LEFT] + image_config[CONF_TRIM][CONF_RIGHT]) / 100 \ - < MINIMAL_IMAGE_WIDTH: - image_config[CONF_TRIM][CONF_LEFT] = 0 - image_config[CONF_TRIM][CONF_RIGHT] = 0 - if image_height \ - - image_height * (image_config[CONF_TRIM][CONF_TOP] + image_config[CONF_TRIM][CONF_BOTTOM]) / 100 \ - < MINIMAL_IMAGE_HEIGHT: - image_config[CONF_TRIM][CONF_TOP] = 0 - image_config[CONF_TRIM][CONF_BOTTOM] = 0 - buf.mark_as_image_beginning() - image, rooms_raw, cleaned_areas, cleaned_areas_layer = ImageHandlerIjai.parse(buf, image_width, image_height, - colors, image_config, - draw_cleaned_area) - _LOGGER.debug('img: number of rooms: %d, numbers: %s', len(rooms_raw), rooms_raw.keys()) - rooms = {} - for number, room in rooms_raw.items(): - rooms[number] = Room(number, MapDataParserIjai.image_to_map(room[0] + image_left), - MapDataParserIjai.image_to_map(room[1] + image_top), - MapDataParserIjai.image_to_map(room[2] + image_left), - MapDataParserIjai.image_to_map(room[3] + image_top)) - return ImageData(image_size, image_top, image_left, image_height, image_width, image_config, - image, MapDataParserIjai.map_to_image, - additional_layers={DRAWABLE_CLEANED_AREA: cleaned_areas_layer}), rooms, cleaned_areas - - @staticmethod - def parse_history(buf: ParsingBuffer) -> Path: - path_points = [] - buf.skip('unknown1', 4) - history_count = buf.get_uint32('history_count') - for _ in range(history_count): - mode = buf.get_uint8('mode') # 0: taxi, 1: working - path_points.append(MapDataParserIjai.parse_position(buf, 'path')) - return Path(len(path_points), 1, 0, [path_points]) - - @staticmethod - def parse_restricted_areas(buf: ParsingBuffer) -> Tuple[List[Wall], List[Area]]: - walls = [] - areas = [] - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - for _ in range(area_count): - buf.skip('restricted.unknown1', 12) - p1 = MapDataParserIjai.parse_position(buf, 'p1') - p2 = MapDataParserIjai.parse_position(buf, 'p2') - p3 = MapDataParserIjai.parse_position(buf, 'p3') - p4 = MapDataParserIjai.parse_position(buf, 'p4') - buf.skip('restricted.unknown2', 48) - _LOGGER.debug('restricted: %s %s %s %s', p1, p2, p3, p4) - if p1 == p2 and p3 == p4: - walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) - else: - areas.append(Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) - return walls, areas - - @staticmethod - def parse_cleaning_areas(buf: ParsingBuffer) -> List[Zone]: - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - zones = [] - for _ in range(area_count): - buf.skip('area.unknown1', 12) - p1 = MapDataParserIjai.parse_position(buf, 'p1') - p2 = MapDataParserIjai.parse_position(buf, 'p2') - p3 = MapDataParserIjai.parse_position(buf, 'p3') - p4 = MapDataParserIjai.parse_position(buf, 'p4') - buf.skip('area.unknown2', 48) - zones.append(Zone(p1.x, p1.y, p3.x, p3.y)) - return zones - - @staticmethod - def parse_rooms(buf: ParsingBuffer, map_data_rooms: Dict[int, Room]): - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - while map_arg > 1: - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint8('room.id') - room_name = buf.get_string_len8('room.name') - if map_data_rooms is not None and room_id in map_data_rooms: - map_data_rooms[room_id].name = room_name - buf.skip('room.unknown1', 1) - room_text_pos = MapDataParserIjai.parse_position(buf, 'room.text_pos') - _LOGGER.debug('room#%d: %s %s', room_id, room_name, room_text_pos) - buf.skip('unknown1', 6) - - @staticmethod - def parse_room_outlines(buf: ParsingBuffer): - buf.skip('unknown1', 51) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint32('room.id') - segment_count = buf.get_uint32('room.segment_count') - for _ in range(segment_count): - buf.skip('unknown2', 5) - _LOGGER.debug('room#%d: segment_count: %d', room_id, segment_count) - - @staticmethod - def parse_section(buf: ParsingBuffer, name: str, map_id: int): - buf.set_name(name) - magic = buf.get_uint32('magic') - if magic != map_id: - raise ValueError( - f"error parsing section {name} at offset {buf._offs - 4:#x}: magic check failed. " + - f"Magic: {magic:#x}, Map ID: {map_id:#x}") - - @staticmethod - def parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Optional[Point]: - x = buf.get_float32(name + '.x') - y = buf.get_float32(name + '.y') - if x == MapDataParserIjai.POSITION_UNKNOWN or y == MapDataParserIjai.POSITION_UNKNOWN: - return None - a = None - if with_angle: - a = buf.get_float32(name + '.a') * 180 / math.pi - return Point(x, y, a) - - @staticmethod - def parse_unknown_section(buf: ParsingBuffer) -> bool: - n = buf._data[buf._offs:].find(buf._data[4:8]) - if n >= 0: - buf._offs += n - buf._length -= n - return True - else: - buf._offs += buf._length - buf._length = 0 - return False diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py b/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py deleted file mode 100644 index 738547b..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py +++ /dev/null @@ -1,92 +0,0 @@ -import logging - -from struct import unpack_from - -_LOGGER = logging.getLogger(__name__) - - -class ParsingBuffer: - def __init__(self, name: str, data: bytes, start_offs: int, length: int): - self._name = name - self._data = data - self._offs = start_offs - self._length = length - self._image_beginning = None - - def set_name(self, name: str): - self._name = name - _LOGGER.debug('SECTION %s: offset 0x%x', self._name, self._offs) - - def mark_as_image_beginning(self): - self._image_beginning = self._offs - - def get_at_image(self, offset) -> int: - return self._data[self._image_beginning + offset - 1] - - def skip(self, field: str, n: int): - if n == 0: - return - if self._length < n: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += n - self._length -= n - - def get_uint8(self, field: str) -> int: - if self._length < 1: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 1 - self._length -= 1 - return self._data[self._offs - 1] - - def get_uint16(self, field: str) -> int: - if self._length < 2: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 2 - self._length -= 2 - return unpack_from(' int: - if self._length < 2: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - lo = self._data[self._offs] - hi = self._data[self._offs + 1] - self._offs += 2 - self._length -= 2 - return ((hi^1) << 7) ^ lo - - def get_uint32(self, field: str) -> int: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 4 - self._length -= 4 - return unpack_from(' float: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += 4 - self._length -= 4 - return unpack_from(' bytes: - if self._length < count: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - self._offs += count - self._length -= count - return self._data[self._offs - count:self._offs].decode('utf-8') - - def get_string_len8(self, field: str) -> str: - n = self.get_uint8(field + '.len') - return get_bytes(n, field) - - - def peek_uint32(self, field: str) -> int: - if self._length < 4: - raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") - return unpack_from(' str | None: - url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url/pro' - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> MapData: +class IjaiCloudVacuum(XiaomiCloudVacuumV2): + WIFI_STR_LEN = 18 + WIFI_STR_POS = 11 + def __init__(self, vacuum_config: VacuumConfig): + super().__init__(vacuum_config) + self._token = vacuum_config.token + self._host = vacuum_config.host + self._mac = vacuum_config._mac + self._wifi_info_sn = None - unzipped = zlib.decompress(raw_map) - - return MapDataParserIjai.parse(unzipped, colors, drawables, texts, sizes, image_config) - - def get_map_archive_extension(self) -> str: + self._ijai_map_data_parser = IjaiMapDataParser( + vacuum_config.palette, + vacuum_config.sizes, + vacuum_config.drawables, + vacuum_config.image_config, + vacuum_config.texts + ) + @property + def map_archive_extension(self) -> str: return "zlib" - def decrypt_map(self, data:bytes, wifi_info_sn:str, user_id:str, device_id:str, model:str, mac:str): - return unGzipCommon(data=data, \ - wifi_info_sn=wifi_info_sn, \ - owner_id=str(user_id), \ - device_id=device_id, \ - model=model, \ - device_mac=mac); \ No newline at end of file + @property + def map_data_parser(self) -> IjaiMapDataParser: + return self._ijai_map_data_parser + + def get_map_url(self, map_name: str) -> str | None: + url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url_pro' + params = { + "data": f'{{"obj_name":"{self._user_id}/{self._device_id}/{map_name}"}}' + } + api_response = self._connector.execute_api_call_encrypted(url, params) + if api_response is None or ("result" not in api_response) or (api_response["result"] is None) or ("url" not in api_response["result"]): + self._LOGGER.debug(f"API returned {api_response['code']}" + "(" + api_response["message"] + ")") + return None + return api_response["result"]["url"] + + def decode_and_parse(self, raw_map: bytes): + if self._wifi_info_sn is None or self._wifi_info_sn is "": + device = MiotDevice(self._host, self._token) + props = device.get_property_by(7, 45)[0]["value"].split(',') + self._wifi_info_sn = props[self.WIFI_STR_POS].replace('"','')[:self.WIFI_STR_LEN] + _LOGGER.debug(f"wifi_sn = {self._wifi_info_sn}") + + decoded_map = self.map_data_parser.unpack_map(raw_map, + wifi_sn=self._wifi_info_sn, + owner_id = str(self._user_id), + device_id = str(self._device_id), + model = self.model, + device_mac = self._mac) + return self.map_data_parser.parse(decoded_map) diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py deleted file mode 100644 index 076f546..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py +++ /dev/null @@ -1,268 +0,0 @@ -import logging -import math -from typing import Dict, List, Optional, Set, Tuple - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ - Wall, Zone -from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser -from custom_components.xiaomi_cloud_map_extractor.const import * -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.viomi.image_handler import ImageHandlerViomi -from custom_components.xiaomi_cloud_map_extractor.viomi.parsing_buffer import ParsingBuffer - -_LOGGER = logging.getLogger(__name__) - - -class MapDataParserViomi(MapDataParser): - FEATURE_ROBOT_STATUS = 0x00000001 - FEATURE_IMAGE = 0x00000002 - FEATURE_HISTORY = 0x00000004 - FEATURE_CHARGE_STATION = 0x00000008 - FEATURE_RESTRICTED_AREAS = 0x00000010 - FEATURE_CLEANING_AREAS = 0x00000020 - FEATURE_NAVIGATE = 0x00000040 - FEATURE_REALTIME = 0x00000080 - FEATURE_ROOMS = 0x00001000 - - POSITION_UNKNOWN = 1100 - - @staticmethod - def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, - image_config: ImageConfig, *args, **kwargs) -> MapData: - map_data = MapData(0, 1) - buf = ParsingBuffer('header', raw, 0, len(raw)) - feature_flags = buf.get_uint32('feature_flags') - map_id = buf.peek_uint32('map_id') - _LOGGER.debug('feature_flags: 0x%x, map_id: %d', feature_flags, map_id) - - if feature_flags & MapDataParserViomi.FEATURE_ROBOT_STATUS != 0: - MapDataParserViomi.parse_section(buf, 'robot_status', map_id) - buf.skip('unknown1', 0x28) - - if feature_flags & MapDataParserViomi.FEATURE_IMAGE != 0: - MapDataParserViomi.parse_section(buf, 'image', map_id) - map_data.image, map_data.rooms, map_data.cleaned_rooms = \ - MapDataParserViomi.parse_image(buf, colors, image_config, DRAWABLE_CLEANED_AREA in drawables) - - if feature_flags & MapDataParserViomi.FEATURE_HISTORY != 0: - MapDataParserViomi.parse_section(buf, 'history', map_id) - map_data.path = MapDataParserViomi.parse_history(buf) - - if feature_flags & MapDataParserViomi.FEATURE_CHARGE_STATION != 0: - MapDataParserViomi.parse_section(buf, 'charge_station', map_id) - map_data.charger = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.charger) - - if feature_flags & MapDataParserViomi.FEATURE_RESTRICTED_AREAS != 0: - MapDataParserViomi.parse_section(buf, 'restricted_areas', map_id) - map_data.walls, map_data.no_go_areas = MapDataParserViomi.parse_restricted_areas(buf) - - if feature_flags & MapDataParserViomi.FEATURE_CLEANING_AREAS != 0: - MapDataParserViomi.parse_section(buf, 'cleaning_areas', map_id) - map_data.zones = MapDataParserViomi.parse_cleaning_areas(buf) - - if feature_flags & MapDataParserViomi.FEATURE_NAVIGATE != 0: - MapDataParserViomi.parse_section(buf, 'navigate', map_id) - buf.skip('unknown1', 4) - map_data.goto = MapDataParserViomi.parse_position(buf, 'pos') - foo = buf.get_float32('foo') - _LOGGER.debug('pos: %s, foo: %f', map_data.goto, foo) - - if feature_flags & MapDataParserViomi.FEATURE_REALTIME != 0: - MapDataParserViomi.parse_section(buf, 'realtime', map_id) - buf.skip('unknown1', 5) - map_data.vacuum_position = MapDataParserViomi.parse_position(buf, 'pos', with_angle=True) - _LOGGER.debug('pos: %s', map_data.vacuum_position) - - if feature_flags & 0x00000800 != 0: - MapDataParserViomi.parse_section(buf, 'unknown1', map_id) - MapDataParserViomi.parse_unknown_section(buf) - - if feature_flags & MapDataParserViomi.FEATURE_ROOMS != 0: - MapDataParserViomi.parse_section(buf, 'rooms', map_id) - MapDataParserViomi.parse_rooms(buf, map_data.rooms) - - if feature_flags & 0x00002000 != 0: - MapDataParserViomi.parse_section(buf, 'unknown2', map_id) - MapDataParserViomi.parse_unknown_section(buf) - - if feature_flags & 0x00004000 != 0: - MapDataParserViomi.parse_section(buf, 'room_outlines', map_id) - MapDataParserViomi.parse_room_outlines(buf) - - buf.check_empty() - - if map_data.rooms is not None: - _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()]) - if map_data.image is not None and not map_data.image.is_empty: - MapDataParserViomi.draw_elements(colors, drawables, sizes, map_data, image_config) - if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: - map_data.vacuum_room = MapDataParserViomi.get_current_vacuum_room(buf, map_data.vacuum_position) - if map_data.vacuum_room is not None: - map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name - _LOGGER.debug('current vacuum room: %s', map_data.vacuum_room) - ImageHandlerViomi.rotate(map_data.image) - ImageHandlerViomi.draw_texts(map_data.image, texts) - return map_data - - @staticmethod - def map_to_image(p: Point) -> Point: - return Point(p.x * 20 + 400, p.y * 20 + 400) - - @staticmethod - def image_to_map(x: float) -> float: - return (x - 400) / 20 - - @staticmethod - def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position: Point) -> Optional[int]: - vacuum_position_on_image = MapDataParserViomi.map_to_image(vacuum_position) - pixel_type = buf.get_at_image(int(vacuum_position_on_image.y) * 800 + int(vacuum_position_on_image.x)) - if ImageHandlerViomi.MAP_ROOM_MIN <= pixel_type <= ImageHandlerViomi.MAP_ROOM_MAX: - return pixel_type - elif ImageHandlerViomi.MAP_SELECTED_ROOM_MIN <= pixel_type <= ImageHandlerViomi.MAP_SELECTED_ROOM_MAX: - return pixel_type - ImageHandlerViomi.MAP_SELECTED_ROOM_MIN + ImageHandlerViomi.MAP_ROOM_MIN - return None - - @staticmethod - def parse_image(buf: ParsingBuffer, colors: Colors, image_config: ImageConfig, draw_cleaned_area: bool) \ - -> Tuple[ImageData, Dict[int, Room], Set[int]]: - buf.skip('unknown1', 0x08) - image_top = 0 - image_left = 0 - image_height = buf.get_uint32('image_height') - image_width = buf.get_uint32('image_width') - buf.skip('unknown2', 20) - image_size = image_height * image_width - _LOGGER.debug('width: %d, height: %d', image_width, image_height) - if image_width \ - - image_width * (image_config[CONF_TRIM][CONF_LEFT] + image_config[CONF_TRIM][CONF_RIGHT]) / 100 \ - < MINIMAL_IMAGE_WIDTH: - image_config[CONF_TRIM][CONF_LEFT] = 0 - image_config[CONF_TRIM][CONF_RIGHT] = 0 - if image_height \ - - image_height * (image_config[CONF_TRIM][CONF_TOP] + image_config[CONF_TRIM][CONF_BOTTOM]) / 100 \ - < MINIMAL_IMAGE_HEIGHT: - image_config[CONF_TRIM][CONF_TOP] = 0 - image_config[CONF_TRIM][CONF_BOTTOM] = 0 - buf.mark_as_image_beginning() - image, rooms_raw, cleaned_areas, cleaned_areas_layer = ImageHandlerViomi.parse(buf, image_width, image_height, - colors, image_config, - draw_cleaned_area) - _LOGGER.debug('img: number of rooms: %d, numbers: %s', len(rooms_raw), rooms_raw.keys()) - rooms = {} - for number, room in rooms_raw.items(): - rooms[number] = Room(number, MapDataParserViomi.image_to_map(room[0] + image_left), - MapDataParserViomi.image_to_map(room[1] + image_top), - MapDataParserViomi.image_to_map(room[2] + image_left), - MapDataParserViomi.image_to_map(room[3] + image_top)) - return ImageData(image_size, image_top, image_left, image_height, image_width, image_config, - image, MapDataParserViomi.map_to_image, - additional_layers={DRAWABLE_CLEANED_AREA: cleaned_areas_layer}), rooms, cleaned_areas - - @staticmethod - def parse_history(buf: ParsingBuffer) -> Path: - path_points = [] - buf.skip('unknown1', 4) - history_count = buf.get_uint32('history_count') - for _ in range(history_count): - mode = buf.get_uint8('mode') # 0: taxi, 1: working - path_points.append(MapDataParserViomi.parse_position(buf, 'path')) - return Path(len(path_points), 1, 0, [path_points]) - - @staticmethod - def parse_restricted_areas(buf: ParsingBuffer) -> Tuple[List[Wall], List[Area]]: - walls = [] - areas = [] - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - for _ in range(area_count): - buf.skip('restricted.unknown1', 12) - p1 = MapDataParserViomi.parse_position(buf, 'p1') - p2 = MapDataParserViomi.parse_position(buf, 'p2') - p3 = MapDataParserViomi.parse_position(buf, 'p3') - p4 = MapDataParserViomi.parse_position(buf, 'p4') - buf.skip('restricted.unknown2', 48) - _LOGGER.debug('restricted: %s %s %s %s', p1, p2, p3, p4) - if p1 == p2 and p3 == p4: - walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) - else: - areas.append(Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) - return walls, areas - - @staticmethod - def parse_cleaning_areas(buf: ParsingBuffer) -> List[Zone]: - buf.skip('unknown1', 4) - area_count = buf.get_uint32('area_count') - zones = [] - for _ in range(area_count): - buf.skip('area.unknown1', 12) - p1 = MapDataParserViomi.parse_position(buf, 'p1') - p2 = MapDataParserViomi.parse_position(buf, 'p2') - p3 = MapDataParserViomi.parse_position(buf, 'p3') - p4 = MapDataParserViomi.parse_position(buf, 'p4') - buf.skip('area.unknown2', 48) - zones.append(Zone(p1.x, p1.y, p3.x, p3.y)) - return zones - - @staticmethod - def parse_rooms(buf: ParsingBuffer, map_data_rooms: Dict[int, Room]): - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - while map_arg > 1: - map_name = buf.get_string_len8('map_name') - map_arg = buf.get_uint32('map_arg') - _LOGGER.debug('map#%d: %s', map_arg, map_name) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint8('room.id') - room_name = buf.get_string_len8('room.name') - if map_data_rooms is not None and room_id in map_data_rooms: - map_data_rooms[room_id].name = room_name - buf.skip('room.unknown1', 1) - room_text_pos = MapDataParserViomi.parse_position(buf, 'room.text_pos') - _LOGGER.debug('room#%d: %s %s', room_id, room_name, room_text_pos) - buf.skip('unknown1', 6) - - @staticmethod - def parse_room_outlines(buf: ParsingBuffer): - buf.skip('unknown1', 51) - room_count = buf.get_uint32('room_count') - for _ in range(room_count): - room_id = buf.get_uint32('room.id') - segment_count = buf.get_uint32('room.segment_count') - for _ in range(segment_count): - buf.skip('unknown2', 5) - _LOGGER.debug('room#%d: segment_count: %d', room_id, segment_count) - - @staticmethod - def parse_section(buf: ParsingBuffer, name: str, map_id: int): - buf.set_name(name) - magic = buf.get_uint32('magic') - if magic != map_id: - raise ValueError( - f"error parsing section {name} at offset {buf._offs - 4:#x}: magic check failed. " + - f"Magic: {magic:#x}, Map ID: {map_id:#x}") - - @staticmethod - def parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Optional[Point]: - x = buf.get_float32(name + '.x') - y = buf.get_float32(name + '.y') - if x == MapDataParserViomi.POSITION_UNKNOWN or y == MapDataParserViomi.POSITION_UNKNOWN: - return None - a = None - if with_angle: - a = buf.get_float32(name + '.a') * 180 / math.pi - return Point(x, y, a) - - @staticmethod - def parse_unknown_section(buf: ParsingBuffer) -> bool: - n = buf._data[buf._offs:].find(buf._data[4:8]) - if n >= 0: - buf._offs += n - buf._length -= n - return True - else: - buf._offs += buf._length - buf._length = 0 - return False diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py deleted file mode 100644 index 432d7e4..0000000 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py +++ /dev/null @@ -1,27 +0,0 @@ -import zlib - -from custom_components.xiaomi_cloud_map_extractor.common.map_data import MapData -from custom_components.xiaomi_cloud_map_extractor.common.vacuum_v2 import XiaomiCloudVacuumV2 -from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector -from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts -from custom_components.xiaomi_cloud_map_extractor.viomi.map_data_parser import MapDataParserViomi - -class ViomiVacuum(XiaomiCloudVacuumV2): - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac:str): - super().__init__(connector, country, user_id, device_id, model, mac) - - def decode_map(self, - raw_map: bytes, - colors: Colors, - drawables: Drawables, - texts: Texts, - sizes: Sizes, - image_config: ImageConfig) -> MapData: - - unzipped = zlib.decompress(raw_map) - - return MapDataParserViomi.parse(unzipped, colors, drawables, texts, sizes, image_config) - - def get_map_archive_extension(self) -> str: - return "zlib"