Skip to content

Commit

Permalink
Added support for ijai.vacuum.xxx map image
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander "Tarh committed Apr 13, 2024
1 parent 2475e84 commit 4d10e4d
Show file tree
Hide file tree
Showing 13 changed files with 606 additions and 20 deletions.
11 changes: 7 additions & 4 deletions custom_components/xiaomi_cloud_map_extractor/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from custom_components.xiaomi_cloud_map_extractor.roidmi.vacuum import RoidmiVacuum
from custom_components.xiaomi_cloud_map_extractor.unsupported.vacuum import UnsupportedVacuum
from custom_components.xiaomi_cloud_map_extractor.viomi.vacuum import ViomiVacuum
from custom_components.xiaomi_cloud_map_extractor.ijai.vacuum import IjaiVacuum
from custom_components.xiaomi_cloud_map_extractor.xiaomi.vacuum import XiaomiVacuum

_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -299,11 +300,11 @@ def _handle_login(self):

def _handle_device(self):
_LOGGER.debug("Retrieving device info, country: %s", self._country)
country, user_id, device_id, model = self._connector.get_device_details(self._vacuum.token, self._country)
country, user_id, device_id, model, mac = self._connector.get_device_details(self._vacuum.token, self._country)
if model is not None:
self._country = country
_LOGGER.debug("Retrieved device model: %s", model)
self._device = self._create_device(user_id, device_id, model)
self._device = self._create_device(user_id, device_id, model, mac)
_LOGGER.debug("Created device, used api: %s", self._used_api)
else:
_LOGGER.error("Failed to retrieve model")
Expand Down Expand Up @@ -363,12 +364,14 @@ def _set_map_data(self, map_data: MapData):
self._map_data = map_data
self._store_image()

def _create_device(self, user_id: str, device_id: str, model: str) -> XiaomiCloudVacuum:
def _create_device(self, user_id: str, device_id: str, model: str, mac: str) -> XiaomiCloudVacuum:
self._used_api = self._detect_api(model)
if self._used_api == CONF_AVAILABLE_API_XIAOMI:
return XiaomiVacuum(self._connector, self._country, user_id, device_id, model)
if self._used_api == CONF_AVAILABLE_API_VIOMI:
return ViomiVacuum(self._connector, self._country, user_id, device_id, model)
return ViomiVacuum(self._connector, self._country, user_id, device_id, model, mac)
if self._used_api == CONF_AVAILABLE_API_IJAI:
return IjaiVacuum(self._connector, self._country, user_id, device_id, model, mac)
if self._used_api == CONF_AVAILABLE_API_ROIDMI:
return RoidmiVacuum(self._connector, self._country, user_id, device_id, model)
if self._used_api == CONF_AVAILABLE_API_DREAME:
Expand Down
10 changes: 8 additions & 2 deletions custom_components/xiaomi_cloud_map_extractor/common/vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@

class XiaomiCloudVacuum:

def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac: str):
self._connector = connector
self._country = country
self._user_id = user_id
self._device_id = device_id
self.model = model
self.mac = mac

def decrypt_map(self, data:bytes, wifi_info_sn:str, user_id:str, device_id:str, model:str, mac:str):
return data

def get_map(self,
map_name: str,
Expand All @@ -33,7 +37,9 @@ def get_map(self,
raw_map_file.write(response)
raw_map_file.close()
map_stored = True
map_data = self.decode_map(response, colors, drawables, texts, sizes, image_config)

map_data_decrypted = self.decrypt_map(response, "######SD##########", self._user_id, self._device_id, self.model, self.mac)
map_data = self.decode_map(map_data_decrypted, colors, drawables, texts, sizes, image_config)
if map_data is None:
return None, map_stored
map_data.map_name = map_name
Expand Down
15 changes: 9 additions & 6 deletions custom_components/xiaomi_cloud_map_extractor/common/vacuum_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@

from custom_components.xiaomi_cloud_map_extractor.common.vacuum import XiaomiCloudVacuum
from custom_components.xiaomi_cloud_map_extractor.common.xiaomi_cloud_connector import XiaomiCloudConnector

import logging

class XiaomiCloudVacuumV2(XiaomiCloudVacuum):

def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str):
super().__init__(connector, country, user_id, device_id, model)

_LOGGER = logging.getLogger(__name__)

def __init__(self, connector: XiaomiCloudConnector, country: str, user_id: str, device_id: str, model: str, mac: str):
super().__init__(connector, country, user_id, device_id, model, mac)

def get_map_url(self, map_name: str) -> Optional[str]:
url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url'
url = self._connector.get_api_url(self._country) + '/v2/home/get_interim_file_url_pro'
params = {
"data": f'{{"obj_name":"{self._user_id}/{self._device_id}/{map_name}"}}'
}
api_response = self._connector.execute_api_call_encrypted(url, params)
if api_response is None or "result" not in api_response or "url" not in api_response["result"]:
if api_response is None or ("result" not in api_response) or (api_response["result"] is None) or ("url" not in api_response["result"]):
self._LOGGER.debug(f"API returned {api_response['code']}" + "(" + api_response["message"] + ")")
return None
return api_response["result"]["url"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def get_raw_map_data(self, map_url) -> Optional[bytes]:
return None

def get_device_details(self, token: str,
country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str]]:
country: str) -> Tuple[Optional[str], Optional[str], Optional[str], Optional[str], Optional[str]]:
countries_to_check = CONF_AVAILABLE_COUNTRIES
if country is not None:
countries_to_check = [country]
Expand All @@ -146,8 +146,9 @@ def get_device_details(self, token: str,
user_id = found[0]["uid"]
device_id = found[0]["did"]
model = found[0]["model"]
return country, user_id, device_id, model
return None, None, None, None
mac = found[0]["mac"]
return country, user_id, device_id, model, mac
return None, None, None, None, None

def get_devices(self, country: str) -> Any:
url = self.get_api_url(country) + "/home/device_list"
Expand All @@ -168,7 +169,7 @@ def execute_api_call_encrypted(self, url: str, params: Dict[str, str]) -> Any:
"userId": str(self._userId),
"yetAnotherServiceToken": str(self._serviceToken),
"serviceToken": str(self._serviceToken),
"locale": "en_GB",
"locale": "en_US",
"timezone": "GMT+02:00",
"is_daylight": "1",
"dst_offset": "3600000",
Expand Down
2 changes: 2 additions & 0 deletions custom_components/xiaomi_cloud_map_extractor/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
CONF_AVAILABLE_API_DREAME = "dreame"
CONF_AVAILABLE_API_ROIDMI = "roidmi"
CONF_AVAILABLE_API_VIOMI = "viomi"
CONF_AVAILABLE_API_IJAI = "ijai"
CONF_AVAILABLE_API_XIAOMI = "xiaomi"
CONF_AVAILABLE_COUNTRIES = ["cn", "de", "us", "ru", "tw", "sg", "in", "i2"]
CONF_BOTTOM = "bottom"
Expand Down Expand Up @@ -204,6 +205,7 @@
CONF_AVAILABLE_API_DREAME: ["dreame.vacuum."],
CONF_AVAILABLE_API_ROIDMI: ["roidmi.vacuum.", "zhimi.vacuum."],
CONF_AVAILABLE_API_VIOMI: ["viomi.vacuum."],
CONF_AVAILABLE_API_IJAI: ["ijai.vacuum."],
CONF_AVAILABLE_API_XIAOMI: ["roborock.vacuum", "rockrobo.vacuum"]
}

Expand Down
Empty file.
66 changes: 66 additions & 0 deletions custom_components/xiaomi_cloud_map_extractor/ijai/aes_decryptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from Crypto.Cipher import AES
from Crypto.Hash import MD5
from Crypto.Util.Padding import pad, unpad
import base64
import logging

_LOGGER = logging.getLogger(__name__)

isEncryptKeyTypeHex = True

def aesEncrypted(data, key: str):
cipher = AES.new(key.encode("utf-8"), AES.MODE_ECB)

encryptedData = cipher.encrypt(
pad(data.encode("utf-8"), AES.block_size, 'pkcs7'))
encryptedBase64Str = base64.b64encode(encryptedData).decode("utf-8")

return encryptedBase64Str

def aesDecrypted(data, key: str):
parsedKey = key.encode("utf-8")
if isEncryptKeyTypeHex:
parsedKey = bytes.fromhex(key)

cipher = AES.new(parsedKey, AES.MODE_ECB)

decryptedBytes = cipher.decrypt(base64.b64decode(data))

decryptedData = unpad(decryptedBytes, AES.block_size, 'pkcs7')

return bytes.fromhex(decryptedData.decode("utf-8"))


def md5key(string: str, model: str, device_mac: str):
pjstr = "".join(device_mac.lower().split(":"))

tempModel = model.split('.')[-1]

if len(tempModel) == 2:
tempModel = "00" + tempModel
elif len(tempModel) == 3:
tempModel = "0" + tempModel

tempKey = pjstr + tempModel
aeskey = aesEncrypted(string, tempKey)
#aeskey = string

temp = MD5.new(aeskey.encode('utf-8')).hexdigest()
if isEncryptKeyTypeHex:
return temp
else:
return temp[8:-8].upper()


def genMD5key(wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str):
arr = [wifi_info_sn, owner_id, device_id]
tempString = '+'.join(arr)
return md5key(tempString, model, device_mac)


def unGzipCommon(data: str, wifi_info_sn: str, owner_id: str, device_id: str, model: str, device_mac: str) -> bytes:
#base64map = base64.b64encode(data)
# with open("0.encrypted.map", 'wb') as file:
# file.write(data)
return aesDecrypted(data, genMD5key(wifi_info_sn, owner_id, device_id, model, device_mac))

95 changes: 95 additions & 0 deletions custom_components/xiaomi_cloud_map_extractor/ijai/image_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import logging
from typing import Dict, Optional, Set, Tuple

from PIL import Image
from PIL.Image import Image as ImageType

from custom_components.xiaomi_cloud_map_extractor.common.image_handler import ImageHandler
from custom_components.xiaomi_cloud_map_extractor.const import *
from custom_components.xiaomi_cloud_map_extractor.types import Colors, ImageConfig
from custom_components.xiaomi_cloud_map_extractor.ijai.parsing_buffer import ParsingBuffer

_LOGGER = logging.getLogger(__name__)


class ImageHandlerIjai(ImageHandler):
MAP_OUTSIDE = 0x00
MAP_WALL = 0xff
MAP_SCAN = 0x01
MAP_NEW_DISCOVERED_AREA = 0x02
MAP_ROOM_MIN = 10
MAP_ROOM_MAX = 59
MAP_SELECTED_ROOM_MIN = 60
MAP_SELECTED_ROOM_MAX = 109
@staticmethod
def parse(buf: ParsingBuffer, width: int, height: int, colors: Colors, image_config: ImageConfig,
draw_cleaned_area: bool) \
-> Tuple[ImageType, Dict[int, Tuple[int, int, int, int]], Set[int], Optional[ImageType]]:
palette = {\
ImageHandlerIjai.MAP_OUTSIDE: ImageHandler.__get_color__(COLOR_MAP_OUTSIDE, colors),\
ImageHandlerIjai.MAP_WALL: ImageHandler.__get_color__(COLOR_MAP_WALL_V2, colors),\
ImageHandlerIjai.MAP_SCAN: ImageHandler.__get_color__(COLOR_SCAN, colors),\
ImageHandlerIjai.MAP_NEW_DISCOVERED_AREA: ImageHandler.__get_color__(COLOR_NEW_DISCOVERED_AREA, colors)}
rooms = {}
cleaned_areas = set()
scale = image_config[CONF_SCALE]
trim_left = int(image_config[CONF_TRIM][CONF_LEFT] * width / 100)
trim_right = int(image_config[CONF_TRIM][CONF_RIGHT] * width / 100)
trim_top = int(image_config[CONF_TRIM][CONF_TOP] * height / 100)
trim_bottom = int(image_config[CONF_TRIM][CONF_BOTTOM] * height / 100)
trimmed_height = height - trim_top - trim_bottom
trimmed_width = width - trim_left - trim_right
if trimmed_width == 0 or trimmed_height == 0:
return ImageHandler.create_empty_map_image(colors), rooms, cleaned_areas, None
image = Image.new('RGBA', (trimmed_width, trimmed_height))
pixels = image.load()
cleaned_areas_layer = None
cleaned_areas_pixels = None
if draw_cleaned_area:
cleaned_areas_layer = Image.new('RGBA', (trimmed_width, trimmed_height))
cleaned_areas_pixels = cleaned_areas_layer.load()
_LOGGER.debug(f"trim_bottom = {trim_bottom}, trim_top = {trim_top}, trim_left = {trim_left}, trim_right = {trim_right}")
buf.skip('trim_bottom', trim_bottom * width)
unknown_pixels = set()
_LOGGER.debug(f"buffer: [{buf._offs:#x}] = {buf.peek_uint32("some_int32")}")
for img_y in range(trimmed_height):
buf.skip('trim_left', trim_left)
for img_x in range(trimmed_width):
pixel_type = buf.get_uint8('pixel')
x = img_x
y = trimmed_height - 1 - img_y
if pixel_type in palette.keys():
pixels[x, y] = palette[pixel_type]
elif ImageHandlerIjai.MAP_ROOM_MIN <= pixel_type <= ImageHandlerIjai.MAP_SELECTED_ROOM_MAX:
room_x = img_x + trim_left
room_y = img_y + trim_bottom
if pixel_type < ImageHandlerIjai.MAP_SELECTED_ROOM_MIN:
room_number = pixel_type
else:
room_number = pixel_type - ImageHandlerIjai.MAP_SELECTED_ROOM_MIN + ImageHandlerIjai.MAP_ROOM_MIN
cleaned_areas.add(room_number)
if draw_cleaned_area:
cleaned_areas_pixels[x, y] = ImageHandler.__get_color__(COLOR_CLEANED_AREA, colors)
if room_number not in rooms:
rooms[room_number] = (room_x, room_y, room_x, room_y)
else:
rooms[room_number] = (min(rooms[room_number][0], room_x),
min(rooms[room_number][1], room_y),
max(rooms[room_number][2], room_x),
max(rooms[room_number][3], room_y))
default = ImageHandler.ROOM_COLORS[room_number % len(ImageHandler.ROOM_COLORS)]
pixels[x, y] = ImageHandler.__get_color__(f"{COLOR_ROOM_PREFIX}{room_number}", colors, default)
else:
pixels[x, y] = ImageHandler.__get_color__(COLOR_UNKNOWN, colors)
unknown_pixels.add(pixel_type)
_LOGGER.debug(f"unknown pixel [{x},{y}] = {pixel_type}")
buf.skip('trim_right', trim_right)
buf.skip('trim_top', trim_top * width)
if image_config["scale"] != 1 and trimmed_width != 0 and trimmed_height != 0:
image = image.resize((int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST)
if draw_cleaned_area:
cleaned_areas_layer = cleaned_areas_layer.resize(
(int(trimmed_width * scale), int(trimmed_height * scale)), resample=Image.NEAREST)
if len(unknown_pixels) > 0:
_LOGGER.warning('unknown pixel_types: %s', unknown_pixels)
return image, rooms, cleaned_areas, cleaned_areas_layer
Loading

0 comments on commit 4d10e4d

Please sign in to comment.