diff --git a/README.md b/README.md index 4ac68bc..f09a821 100644 --- a/README.md +++ b/README.md @@ -30,10 +30,12 @@ pip install valetudo_map_parser ``` ### Requirements: -- Python 3.12 or higher +- Python 3.13 or higher - Dependencies: - - Pillow (PIL) for image processing - - NumPy for array operations + - Pillow (>=10.3.0) for image processing + - NumPy (>=1.26.4) for array operations + - SciPy (>=1.12.0) for scientific computing + - mvcrender (==0.0.7) for high-performance rendering ### Usage: The library is configured using a dictionary format. See our [sample code](https://github.com/sca075/Python-package-valetudo-map-parser/blob/main/tests/test.py) for implementation examples. @@ -46,7 +48,7 @@ Key functionalities: - Supports asynchronous operations ### Development Status: -Current version: 0.1.9.b41 +Current version: 0.2.0 - Full functionality available in versions >= 0.1.9 - Actively maintained and enhanced - Uses Poetry for dependency management diff --git a/SCR/valetudo_map_parser/__init__.py b/SCR/valetudo_map_parser/__init__.py index 7fb2898..68ba02c 100644 --- a/SCR/valetudo_map_parser/__init__.py +++ b/SCR/valetudo_map_parser/__init__.py @@ -1,5 +1,5 @@ """Valetudo map parser. -Version: 0.1.14""" +Version: 0.2.0""" from pathlib import Path diff --git a/SCR/valetudo_map_parser/config/colors.py b/SCR/valetudo_map_parser/config/colors.py index f37fd46..085964c 100644 --- a/SCR/valetudo_map_parser/config/colors.py +++ b/SCR/valetudo_map_parser/config/colors.py @@ -2,7 +2,7 @@ from __future__ import annotations -from enum import StrEnum +from enum import IntEnum, StrEnum from typing import Dict, List, Tuple import numpy as np @@ -134,6 +134,7 @@ color_material_tile, # [11] ] +# Might be not used in the future we may remove it. color_array = [ base_colors_array[0], # color_wall base_colors_array[6], # color_no_go @@ -150,6 +151,26 @@ ] +class ColorIndex(IntEnum): + """ + Named indices for user_colors array. + This prevents hardcoded indices and makes the code maintainable. + The order must match the order in set_initial_colours() base_color_keys. + """ + WALL = 0 + ZONE_CLEAN = 1 + ROBOT = 2 + BACKGROUND = 3 + MOVE = 4 + CHARGER = 5 + CARPET = 6 + NO_GO = 7 + GO_TO = 8 + TEXT = 9 + MATERIAL_WOOD = 10 + MATERIAL_TILE = 11 + + class SupportedColor(StrEnum): """Color of a supported map element.""" @@ -383,17 +404,30 @@ def set_initial_colours(self, device_info: dict) -> None: def initialize_user_colors(self, device_info: dict) -> List[Color]: """ Initialize user-defined colors with defaults as fallback. + The order MUST match ColorIndex enum and set_initial_colours() base_color_keys. :param device_info: Dictionary containing user-defined colors. :return: List of RGBA colors for map elements. """ + # Define the canonical order matching ColorIndex enum + base_color_keys = [ + (COLOR_WALL, color_wall, ALPHA_WALL), + (COLOR_ZONE_CLEAN, color_zone_clean, ALPHA_ZONE_CLEAN), + (COLOR_ROBOT, color_robot, ALPHA_ROBOT), + (COLOR_BACKGROUND, color_background, ALPHA_BACKGROUND), + (COLOR_MOVE, color_move, ALPHA_MOVE), + (COLOR_CHARGER, color_charger, ALPHA_CHARGER), + (COLOR_CARPET, color_carpet, ALPHA_CARPET), + (COLOR_NO_GO, color_no_go, ALPHA_NO_GO), + (COLOR_GO_TO, color_go_to, ALPHA_GO_TO), + (COLOR_TEXT, color_text, ALPHA_TEXT), + (COLOR_MATERIAL_WOOD, color_material_wood, ALPHA_MATERIAL_WOOD), + (COLOR_MATERIAL_TILE, color_material_tile, ALPHA_MATERIAL_TILE), + ] + colors = [] - for key in SupportedColor: - if key.startswith(SupportedColor.COLOR_ROOM_PREFIX): - continue # Skip room colors for user_colors - rgb = device_info.get(key, DefaultColors.COLORS_RGB.get(key)) - alpha = device_info.get( - f"alpha_{key}", DefaultColors.DEFAULT_ALPHA.get(f"alpha_{key}") - ) + for color_key, default_color, alpha_key in base_color_keys: + rgb = device_info.get(color_key, default_color[:3] if default_color else (0, 0, 0)) + alpha = device_info.get(alpha_key, 255.0) colors.append(self.add_alpha_to_color(rgb, alpha)) return colors diff --git a/SCR/valetudo_map_parser/config/drawable_elements.py b/SCR/valetudo_map_parser/config/drawable_elements.py index dd7cea9..7484e5f 100644 --- a/SCR/valetudo_map_parser/config/drawable_elements.py +++ b/SCR/valetudo_map_parser/config/drawable_elements.py @@ -148,27 +148,17 @@ def enable_element(self, element_code: DrawableElement) -> None: """Enable drawing of a specific element.""" if element_code in self._enabled_elements: self._enabled_elements[element_code] = True - LOGGER.info( + LOGGER.debug( "Enabled element %s (%s)", element_code.name, element_code.value ) - LOGGER.info( - "Element %s is now enabled: %s", - element_code.name, - self._enabled_elements[element_code], - ) def disable_element(self, element_code: DrawableElement) -> None: """Disable drawing of a specific element.""" if element_code in self._enabled_elements: self._enabled_elements[element_code] = False - LOGGER.info( + LOGGER.debug( "Disabled element %s (%s)", element_code.name, element_code.value ) - LOGGER.info( - "Element %s is now enabled: %s", - element_code.name, - self._enabled_elements[element_code], - ) def set_elements(self, element_codes: List[DrawableElement]) -> None: """Enable only the specified elements, disable all others.""" @@ -317,9 +307,6 @@ def update_from_device_info(self, device_info: dict) -> None: for disable_key, element in element_disable_mapping.items(): if device_info.get(disable_key, False): self.disable_element(element) - LOGGER.info( - "Disabled %s element from device_info setting", element.name - ) # Process room disable flags (1-15) for room_id in range(1, 16): @@ -327,6 +314,3 @@ def update_from_device_info(self, device_info: dict) -> None: if device_info.get(disable_key, False): room_element = getattr(DrawableElement, f"ROOM_{room_id}") self.disable_element(room_element) - LOGGER.info( - "Disabled ROOM_%d element from device_info setting", room_id - ) diff --git a/SCR/valetudo_map_parser/config/material.py b/SCR/valetudo_map_parser/config/material.py index 1f10f7d..fe13869 100644 --- a/SCR/valetudo_map_parser/config/material.py +++ b/SCR/valetudo_map_parser/config/material.py @@ -206,18 +206,3 @@ def tile_block(tile: NumpyArray, r0: int, r1: int, c0: int, c1: int) -> NumpyArr rows = (np.arange(r0, r1) % th).astype(np.intp, copy=False) cols = (np.arange(c0, c1) % tw).astype(np.intp, copy=False) return tile[rows[:, None], cols[None, :], :] - - @staticmethod - def apply_overlay_on_region( - image: NumpyArray, - tile: NumpyArray, - r0: int, - r1: int, - c0: int, - c1: int, - ) -> None: - region = image[r0:r1, c0:c1] - overlay = MaterialTileRenderer.tile_block(tile, r0, r1, c0, c1) - mask = overlay[..., 3] > 0 - if np.any(mask): - region[mask] = overlay[mask] diff --git a/SCR/valetudo_map_parser/config/shared.py b/SCR/valetudo_map_parser/config/shared.py index aa62f07..a543f6b 100755 --- a/SCR/valetudo_map_parser/config/shared.py +++ b/SCR/valetudo_map_parser/config/shared.py @@ -124,6 +124,7 @@ def __init__(self, file_name): self.current_floor: str = "floor_0" self.skip_room_ids: List[str] = [] self.device_info = None + self.mop_mode: bool = False self._battery_state = None def vacuum_bat_charged(self) -> bool: diff --git a/SCR/valetudo_map_parser/config/utils.py b/SCR/valetudo_map_parser/config/utils.py index 2830a39..95bfe1b 100644 --- a/SCR/valetudo_map_parser/config/utils.py +++ b/SCR/valetudo_map_parser/config/utils.py @@ -13,6 +13,7 @@ from ..map_data import HyperMapData from .async_utils import AsyncNumPy +from .colors import ColorIndex from .drawable import Drawable from .drawable_elements import DrawingConfig from .status_text.status_text import StatusText @@ -205,7 +206,7 @@ async def _add_status_text(self, new_image: PilPNG): Drawable.status_text( new_image, img_text[1], - self.shared.user_colors[8], + self.shared.user_colors[ColorIndex.TEXT], img_text[0], self.shared.vacuum_status_font, self.shared.vacuum_status_position, diff --git a/SCR/valetudo_map_parser/const.py b/SCR/valetudo_map_parser/const.py index a6514d1..66a0f77 100644 --- a/SCR/valetudo_map_parser/const.py +++ b/SCR/valetudo_map_parser/const.py @@ -44,6 +44,8 @@ "no_go", "go_to", "text", + "material_wood", + "material_tile", ] SENSOR_NO_DATA = { diff --git a/SCR/valetudo_map_parser/hypfer_draw.py b/SCR/valetudo_map_parser/hypfer_draw.py index 18f7562..2ed68f4 100755 --- a/SCR/valetudo_map_parser/hypfer_draw.py +++ b/SCR/valetudo_map_parser/hypfer_draw.py @@ -8,6 +8,7 @@ import logging +from .config.colors import ColorIndex from .config.drawable_elements import DrawableElement from .config.material import MaterialColors, MaterialTileRenderer from .config.types import Color, JsonType, NumpyArray, RobotPosition, RoomStore @@ -24,6 +25,7 @@ class ImageDraw: def __init__(self, image_handler): self.img_h = image_handler self.file_name = self.img_h.shared.file_name + self.robot_size = self.img_h.shared.robot_size async def draw_go_to_flag( self, np_array: NumpyArray, entity_dict: dict, color_go_to: Color @@ -130,11 +132,10 @@ async def _process_room_layer( materials = getattr(self.img_h.json_data, "materials", {}) material = materials.get(segment_id) if material and material != "generic": - # Get material colors from shared.user_colors - # Index 10 = wood, Index 11 = tile + # Get material colors from shared.user_colors using named indices material_colors = MaterialColors( - wood_rgba=self.img_h.shared.user_colors[10], - tile_rgba=self.img_h.shared.user_colors[11] + wood_rgba=self.img_h.shared.user_colors[ColorIndex.MATERIAL_WOOD], + tile_rgba=self.img_h.shared.user_colors[ColorIndex.MATERIAL_TILE] ) img_np_array = self._apply_material_overlay( img_np_array, @@ -380,9 +381,9 @@ async def async_draw_virtual_walls( virtual_walls = self.img_h.data.find_virtual_walls(m_json) except (ValueError, KeyError): virtual_walls = None - else: - _LOGGER.info("%s: Got virtual walls.", self.file_name) + if virtual_walls: + _LOGGER.debug("%s: Got virtual walls.", self.file_name) np_array = await self.img_h.draw.draw_virtual_walls( np_array, virtual_walls, color_no_go ) @@ -415,6 +416,12 @@ async def async_draw_paths( np_array, predicted_pat2, 2, color_gray ) if path_pixels: + # Calculate path width based on mop mode + if self.img_h.shared.mop_mode: + path_width = max(1, self.robot_size - 2) + else: + path_width = 5 # Default width + for path in path_pixels: # Get the points from the current path and extend multiple paths. points = path.get("points", []) @@ -423,7 +430,7 @@ async def async_draw_paths( sublist, 2 ) np_array = await self.img_h.draw.lines( - np_array, self.img_h.shared.map_new_path, 5, color_move + np_array, self.img_h.shared.map_new_path, path_width, color_move ) return np_array diff --git a/SCR/valetudo_map_parser/hypfer_handler.py b/SCR/valetudo_map_parser/hypfer_handler.py index 3df4f76..92a4778 100644 --- a/SCR/valetudo_map_parser/hypfer_handler.py +++ b/SCR/valetudo_map_parser/hypfer_handler.py @@ -215,9 +215,9 @@ async def _prepare_data_tasks(self, m_json, entity_dict): data_tasks.append(self._prepare_goto_data(entity_dict)) path_enabled = self.drawing_config.is_enabled(DrawableElement.PATH) - LOGGER.info("%s: PATH element enabled: %s", self.file_name, path_enabled) + LOGGER.debug("%s: PATH element enabled: %s", self.file_name, path_enabled) if path_enabled: - LOGGER.info("%s: Drawing path", self.file_name) + LOGGER.debug("%s: Drawing path", self.file_name) data_tasks.append(self._prepare_path_data(m_json)) if data_tasks: @@ -248,7 +248,7 @@ async def _draw_dynamic_elements( img_np_array, m_json, colors["move"], self.color_grey ) else: - LOGGER.info("%s: Skipping path drawing", self.file_name) + LOGGER.debug("%s: Skipping path drawing", self.file_name) return img_np_array @@ -269,7 +269,7 @@ async def _draw_robot_if_enabled( y=robot_position[1], angle=robot_position_angle, fill=robot_color, - radius=self.shared.robot_size, + radius=self.imd.robot_size, robot_state=self.shared.vacuum_state, ) @@ -334,7 +334,7 @@ async def async_get_image_from_json( room_id, robot_position, robot_position_angle ) - LOGGER.info("%s: Completed base Layers", self.file_name) + LOGGER.debug("%s: Completed base Layers", self.file_name) # Copy the new array in base layer. # Delete old base layer before creating new one to free memory if self.img_base_layer is not None: @@ -384,7 +384,7 @@ async def async_get_image_from_json( # Synchronize zooming state from ImageDraw to handler before auto-crop self.zooming = self.imd.img_h.zooming - # Resize the image + # Resize the image - auto_trim_and_zoom_image creates a new array img_np_array = self.auto_trim_and_zoom_image( img_np_array, colors["background"], @@ -436,7 +436,7 @@ def get_calibration_data(self, rotation_angle: int = 0) -> CalibrationPoints: """Get the calibration data from the JSON data. this will create the attribute calibration points.""" calibration_data = [] - LOGGER.info("Getting %s Calibrations points.", self.file_name) + LOGGER.debug("Getting %s Calibrations points.", self.file_name) # Define the map points (fixed) map_points = self.get_map_points() diff --git a/SCR/valetudo_map_parser/rand256_handler.py b/SCR/valetudo_map_parser/rand256_handler.py index 8e150e8..933db68 100644 --- a/SCR/valetudo_map_parser/rand256_handler.py +++ b/SCR/valetudo_map_parser/rand256_handler.py @@ -139,12 +139,12 @@ async def get_image_from_rrm( try: if (m_json is not None) and (not isinstance(m_json, tuple)): - LOGGER.info("%s: Composing the image for the camera.", self.file_name) + LOGGER.debug("%s: Composing the image for the camera.", self.file_name) self.json_data = m_json size_x, size_y = self.data.get_rrm_image_size(m_json) self.img_size = DEFAULT_IMAGE_SIZE self.json_id = str(uuid.uuid4()) # image id - LOGGER.info("Vacuum Data ID: %s", self.json_id) + LOGGER.debug("Vacuum Data ID: %s", self.json_id) ( img_np_array, @@ -220,7 +220,7 @@ async def _initialize_base_layer( colors["background"], DEFAULT_PIXEL_SIZE, ) - LOGGER.info("%s: Completed base Layers", self.file_name) + LOGGER.debug("%s: Completed base Layers", self.file_name) if room_id > 0 and not self.room_propriety: self.room_propriety = await self.get_rooms_attributes(destinations) @@ -526,7 +526,7 @@ def get_calibration_data(self, rotation_angle: int = 0) -> Any: """Return the map calibration data.""" if not self.calibration_data and self.crop_img_size: self.calibration_data = [] - LOGGER.info( + LOGGER.debug( "%s: Getting Calibrations points %s", self.file_name, str(self.crop_area), diff --git a/SCR/valetudo_map_parser/reimg_draw.py b/SCR/valetudo_map_parser/reimg_draw.py index 63c1604..aa52948 100644 --- a/SCR/valetudo_map_parser/reimg_draw.py +++ b/SCR/valetudo_map_parser/reimg_draw.py @@ -65,7 +65,7 @@ async def async_segment_data( ) except ValueError as e: self.img_h.segment_data = None - LOGGER.info("%s: No segments data found: %s", self.file_name, e) + LOGGER.debug("%s: No segments data found: %s", self.file_name, e) async def async_draw_base_layer( self, @@ -82,13 +82,13 @@ async def async_draw_base_layer( walls_data = self.data.get_rrm_walls(m_json) floor_data = self.data.get_rrm_floor(m_json) - LOGGER.info("%s: Empty image with background color", self.file_name) + LOGGER.debug("%s: Empty image with background color", self.file_name) img_np_array = await self.draw.create_empty_image( self.img_h.img_size["x"], self.img_h.img_size["y"], color_background ) room_id = 0 if self.img_h.frame_number == 0: - LOGGER.info("%s: Overlapping Layers", self.file_name) + LOGGER.debug("%s: Overlapping Layers", self.file_name) # checking if there are segments too (sorted pixels in the raw data). await self.async_segment_data(m_json, size_x, size_y, pos_top, pos_left) @@ -143,10 +143,10 @@ async def _draw_segments( room_id = 0 rooms_list = [color_wall] if not segment_data: - LOGGER.info("%s: No segments data found.", self.file_name) + LOGGER.debug("%s: No segments data found.", self.file_name) return room_id, img_np_array - LOGGER.info("%s: Drawing segments.", self.file_name) + LOGGER.debug("%s: Drawing segments.", self.file_name) for pixels in segment_data: room_color = self.img_h.shared.rooms_colors[room_id] rooms_list.append(room_color) @@ -233,7 +233,7 @@ async def async_draw_zones( zone_clean = None if zone_clean: - LOGGER.info("%s: Got zones.", self.file_name) + LOGGER.debug("%s: Got zones.", self.file_name) return await self.draw.zones(np_array, zone_clean, color_zone_clean) return np_array @@ -247,7 +247,7 @@ async def async_draw_virtual_restrictions( virtual_walls = None if virtual_walls: - LOGGER.info("%s: Got virtual walls.", self.file_name) + LOGGER.debug("%s: Got virtual walls.", self.file_name) np_array = await self.draw.draw_virtual_walls( np_array, virtual_walls, color_no_go ) @@ -291,8 +291,9 @@ async def async_get_entity_data(self, m_json: JsonType) -> dict or None: entity_dict = self.data_sup.find_points_entities(m_json) except (ValueError, KeyError): entity_dict = None - else: - LOGGER.info("%s: Got the points in the json.", self.file_name) + + if entity_dict: + LOGGER.debug("%s: Got the points in the json.", self.file_name) return entity_dict async def async_get_robot_position(self, m_json: JsonType) -> tuple | None: diff --git a/pyproject.toml b/pyproject.toml index 6122fda..3a3c88d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "valetudo-map-parser" -version = "0.1.17" +version = "0.2.0" description = "A Python library to parse Valetudo map data returning a PIL Image object." authors = ["Sandro Cantarella "] license = "Apache-2.0"