From 4d10e4dbd1e4d05557ec07110974ce5356fc7c02 Mon Sep 17 00:00:00 2001 From: "Alexander \"Tarh" Date: Sun, 14 Apr 2024 01:32:02 +0300 Subject: [PATCH] Added support for ijai.vacuum.xxx map image --- .../xiaomi_cloud_map_extractor/camera.py | 11 +- .../common/vacuum.py | 10 +- .../common/vacuum_v2.py | 15 +- .../common/xiaomi_cloud_connector.py | 9 +- .../xiaomi_cloud_map_extractor/const.py | 2 + .../ijai/__init__.py | 0 .../ijai/aes_decryptor.py | 66 ++++ .../ijai/image_handler.py | 95 ++++++ .../ijai/map_data_parser.py | 281 ++++++++++++++++++ .../ijai/parsing_buffer.py | 92 ++++++ .../xiaomi_cloud_map_extractor/ijai/vacuum.py | 36 +++ .../viomi/map_data_parser.py | 2 +- .../viomi/vacuum.py | 7 +- 13 files changed, 606 insertions(+), 20 deletions(-) create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py create mode 100644 custom_components/xiaomi_cloud_map_extractor/ijai/vacuum.py diff --git a/custom_components/xiaomi_cloud_map_extractor/camera.py b/custom_components/xiaomi_cloud_map_extractor/camera.py index 0568631..f7edb04 100644 --- a/custom_components/xiaomi_cloud_map_extractor/camera.py +++ b/custom_components/xiaomi_cloud_map_extractor/camera.py @@ -28,6 +28,7 @@ from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum from custom_components.xiaomi_cloud_map_extractor.unsupported.vacuum import UnsupportedVacuum from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum +from custom_components.xiaomi_cloud_map_extractor.ijai.vacuum import IjaiVacuum from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum _LOGGER = logging.getLogger(__name__) @@ -299,11 +300,11 @@ def _handle_login(self): def _handle_device(self): _LOGGER.debug("Retrieving device info, country: %s", self._country) - country, user_id, device_id, model = self._connector.get_device_details(self._vacuum.token, self._country) + country, user_id, device_id, model, mac = self._connector.get_device_details(self._vacuum.token, self._country) if model is not None: self._country = country _LOGGER.debug("Retrieved device model: %s", model) - self._device = self._create_device(user_id, device_id, model) + self._device = self._create_device(user_id, device_id, model, mac) _LOGGER.debug("Created device, used api: %s", self._used_api) else: _LOGGER.error("Failed to retrieve model") @@ -363,12 +364,14 @@ def _set_map_data(self, map_data: MapData): self._map_data = map_data self._store_image() - def _create_device(self, user_id: str, device_id: str, model: str) -> XiaomiCloudVacuum: + def _create_device(self, user_id: str, device_id: str, model: str, mac: str) -> XiaomiCloudVacuum: self._used_api = self._detect_api(model) if self._used_api == CONF_AVAILABLE_API_XIAOMI: return XiaomiVacuum(self._connector, self._country, user_id, device_id, model) if self._used_api == CONF_AVAILABLE_API_VIOMI: - return ViomiVacuum(self._connector, self._country, user_id, device_id, model) + return ViomiVacuum(self._connector, self._country, user_id, device_id, model, mac) + if self._used_api == CONF_AVAILABLE_API_IJAI: + return IjaiVacuum(self._connector, self._country, user_id, device_id, model, mac) if self._used_api == CONF_AVAILABLE_API_ROIDMI: return RoidmiVacuum(self._connector, self._country, user_id, device_id, model) if self._used_api == CONF_AVAILABLE_API_DREAME: diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py index 6b46841..c15107f 100644 --- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py +++ b/custom_components/xiaomi_cloud_map_extractor/common/vacuum.py @@ -9,12 +9,16 @@ class XiaomiCloudVacuum: - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): + def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac: str): self._connector = connector self._country = country self._user_id = user_id self._device_id = device_id self.model = model + self.mac = mac + + def decrypt_map(self, data:bytes, wifi_info_sn:str, user_id:str, device_id:str, model:str, mac:str): + return data def get_map(self, map_name: str, @@ -33,7 +37,9 @@ def get_map(self, raw_map_file.write(response) raw_map_file.close() map_stored = True - map_data = self.decode_map(response, colors, drawables, texts, sizes, image_config) + + map_data_decrypted = self.decrypt_map(response, "######SD##########", self._user_id, self._device_id, self.model, self.mac) + map_data = self.decode_map(map_data_decrypted, colors, drawables, texts, sizes, image_config) if map_data is None: return None, map_stored map_data.map_name = map_name diff --git a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py b/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py index 6375a53..aafe381 100644 --- a/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py +++ b/custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py @@ -2,20 +2,23 @@ from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector - +import logging class XiaomiCloudVacuumV2(XiaomiCloudVacuum): - - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - super().__init__(connector, country, user_id, device_id, model) + + _LOGGER = logging.getLogger(__name__) + + def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac: str): + super().__init__(connector, country, user_id, device_id, model, mac) def get_map_url(self, map_name: str) -> Optional[str]: - url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url' + url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url_pro' params = { "data": f'{{"obj_name":"{self._user_id}/{self._device_id}/{map_name}"}}' } api_response = self._connector.execute_api_call_encrypted(url, params) - if api_response is None or "result" not in api_response or "url" not in api_response["result"]: + if api_response is None or ("result" not in api_response) or (api_response["result"] is None) or ("url" not in api_response["result"]): + self._LOGGER.debug(f"API returned {api_response['code']}" + "(" + api_response["message"] + ")") return None return api_response["result"]["url"] diff --git a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py b/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py index 94fb641..4f4ec42 100644 --- a/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py +++ b/custom_components/xiaomi_cloud_map_extractor/common/xiaomi_cloud_connector.py @@ -132,7 +132,7 @@ def get_raw_map_data(self, map_url) -> Optional[bytes]: return None def get_device_details(self, token: str, - country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]: + country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]: countries_to_check = CONF_AVAILABLE_COUNTRIES if country is not None: countries_to_check = [country] @@ -146,8 +146,9 @@ def get_device_details(self, token: str, user_id = found[0]["uid"] device_id = found[0]["did"] model = found[0]["model"] - return country, user_id, device_id, model - return None, None, None, None + mac = found[0]["mac"] + return country, user_id, device_id, model, mac + return None, None, None, None, None def get_devices(self, country: str) -> Any: url = self.get_api_url(country) + "/home/device_list" @@ -168,7 +169,7 @@ def execute_api_call_encrypted(self, url: str, params: Dict[str, str]) -> Any: "userId": str(self._userId), "yetAnotherServiceToken": str(self._serviceToken), "serviceToken": str(self._serviceToken), - "locale": "en_GB", + "locale": "en_US", "timezone": "GMT+02:00", "is_daylight": "1", "dst_offset": "3600000", diff --git a/custom_components/xiaomi_cloud_map_extractor/const.py b/custom_components/xiaomi_cloud_map_extractor/const.py index 5bdd5c6..e7bf3b2 100644 --- a/custom_components/xiaomi_cloud_map_extractor/const.py +++ b/custom_components/xiaomi_cloud_map_extractor/const.py @@ -6,6 +6,7 @@ CONF_AVAILABLE_API_DREAME = "dreame" CONF_AVAILABLE_API_ROIDMI = "roidmi" CONF_AVAILABLE_API_VIOMI = "viomi" +CONF_AVAILABLE_API_IJAI = "ijai" CONF_AVAILABLE_API_XIAOMI = "xiaomi" CONF_AVAILABLE_COUNTRIES = ["cn", "de", "us", "ru", "tw", "sg", "in", "i2"] CONF_BOTTOM = "bottom" @@ -204,6 +205,7 @@ CONF_AVAILABLE_API_DREAME: ["dreame.vacuum."], CONF_AVAILABLE_API_ROIDMI: ["roidmi.vacuum.", "zhimi.vacuum."], CONF_AVAILABLE_API_VIOMI: ["viomi.vacuum."], + CONF_AVAILABLE_API_IJAI: ["ijai.vacuum."], CONF_AVAILABLE_API_XIAOMI: ["roborock.vacuum", "rockrobo.vacuum"] } diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py b/custom_components/xiaomi_cloud_map_extractor/ijai/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py b/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py new file mode 100644 index 0000000..25bb701 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py @@ -0,0 +1,66 @@ +from Crypto.Cipher import AES +from Crypto.Hash import MD5 +from Crypto.Util.Padding import pad, unpad +import base64 +import logging + +_LOGGER = logging.getLogger(__name__) + +isEncryptKeyTypeHex = True + +def aesEncrypted(data, key: str): + cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB) + + encryptedData = cipher.encrypt( + pad(data.encode("utf-8"), AES.block_size, 'pkcs7')) + encryptedBase64Str = base64.b64encode(encryptedData).decode("utf-8") + + return encryptedBase64Str + +def aesDecrypted(data, key: str): + parsedKey = key.encode("utf-8") + if isEncryptKeyTypeHex: + parsedKey = bytes.fromhex(key) + + cipher = AES.new(parsedKey, AES.MODE_ECB) + + decryptedBytes = cipher.decrypt(base64.b64decode(data)) + + decryptedData = unpad(decryptedBytes, AES.block_size, 'pkcs7') + + return bytes.fromhex(decryptedData.decode("utf-8")) + + +def md5key(string: str, model: str, device_mac: str): + pjstr = "".join(device_mac.lower().split(":")) + + tempModel = model.split('.')[-1] + + if len(tempModel) == 2: + tempModel = "00" + tempModel + elif len(tempModel) == 3: + tempModel = "0" + tempModel + + tempKey = pjstr + tempModel + aeskey = aesEncrypted(string, tempKey) + #aeskey = string + + temp = MD5.new(aeskey.encode('utf-8')).hexdigest() + if isEncryptKeyTypeHex: + return temp + else: + return temp[8:-8].upper() + + +def genMD5key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str): + arr = [wifi_info_sn, owner_id, device_id] + tempString = '+'.join(arr) + return md5key(tempString, model, device_mac) + + +def unGzipCommon(data: str, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes: + #base64map = base64.b64encode(data) +# with open("0.encrypted.map", 'wb') as file: +# file.write(data) + return aesDecrypted(data, genMD5key(wifi_info_sn, owner_id, device_id, model, device_mac)) + diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py b/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py new file mode 100644 index 0000000..f930077 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py @@ -0,0 +1,95 @@ +import logging +from typing import Dict, Optional, Set, Tuple + +from PIL import Image +from PIL.Image import Image as ImageType + +from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler +from custom_components.xiaomi_cloud_map_extractor.const import * +from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig +from custom_components.xiaomi_cloud_map_extractor.ijai.parsing_buffer import ParsingBuffer + +_LOGGER = logging.getLogger(__name__) + + +class ImageHandlerIjai(ImageHandler): + MAP_OUTSIDE = 0x00 + MAP_WALL = 0xff + MAP_SCAN = 0x01 + MAP_NEW_DISCOVERED_AREA = 0x02 + MAP_ROOM_MIN = 10 + MAP_ROOM_MAX = 59 + MAP_SELECTED_ROOM_MIN = 60 + MAP_SELECTED_ROOM_MAX = 109 + @staticmethod + def parse(buf: ParsingBuffer, width: int, height: int, colors: Colors, image_config: ImageConfig, + draw_cleaned_area: bool) \ + -> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]], Set[int], Optional[ImageType]]: + palette = {\ + ImageHandlerIjai.MAP_OUTSIDE: ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors),\ + ImageHandlerIjai.MAP_WALL: ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors),\ + ImageHandlerIjai.MAP_SCAN: ImageHandler.__get_color__(COLOR_SCAN, colors),\ + ImageHandlerIjai.MAP_NEW_DISCOVERED_AREA: ImageHandler.__get_color__(COLOR_NEW_DISCOVERED_AREA, colors)} + rooms = {} + cleaned_areas = set() + scale = image_config[CONF_SCALE] + trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100) + trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100) + trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100) + trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100) + trimmed_height = height - trim_top - trim_bottom + trimmed_width = width - trim_left - trim_right + if trimmed_width == 0 or trimmed_height == 0: + return ImageHandler.create_empty_map_image(colors), rooms, cleaned_areas, None + image = Image.new('RGBA', (trimmed_width, trimmed_height)) + pixels = image.load() + cleaned_areas_layer = None + cleaned_areas_pixels = None + if draw_cleaned_area: + cleaned_areas_layer = Image.new('RGBA', (trimmed_width, trimmed_height)) + cleaned_areas_pixels = cleaned_areas_layer.load() + _LOGGER.debug(f"trim_bottom = {trim_bottom}, trim_top = {trim_top}, trim_left = {trim_left}, trim_right = {trim_right}") + buf.skip('trim_bottom', trim_bottom * width) + unknown_pixels = set() + _LOGGER.debug(f"buffer: [{buf._offs:#x}] = {buf.peek_uint32("some_int32")}") + for img_y in range(trimmed_height): + buf.skip('trim_left', trim_left) + for img_x in range(trimmed_width): + pixel_type = buf.get_uint8('pixel') + x = img_x + y = trimmed_height - 1 - img_y + if pixel_type in palette.keys(): + pixels[x, y] = palette[pixel_type] + elif ImageHandlerIjai.MAP_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_SELECTED_ROOM_MAX: + room_x = img_x + trim_left + room_y = img_y + trim_bottom + if pixel_type < ImageHandlerIjai.MAP_SELECTED_ROOM_MIN: + room_number = pixel_type + else: + room_number = pixel_type - ImageHandlerIjai.MAP_SELECTED_ROOM_MIN + ImageHandlerIjai.MAP_ROOM_MIN + cleaned_areas.add(room_number) + if draw_cleaned_area: + cleaned_areas_pixels[x, y] = ImageHandler.__get_color__(COLOR_CLEANED_AREA, colors) + if room_number not in rooms: + rooms[room_number] = (room_x, room_y, room_x, room_y) + else: + rooms[room_number] = (min(rooms[room_number][0], room_x), + min(rooms[room_number][1], room_y), + max(rooms[room_number][2], room_x), + max(rooms[room_number][3], room_y)) + default = ImageHandler.ROOM_COLORS[room_number % len(ImageHandler.ROOM_COLORS)] + pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default) + else: + pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors) + unknown_pixels.add(pixel_type) + _LOGGER.debug(f"unknown pixel [{x},{y}] = {pixel_type}") + buf.skip('trim_right', trim_right) + buf.skip('trim_top', trim_top * width) + if image_config["scale"] != 1 and trimmed_width != 0 and trimmed_height != 0: + image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) + if draw_cleaned_area: + cleaned_areas_layer = cleaned_areas_layer.resize( + (int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST) + if len(unknown_pixels) > 0: + _LOGGER.warning('unknown pixel_types: %s', unknown_pixels) + return image, rooms, cleaned_areas, cleaned_areas_layer diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py new file mode 100644 index 0000000..e83b978 --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/ijai/map_data_parser.py @@ -0,0 +1,281 @@ +import logging +import math +from typing import Dict, List, Optional, Set, Tuple + +from custom_components.xiaomi_cloud_map_extractor.common.map_data import Area, ImageData, MapData, Path, Point, Room, \ + Wall, Zone +from custom_components.xiaomi_cloud_map_extractor.common.map_data_parser import MapDataParser +from custom_components.xiaomi_cloud_map_extractor.const import * +from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts +from custom_components.xiaomi_cloud_map_extractor.ijai.image_handler import ImageHandlerIjai +from custom_components.xiaomi_cloud_map_extractor.ijai.parsing_buffer import ParsingBuffer + +_LOGGER = logging.getLogger(__name__) + + +class MapDataParserIjai(MapDataParser): + FEATURE_ROBOT_STATUS = 0x00000001 + FEATURE_IMAGE = 0x00000002 + FEATURE_HISTORY = 0x00000004 + FEATURE_CHARGE_STATION = 0x00000008 + FEATURE_RESTRICTED_AREAS = 0x00000010 + FEATURE_CLEANING_AREAS = 0x00000020 + FEATURE_NAVIGATE = 0x00000040 + FEATURE_REALTIME = 0x00000080 + FEATURE_ROOMS = 0x00001000 + + POSITION_UNKNOWN = 1100 + + @staticmethod + def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: Sizes, + image_config: ImageConfig, *args, **kwargs) -> MapData: + map_data = MapData(0, 1) + buf = ParsingBuffer('header', raw, 0, len(raw)) + + id1 = buf.get_uint8('id1') + magic1 = buf.get_uint16('magic1') + offset1 = buf.get_uint8('offset1') - 1 + + buf.skip('unknown_hdr1', offset1) + _LOGGER.debug(f"Skipping {offset1} bytes, value: {buf._data[buf._offs]:#x}") + + feature_flags = MapDataParserIjai.FEATURE_IMAGE + #feature_flags = buf.get_uint32('feature_flags') + map_id = buf.peek_uint32('map_id') + #_LOGGER.debug('feature_flags: 0x%x, map_id: %d, some_hash: %s', feature_flags, map_id, some_hash) + + if feature_flags & MapDataParserIjai.FEATURE_ROBOT_STATUS != 0: + MapDataParserIjai.parse_section(buf, 'robot_status', map_id) + buf.skip('unknown1', 0x28) + + if feature_flags & MapDataParserIjai.FEATURE_IMAGE != 0: + #MapDataParserIjai.parse_section(buf, 'image', map_id) + buf.set_name('image') + map_data.image, map_data.rooms, map_data.cleaned_rooms = \ + MapDataParserIjai.parse_image(buf, colors, image_config, DRAWABLE_CLEANED_AREA in drawables) + + if feature_flags & MapDataParserIjai.FEATURE_HISTORY != 0: + MapDataParserIjai.parse_section(buf, 'history', map_id) + map_data.path = MapDataParserIjai.parse_history(buf) + + if feature_flags & MapDataParserIjai.FEATURE_CHARGE_STATION != 0: + MapDataParserIjai.parse_section(buf, 'charge_station', map_id) + map_data.charger = MapDataParserIjai.parse_position(buf, 'pos', with_angle=True) + _LOGGER.debug('pos: %s', map_data.charger) + + if feature_flags & MapDataParserIjai.FEATURE_RESTRICTED_AREAS != 0: + MapDataParserIjai.parse_section(buf, 'restricted_areas', map_id) + map_data.walls, map_data.no_go_areas = MapDataParserIjai.parse_restricted_areas(buf) + + if feature_flags & MapDataParserIjai.FEATURE_CLEANING_AREAS != 0: + MapDataParserIjai.parse_section(buf, 'cleaning_areas', map_id) + map_data.zones = MapDataParserIjai.parse_cleaning_areas(buf) + + if feature_flags & MapDataParserIjai.FEATURE_NAVIGATE != 0: + MapDataParserIjai.parse_section(buf, 'navigate', map_id) + buf.skip('unknown1', 4) + map_data.goto = MapDataParserIjai.parse_position(buf, 'pos') + foo = buf.get_float32('foo') + _LOGGER.debug('pos: %s, foo: %f', map_data.goto, foo) + + if feature_flags & MapDataParserIjai.FEATURE_REALTIME != 0: + MapDataParserIjai.parse_section(buf, 'realtime', map_id) + buf.skip('unknown1', 5) + map_data.vacuum_position = MapDataParserIjai.parse_position(buf, 'pos', with_angle=True) + _LOGGER.debug('pos: %s', map_data.vacuum_position) + + if feature_flags & 0x00000800 != 0: + MapDataParserIjai.parse_section(buf, 'unknown1', map_id) + MapDataParserIjai.parse_unknown_section(buf) + + if feature_flags & MapDataParserIjai.FEATURE_ROOMS != 0: + MapDataParserIjai.parse_section(buf, 'rooms', map_id) + MapDataParserIjai.parse_rooms(buf, map_data.rooms) + + if feature_flags & 0x00002000 != 0: + MapDataParserIjai.parse_section(buf, 'unknown2', map_id) + MapDataParserIjai.parse_unknown_section(buf) + + if feature_flags & 0x00004000 != 0: + MapDataParserIjai.parse_section(buf, 'room_outlines', map_id) + MapDataParserIjai.parse_room_outlines(buf) + + buf.check_empty() + + if map_data.rooms is not None: + _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()]) + if map_data.image is not None and not map_data.image.is_empty: + MapDataParserIjai.draw_elements(colors, drawables, sizes, map_data, image_config) + if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: + map_data.vacuum_room = MapDataParserIjai.get_current_vacuum_room(buf, map_data.vacuum_position) + if map_data.vacuum_room is not None: + map_data.vacuum_room_name = map_data.rooms[map_data.vacuum_room].name + _LOGGER.debug('current vacuum room: %s', map_data.vacuum_room) + ImageHandlerIjai.rotate(map_data.image) + ImageHandlerIjai.draw_texts(map_data.image, texts) + return map_data + + @staticmethod + def map_to_image(p: Point) -> Point: + return Point(p.x * 20 + 400, p.y * 20 + 400) + + @staticmethod + def image_to_map(x: float) -> float: + return (x - 400) / 20 + + @staticmethod + def get_current_vacuum_room(buf: ParsingBuffer, vacuum_position: Point) -> Optional[int]: + vacuum_position_on_image = MapDataParserIjai.map_to_image(vacuum_position) + pixel_type = buf.get_at_image(int(vacuum_position_on_image.y) * 800 + int(vacuum_position_on_image.x)) + if ImageHandlerIjai.MAP_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_ROOM_MAX: + return pixel_type + elif ImageHandlerIjai.MAP_SELECTED_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_SELECTED_ROOM_MAX: + return pixel_type - ImageHandlerIjai.MAP_SELECTED_ROOM_MIN + ImageHandlerIjai.MAP_ROOM_MIN + return None + + @staticmethod + def parse_image(buf: ParsingBuffer, colors: Colors, image_config: ImageConfig, draw_cleaned_area: bool) \ + -> Tuple[ImageData, Dict[int, Room], Set[int]]: + buf.skip('unknown1', 0xA) + + image_top = 0 + image_left = 0 + image_width = buf.get_uint16_remove_parity('image_width') + buf.skip('unknown2', 1) + image_height = buf.get_uint16_remove_parity('image_height') + buf.skip('unknown3', 0x21) + + image_size = image_height * image_width + _LOGGER.debug('width: %d, height: %d', image_width, image_height) + if image_width \ + - image_width * (image_config[CONF_TRIM][CONF_LEFT] + image_config[CONF_TRIM][CONF_RIGHT]) / 100 \ + < MINIMAL_IMAGE_WIDTH: + image_config[CONF_TRIM][CONF_LEFT] = 0 + image_config[CONF_TRIM][CONF_RIGHT] = 0 + if image_height \ + - image_height * (image_config[CONF_TRIM][CONF_TOP] + image_config[CONF_TRIM][CONF_BOTTOM]) / 100 \ + < MINIMAL_IMAGE_HEIGHT: + image_config[CONF_TRIM][CONF_TOP] = 0 + image_config[CONF_TRIM][CONF_BOTTOM] = 0 + buf.mark_as_image_beginning() + image, rooms_raw, cleaned_areas, cleaned_areas_layer = ImageHandlerIjai.parse(buf, image_width, image_height, + colors, image_config, + draw_cleaned_area) + _LOGGER.debug('img: number of rooms: %d, numbers: %s', len(rooms_raw), rooms_raw.keys()) + rooms = {} + for number, room in rooms_raw.items(): + rooms[number] = Room(number, MapDataParserIjai.image_to_map(room[0] + image_left), + MapDataParserIjai.image_to_map(room[1] + image_top), + MapDataParserIjai.image_to_map(room[2] + image_left), + MapDataParserIjai.image_to_map(room[3] + image_top)) + return ImageData(image_size, image_top, image_left, image_height, image_width, image_config, + image, MapDataParserIjai.map_to_image, + additional_layers={DRAWABLE_CLEANED_AREA: cleaned_areas_layer}), rooms, cleaned_areas + + @staticmethod + def parse_history(buf: ParsingBuffer) -> Path: + path_points = [] + buf.skip('unknown1', 4) + history_count = buf.get_uint32('history_count') + for _ in range(history_count): + mode = buf.get_uint8('mode') # 0: taxi, 1: working + path_points.append(MapDataParserIjai.parse_position(buf, 'path')) + return Path(len(path_points), 1, 0, [path_points]) + + @staticmethod + def parse_restricted_areas(buf: ParsingBuffer) -> Tuple[List[Wall], List[Area]]: + walls = [] + areas = [] + buf.skip('unknown1', 4) + area_count = buf.get_uint32('area_count') + for _ in range(area_count): + buf.skip('restricted.unknown1', 12) + p1 = MapDataParserIjai.parse_position(buf, 'p1') + p2 = MapDataParserIjai.parse_position(buf, 'p2') + p3 = MapDataParserIjai.parse_position(buf, 'p3') + p4 = MapDataParserIjai.parse_position(buf, 'p4') + buf.skip('restricted.unknown2', 48) + _LOGGER.debug('restricted: %s %s %s %s', p1, p2, p3, p4) + if p1 == p2 and p3 == p4: + walls.append(Wall(p1.x, p1.y, p3.x, p3.y)) + else: + areas.append(Area(p1.x, p1.y, p2.x, p2.y, p3.x, p3.y, p4.x, p4.y)) + return walls, areas + + @staticmethod + def parse_cleaning_areas(buf: ParsingBuffer) -> List[Zone]: + buf.skip('unknown1', 4) + area_count = buf.get_uint32('area_count') + zones = [] + for _ in range(area_count): + buf.skip('area.unknown1', 12) + p1 = MapDataParserIjai.parse_position(buf, 'p1') + p2 = MapDataParserIjai.parse_position(buf, 'p2') + p3 = MapDataParserIjai.parse_position(buf, 'p3') + p4 = MapDataParserIjai.parse_position(buf, 'p4') + buf.skip('area.unknown2', 48) + zones.append(Zone(p1.x, p1.y, p3.x, p3.y)) + return zones + + @staticmethod + def parse_rooms(buf: ParsingBuffer, map_data_rooms: Dict[int, Room]): + map_name = buf.get_string_len8('map_name') + map_arg = buf.get_uint32('map_arg') + _LOGGER.debug('map#%d: %s', map_arg, map_name) + while map_arg > 1: + map_name = buf.get_string_len8('map_name') + map_arg = buf.get_uint32('map_arg') + _LOGGER.debug('map#%d: %s', map_arg, map_name) + room_count = buf.get_uint32('room_count') + for _ in range(room_count): + room_id = buf.get_uint8('room.id') + room_name = buf.get_string_len8('room.name') + if map_data_rooms is not None and room_id in map_data_rooms: + map_data_rooms[room_id].name = room_name + buf.skip('room.unknown1', 1) + room_text_pos = MapDataParserIjai.parse_position(buf, 'room.text_pos') + _LOGGER.debug('room#%d: %s %s', room_id, room_name, room_text_pos) + buf.skip('unknown1', 6) + + @staticmethod + def parse_room_outlines(buf: ParsingBuffer): + buf.skip('unknown1', 51) + room_count = buf.get_uint32('room_count') + for _ in range(room_count): + room_id = buf.get_uint32('room.id') + segment_count = buf.get_uint32('room.segment_count') + for _ in range(segment_count): + buf.skip('unknown2', 5) + _LOGGER.debug('room#%d: segment_count: %d', room_id, segment_count) + + @staticmethod + def parse_section(buf: ParsingBuffer, name: str, map_id: int): + buf.set_name(name) + magic = buf.get_uint32('magic') + if magic != map_id: + raise ValueError( + f"error parsing section {name} at offset {buf._offs - 4:#x}: magic check failed. " + + f"Magic: {magic:#x}, Map ID: {map_id:#x}") + + @staticmethod + def parse_position(buf: ParsingBuffer, name: str, with_angle: bool = False) -> Optional[Point]: + x = buf.get_float32(name + '.x') + y = buf.get_float32(name + '.y') + if x == MapDataParserIjai.POSITION_UNKNOWN or y == MapDataParserIjai.POSITION_UNKNOWN: + return None + a = None + if with_angle: + a = buf.get_float32(name + '.a') * 180 / math.pi + return Point(x, y, a) + + @staticmethod + def parse_unknown_section(buf: ParsingBuffer) -> bool: + n = buf._data[buf._offs:].find(buf._data[4:8]) + if n >= 0: + buf._offs += n + buf._length -= n + return True + else: + buf._offs += buf._length + buf._length = 0 + return False diff --git a/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py b/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py new file mode 100644 index 0000000..738547b --- /dev/null +++ b/custom_components/xiaomi_cloud_map_extractor/ijai/parsing_buffer.py @@ -0,0 +1,92 @@ +import logging + +from struct import unpack_from + +_LOGGER = logging.getLogger(__name__) + + +class ParsingBuffer: + def __init__(self, name: str, data: bytes, start_offs: int, length: int): + self._name = name + self._data = data + self._offs = start_offs + self._length = length + self._image_beginning = None + + def set_name(self, name: str): + self._name = name + _LOGGER.debug('SECTION %s: offset 0x%x', self._name, self._offs) + + def mark_as_image_beginning(self): + self._image_beginning = self._offs + + def get_at_image(self, offset) -> int: + return self._data[self._image_beginning + offset - 1] + + def skip(self, field: str, n: int): + if n == 0: + return + if self._length < n: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += n + self._length -= n + + def get_uint8(self, field: str) -> int: + if self._length < 1: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += 1 + self._length -= 1 + return self._data[self._offs - 1] + + def get_uint16(self, field: str) -> int: + if self._length < 2: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += 2 + self._length -= 2 + return unpack_from(' int: + if self._length < 2: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + lo = self._data[self._offs] + hi = self._data[self._offs + 1] + self._offs += 2 + self._length -= 2 + return ((hi^1) << 7) ^ lo + + def get_uint32(self, field: str) -> int: + if self._length < 4: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += 4 + self._length -= 4 + return unpack_from(' float: + if self._length < 4: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += 4 + self._length -= 4 + return unpack_from(' bytes: + if self._length < count: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + self._offs += count + self._length -= count + return self._data[self._offs - count:self._offs].decode('utf-8') + + def get_string_len8(self, field: str) -> str: + n = self.get_uint8(field + '.len') + return get_bytes(n, field) + + + def peek_uint32(self, field: str) -> int: + if self._length < 4: + raise ValueError(f"error parsing {self._name}.{field} at offset {self._offs:#x}: buffer underrun") + return unpack_from(' MapData: + + unzipped = zlib.decompress(raw_map) + + return MapDataParserIjai.parse(unzipped, colors, drawables, texts, sizes, image_config) + + def get_map_archive_extension(self) -> str: + return "zlib" + + def decrypt_map(self, data:bytes, wifi_info_sn:str, user_id:str, device_id:str, model:str, mac:str): + return unGzipCommon(data=data, \ + wifi_info_sn=wifi_info_sn, \ + owner_id=str(user_id), \ + device_id=device_id, \ + model=model, \ + device_mac=mac); \ No newline at end of file diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py index 49a62c0..076f546 100644 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py +++ b/custom_components/xiaomi_cloud_map_extractor/viomi/map_data_parser.py @@ -94,7 +94,7 @@ def parse(raw: bytes, colors: Colors, drawables: Drawables, texts: Texts, sizes: if map_data.rooms is not None: _LOGGER.debug('rooms: %s', [str(room) for number, room in map_data.rooms.items()]) - if not map_data.image.is_empty: + if map_data.image is not None and not map_data.image.is_empty: MapDataParserViomi.draw_elements(colors, drawables, sizes, map_data, image_config) if len(map_data.rooms) > 0 and map_data.vacuum_position is not None: map_data.vacuum_room = MapDataParserViomi.get_current_vacuum_room(buf, map_data.vacuum_position) diff --git a/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py b/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py index 619aeda..432d7e4 100644 --- a/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py +++ b/custom_components/xiaomi_cloud_map_extractor/viomi/vacuum.py @@ -6,11 +6,10 @@ from custom_components.xiaomi_cloud_map_extractor.types import Colors, Drawables, ImageConfig, Sizes, Texts from custom_components.xiaomi_cloud_map_extractor.viomi.map_data_parser import MapDataParserViomi - class ViomiVacuum(XiaomiCloudVacuumV2): - def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str): - super().__init__(connector, country, user_id, device_id, model) + def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac:str): + super().__init__(connector, country, user_id, device_id, model, mac) def decode_map(self, raw_map: bytes, @@ -19,7 +18,9 @@ def decode_map(self, texts: Texts, sizes: Sizes, image_config: ImageConfig) -> MapData: + unzipped = zlib.decompress(raw_map) + return MapDataParserViomi.parse(unzipped, colors, drawables, texts, sizes, image_config) def get_map_archive_extension(self) -> str: