From 2d2e3e6cad2b62131330aba11e7800da64e2e4f5 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Fri, 5 Dec 2025 18:05:54 -0800 Subject: [PATCH 1/3] poc seralization & deseralization --- .../environments/arena_env_builder.py | 17 +- .../examples/policy_runner_clean.py | 112 +++ isaaclab_arena/utils/config_serialization.py | 945 ++++++++++++++++++ 3 files changed, 1069 insertions(+), 5 deletions(-) create mode 100644 isaaclab_arena/examples/policy_runner_clean.py create mode 100644 isaaclab_arena/utils/config_serialization.py diff --git a/isaaclab_arena/environments/arena_env_builder.py b/isaaclab_arena/environments/arena_env_builder.py index 436f0481b..273333076 100644 --- a/isaaclab_arena/environments/arena_env_builder.py +++ b/isaaclab_arena/environments/arena_env_builder.py @@ -12,6 +12,7 @@ from isaaclab.envs.manager_based_env import ManagerBasedEnv from isaaclab.managers.recorder_manager import RecorderManagerBaseCfg from isaaclab.scene import InteractiveSceneCfg +from isaaclab.utils.io import dump_yaml, load_yaml from isaaclab_tasks.utils import parse_env_cfg from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment @@ -184,7 +185,7 @@ def get_entry_point(self) -> str | type[ManagerBasedRLMimicEnv]: return "isaaclab.envs:ManagerBasedRLEnv" def build_registered( - self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None + self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False ) -> tuple[str, IsaacLabArenaManagerBasedRLEnvCfg]: """Register Gym env and parse runtime cfg.""" name = self.arena_env.name @@ -194,6 +195,8 @@ def build_registered( # THIS IS A WORKAROUND TO ALLOW USER TO GRADUALLY MOVE TO THE NEW CONFIGURATION SYSTEM. # THIS WILL BE REMOVED IN THE FUTURE. cfg_entry = self.modify_env_cfg(cfg_entry) + if serialize: + dump_yaml(f"/tmp/{name}_cfg_entry.yaml", cfg_entry) entry_point = self.get_entry_point() gym.register( id=name, @@ -209,12 +212,16 @@ def build_registered( ) return name, cfg - def make_registered(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None) -> ManagerBasedEnv: - env, _ = self.make_registered_and_return_cfg(env_cfg) + def make_registered(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False) -> ManagerBasedEnv: + env, _ = self.make_registered_and_return_cfg(env_cfg, serialize=serialize) return env def make_registered_and_return_cfg( - self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None + self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False ) -> tuple[ManagerBasedEnv, IsaacLabArenaManagerBasedRLEnvCfg]: - name, cfg = self.build_registered(env_cfg) + name, cfg = self.build_registered(env_cfg, serialize=serialize) return gym.make(name, cfg=cfg).unwrapped, cfg + + def return_cfg(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None) -> IsaacLabArenaManagerBasedRLEnvCfg: + name, cfg = self.build_registered(env_cfg) + return cfg diff --git a/isaaclab_arena/examples/policy_runner_clean.py b/isaaclab_arena/examples/policy_runner_clean.py new file mode 100644 index 000000000..10df85737 --- /dev/null +++ b/isaaclab_arena/examples/policy_runner_clean.py @@ -0,0 +1,112 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Script to run an IsaacLab Arena environment with a policy by deserializing from YAML config.""" + +import gymnasium as gym +import numpy as np +import random +import torch +import tqdm + +from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser +from isaaclab_arena.examples.policy_runner_cli import create_policy, setup_policy_argument_parser +from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext + + +from isaaclab_arena.examples.example_environments.cli import get_arena_builder_from_cli + + +def main(): + """Script to run an IsaacLab Arena environment with a policy from serialized config.""" + + # Parse CLI arguments + args_parser = get_isaaclab_arena_cli_parser() + args_cli, _ = args_parser.parse_known_args() + + # Start the simulation app + with SimulationAppContext(args_cli): + # Add policy-related arguments to the parser + args_parser = setup_policy_argument_parser(args_parser) + args_cli = args_parser.parse_args() + + # Load environment configuration from YAML (metrics are reconstructed from YAML) + print("[INFO] Loading environment configuration from YAML...") + from isaaclab_arena.utils.config_serialization import load_env_cfg_from_yaml + cfg = load_env_cfg_from_yaml("/datasets/cfg_entry.yaml") + + arena_builder = get_arena_builder_from_cli(args_cli) + cli_cfg = arena_builder.return_cfg() + print("[DEBUG] Expected cfg.metrics", cli_cfg.metrics) + print("[DEBUG] Actual cfg.metrics", cfg.metrics) + + + # Register environment with gymnasium + name = "kitchen_pick_and_place" + entry_point = "isaaclab.envs:ManagerBasedRLEnv" + + gym.register( + id=name, + entry_point=entry_point, + kwargs={"env_cfg_entry_point": cfg}, + disable_env_checker=True, + ) + + # Parse environment config for runtime + from isaaclab_tasks.utils.parse_cfg import parse_env_cfg + cfg = parse_env_cfg( + name, + device="cuda:0", + num_envs=1, + use_fabric=False, + ) + + # Create environment + print("[INFO] Creating environment...") + env = gym.make(name, cfg=cfg).unwrapped + + # Set random seeds + if args_cli.seed is not None: + env.seed(args_cli.seed) + torch.manual_seed(args_cli.seed) + np.random.seed(args_cli.seed) + random.seed(args_cli.seed) + + # Reset environment + obs, _ = env.reset() + + # Create policy + print("[INFO] Creating policy...") + policy, num_steps = create_policy(args_cli) + + # Lazy import to prevent app stalling + from isaaclab_arena.metrics.metrics import compute_metrics + + # Run policy + print(f"[INFO] Running policy for {num_steps} steps...") + for _ in tqdm.tqdm(range(num_steps)): + with torch.inference_mode(): + actions = policy.get_action(env, obs) + obs, _, terminated, truncated, _ = env.step(actions) + + if terminated.any() or truncated.any(): + print( + f"Resetting policy for terminated env_ids: {terminated.nonzero().flatten()}" + f" and truncated env_ids: {truncated.nonzero().flatten()}" + ) + env_ids = (terminated | truncated).nonzero().flatten() + policy.reset(env_ids=env_ids) + + # Compute and print metrics + metrics = compute_metrics(env) + print(f"Metrics: {metrics}") + + # Close the environment + env.close() + + +if __name__ == "__main__": + main() + diff --git a/isaaclab_arena/utils/config_serialization.py b/isaaclab_arena/utils/config_serialization.py new file mode 100644 index 000000000..8bac3113f --- /dev/null +++ b/isaaclab_arena/utils/config_serialization.py @@ -0,0 +1,945 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Utilities for serializing and deserializing IsaacLab Arena environment configs.""" + +# Standard library +import builtins +from dataclasses import fields + +# Third party +import numpy as np +import yaml + +# IsaacLab imports +from isaaclab.assets import AssetBaseCfg +from isaaclab.devices.openxr import XrCfg +from isaaclab.managers import ( + CommandTermCfg, + CurriculumTermCfg, + EventTermCfg, + ObservationGroupCfg, + ObservationTermCfg, + RecorderManagerBaseCfg, + RewardTermCfg, + TerminationTermCfg, +) +from isaaclab.managers.scene_entity_cfg import SceneEntityCfg +from isaaclab.scene import InteractiveSceneCfg +from isaaclab.sim.schemas import ( + ArticulationRootPropertiesCfg, + CollisionPropertiesCfg, + MassPropertiesCfg, + RigidBodyPropertiesCfg, +) +from isaaclab.sim.spawners.from_files.from_files_cfg import UsdFileCfg +from isaaclab.sim.spawners.materials import PreviewSurfaceCfg, VisualMaterialCfg +from isaaclab.sim.spawners.shapes.shapes_cfg import CapsuleCfg, ConeCfg, CuboidCfg, CylinderCfg, SphereCfg +from isaaclab.utils.string import string_to_callable + +# IsaacLab Arena imports +from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment +from isaaclab_arena.environments.isaaclab_arena_manager_based_env import IsaacLabArenaManagerBasedRLEnvCfg +from isaaclab_arena.utils.configclass import make_configclass +from isaaclab_arena.utils.pose import Pose + + +def _get_config_class_patterns(class_type_str): + """Generate potential config class patterns from a class_type string.""" + if ':' not in class_type_str: + return [] + + module_path, class_name = class_type_str.rsplit(':', 1) + config_patterns = [] + + # Pattern 1: same module + Cfg (e.g., joint_actions:JointPositionActionCfg) + config_patterns.append(f'{module_path}:{class_name}Cfg') + + # Pattern 2: module + _cfg + Cfg (e.g., rigid_object_cfg:RigidObjectCfg) + config_patterns.append(f'{module_path}_cfg:{class_name}Cfg') + + # Pattern 3: For nested modules like actions.joint_actions, try parent.parent_cfg + # e.g., isaaclab.envs.mdp.actions.joint_actions -> isaaclab.envs.mdp.actions.actions_cfg + if '.' in module_path: + parent_parts = module_path.rsplit('.', 1) + if len(parent_parts) == 2: + parent_module, last_module = parent_parts + parent_name = parent_module.split('.')[-1] + config_patterns.append(f'{parent_module}.{parent_name}_cfg:{class_name}Cfg') + + # Pattern 4: For actuators specifically, try replacing last module with 'actuator_cfg' + # e.g., isaaclab.actuators.actuator_pd:ImplicitActuator -> isaaclab.actuators.actuator_cfg:ImplicitActuatorCfg + if last_module.startswith('actuator'): + config_patterns.append(f'{parent_module}.actuator_cfg:{class_name}Cfg') + + return config_patterns + +def register_yaml_constructors(): + """Register custom YAML constructors for numpy types and Python builtins.""" + + def slice_constructor(loader, node): + """Construct Python slice objects from YAML.""" + args = loader.construct_sequence(node, deep=True) + return builtins.slice(*args) + + def numpy_scalar_constructor(loader, node): + """Construct numpy scalar objects from YAML binary data.""" + dtype_node, data_node = node.value + dtype = loader.construct_object(dtype_node, deep=True) + data = loader.construct_object(data_node, deep=True) + scalar = np.frombuffer(data, dtype=dtype, count=1)[0] + return scalar + + def numpy_dtype_constructor(loader, node): + """Construct numpy dtype objects from YAML.""" + if isinstance(node, yaml.MappingNode): + mapping = loader.construct_mapping(node, deep=True) + if 'args' in mapping: + return np.dtype(*mapping['args']) + raise yaml.YAMLError(f'Could not reconstruct numpy.dtype from node: {node}') + + # Register constructors + yaml.add_constructor( + 'tag:yaml.org,2002:python/object/apply:builtins.slice', + slice_constructor, + Loader=yaml.FullLoader + ) + yaml.add_constructor( + 'tag:yaml.org,2002:python/object/apply:numpy.dtype', + numpy_dtype_constructor, + Loader=yaml.FullLoader + ) + yaml.add_constructor( + 'tag:yaml.org,2002:python/object/apply:numpy.core.multiarray.scalar', + numpy_scalar_constructor, + Loader=yaml.FullLoader + ) + + +def _discover_metric_classes(): + """Discover all MetricBase subclasses in the metrics module. + + Returns: + Dict mapping metric class names to their classes + """ + import importlib + import inspect + import pkgutil + from pathlib import Path + + try: + import isaaclab_arena.metrics as metrics_module + from isaaclab_arena.metrics.metric_base import MetricBase + except ImportError as e: + print(f"[WARNING] Could not import metrics module: {e}") + return {} + + metric_classes = {} + + # Get the metrics module path + metrics_path = Path(metrics_module.__file__).parent + + # Iterate through all .py files in the metrics directory + for module_info in pkgutil.iter_modules([str(metrics_path)]): + if module_info.name.startswith('_') or module_info.name == 'metric_base': + continue + + try: + # Import the module + module = importlib.import_module(f'isaaclab_arena.metrics.{module_info.name}') + + # Find all MetricBase subclasses in the module + for name, obj in inspect.getmembers(module, inspect.isclass): + if issubclass(obj, MetricBase) and obj is not MetricBase: + metric_classes[name] = obj + print(f"[DEBUG] Discovered metric class: {name} from module {module_info.name}") + except Exception as e: + print(f"[WARNING] Could not import metric module {module_info.name}: {e}") + import traceback + traceback.print_exc() + + return metric_classes + + +def _reconstruct_asset_from_dict(asset_dict): + """Reconstruct an Asset object from a dictionary. + + Args: + asset_dict: Dictionary containing asset data + + Returns: + Asset instance + """ + from isaaclab_arena.assets.asset import Asset + + return Asset( + name=asset_dict.get('_name', asset_dict.get('name')), + tags=asset_dict.get('tags') + ) + + +def _reconstruct_metrics_from_yaml(metrics_list): + """Reconstruct metric objects from YAML data by auto-discovering metric classes. + + This function automatically discovers all MetricBase subclasses in the metrics folder + and attempts to match YAML data to their __init__ signatures. + + Args: + metrics_list: List of metric dictionaries from YAML + + Returns: + List of MetricBase objects + """ + import inspect + + if not metrics_list: + return [] + + # Discover all available metric classes + metric_classes = _discover_metric_classes() + + if not metric_classes: + print("[WARNING] No metric classes discovered") + return [] + + print(f"[INFO] Discovered {len(metric_classes)} metric classes: {list(metric_classes.keys())}") + + reconstructed_metrics = [] + + for idx, metric_data in enumerate(metrics_list): + metric_created = False + print(f"[DEBUG] Processing metric {idx}: {metric_data}") + + # Empty dict - try metrics with no required parameters + if not metric_data or metric_data == {}: + print(f"[DEBUG] Empty dict detected, trying to match to no-param metrics") + for metric_name, metric_class in metric_classes.items(): + try: + # Try to instantiate with no arguments + metric_instance = metric_class() + reconstructed_metrics.append(metric_instance) + metric_created = True + print(f"[INFO] Matched empty dict to {metric_name}") + break + except TypeError as e: + # This metric requires parameters, try next one + print(f"[DEBUG] {metric_name} requires parameters: {e}") + continue + except Exception as e: + # Other error, skip this metric + print(f"[DEBUG] Failed to instantiate {metric_name}: {type(e).__name__}: {e}") + continue + else: + # Try to match metric data to metric class signatures + for metric_name, metric_class in metric_classes.items(): + try: + sig = inspect.signature(metric_class.__init__) + params = {name: p for name, p in sig.parameters.items() if name != 'self'} + + # Prepare arguments for the metric class + args = {} + + # Handle 'object' parameter specially - reconstruct Asset + if 'object' in params and 'object' in metric_data: + args['object'] = _reconstruct_asset_from_dict(metric_data['object']) + + # Handle other parameters by direct mapping + for param_name, param in params.items(): + if param_name == 'object': + continue # Already handled + + # Try to find matching key in metric_data + if param_name in metric_data: + args[param_name] = metric_data[param_name] + elif param.default == inspect.Parameter.empty: + # Required parameter not found, can't use this class + raise ValueError(f"Required parameter {param_name} not found") + + # Try to instantiate the metric + metric_instance = metric_class(**args) + reconstructed_metrics.append(metric_instance) + metric_created = True + print(f"[INFO] Matched metric data to {metric_name}") + break + + except Exception as e: + # This metric class doesn't match, try next one + print(f"[DEBUG] Failed to match {metric_name}: {e}") + continue + + if not metric_created: + print(f"[WARNING] Could not match metric data to any known metric class: {metric_data}") + + return reconstructed_metrics + + +def load_env_cfg_from_yaml(yaml_path: str): + """Load an IsaacLab Arena environment config from a YAML file. + + This function deserializes a config saved using dump_yaml(), handling complex + nested structures, numpy types, and configclass objects. + + Metrics are automatically discovered from the isaaclab_arena/metrics folder and + reconstructed by matching YAML data to their __init__ signatures. This supports + custom user-defined metrics - just add them to the metrics folder. + + Args: + yaml_path: Path to the YAML config file + + Returns: + IsaacLabArenaManagerBasedRLEnvCfg: The deserialized environment configuration + with metrics auto-discovered and reconstructed + """ + # Register YAML constructors + register_yaml_constructors() + + # Load YAML + with open(yaml_path, encoding='utf-8') as f: + cfg_dict = yaml.load(f, Loader=yaml.FullLoader) + + # Create base config + cfg = IsaacLabArenaManagerBasedRLEnvCfg() + + # Extract and handle sections with dynamic configs + for section_name, create_func in [ + ('scene', _create_scene_config), + ('recorders', _create_recorders_config), + ('actions', _create_actions_config), + ('events', _create_events_config), + ('observations', _create_observations_config), + ]: + section_dict = cfg_dict.pop(section_name, None) + if section_dict is not None: + setattr(cfg, section_name, create_func(section_dict)) + + # Default observations if not present + if not hasattr(cfg, 'observations') or cfg.observations is None: + cfg.observations = ObservationGroupCfg() + + # Handle terminations + terminations_dict = cfg_dict.pop('terminations', None) + if terminations_dict is not None: + _create_terminations_config(cfg, terminations_dict) + + # Handle rewards, curriculum, commands - create dynamic configs + for config_name, term_cfg_class in [ + ('rewards', RewardTermCfg), + ('curriculum', CurriculumTermCfg), + ('commands', CommandTermCfg) + ]: + config_dict = cfg_dict.pop(config_name, None) + if config_dict is not None: + if config_dict: # Non-empty dict + result_cfg = _create_dynamic_manager_config( + config_dict, None, term_cfg_class, f'{config_name.capitalize()}Cfg' + ) + else: # Empty dict + EmptyCfg = make_configclass(f'{config_name.capitalize()}Cfg', []) + result_cfg = EmptyCfg() + setattr(cfg, config_name, result_cfg) + + # Handle metrics - reconstruct metric objects from YAML data + metrics_list = cfg_dict.pop('metrics', None) + if metrics_list is not None: + cfg.metrics = _reconstruct_metrics_from_yaml(metrics_list) + else: + cfg.metrics = [] + + # Handle nested configs (isaaclab_arena_env, XR) + for section_name, config_class in [ + ('isaaclab_arena_env', IsaacLabArenaEnvironment), + ('xr', XrCfg), + ]: + section_dict = cfg_dict.pop(section_name, None) + if section_dict is not None: + section_cfg = config_class() + _populate_from_dict(section_cfg, section_dict) + setattr(cfg, section_name, section_cfg) + + # Use from_dict for all remaining fields + cfg.from_dict(cfg_dict) + + return cfg + + +def _convert_markers_to_spawner_configs(markers_dict): + """Convert markers dictionary to proper spawner config objects (SphereCfg, etc.). + + Markers are spawner configs identified by their func field, not class_type. + + Args: + markers_dict: Dictionary where each value is a marker dict with 'func' field + + Returns: + Dictionary with each marker converted to its spawner config object + """ + result_cfg = {} + for marker_name, marker_dict in markers_dict.items(): + if isinstance(marker_dict, dict) and 'func' in marker_dict: + # Recursively convert nested configs first + marker_dict_converted = _convert_funcs_in_dict(marker_dict) + + # Convert visual_material dict to PreviewSurfaceCfg if present + if 'visual_material' in marker_dict_converted and isinstance(marker_dict_converted['visual_material'], dict): + visual_mat = PreviewSurfaceCfg() + _populate_from_dict(visual_mat, marker_dict_converted['visual_material']) + marker_dict_converted['visual_material'] = visual_mat + + # Determine spawner config type from func + func = marker_dict_converted.get('func') + marker_cfg = None + + if func: + func_name = func.__name__ if callable(func) else str(func) + # Map func name to config class + if 'sphere' in func_name.lower(): + marker_cfg = SphereCfg() + elif 'cone' in func_name.lower(): + marker_cfg = ConeCfg() + elif 'cylinder' in func_name.lower(): + marker_cfg = CylinderCfg() + elif 'cuboid' in func_name.lower() or 'cube' in func_name.lower(): + marker_cfg = CuboidCfg() + elif 'capsule' in func_name.lower(): + marker_cfg = CapsuleCfg() + + if marker_cfg: + _populate_from_dict(marker_cfg, marker_dict_converted, skip_conversion=True) + result_cfg[marker_name] = marker_cfg + else: + # Unknown marker type, keep as converted dict + result_cfg[marker_name] = marker_dict_converted + else: + result_cfg[marker_name] = marker_dict_converted + else: + result_cfg[marker_name] = marker_dict + + return result_cfg + + +def _convert_class_type_dict_to_config(items_dict, context_name=''): + """Convert a dict of items with class_type to a dict of config objects. + + Args: + items_dict: Dictionary where each value is a dict with 'class_type' field + context_name: Context name for error messages (e.g. 'Actuator', 'Marker') + + Returns: + Dictionary with each value converted to its config object + """ + result_cfg = {} + for item_name, item_dict in items_dict.items(): + if isinstance(item_dict, dict) and 'class_type' in item_dict: + # Recursively convert this item dict first (but NOT actuators, to avoid infinite recursion) + item_dict_converted = item_dict.copy() + for k, v in item_dict.items(): + if k == 'class_type' and isinstance(v, str): + try: + item_dict_converted[k] = string_to_callable(v) + except (ImportError, AttributeError, ValueError): + item_dict_converted[k] = v + elif k != 'actuators' and isinstance(v, (dict, list)): + item_dict_converted[k] = _convert_funcs_in_dict(v) + else: + item_dict_converted[k] = v + + class_type = item_dict_converted['class_type'] + + # Try to get the Cfg class directly by appending 'Cfg' to class name + item_cfg = None + if not isinstance(class_type, str): + # class_type is already a class object + # Try multiple approaches to get the config class + class_module = class_type.__module__ + class_name = class_type.__name__ + cfg_class_name = class_name + 'Cfg' + + # Approach 1: Try direct string_to_callable + try: + cfg_class = string_to_callable(f"{class_module}:{cfg_class_name}") + item_cfg = cfg_class() + _populate_from_dict(item_cfg, item_dict_converted, skip_conversion=True) + result_cfg[item_name] = item_cfg + except (ImportError, AttributeError, ValueError) as e: + # Approach 2: Try patterns + config_patterns = _get_config_class_patterns(f"{class_module}:{class_name}") + for config_class_str in config_patterns: + try: + config_class = string_to_callable(config_class_str) + item_cfg = config_class() + _populate_from_dict(item_cfg, item_dict_converted, skip_conversion=True) + result_cfg[item_name] = item_cfg + break + except (ImportError, AttributeError, ValueError) as e2: + continue + else: + # class_type is still a string + config_patterns = _get_config_class_patterns(class_type) + for config_class_str in config_patterns: + try: + config_class = string_to_callable(config_class_str) + item_cfg = config_class() + _populate_from_dict(item_cfg, item_dict_converted, skip_conversion=True) + result_cfg[item_name] = item_cfg + break + except (ImportError, AttributeError, ValueError) as e: + continue + + if item_cfg is None: + # If conversion failed, keep as converted dict + result_cfg[item_name] = item_dict_converted + else: + # Not a dict with class_type, just convert funcs + result_cfg[item_name] = _convert_funcs_in_dict(item_dict) if isinstance(item_dict, dict) else item_dict + + return result_cfg + + +def _convert_funcs_in_dict(data): + """Recursively convert all 'func' and '*_class_type' string fields to callables and actuators to config objects in nested dicts.""" + if isinstance(data, dict): + result = {} + for key, value in data.items(): + if (key == 'func' or key == 'class_type' or key.endswith('_class_type')) and isinstance(value, str): + try: + result[key] = string_to_callable(value) + except (ImportError, AttributeError, ValueError) as e: + # If conversion fails, keep as string + print(f"Warning: Could not convert {key}='{value}' to callable: {e}") + result[key] = value + elif key == 'actuators' and isinstance(value, dict): + # Convert actuators dict to proper config objects + result[key] = _convert_class_type_dict_to_config(value, 'Actuator') + elif key == 'markers' and isinstance(value, dict): + # Convert markers dict (used in visualizer_cfg) to proper spawner config objects + # Markers don't have class_type, they're spawner configs determined by func + result[key] = _convert_markers_to_spawner_configs(value) + elif isinstance(value, (dict, list)): + result[key] = _convert_funcs_in_dict(value) + else: + result[key] = value + return result + elif isinstance(data, list): + return [_convert_funcs_in_dict(item) for item in data] + else: + return data + + +def _populate_from_dict(obj, data_dict, skip_conversion=False): + """Recursively populate an object from a dictionary for nested config structures. + + Args: + obj: The object to populate + data_dict: The dictionary with values + skip_conversion: If True, skip the _convert_funcs_in_dict call (assume already converted) + """ + if not isinstance(data_dict, dict): + return + + # Convert all func strings to callables first (recursively) + # This also handles actuators conversion + if not skip_conversion: + data_dict = _convert_funcs_in_dict(data_dict) + + # Replace all MISSING fields with None first + _replace_all_missing(obj) + + for key, value in data_dict.items(): + if not hasattr(obj, key): + # Dynamically add the attribute - just set the value + setattr(obj, key, value) + else: + existing_attr = getattr(obj, key) + is_missing = type(existing_attr).__name__ == '_MISSING_TYPE' + + if is_missing or existing_attr is None: + # Just set the value directly + object.__setattr__(obj, key, value) + elif isinstance(value, dict) and hasattr(existing_attr, '__dict__') and not isinstance(existing_attr, dict): + # Recurse into nested objects (only if it's a proper object, not a dict) + _populate_from_dict(existing_attr, value) + else: + # Simple assignment + setattr(obj, key, value) + + +def _replace_all_missing(obj): + """Replace all MISSING fields in a dataclass with None.""" + if hasattr(obj, '__dataclass_fields__'): + for field_name in obj.__dataclass_fields__: + field_value = getattr(obj, field_name) + if type(field_value).__name__ == '_MISSING_TYPE': + object.__setattr__(obj, field_name, None) + + +def _create_dynamic_manager_config(cfg_dict, base_fields, term_cfg_class, config_name): + """Create a dynamic manager config (terminations, rewards, etc.). + + Args: + cfg_dict: Dictionary of term configurations + base_fields: Base fields from the parent class (if any) + term_cfg_class: The term config class (e.g., TerminationTermCfg, RewardTermCfg) + config_name: Name for the dynamic config class + + Returns: + Dynamic configclass instance + """ + if not cfg_dict: + return None + + # Create term instances + term_instances = {} + for key, term_dict in cfg_dict.items(): + # Convert funcs and scene entities in params + term_dict = _convert_funcs_in_dict(term_dict) + if 'params' in term_dict and isinstance(term_dict['params'], dict): + term_dict['params'] = _convert_scene_entity_dicts(term_dict['params']) + + term_cfg = term_cfg_class() + _populate_from_dict(term_cfg, term_dict, skip_conversion=True) + term_instances[key] = term_cfg + + # Create dynamic config class + term_fields = [(key, term_cfg_class, inst) for key, inst in term_instances.items()] + DynamicCfg = make_configclass(config_name, term_fields, bases=base_fields if base_fields else ()) + return DynamicCfg() + + +def _convert_spawn_to_config(asset_dict): + """Convert spawn dictionary to proper spawn config object (UsdFileCfg, etc.). + + Also converts nested config objects within spawn (rigid_props, articulation_props, etc.) + + Args: + asset_dict: Asset dictionary that may contain a 'spawn' dict + + Returns: + Asset dictionary with spawn dict converted to proper config object + """ + if not isinstance(asset_dict, dict) or 'spawn' not in asset_dict: + return asset_dict + + spawn_dict = asset_dict.get('spawn') + if spawn_dict is None or not isinstance(spawn_dict, dict): + return asset_dict + + # Convert nested config dicts to proper config objects + spawn_dict_copy = spawn_dict.copy() + + # Handle rigid_props + if 'rigid_props' in spawn_dict_copy and isinstance(spawn_dict_copy['rigid_props'], dict): + rigid_cfg = RigidBodyPropertiesCfg() + _populate_from_dict(rigid_cfg, spawn_dict_copy['rigid_props']) + spawn_dict_copy['rigid_props'] = rigid_cfg + + # Handle collision_props + if 'collision_props' in spawn_dict_copy and isinstance(spawn_dict_copy['collision_props'], dict): + collision_cfg = CollisionPropertiesCfg() + _populate_from_dict(collision_cfg, spawn_dict_copy['collision_props']) + spawn_dict_copy['collision_props'] = collision_cfg + + # Handle mass_props + if 'mass_props' in spawn_dict_copy and isinstance(spawn_dict_copy['mass_props'], dict): + mass_cfg = MassPropertiesCfg() + _populate_from_dict(mass_cfg, spawn_dict_copy['mass_props']) + spawn_dict_copy['mass_props'] = mass_cfg + + # Handle articulation_props + if 'articulation_props' in spawn_dict_copy and isinstance(spawn_dict_copy['articulation_props'], dict): + articulation_cfg = ArticulationRootPropertiesCfg() + _populate_from_dict(articulation_cfg, spawn_dict_copy['articulation_props']) + spawn_dict_copy['articulation_props'] = articulation_cfg + + # Handle visual_material + if 'visual_material' in spawn_dict_copy and isinstance(spawn_dict_copy['visual_material'], dict): + visual_mat_cfg = VisualMaterialCfg() + _populate_from_dict(visual_mat_cfg, spawn_dict_copy['visual_material']) + spawn_dict_copy['visual_material'] = visual_mat_cfg + + # Create the UsdFileCfg with converted nested configs + spawn_cfg = UsdFileCfg() + _populate_from_dict(spawn_cfg, spawn_dict_copy) + + # Create a copy of asset_dict with the converted spawn + result = asset_dict.copy() + result['spawn'] = spawn_cfg + return result + + +def _convert_scene_entity_dicts(params_dict): + """Convert dictionaries that match Pose or SceneEntityCfg patterns to actual objects. + + Handles conversion of: + - Pose objects (dicts with 'position_xyz' and 'rotation_wxyz') + - SceneEntityCfg objects (dicts with 'name' and 'joint_names') + + Args: + params_dict: Dictionary of parameters that may contain Pose or SceneEntityCfg dicts + + Returns: + Dictionary with Pose/SceneEntityCfg dicts converted to proper objects + """ + result = {} + for key, value in params_dict.items(): + # Check if this dict looks like a Pose (has position_xyz and rotation_wxyz) + if isinstance(value, dict) and 'position_xyz' in value and 'rotation_wxyz' in value: + # Convert to Pose + result[key] = Pose(**value) + # Check if this dict looks like a SceneEntityCfg (has 'name' and other typical fields) + elif isinstance(value, dict) and 'name' in value and 'joint_names' in value: + # Convert to SceneEntityCfg + result[key] = SceneEntityCfg(**value) + else: + result[key] = value + + return result + + +def _create_config_from_class_type(item_name, item_dict, context='item'): + """Create a config object from a dict with class_type field. + + Args: + item_name: Name of the item + item_dict: Dictionary containing class_type and other fields + context: Context for error messages (e.g., 'asset', 'recorder term') + + Returns: + Configured object matching the class_type + """ + class_type = item_dict.get('class_type') + if class_type is None: + raise ValueError(f"{context} '{item_name}' has no class_type") + + # Get class_type string + if not isinstance(class_type, str): + class_type_str = f"{class_type.__module__}:{class_type.__name__}" + else: + class_type_str = class_type + + # Try to find and instantiate the config class + config_patterns = _get_config_class_patterns(class_type_str) + + for config_class_str in config_patterns: + try: + config_class = string_to_callable(config_class_str) + config_obj = config_class() + _populate_from_dict(config_obj, item_dict, skip_conversion=True) + return config_obj + except (ImportError, AttributeError, ValueError): + continue + + # All patterns failed + raise ValueError( + f"Failed to find config class for {context} '{item_name}' with class_type '{class_type_str}'. " + f"Tried patterns: {config_patterns}" + ) + + +def _create_asset_config(asset_name, asset_dict): + """Create a single asset config object from dictionary. + + Args: + asset_name: Name of the asset + asset_dict: Dictionary containing asset configuration + + Returns: + Configured asset object (RigidObjectCfg, ArticulationCfg, or AssetBaseCfg) + """ + # Convert funcs and spawn to proper configs + asset_dict = _convert_funcs_in_dict(asset_dict) + asset_dict = _convert_spawn_to_config(asset_dict) + + # If no class_type, it's a BASE asset + if asset_dict.get('class_type') is None: + asset_cfg = AssetBaseCfg() + _populate_from_dict(asset_cfg, asset_dict, skip_conversion=True) + return asset_cfg + + # Otherwise, use class_type to create the appropriate config + return _create_config_from_class_type(asset_name, asset_dict, 'asset') + + +def _create_scene_config(scene_dict): + """Create scene config with dynamic assets. + + Args: + scene_dict: Dictionary containing scene configuration + + Returns: + Dynamic SceneCfg with all assets properly configured + """ + if not scene_dict: + return InteractiveSceneCfg() + + # Separate base fields from dynamic assets + base_scene_fields = {f.name for f in fields(InteractiveSceneCfg)} + base_fields_dict = {k: v for k, v in scene_dict.items() if k in base_scene_fields} + dynamic_assets_dict = {k: v for k, v in scene_dict.items() if k not in base_scene_fields} + + # Create asset instances + asset_instances = {} + for asset_name, asset_dict in dynamic_assets_dict.items(): + if isinstance(asset_dict, dict): + asset_instances[asset_name] = _create_asset_config(asset_name, asset_dict) + else: + asset_instances[asset_name] = asset_dict + + # Create dynamic SceneCfg + if asset_instances: + asset_fields = [(name, type(inst), inst) for name, inst in asset_instances.items()] + SceneCfg = make_configclass('SceneCfg', asset_fields, bases=(InteractiveSceneCfg,)) + scene_cfg = SceneCfg() + else: + scene_cfg = InteractiveSceneCfg() + + # Populate base fields + if base_fields_dict: + _populate_from_dict(scene_cfg, base_fields_dict) + + return scene_cfg + + +def _create_recorders_config(recorders_dict): + """Create recorders config with dynamic terms. + + Args: + recorders_dict: Dictionary containing recorder configuration + + Returns: + Dynamic RecorderManagerCfg with all terms properly configured + """ + if not recorders_dict: + return RecorderManagerBaseCfg() + + # Separate base fields from dynamic terms + base_recorder_fields = {f.name for f in fields(RecorderManagerBaseCfg)} + base_fields_dict = {k: v for k, v in recorders_dict.items() if k in base_recorder_fields} + dynamic_terms_dict = {k: v for k, v in recorders_dict.items() if k not in base_recorder_fields} + + # Create recorder term configs using class_type + recorder_term_instances = {} + for term_name, term_dict in dynamic_terms_dict.items(): + if isinstance(term_dict, dict) and 'class_type' in term_dict and term_dict['class_type'] is not None: + term_dict = _convert_funcs_in_dict(term_dict) + recorder_term_instances[term_name] = _create_config_from_class_type(term_name, term_dict, 'recorder term') + else: + recorder_term_instances[term_name] = term_dict + + # Create dynamic RecorderManagerCfg + if recorder_term_instances: + recorder_fields = [(name, type(inst), inst) for name, inst in recorder_term_instances.items()] + RecorderManagerCfg = make_configclass('RecorderManagerCfg', recorder_fields, bases=(RecorderManagerBaseCfg,)) + recorder_cfg = RecorderManagerCfg() + else: + recorder_cfg = RecorderManagerBaseCfg() + + # Populate base fields + if base_fields_dict: + _populate_from_dict(recorder_cfg, base_fields_dict) + + return recorder_cfg + + +def _create_actions_config(actions_dict): + """Create actions config with dynamic terms. + + Args: + actions_dict: Dictionary containing action terms + + Returns: + Dynamic ActionsCfg with all terms properly configured + """ + if not actions_dict: + return None + + # Create action term configs using class_type + action_terms = {} + for term_name, term_dict in actions_dict.items(): + if isinstance(term_dict, dict) and 'class_type' in term_dict and term_dict['class_type'] is not None: + term_dict = _convert_funcs_in_dict(term_dict) + action_terms[term_name] = _create_config_from_class_type(term_name, term_dict, 'action term') + else: + action_terms[term_name] = term_dict + + # Create dynamic ActionsCfg + if action_terms: + action_fields = [(name, type(term), term) for name, term in action_terms.items()] + return make_configclass('ActionsCfg', action_fields)() + + return None + + +def _create_events_config(events_dict): + """Create events config with dynamic terms.""" + if not events_dict: + return None + + return _create_dynamic_manager_config( + events_dict, + None, + EventTermCfg, + 'EventsCfg' + ) + + +def _create_observations_config(observations_dict): + """Create observations config with proper structure.""" + if not observations_dict: + return ObservationGroupCfg() + + # Get base fields of ObservationGroupCfg + base_group_field_names = {f.name for f in fields(ObservationGroupCfg)} + + # Create observation groups + obs_groups = {} + for group_name, group_dict in observations_dict.items(): + if not isinstance(group_dict, dict): + obs_groups[group_name] = group_dict + continue + + # Separate base attributes from observation terms + group_base_attrs = {k: v for k, v in group_dict.items() if k in base_group_field_names} + obs_terms_dict = {k: v for k, v in group_dict.items() if k not in base_group_field_names} + + # Create observation term configs + obs_term_instances = {} + for term_name, term_dict in obs_terms_dict.items(): + if isinstance(term_dict, dict) and 'func' in term_dict: + term_dict = _convert_funcs_in_dict(term_dict) + if 'params' in term_dict and isinstance(term_dict['params'], dict): + term_dict['params'] = _convert_scene_entity_dicts(term_dict['params']) + term_cfg = ObservationTermCfg() + _populate_from_dict(term_cfg, term_dict, skip_conversion=True) + obs_term_instances[term_name] = term_cfg + + # Create dynamic group config or use base + if obs_term_instances: + obs_term_fields = [(name, ObservationTermCfg, cfg) for name, cfg in obs_term_instances.items()] + ObsGroupCfg = make_configclass(f'{group_name.capitalize()}ObsGroupCfg', obs_term_fields, bases=(ObservationGroupCfg,)) + obs_group_cfg = ObsGroupCfg() + else: + obs_group_cfg = ObservationGroupCfg() + + # Populate base attributes + if group_base_attrs: + _populate_from_dict(obs_group_cfg, group_base_attrs) + + obs_groups[group_name] = obs_group_cfg + + # Create dynamic ObservationCfg with all groups + if obs_groups: + obs_fields = [(name, type(group), group) for name, group in obs_groups.items()] + ObservationCfg = make_configclass('ObservationCfg', obs_fields) + return ObservationCfg() + + return ObservationGroupCfg() + + +def _create_terminations_config(cfg, terminations_dict): + """Create terminations config.""" + cfg.terminations = _create_dynamic_manager_config( + terminations_dict, None, TerminationTermCfg, 'TerminationCfg' + ) + From a0164d8c2bb43dd18026d68dabe125df1147a7a6 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Mon, 8 Dec 2025 10:47:37 -0800 Subject: [PATCH 2/3] wip --- isaaclab_arena/utils/config_serialization.py | 82 +++++++++++--------- 1 file changed, 47 insertions(+), 35 deletions(-) diff --git a/isaaclab_arena/utils/config_serialization.py b/isaaclab_arena/utils/config_serialization.py index 8bac3113f..a790d39b3 100644 --- a/isaaclab_arena/utils/config_serialization.py +++ b/isaaclab_arena/utils/config_serialization.py @@ -5,16 +5,14 @@ """Utilities for serializing and deserializing IsaacLab Arena environment configs.""" -# Standard library import builtins from dataclasses import fields +from typing import Any -# Third party import numpy as np import yaml -# IsaacLab imports -from isaaclab.assets import AssetBaseCfg +from isaaclab.assets import AssetBaseCfg, RigidObjectCfg, ArticulationCfg from isaaclab.devices.openxr import XrCfg from isaaclab.managers import ( CommandTermCfg, @@ -34,6 +32,7 @@ MassPropertiesCfg, RigidBodyPropertiesCfg, ) + from isaaclab.sim.spawners.from_files.from_files_cfg import UsdFileCfg from isaaclab.sim.spawners.materials import PreviewSurfaceCfg, VisualMaterialCfg from isaaclab.sim.spawners.shapes.shapes_cfg import CapsuleCfg, ConeCfg, CuboidCfg, CylinderCfg, SphereCfg @@ -737,47 +736,60 @@ def _create_config_from_class_type(item_name, item_dict, context='item'): ) -def _create_asset_config(asset_name, asset_dict): +def _create_asset_config(asset_name: str, asset_dict: dict[str, Any]) -> AssetBaseCfg | RigidObjectCfg | ArticulationCfg: """Create a single asset config object from dictionary. - + + If no class_type, it's a AssetBaseCfg. If class_type is provided, it could be RigidObjectCfg, ArticulationCfg, AssetBaseCfg. Assets all come with + spawn configuration, which is converted to proper config objects in the _convert_spawn_to_config function. + Args: asset_name: Name of the asset asset_dict: Dictionary containing asset configuration - + Returns: Configured asset object (RigidObjectCfg, ArticulationCfg, or AssetBaseCfg) """ # Convert funcs and spawn to proper configs + # With conversion: spawn_cfg.func = asset_dict = _convert_funcs_in_dict(asset_dict) + # The spawn field has a known, specific structure with standard sub-properties (rigid_props, articulation_props, etc.) that need specific config classes. asset_dict = _convert_spawn_to_config(asset_dict) - + # If no class_type, it's a BASE asset if asset_dict.get('class_type') is None: asset_cfg = AssetBaseCfg() + # string callables have been converted to callables by now, only poulating the items _populate_from_dict(asset_cfg, asset_dict, skip_conversion=True) return asset_cfg - + # Otherwise, use class_type to create the appropriate config return _create_config_from_class_type(asset_name, asset_dict, 'asset') -def _create_scene_config(scene_dict): +def _create_scene_config(scene_dict: dict[str, Any]): """Create scene config with dynamic assets. - + + If no assets are listed, a default InteractiveSceneCfg is returned. + If assets are listed, an InteractiveSceneCfg is returned with those dynamically created assets added to the scene. + Args: scene_dict: Dictionary containing scene configuration - + Returns: Dynamic SceneCfg with all assets properly configured """ + # Default scene config + scene_cfg = InteractiveSceneCfg() if not scene_dict: - return InteractiveSceneCfg() - + return scene_cfg + # Separate base fields from dynamic assets + # e.g. num_envs, env_spacing, replicate_physics, filter_collisions, clone_in_fabric, etc. base_scene_fields = {f.name for f in fields(InteractiveSceneCfg)} base_fields_dict = {k: v for k, v in scene_dict.items() if k in base_scene_fields} + # background/objects/destinations/robots/sensors etc. are added dynamically to the scene dynamic_assets_dict = {k: v for k, v in scene_dict.items() if k not in base_scene_fields} - + # Create asset instances asset_instances = {} for asset_name, asset_dict in dynamic_assets_dict.items(): @@ -785,64 +797,64 @@ def _create_scene_config(scene_dict): asset_instances[asset_name] = _create_asset_config(asset_name, asset_dict) else: asset_instances[asset_name] = asset_dict - + # Create dynamic SceneCfg if asset_instances: asset_fields = [(name, type(inst), inst) for name, inst in asset_instances.items()] SceneCfg = make_configclass('SceneCfg', asset_fields, bases=(InteractiveSceneCfg,)) scene_cfg = SceneCfg() - else: - scene_cfg = InteractiveSceneCfg() - + # Populate base fields if base_fields_dict: _populate_from_dict(scene_cfg, base_fields_dict) - + return scene_cfg -def _create_recorders_config(recorders_dict): - """Create recorders config with dynamic terms. - +def _create_recorders_config(recorders_dict: dict[str, Any]): + """Create recorders config with dynamic terms. If no metrics are listed, a default + RecorderManagerCfg is returned. If metrics are listed, a RecorderManagerCfg is returned with + those metrics recorder. Recorder terms are added dynamically to the recorder manager config. + Args: recorders_dict: Dictionary containing recorder configuration - + Returns: - Dynamic RecorderManagerCfg with all terms properly configured + Dynamic RecorderManagerCfg with metircs recorder terms added if any """ + recorder_cfg = RecorderManagerBaseCfg() if not recorders_dict: - return RecorderManagerBaseCfg() - + return recorder_cfg + # Separate base fields from dynamic terms base_recorder_fields = {f.name for f in fields(RecorderManagerBaseCfg)} base_fields_dict = {k: v for k, v in recorders_dict.items() if k in base_recorder_fields} dynamic_terms_dict = {k: v for k, v in recorders_dict.items() if k not in base_recorder_fields} - + # Create recorder term configs using class_type recorder_term_instances = {} + # recorder manager config contains metrics listed in the recorder manager config if any for term_name, term_dict in dynamic_terms_dict.items(): if isinstance(term_dict, dict) and 'class_type' in term_dict and term_dict['class_type'] is not None: term_dict = _convert_funcs_in_dict(term_dict) recorder_term_instances[term_name] = _create_config_from_class_type(term_name, term_dict, 'recorder term') else: - recorder_term_instances[term_name] = term_dict - + raise ValueError(f"Recorder term '{term_name}' is not a dictionary with a 'class_type' field") + # Create dynamic RecorderManagerCfg if recorder_term_instances: recorder_fields = [(name, type(inst), inst) for name, inst in recorder_term_instances.items()] RecorderManagerCfg = make_configclass('RecorderManagerCfg', recorder_fields, bases=(RecorderManagerBaseCfg,)) recorder_cfg = RecorderManagerCfg() - else: - recorder_cfg = RecorderManagerBaseCfg() - + # Populate base fields if base_fields_dict: _populate_from_dict(recorder_cfg, base_fields_dict) - + return recorder_cfg -def _create_actions_config(actions_dict): +def _create_actions_config(actions_dict: dict[str, Any]): """Create actions config with dynamic terms. Args: From a095cdec1e63b16fe3726a3f71d199075c2d84a7 Mon Sep 17 00:00:00 2001 From: Xinjie Yao Date: Mon, 8 Dec 2025 23:38:01 -0800 Subject: [PATCH 3/3] poc for nonmimic env --- .../environments/arena_env_builder.py | 37 +- .../examples/policy_runner_clean.py | 112 --- isaaclab_arena/tasks/generic_task.py | 64 ++ .../tests/test_config_serializations.py | 126 ++++ .../utils/arena_env_reconstruction.py | 361 ++++++++++ isaaclab_arena/utils/config_serialization.py | 648 ++++++++++-------- 6 files changed, 948 insertions(+), 400 deletions(-) delete mode 100644 isaaclab_arena/examples/policy_runner_clean.py create mode 100644 isaaclab_arena/tasks/generic_task.py create mode 100644 isaaclab_arena/tests/test_config_serializations.py create mode 100644 isaaclab_arena/utils/arena_env_reconstruction.py diff --git a/isaaclab_arena/environments/arena_env_builder.py b/isaaclab_arena/environments/arena_env_builder.py index 273333076..ae7f3db84 100644 --- a/isaaclab_arena/environments/arena_env_builder.py +++ b/isaaclab_arena/environments/arena_env_builder.py @@ -7,12 +7,13 @@ import argparse import gymnasium as gym +from pathlib import Path from isaaclab.envs import ManagerBasedRLMimicEnv from isaaclab.envs.manager_based_env import ManagerBasedEnv from isaaclab.managers.recorder_manager import RecorderManagerBaseCfg from isaaclab.scene import InteractiveSceneCfg -from isaaclab.utils.io import dump_yaml, load_yaml +from isaaclab.utils.io import dump_yaml from isaaclab_tasks.utils import parse_env_cfg from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment @@ -29,9 +30,15 @@ class ArenaEnvBuilder: DEFAULT_SCENE_CFG = InteractiveSceneCfg(num_envs=4096, env_spacing=30.0, replicate_physics=False) - def __init__(self, arena_env: IsaacLabArenaEnvironment, args: argparse.Namespace): + def __init__( + self, arena_env: IsaacLabArenaEnvironment, args: argparse.Namespace, serialization_file_path: str = None + ): self.arena_env = arena_env self.args = args + self.serialization_file_path = serialization_file_path if serialization_file_path is not None else "/tmp/" + self.serialization_file_path = Path(self.serialization_file_path).joinpath( + f"{self.arena_env.name}_cfg_entry.yaml" + ) def orchestrate(self) -> None: """Orchestrate the environment member interaction""" @@ -184,19 +191,27 @@ def get_entry_point(self) -> str | type[ManagerBasedRLMimicEnv]: else: return "isaaclab.envs:ManagerBasedRLEnv" - def build_registered( + def build_cfg_entry( self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False - ) -> tuple[str, IsaacLabArenaManagerBasedRLEnvCfg]: - """Register Gym env and parse runtime cfg.""" - name = self.arena_env.name + ) -> IsaacLabArenaManagerBasedRLEnvCfg: # orchestrate the environment member interaction self.orchestrate() cfg_entry = env_cfg if env_cfg is not None else self.compose_manager_cfg() # THIS IS A WORKAROUND TO ALLOW USER TO GRADUALLY MOVE TO THE NEW CONFIGURATION SYSTEM. # THIS WILL BE REMOVED IN THE FUTURE. cfg_entry = self.modify_env_cfg(cfg_entry) + # serialize the configuration if requested if serialize: - dump_yaml(f"/tmp/{name}_cfg_entry.yaml", cfg_entry) + dump_yaml(self.serialization_file_path, cfg_entry) + return cfg_entry + + def build_registered( + self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False + ) -> tuple[str, IsaacLabArenaManagerBasedRLEnvCfg]: + """Register Gym env and parse runtime cfg.""" + name = self.arena_env.name + cfg_entry = self.build_cfg_entry(env_cfg, serialize=serialize) + entry_point = self.get_entry_point() gym.register( id=name, @@ -212,7 +227,9 @@ def build_registered( ) return name, cfg - def make_registered(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False) -> ManagerBasedEnv: + def make_registered( + self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False + ) -> ManagerBasedEnv: env, _ = self.make_registered_and_return_cfg(env_cfg, serialize=serialize) return env @@ -221,7 +238,3 @@ def make_registered_and_return_cfg( ) -> tuple[ManagerBasedEnv, IsaacLabArenaManagerBasedRLEnvCfg]: name, cfg = self.build_registered(env_cfg, serialize=serialize) return gym.make(name, cfg=cfg).unwrapped, cfg - - def return_cfg(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None) -> IsaacLabArenaManagerBasedRLEnvCfg: - name, cfg = self.build_registered(env_cfg) - return cfg diff --git a/isaaclab_arena/examples/policy_runner_clean.py b/isaaclab_arena/examples/policy_runner_clean.py deleted file mode 100644 index 10df85737..000000000 --- a/isaaclab_arena/examples/policy_runner_clean.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). -# All rights reserved. -# -# SPDX-License-Identifier: Apache-2.0 - -"""Script to run an IsaacLab Arena environment with a policy by deserializing from YAML config.""" - -import gymnasium as gym -import numpy as np -import random -import torch -import tqdm - -from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser -from isaaclab_arena.examples.policy_runner_cli import create_policy, setup_policy_argument_parser -from isaaclab_arena.utils.isaaclab_utils.simulation_app import SimulationAppContext - - -from isaaclab_arena.examples.example_environments.cli import get_arena_builder_from_cli - - -def main(): - """Script to run an IsaacLab Arena environment with a policy from serialized config.""" - - # Parse CLI arguments - args_parser = get_isaaclab_arena_cli_parser() - args_cli, _ = args_parser.parse_known_args() - - # Start the simulation app - with SimulationAppContext(args_cli): - # Add policy-related arguments to the parser - args_parser = setup_policy_argument_parser(args_parser) - args_cli = args_parser.parse_args() - - # Load environment configuration from YAML (metrics are reconstructed from YAML) - print("[INFO] Loading environment configuration from YAML...") - from isaaclab_arena.utils.config_serialization import load_env_cfg_from_yaml - cfg = load_env_cfg_from_yaml("/datasets/cfg_entry.yaml") - - arena_builder = get_arena_builder_from_cli(args_cli) - cli_cfg = arena_builder.return_cfg() - print("[DEBUG] Expected cfg.metrics", cli_cfg.metrics) - print("[DEBUG] Actual cfg.metrics", cfg.metrics) - - - # Register environment with gymnasium - name = "kitchen_pick_and_place" - entry_point = "isaaclab.envs:ManagerBasedRLEnv" - - gym.register( - id=name, - entry_point=entry_point, - kwargs={"env_cfg_entry_point": cfg}, - disable_env_checker=True, - ) - - # Parse environment config for runtime - from isaaclab_tasks.utils.parse_cfg import parse_env_cfg - cfg = parse_env_cfg( - name, - device="cuda:0", - num_envs=1, - use_fabric=False, - ) - - # Create environment - print("[INFO] Creating environment...") - env = gym.make(name, cfg=cfg).unwrapped - - # Set random seeds - if args_cli.seed is not None: - env.seed(args_cli.seed) - torch.manual_seed(args_cli.seed) - np.random.seed(args_cli.seed) - random.seed(args_cli.seed) - - # Reset environment - obs, _ = env.reset() - - # Create policy - print("[INFO] Creating policy...") - policy, num_steps = create_policy(args_cli) - - # Lazy import to prevent app stalling - from isaaclab_arena.metrics.metrics import compute_metrics - - # Run policy - print(f"[INFO] Running policy for {num_steps} steps...") - for _ in tqdm.tqdm(range(num_steps)): - with torch.inference_mode(): - actions = policy.get_action(env, obs) - obs, _, terminated, truncated, _ = env.step(actions) - - if terminated.any() or truncated.any(): - print( - f"Resetting policy for terminated env_ids: {terminated.nonzero().flatten()}" - f" and truncated env_ids: {truncated.nonzero().flatten()}" - ) - env_ids = (terminated | truncated).nonzero().flatten() - policy.reset(env_ids=env_ids) - - # Compute and print metrics - metrics = compute_metrics(env) - print(f"Metrics: {metrics}") - - # Close the environment - env.close() - - -if __name__ == "__main__": - main() - diff --git a/isaaclab_arena/tasks/generic_task.py b/isaaclab_arena/tasks/generic_task.py new file mode 100644 index 000000000..6639a5852 --- /dev/null +++ b/isaaclab_arena/tasks/generic_task.py @@ -0,0 +1,64 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from isaaclab_arena.tasks.task_base import TaskBase + + +class GenericTask(TaskBase): + """Generic task wrapper for deserialized task data.""" + + def __init__(self, scene_cfg, events_cfg, termination_cfg, + observation_cfg, rewards_cfg, curriculum_cfg, commands_cfg, + episode_length_s=None): + super().__init__(episode_length_s=episode_length_s) + + # Store deserialized config attributes + self.scene_config = scene_cfg + self.events_cfg_data = events_cfg + self.termination_cfg_data = termination_cfg + self.observation_cfg_data = observation_cfg + self.rewards_cfg_data = rewards_cfg + self.curriculum_cfg_data = curriculum_cfg + self.commands_cfg_data = commands_cfg + + def get_scene_cfg(self): + """Returns task-specific scene config if available.""" + return self.scene_config + + def get_termination_cfg(self): + """Returns task-specific termination config.""" + return self.termination_cfg_data + + def get_events_cfg(self): + """Returns task-specific events config.""" + return self.events_cfg_data + + def get_observation_cfg(self): + """Returns task-specific observation config.""" + return self.observation_cfg_data + + def get_rewards_cfg(self): + """Returns task-specific rewards config.""" + return self.rewards_cfg_data + + def get_curriculum_cfg(self): + """Returns task-specific curriculum config.""" + return self.curriculum_cfg_data + + def get_commands_cfg(self): + """Returns task-specific commands config.""" + return self.commands_cfg_data + + def get_prompt(self): + """Returns task prompt if available.""" + return self.task_data.get('prompt', '') + + def get_mimic_env_cfg(self, embodiment_name: str): + """Returns mimic env config (not available in generic task).""" + return None + + def get_metrics(self): + """Returns empty metrics list (metrics are stored at cfg level).""" + return [] diff --git a/isaaclab_arena/tests/test_config_serializations.py b/isaaclab_arena/tests/test_config_serializations.py new file mode 100644 index 000000000..618de8ffd --- /dev/null +++ b/isaaclab_arena/tests/test_config_serializations.py @@ -0,0 +1,126 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from pathlib import Path +import gymnasium as gym +from tqdm import tqdm +import torch + +from isaaclab_arena.tests.utils.subprocess import run_simulation_app_function + +CFG_YAML_PATH = "/tmp/test_config_serializations.yaml" +NUM_STEPS = 100 +HEADLESS = True + +def _test_config_serializations(simulation_app) -> bool: + + from isaaclab_arena.utils.config_serialization import load_env_cfg_from_yaml + from isaaclab_arena.metrics.metrics import compute_metrics + from isaaclab_tasks.utils.parse_cfg import parse_env_cfg + from isaaclab_arena.assets.asset_registry import AssetRegistry + from isaaclab_arena.assets.object_reference import ObjectReference + from isaaclab_arena.scene.scene import Scene + from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment + from isaaclab_arena.tasks.pick_and_place_task import PickAndPlaceTask + from isaaclab_arena.utils.pose import Pose + from isaaclab_arena.assets.object_base import ObjectType + from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder + from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser + + args_cli = get_isaaclab_arena_cli_parser().parse_args([]) + asset_registry = AssetRegistry() + background = asset_registry.get_asset_by_name("kitchen")() + cracker_box = asset_registry.get_asset_by_name("cracker_box")() + embodiment = asset_registry.get_asset_by_name("gr1_joint")() + + cracker_box.set_initial_pose( + Pose( + position_xyz=(0.4, 0.0, 0.1), + rotation_wxyz=(1.0, 0.0, 0.0, 0.0), + ) + ) + destination_location = ObjectReference( + name="destination_location", + prim_path="{ENV_REGEX_NS}/kitchen/Cabinet_B_02", + parent_asset=background, + object_type=ObjectType.RIGID, + ) + + scene = Scene(assets=[background, cracker_box, destination_location]) + isaaclab_arena_environment = IsaacLabArenaEnvironment( + name="kitchen_pick_and_place", + embodiment=embodiment, + scene=scene, + task=PickAndPlaceTask(cracker_box, destination_location, background), + teleop_device=None, + ) + + try: + env_builder = ArenaEnvBuilder(isaaclab_arena_environment, args_cli) + env_builder.serialization_file_path = CFG_YAML_PATH + cfg_entry_from_cli = env_builder.build_cfg_entry(serialize=True) + except Exception as e: + print(f"Error: {e}") + return False + + assert Path(CFG_YAML_PATH).exists() + + cfg_entry_from_yaml = load_env_cfg_from_yaml(CFG_YAML_PATH) + # test env can be created from the yaml file + name = "kitchen_pick_and_place" + entry_point = "isaaclab.envs:ManagerBasedRLEnv" + try: + gym.register( + id=name, + entry_point=entry_point, + kwargs={"env_cfg_entry_point": cfg_entry_from_yaml}, + disable_env_checker=True, + ) + + cfg = parse_env_cfg( + name, + device="cuda:0", + num_envs=1, + use_fabric=False, + ) + + # Create environment + print("[INFO] Creating environment...") + env = gym.make(name, cfg=cfg).unwrapped + env.reset() + except Exception as e: + print(f"Error: {e}") + return False + + try: + + # Run some zero actions. + for _ in tqdm(range(NUM_STEPS)): + with torch.inference_mode(): + actions = torch.zeros(env.action_space.shape, device=env.unwrapped.device) + env.step(actions) + + metrics = compute_metrics(env) + assert metrics is not None + assert "num_episodes" in metrics + assert "success_rate" in metrics + assert "object_moved_rate" in metrics + + finally: + env.close() + + return True + + +def test_config_serializations(): + result = run_simulation_app_function( + _test_config_serializations, + headless=HEADLESS, + ) + assert result, f"Test {test_config_serializations.__name__} failed" + + +if __name__ == "__main__": + test_config_serializations() diff --git a/isaaclab_arena/utils/arena_env_reconstruction.py b/isaaclab_arena/utils/arena_env_reconstruction.py new file mode 100644 index 000000000..03ceb9f16 --- /dev/null +++ b/isaaclab_arena/utils/arena_env_reconstruction.py @@ -0,0 +1,361 @@ +# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +"""Utilities for reconstructing IsaacLabArenaEnvironment components from YAML.""" + +# IsaacLab Arena imports +from isaaclab_arena.assets.background import Background +from isaaclab_arena.assets.object import Object +from isaaclab_arena.assets.object_base import ObjectType +from isaaclab_arena.assets.object_reference import ObjectReference +from isaaclab_arena.embodiments.embodiment_base import EmbodimentBase +from isaaclab_arena.scene.scene import Scene +from isaaclab_arena.tasks.generic_task import GenericTask +from isaaclab_arena.utils.configclass import make_configclass +from isaaclab_arena.utils.pose import Pose + + +def _extract_config_from_merged_cfg(embodiment_dict, dict_key, cfg, cfg_attr_name): + """Extract a config from the merged cfg by looking up the first key in the dict. + + This helper function handles the common pattern of: + - Check if dict_key exists in embodiment_dict + - Iterate through the keys in embodiment_dict[dict_key] + - Use getattr to extract the corresponding config from cfg.cfg_attr_name + + Args: + embodiment_dict: Dictionary containing embodiment configuration + dict_key: Key to look up in embodiment_dict (e.g., 'scene_config') + cfg: The main IsaacLabArenaManagerBasedRLEnvCfg + cfg_attr_name: Attribute name in cfg to extract from (e.g., 'scene') + + Returns: + The extracted config object, or None if not found + """ + if dict_key in embodiment_dict and embodiment_dict[dict_key] is not None: + #TODO (xinjieyao, 2025-12-08): Handle mimic_env + if dict_key == 'mimic_env': + return None + for config_name, _ in embodiment_dict[dict_key].items(): + cfg_object = getattr(cfg, cfg_attr_name, None) + if cfg_object is not None: + return getattr(cfg_object, config_name, None) + return None + + +def reconstruct_embodiment(embodiment_dict, cfg): + """Reconstruct EmbodimentBase from YAML dictionary. + + Stores embodiment-specific configs from embodiment_dict. These are the individual + contributions from the embodiment that will be combined with scene and task configs. + Uses deserialization functions to convert YAML dicts to proper config objects. + + Args: + embodiment_dict: Dictionary containing embodiment configuration and metadata + cfg: The main IsaacLabArenaManagerBasedRLEnvCfg with deserialized top-level configs + + Returns: + EmbodimentBase instance populated with embodiment-specific configs + """ + # Extract initialization parameters from YAML + enable_cameras = embodiment_dict.get('enable_cameras', False) + initial_pose_dict = embodiment_dict.get('initial_pose') + initial_pose = None + if initial_pose_dict: + initial_pose = Pose( + position_xyz=tuple(initial_pose_dict.get('position_xyz', (0, 0, 0))), + rotation_wxyz=tuple(initial_pose_dict.get('rotation_wxyz', (1, 0, 0, 0))) + ) + + # Create embodiment instance + embodiment = EmbodimentBase( + enable_cameras=enable_cameras, + initial_pose=initial_pose + ) + + # Camera config needs special handling + if 'camera_config' in embodiment_dict: + embodiment.camera_config = _deserialize_camera_config(embodiment_dict['camera_config']) + + # Define config mappings: (dict_key, cfg_attr, embodiment_attr) + config_mappings = [ + ('scene_config', 'scene', 'scene_config'), + ('action_config', 'actions', 'action_config'), + ('observation_config', 'observations', 'observation_config'), + ('event_config', 'events', 'event_config'), + ('reward_config', 'rewards', 'reward_config'), + ('curriculum_config', 'curriculum', 'curriculum_config'), + ('command_config', 'commands', 'command_config'), + ('termination_cfg', 'terminations', 'termination_cfg'), + ('xr', 'xr', 'xr'), + ('mimic_env', 'mimic_env', 'mimic_env'), + ] + + # Extract all configs using a loop + for dict_key, cfg_attr, embodiment_attr in config_mappings: + config = _extract_config_from_merged_cfg(embodiment_dict, dict_key, cfg, cfg_attr) + if config is not None: + setattr(embodiment, embodiment_attr, config) + + return embodiment + + +def _extract_asset_metadata(asset_data): + """Extract common asset metadata from YAML dictionary. + + Args: + asset_data: Dictionary containing asset data + + Returns: + Tuple of (tags, prim_path, usd_path, scale, object_type, initial_pose) + """ + tags = asset_data.get('tags') + prim_path = asset_data.get('prim_path') + usd_path = asset_data.get('usd_path') + scale = tuple(asset_data.get('scale', (1.0, 1.0, 1.0))) + + # Extract object type + asset_type_enum_dict = asset_data.get('object_type', {}) + object_type_value = asset_type_enum_dict.get('_value_', 'base') + object_type = { + 'base': ObjectType.BASE, + 'rigid': ObjectType.RIGID, + 'articulation': ObjectType.ARTICULATION, + 'spawner': ObjectType.SPAWNER, + }.get(object_type_value, ObjectType.BASE) + + # Extract initial pose + initial_pose_dict = asset_data.get('initial_pose') + initial_pose = None + if initial_pose_dict: + initial_pose = Pose( + position_xyz=tuple(initial_pose_dict.get('position_xyz', (0, 0, 0))), + rotation_wxyz=tuple(initial_pose_dict.get('rotation_wxyz', (1, 0, 0, 0))) + ) + + return tags, prim_path, usd_path, scale, object_type, initial_pose + + +def reconstruct_scene(scene_dict, cfg): + """Reconstruct Scene from YAML dictionary. + + Reconstructs Asset objects and stores scene-specific configs from scene_dict. + These are the individual contributions from the scene that will be combined. + + Args: + scene_dict: Dictionary containing scene metadata, assets, and configs + cfg: The main IsaacLabArenaManagerBasedRLEnvCfg (for reference, not directly used) + + Returns: + Scene instance populated with assets and scene-specific configs + """ + # Create scene instance + scene = Scene() + + # Reconstruct assets from the metadata in scene_dict and configs from cfg.scene + assets_dict = scene_dict.get('assets', {}) + + # First pass: Create regular assets (Object, Background) + for asset_name, asset_data in assets_dict.items(): + if not isinstance(asset_data, dict) or 'parent_asset' in asset_data: + continue + + asset_cfg = getattr(cfg.scene, asset_name, None) + if asset_cfg is None: + continue + + # Extract metadata + tags, prim_path, usd_path, scale, object_type, initial_pose = _extract_asset_metadata(asset_data) + + try: + # Create the appropriate Asset type + if tags and 'background' in tags: + asset = Background( + name=asset_name, + usd_path=usd_path, + object_min_z=asset_data.get('object_min_z', -0.2), + prim_path=prim_path, + initial_pose=initial_pose, + scale=scale, + tags=tags + ) + else: + asset = Object( + name=asset_name, + prim_path=prim_path, + object_type=object_type, + usd_path=usd_path, + scale=scale, + initial_pose=initial_pose, + tags=tags + ) + + asset.object_cfg = asset_cfg + scene.add_asset(asset) + except Exception as exc: + raise Exception(f"Failed to create asset {asset_name}: {exc}") from exc + + # Second pass: Create ObjectReference assets (they need parent assets to exist first) + for asset_name, asset_data in assets_dict.items(): + if not isinstance(asset_data, dict) or 'parent_asset' not in asset_data: + continue + + asset_cfg = getattr(cfg.scene, asset_name, None) + if asset_cfg is None: + continue + + try: + # Get parent asset from scene + parent_asset_name = asset_data.get('parent_asset', {}).get('_name') + if parent_asset_name is None: + continue + + parent_asset = scene.assets.get(parent_asset_name) + if parent_asset is None: + raise ValueError(f"Parent asset '{parent_asset_name}' not found for '{asset_name}'") + + # Extract metadata + tags, prim_path, _, _, object_type, _ = _extract_asset_metadata(asset_data) + + # Create ObjectReference + asset_ref = ObjectReference( + parent_asset=parent_asset, + name=asset_name, + prim_path=prim_path, + object_type=object_type, + tags=tags + ) + + asset_ref.object_cfg = asset_cfg + scene.add_asset(asset_ref) + except Exception as exc: + raise Exception(f"Failed to create ObjectReference {asset_name}: {exc}") from exc + + # Define config mappings: (dict_key, cfg_attr, scene_attr) + config_mappings = [ + ('observation_cfg', 'observations', 'observation_cfg'), + ('events_cfg', 'events', 'events_cfg'), + ('termination_cfg', 'terminations', 'termination_cfg'), + ('rewards_cfg', 'rewards', 'rewards_cfg'), + ('curriculum_cfg', 'curriculum', 'curriculum_cfg'), + ('commands_cfg', 'commands', 'commands_cfg'), + ] + + # Extract all configs using a loop + for dict_key, cfg_attr, scene_attr in config_mappings: + config = _extract_config_from_merged_cfg(scene_dict, dict_key, cfg, cfg_attr) + if config is not None: + setattr(scene, scene_attr, config) + + return scene + + +def reconstruct_task(task_dict, cfg): + """Reconstruct TaskBase from YAML dictionary. + + Stores task-specific configs from task_dict. These are the individual + contributions from the task that will be combined with embodiment and scene. + Uses deserialization functions to convert YAML dicts to proper config objects. + + Args: + task_dict: Dictionary containing task metadata and configs + cfg: The main IsaacLabArenaManagerBasedRLEnvCfg with deserialized top-level configs + + Returns: + TaskBase-like object with task-specific data and configs + """ + # Extract task-specific configs using the helper function + # Define config mappings: (dict_key, cfg_attr) + config_keys = [ + ('scene_config', 'scene'), + ('events_cfg', 'events'), + ('termination_cfg', 'terminations'), + ('observation_cfg', 'observations'), + ('rewards_cfg', 'rewards'), + ('curriculum_cfg', 'curriculum'), + ('commands_cfg', 'commands'), + ] + + # Extract all configs + extracted_configs = {} + for dict_key, cfg_attr in config_keys: + config = _extract_config_from_merged_cfg(task_dict, dict_key, cfg, cfg_attr) + extracted_configs[dict_key] = config + + return GenericTask( + scene_cfg=extracted_configs.get('scene_config'), + events_cfg=extracted_configs.get('events_cfg'), + termination_cfg=extracted_configs.get('termination_cfg'), + observation_cfg=extracted_configs.get('observation_cfg'), + rewards_cfg=extracted_configs.get('rewards_cfg'), + curriculum_cfg=extracted_configs.get('curriculum_cfg'), + commands_cfg=extracted_configs.get('commands_cfg'), + episode_length_s=task_dict.get('episode_length_s'), + ) + + +def _deserialize_camera_config(camera_config_dict): + """Deserialize camera config dictionary into proper camera sensor config objects. + + Camera configs contain sensor configurations (e.g., TiledCamera) that need to be + properly instantiated with the correct config classes. + + Args: + camera_config_dict: Dictionary containing camera configurations + Returns: + Dynamically created config class with camera sensor configs + """ + # Import here to avoid circular dependency + from isaaclab_arena.utils.config_serialization import _create_asset_config + + # Fields that should NOT be in camera configs (these are for spawn configs of robots/objects) + INVALID_CAMERA_FIELDS = { + 'activateContactSensors', 'activate_contact_sensors', + 'rigid_props', 'articulation_props', 'collision_props', + 'actuators', 'joint_names', 'fixed_tendons', + } + + def _filter_dict_recursive(data): + """Recursively filter out invalid fields from nested dicts.""" + if not isinstance(data, dict): + return data + + filtered = {} + for k, v in data.items(): + # Skip invalid fields + if k in INVALID_CAMERA_FIELDS: + continue + # Recursively filter nested dicts + if isinstance(v, dict): + filtered[k] = _filter_dict_recursive(v) + else: + filtered[k] = v + return filtered + + # Deserialize each camera sensor configuration + camera_configs = {} + for camera_name, camera_data in camera_config_dict.items(): + if camera_name.startswith('_'): + # Skip metadata fields like _is_tiled_camera, _camera_offset + continue + + if isinstance(camera_data, dict): + # Filter out invalid fields recursively (including in spawn config) + filtered_camera_data = _filter_dict_recursive(camera_data) + + # Deserialize camera sensor config using _create_asset_config + camera_configs[camera_name] = _create_asset_config(camera_name, filtered_camera_data) + else: + # Keep as-is if not a dict + camera_configs[camera_name] = camera_data + + # Create dynamic config class containing all cameras + if camera_configs: + camera_fields = [(name, type(cfg), cfg) for name, cfg in camera_configs.items()] + CameraConfigClass = make_configclass('CameraConfig', camera_fields) + return CameraConfigClass() + + return None + diff --git a/isaaclab_arena/utils/config_serialization.py b/isaaclab_arena/utils/config_serialization.py index a790d39b3..c4c9207b2 100644 --- a/isaaclab_arena/utils/config_serialization.py +++ b/isaaclab_arena/utils/config_serialization.py @@ -6,13 +6,16 @@ """Utilities for serializing and deserializing IsaacLab Arena environment configs.""" import builtins -from dataclasses import fields -from typing import Any - +import importlib +import inspect import numpy as np +import pkgutil import yaml +from dataclasses import fields +from pathlib import Path +from typing import Any -from isaaclab.assets import AssetBaseCfg, RigidObjectCfg, ArticulationCfg +from isaaclab.assets import ArticulationCfg, AssetBaseCfg, RigidObjectCfg from isaaclab.devices.openxr import XrCfg from isaaclab.managers import ( CommandTermCfg, @@ -32,7 +35,6 @@ MassPropertiesCfg, RigidBodyPropertiesCfg, ) - from isaaclab.sim.spawners.from_files.from_files_cfg import UsdFileCfg from isaaclab.sim.spawners.materials import PreviewSurfaceCfg, VisualMaterialCfg from isaaclab.sim.spawners.shapes.shapes_cfg import CapsuleCfg, ConeCfg, CuboidCfg, CylinderCfg, SphereCfg @@ -41,48 +43,50 @@ # IsaacLab Arena imports from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment from isaaclab_arena.environments.isaaclab_arena_manager_based_env import IsaacLabArenaManagerBasedRLEnvCfg +from isaaclab_arena.utils.arena_env_reconstruction import reconstruct_embodiment, reconstruct_scene, reconstruct_task from isaaclab_arena.utils.configclass import make_configclass from isaaclab_arena.utils.pose import Pose def _get_config_class_patterns(class_type_str): """Generate potential config class patterns from a class_type string.""" - if ':' not in class_type_str: + if ":" not in class_type_str: return [] - - module_path, class_name = class_type_str.rsplit(':', 1) + + module_path, class_name = class_type_str.rsplit(":", 1) config_patterns = [] - + # Pattern 1: same module + Cfg (e.g., joint_actions:JointPositionActionCfg) - config_patterns.append(f'{module_path}:{class_name}Cfg') - + config_patterns.append(f"{module_path}:{class_name}Cfg") + # Pattern 2: module + _cfg + Cfg (e.g., rigid_object_cfg:RigidObjectCfg) - config_patterns.append(f'{module_path}_cfg:{class_name}Cfg') - + config_patterns.append(f"{module_path}_cfg:{class_name}Cfg") + # Pattern 3: For nested modules like actions.joint_actions, try parent.parent_cfg # e.g., isaaclab.envs.mdp.actions.joint_actions -> isaaclab.envs.mdp.actions.actions_cfg - if '.' in module_path: - parent_parts = module_path.rsplit('.', 1) + if "." in module_path: + parent_parts = module_path.rsplit(".", 1) if len(parent_parts) == 2: parent_module, last_module = parent_parts - parent_name = parent_module.split('.')[-1] - config_patterns.append(f'{parent_module}.{parent_name}_cfg:{class_name}Cfg') - + parent_name = parent_module.split(".")[-1] + config_patterns.append(f"{parent_module}.{parent_name}_cfg:{class_name}Cfg") + # Pattern 4: For actuators specifically, try replacing last module with 'actuator_cfg' # e.g., isaaclab.actuators.actuator_pd:ImplicitActuator -> isaaclab.actuators.actuator_cfg:ImplicitActuatorCfg - if last_module.startswith('actuator'): - config_patterns.append(f'{parent_module}.actuator_cfg:{class_name}Cfg') - + if last_module.startswith("actuator"): + config_patterns.append(f"{parent_module}.actuator_cfg:{class_name}Cfg") + return config_patterns + def register_yaml_constructors(): """Register custom YAML constructors for numpy types and Python builtins.""" - + def slice_constructor(loader, node): """Construct Python slice objects from YAML.""" args = loader.construct_sequence(node, deep=True) return builtins.slice(*args) - + def numpy_scalar_constructor(loader, node): """Construct numpy scalar objects from YAML binary data.""" dtype_node, data_node = node.value @@ -90,65 +94,57 @@ def numpy_scalar_constructor(loader, node): data = loader.construct_object(data_node, deep=True) scalar = np.frombuffer(data, dtype=dtype, count=1)[0] return scalar - + def numpy_dtype_constructor(loader, node): """Construct numpy dtype objects from YAML.""" if isinstance(node, yaml.MappingNode): mapping = loader.construct_mapping(node, deep=True) - if 'args' in mapping: - return np.dtype(*mapping['args']) - raise yaml.YAMLError(f'Could not reconstruct numpy.dtype from node: {node}') - + if "args" in mapping: + return np.dtype(*mapping["args"]) + raise yaml.YAMLError(f"Could not reconstruct numpy.dtype from node: {node}") + # Register constructors yaml.add_constructor( - 'tag:yaml.org,2002:python/object/apply:builtins.slice', - slice_constructor, - Loader=yaml.FullLoader + "tag:yaml.org,2002:python/object/apply:builtins.slice", slice_constructor, Loader=yaml.FullLoader ) yaml.add_constructor( - 'tag:yaml.org,2002:python/object/apply:numpy.dtype', - numpy_dtype_constructor, - Loader=yaml.FullLoader + "tag:yaml.org,2002:python/object/apply:numpy.dtype", numpy_dtype_constructor, Loader=yaml.FullLoader ) yaml.add_constructor( - 'tag:yaml.org,2002:python/object/apply:numpy.core.multiarray.scalar', + "tag:yaml.org,2002:python/object/apply:numpy.core.multiarray.scalar", numpy_scalar_constructor, - Loader=yaml.FullLoader + Loader=yaml.FullLoader, ) def _discover_metric_classes(): """Discover all MetricBase subclasses in the metrics module. - + Returns: Dict mapping metric class names to their classes """ - import importlib - import inspect - import pkgutil - from pathlib import Path - + try: import isaaclab_arena.metrics as metrics_module from isaaclab_arena.metrics.metric_base import MetricBase except ImportError as e: print(f"[WARNING] Could not import metrics module: {e}") return {} - + metric_classes = {} - + # Get the metrics module path metrics_path = Path(metrics_module.__file__).parent - + # Iterate through all .py files in the metrics directory for module_info in pkgutil.iter_modules([str(metrics_path)]): - if module_info.name.startswith('_') or module_info.name == 'metric_base': + if module_info.name.startswith("_") or module_info.name == "metric_base": continue - + try: # Import the module - module = importlib.import_module(f'isaaclab_arena.metrics.{module_info.name}') - + module = importlib.import_module(f"isaaclab_arena.metrics.{module_info.name}") + # Find all MetricBase subclasses in the module for name, obj in inspect.getmembers(module, inspect.isclass): if issubclass(obj, MetricBase) and obj is not MetricBase: @@ -157,136 +153,176 @@ def _discover_metric_classes(): except Exception as e: print(f"[WARNING] Could not import metric module {module_info.name}: {e}") import traceback + traceback.print_exc() - + return metric_classes def _reconstruct_asset_from_dict(asset_dict): """Reconstruct an Asset object from a dictionary. - + Args: asset_dict: Dictionary containing asset data - + Returns: Asset instance """ from isaaclab_arena.assets.asset import Asset - - return Asset( - name=asset_dict.get('_name', asset_dict.get('name')), - tags=asset_dict.get('tags') - ) + + return Asset(name=asset_dict.get("_name", asset_dict.get("name")), tags=asset_dict.get("tags")) def _reconstruct_metrics_from_yaml(metrics_list): """Reconstruct metric objects from YAML data by auto-discovering metric classes. - + This function automatically discovers all MetricBase subclasses in the metrics folder and attempts to match YAML data to their __init__ signatures. - + Args: metrics_list: List of metric dictionaries from YAML - + Returns: List of MetricBase objects """ - import inspect - + if not metrics_list: return [] - + # Discover all available metric classes metric_classes = _discover_metric_classes() - + if not metric_classes: print("[WARNING] No metric classes discovered") return [] - + print(f"[INFO] Discovered {len(metric_classes)} metric classes: {list(metric_classes.keys())}") - + reconstructed_metrics = [] - + for idx, metric_data in enumerate(metrics_list): metric_created = False print(f"[DEBUG] Processing metric {idx}: {metric_data}") - + # Empty dict - try metrics with no required parameters if not metric_data or metric_data == {}: - print(f"[DEBUG] Empty dict detected, trying to match to no-param metrics") + # note (xinjieyao, 2025-12-08): empty init params are allowed for metrics, e.g. SuccessRateMetric. for metric_name, metric_class in metric_classes.items(): try: # Try to instantiate with no arguments metric_instance = metric_class() reconstructed_metrics.append(metric_instance) metric_created = True - print(f"[INFO] Matched empty dict to {metric_name}") break - except TypeError as e: - # This metric requires parameters, try next one - print(f"[DEBUG] {metric_name} requires parameters: {e}") - continue except Exception as e: - # Other error, skip this metric - print(f"[DEBUG] Failed to instantiate {metric_name}: {type(e).__name__}: {e}") + print(f"[DEBUG] Failed to instantiate {metric_name}: {type(e).__name__}: {e}, skipping") continue else: # Try to match metric data to metric class signatures for metric_name, metric_class in metric_classes.items(): try: sig = inspect.signature(metric_class.__init__) - params = {name: p for name, p in sig.parameters.items() if name != 'self'} - + params = {name: p for name, p in sig.parameters.items() if name != "self"} + # Prepare arguments for the metric class args = {} - + # Handle 'object' parameter specially - reconstruct Asset - if 'object' in params and 'object' in metric_data: - args['object'] = _reconstruct_asset_from_dict(metric_data['object']) - + if "object" in params and "object" in metric_data: + args["object"] = _reconstruct_asset_from_dict(metric_data["object"]) + # Handle other parameters by direct mapping for param_name, param in params.items(): - if param_name == 'object': + if param_name == "object": continue # Already handled - + # Try to find matching key in metric_data + # TODO(xinjieyao, 2025-12-08): check other metrics for required parameters. if param_name in metric_data: args[param_name] = metric_data[param_name] elif param.default == inspect.Parameter.empty: # Required parameter not found, can't use this class - raise ValueError(f"Required parameter {param_name} not found") - + print(f"[DEBUG] Required parameter {param_name} not found for {metric_name}, skipping") + continue + # Try to instantiate the metric metric_instance = metric_class(**args) reconstructed_metrics.append(metric_instance) metric_created = True - print(f"[INFO] Matched metric data to {metric_name}") break - + except Exception as e: # This metric class doesn't match, try next one print(f"[DEBUG] Failed to match {metric_name}: {e}") continue - + if not metric_created: print(f"[WARNING] Could not match metric data to any known metric class: {metric_data}") - + return reconstructed_metrics +def _reconstruct_isaaclab_arena_env(arena_env_dict, cfg): + """Reconstruct isaaclab_arena_env using already-deserialized top-level configs. + + This function constructs proper typed objects for embodiment, scene, and task + from the YAML dictionary data, using the already-deserialized top-level configs. + + Args: + arena_env_dict: The YAML dictionary for isaaclab_arena_env (for metadata like 'name') + cfg: The main IsaacLabArenaManagerBasedRLEnvCfg with deserialized sections + + Returns: + IsaacLabArenaEnvironment instance or None if arena_env_dict is empty + """ + if not arena_env_dict: + return None + + arena_env = IsaacLabArenaEnvironment() + + # Populate name from YAML + if "name" in arena_env_dict: + arena_env.name = arena_env_dict["name"] + + # Reconstruct embodiment + if "embodiment" in arena_env_dict: + arena_env.embodiment = reconstruct_embodiment(arena_env_dict["embodiment"], cfg) + + # Reconstruct scene + if "scene" in arena_env_dict: + arena_env.scene = reconstruct_scene(arena_env_dict["scene"], cfg) + + # Reconstruct task + if "task" in arena_env_dict: + arena_env.task = reconstruct_task(arena_env_dict["task"], cfg) + + # Handle teleop_device and orchestrator - keep as dicts for now since they're optional + # TODO(xinjieyao, 2025-12-08): test on teleop device + if "teleop_device" in arena_env_dict: + arena_env.teleop_device = arena_env_dict["teleop_device"] + + if "orchestrator" in arena_env_dict: + arena_env.orchestrator = arena_env_dict["orchestrator"] + + if "env_cfg_callback" in arena_env_dict: + arena_env.env_cfg_callback = arena_env_dict["env_cfg_callback"] + + return arena_env + + def load_env_cfg_from_yaml(yaml_path: str): """Load an IsaacLab Arena environment config from a YAML file. - + This function deserializes a config saved using dump_yaml(), handling complex nested structures, numpy types, and configclass objects. - + Metrics are automatically discovered from the isaaclab_arena/metrics folder and reconstructed by matching YAML data to their __init__ signatures. This supports custom user-defined metrics - just add them to the metrics folder. - + Args: yaml_path: Path to the YAML config file - + Returns: IsaacLabArenaManagerBasedRLEnvCfg: The deserialized environment configuration with metrics auto-discovered and reconstructed @@ -295,115 +331,115 @@ def load_env_cfg_from_yaml(yaml_path: str): register_yaml_constructors() # Load YAML - with open(yaml_path, encoding='utf-8') as f: + with open(yaml_path, encoding="utf-8") as f: cfg_dict = yaml.load(f, Loader=yaml.FullLoader) - + # Create base config cfg = IsaacLabArenaManagerBasedRLEnvCfg() - + # Extract and handle sections with dynamic configs for section_name, create_func in [ - ('scene', _create_scene_config), - ('recorders', _create_recorders_config), - ('actions', _create_actions_config), - ('events', _create_events_config), - ('observations', _create_observations_config), + ("scene", _create_scene_config), + ("recorders", _create_recorders_config), + ("actions", _create_actions_config), + ("events", _create_events_config), + ("observations", _create_observations_config), ]: section_dict = cfg_dict.pop(section_name, None) if section_dict is not None: setattr(cfg, section_name, create_func(section_dict)) - + # Default observations if not present - if not hasattr(cfg, 'observations') or cfg.observations is None: + if not hasattr(cfg, "observations") or cfg.observations is None: cfg.observations = ObservationGroupCfg() - + # Handle terminations - terminations_dict = cfg_dict.pop('terminations', None) + terminations_dict = cfg_dict.pop("terminations", None) if terminations_dict is not None: _create_terminations_config(cfg, terminations_dict) - + # Handle rewards, curriculum, commands - create dynamic configs for config_name, term_cfg_class in [ - ('rewards', RewardTermCfg), - ('curriculum', CurriculumTermCfg), - ('commands', CommandTermCfg) + ("rewards", RewardTermCfg), + ("curriculum", CurriculumTermCfg), + ("commands", CommandTermCfg), ]: config_dict = cfg_dict.pop(config_name, None) if config_dict is not None: if config_dict: # Non-empty dict result_cfg = _create_dynamic_manager_config( - config_dict, None, term_cfg_class, f'{config_name.capitalize()}Cfg' + config_dict, None, term_cfg_class, f"{config_name.capitalize()}Cfg" ) else: # Empty dict - EmptyCfg = make_configclass(f'{config_name.capitalize()}Cfg', []) + EmptyCfg = make_configclass(f"{config_name.capitalize()}Cfg", []) result_cfg = EmptyCfg() setattr(cfg, config_name, result_cfg) - + # Handle metrics - reconstruct metric objects from YAML data - metrics_list = cfg_dict.pop('metrics', None) + metrics_list = cfg_dict.pop("metrics", None) if metrics_list is not None: cfg.metrics = _reconstruct_metrics_from_yaml(metrics_list) else: cfg.metrics = [] - - # Handle nested configs (isaaclab_arena_env, XR) - for section_name, config_class in [ - ('isaaclab_arena_env', IsaacLabArenaEnvironment), - ('xr', XrCfg), - ]: - section_dict = cfg_dict.pop(section_name, None) - if section_dict is not None: - section_cfg = config_class() - _populate_from_dict(section_cfg, section_dict) - setattr(cfg, section_name, section_cfg) - - # Use from_dict for all remaining fields - cfg.from_dict(cfg_dict) - + + # Handle XR config + xr_dict = cfg_dict.pop("xr", None) + if xr_dict is not None: + xr_cfg = XrCfg() + _populate_from_dict(xr_cfg, xr_dict) + cfg.xr = xr_cfg + + # Reconstruct isaaclab_arena_env using already-deserialized top-level configs + arena_env_dict = cfg_dict.pop("isaaclab_arena_env", None) + if arena_env_dict is not None: + cfg.isaaclab_arena_env = _reconstruct_isaaclab_arena_env(arena_env_dict, cfg) + return cfg def _convert_markers_to_spawner_configs(markers_dict): """Convert markers dictionary to proper spawner config objects (SphereCfg, etc.). - + Markers are spawner configs identified by their func field, not class_type. - + Args: markers_dict: Dictionary where each value is a marker dict with 'func' field - + Returns: Dictionary with each marker converted to its spawner config object """ result_cfg = {} for marker_name, marker_dict in markers_dict.items(): - if isinstance(marker_dict, dict) and 'func' in marker_dict: + if isinstance(marker_dict, dict) and "func" in marker_dict: # Recursively convert nested configs first marker_dict_converted = _convert_funcs_in_dict(marker_dict) - + # Convert visual_material dict to PreviewSurfaceCfg if present - if 'visual_material' in marker_dict_converted and isinstance(marker_dict_converted['visual_material'], dict): + if "visual_material" in marker_dict_converted and isinstance( + marker_dict_converted["visual_material"], dict + ): visual_mat = PreviewSurfaceCfg() - _populate_from_dict(visual_mat, marker_dict_converted['visual_material']) - marker_dict_converted['visual_material'] = visual_mat - + _populate_from_dict(visual_mat, marker_dict_converted["visual_material"]) + marker_dict_converted["visual_material"] = visual_mat + # Determine spawner config type from func - func = marker_dict_converted.get('func') + func = marker_dict_converted.get("func") marker_cfg = None - + if func: func_name = func.__name__ if callable(func) else str(func) # Map func name to config class - if 'sphere' in func_name.lower(): + if "sphere" in func_name.lower(): marker_cfg = SphereCfg() - elif 'cone' in func_name.lower(): + elif "cone" in func_name.lower(): marker_cfg = ConeCfg() - elif 'cylinder' in func_name.lower(): + elif "cylinder" in func_name.lower(): marker_cfg = CylinderCfg() - elif 'cuboid' in func_name.lower() or 'cube' in func_name.lower(): + elif "cuboid" in func_name.lower() or "cube" in func_name.lower(): marker_cfg = CuboidCfg() - elif 'capsule' in func_name.lower(): + elif "capsule" in func_name.lower(): marker_cfg = CapsuleCfg() - + if marker_cfg: _populate_from_dict(marker_cfg, marker_dict_converted, skip_conversion=True) result_cfg[marker_name] = marker_cfg @@ -414,38 +450,38 @@ def _convert_markers_to_spawner_configs(markers_dict): result_cfg[marker_name] = marker_dict_converted else: result_cfg[marker_name] = marker_dict - + return result_cfg -def _convert_class_type_dict_to_config(items_dict, context_name=''): +def _convert_class_type_dict_to_config(items_dict, context_name=""): """Convert a dict of items with class_type to a dict of config objects. - + Args: items_dict: Dictionary where each value is a dict with 'class_type' field context_name: Context name for error messages (e.g. 'Actuator', 'Marker') - + Returns: Dictionary with each value converted to its config object """ result_cfg = {} for item_name, item_dict in items_dict.items(): - if isinstance(item_dict, dict) and 'class_type' in item_dict: + if isinstance(item_dict, dict) and "class_type" in item_dict: # Recursively convert this item dict first (but NOT actuators, to avoid infinite recursion) item_dict_converted = item_dict.copy() for k, v in item_dict.items(): - if k == 'class_type' and isinstance(v, str): + if k == "class_type" and isinstance(v, str): try: item_dict_converted[k] = string_to_callable(v) except (ImportError, AttributeError, ValueError): item_dict_converted[k] = v - elif k != 'actuators' and isinstance(v, (dict, list)): + elif k != "actuators" and isinstance(v, (dict, list)): item_dict_converted[k] = _convert_funcs_in_dict(v) else: item_dict_converted[k] = v - - class_type = item_dict_converted['class_type'] - + + class_type = item_dict_converted["class_type"] + # Try to get the Cfg class directly by appending 'Cfg' to class name item_cfg = None if not isinstance(class_type, str): @@ -453,8 +489,8 @@ def _convert_class_type_dict_to_config(items_dict, context_name=''): # Try multiple approaches to get the config class class_module = class_type.__module__ class_name = class_type.__name__ - cfg_class_name = class_name + 'Cfg' - + cfg_class_name = class_name + "Cfg" + # Approach 1: Try direct string_to_callable try: cfg_class = string_to_callable(f"{class_module}:{cfg_class_name}") @@ -462,6 +498,10 @@ def _convert_class_type_dict_to_config(items_dict, context_name=''): _populate_from_dict(item_cfg, item_dict_converted, skip_conversion=True) result_cfg[item_name] = item_cfg except (ImportError, AttributeError, ValueError) as e: + print( + f"[DEBUG] Failed to instantiate {item_name} with {class_module}:{cfg_class_name}:" + f" {type(e).__name__}: {e}, skipping" + ) # Approach 2: Try patterns config_patterns = _get_config_class_patterns(f"{class_module}:{class_name}") for config_class_str in config_patterns: @@ -472,6 +512,10 @@ def _convert_class_type_dict_to_config(items_dict, context_name=''): result_cfg[item_name] = item_cfg break except (ImportError, AttributeError, ValueError) as e2: + print( + f"[DEBUG] Failed to instantiate {item_name} with {config_class_str}:" + f" {type(e2).__name__}: {e2}, skipping" + ) continue else: # class_type is still a string @@ -484,15 +528,19 @@ def _convert_class_type_dict_to_config(items_dict, context_name=''): result_cfg[item_name] = item_cfg break except (ImportError, AttributeError, ValueError) as e: + print( + f"[DEBUG] Failed to instantiate {item_name} with {config_class_str}:" + f" {type(e).__name__}: {e}, skipping" + ) continue - + if item_cfg is None: # If conversion failed, keep as converted dict result_cfg[item_name] = item_dict_converted else: # Not a dict with class_type, just convert funcs result_cfg[item_name] = _convert_funcs_in_dict(item_dict) if isinstance(item_dict, dict) else item_dict - + return result_cfg @@ -501,17 +549,17 @@ def _convert_funcs_in_dict(data): if isinstance(data, dict): result = {} for key, value in data.items(): - if (key == 'func' or key == 'class_type' or key.endswith('_class_type')) and isinstance(value, str): + if (key == "func" or key == "class_type" or key.endswith("_class_type")) and isinstance(value, str): try: result[key] = string_to_callable(value) except (ImportError, AttributeError, ValueError) as e: # If conversion fails, keep as string print(f"Warning: Could not convert {key}='{value}' to callable: {e}") result[key] = value - elif key == 'actuators' and isinstance(value, dict): + elif key == "actuators" and isinstance(value, dict): # Convert actuators dict to proper config objects - result[key] = _convert_class_type_dict_to_config(value, 'Actuator') - elif key == 'markers' and isinstance(value, dict): + result[key] = _convert_class_type_dict_to_config(value, "Actuator") + elif key == "markers" and isinstance(value, dict): # Convert markers dict (used in visualizer_cfg) to proper spawner config objects # Markers don't have class_type, they're spawner configs determined by func result[key] = _convert_markers_to_spawner_configs(value) @@ -528,7 +576,7 @@ def _convert_funcs_in_dict(data): def _populate_from_dict(obj, data_dict, skip_conversion=False): """Recursively populate an object from a dictionary for nested config structures. - + Args: obj: The object to populate data_dict: The dictionary with values @@ -536,27 +584,27 @@ def _populate_from_dict(obj, data_dict, skip_conversion=False): """ if not isinstance(data_dict, dict): return - + # Convert all func strings to callables first (recursively) # This also handles actuators conversion if not skip_conversion: data_dict = _convert_funcs_in_dict(data_dict) - + # Replace all MISSING fields with None first _replace_all_missing(obj) - + for key, value in data_dict.items(): if not hasattr(obj, key): # Dynamically add the attribute - just set the value setattr(obj, key, value) else: existing_attr = getattr(obj, key) - is_missing = type(existing_attr).__name__ == '_MISSING_TYPE' - + is_missing = type(existing_attr).__name__ == "_MISSING_TYPE" + if is_missing or existing_attr is None: # Just set the value directly object.__setattr__(obj, key, value) - elif isinstance(value, dict) and hasattr(existing_attr, '__dict__') and not isinstance(existing_attr, dict): + elif isinstance(value, dict) and hasattr(existing_attr, "__dict__") and not isinstance(existing_attr, dict): # Recurse into nested objects (only if it's a proper object, not a dict) _populate_from_dict(existing_attr, value) else: @@ -566,40 +614,42 @@ def _populate_from_dict(obj, data_dict, skip_conversion=False): def _replace_all_missing(obj): """Replace all MISSING fields in a dataclass with None.""" - if hasattr(obj, '__dataclass_fields__'): + if hasattr(obj, "__dataclass_fields__"): for field_name in obj.__dataclass_fields__: field_value = getattr(obj, field_name) - if type(field_value).__name__ == '_MISSING_TYPE': + if type(field_value).__name__ == "_MISSING_TYPE": object.__setattr__(obj, field_name, None) def _create_dynamic_manager_config(cfg_dict, base_fields, term_cfg_class, config_name): """Create a dynamic manager config (terminations, rewards, etc.). - + Args: cfg_dict: Dictionary of term configurations base_fields: Base fields from the parent class (if any) term_cfg_class: The term config class (e.g., TerminationTermCfg, RewardTermCfg) config_name: Name for the dynamic config class - + Returns: Dynamic configclass instance """ if not cfg_dict: return None - + # Create term instances term_instances = {} for key, term_dict in cfg_dict.items(): # Convert funcs and scene entities in params + # e.g. Event with pose in params is converted to Pose objects (params: pose: position_xyz and rotation_wxyz) + # e.g. Termination with object_cfg in params is converted to SceneEntityCfg term_dict = _convert_funcs_in_dict(term_dict) - if 'params' in term_dict and isinstance(term_dict['params'], dict): - term_dict['params'] = _convert_scene_entity_dicts(term_dict['params']) - + if "params" in term_dict and isinstance(term_dict["params"], dict): + term_dict["params"] = _convert_term_params_to_objects(term_dict["params"]) + term_cfg = term_cfg_class() _populate_from_dict(term_cfg, term_dict, skip_conversion=True) term_instances[key] = term_cfg - + # Create dynamic config class term_fields = [(key, term_cfg_class, inst) for key, inst in term_instances.items()] DynamicCfg = make_configclass(config_name, term_fields, bases=base_fields if base_fields else ()) @@ -608,127 +658,157 @@ def _create_dynamic_manager_config(cfg_dict, base_fields, term_cfg_class, config def _convert_spawn_to_config(asset_dict): """Convert spawn dictionary to proper spawn config object (UsdFileCfg, etc.). - + Also converts nested config objects within spawn (rigid_props, articulation_props, etc.) - + Args: asset_dict: Asset dictionary that may contain a 'spawn' dict - + Returns: Asset dictionary with spawn dict converted to proper config object """ - if not isinstance(asset_dict, dict) or 'spawn' not in asset_dict: + if not isinstance(asset_dict, dict) or "spawn" not in asset_dict: return asset_dict - - spawn_dict = asset_dict.get('spawn') + + spawn_dict = asset_dict.get("spawn") if spawn_dict is None or not isinstance(spawn_dict, dict): return asset_dict - + # Convert nested config dicts to proper config objects spawn_dict_copy = spawn_dict.copy() - + # Handle rigid_props - if 'rigid_props' in spawn_dict_copy and isinstance(spawn_dict_copy['rigid_props'], dict): + if "rigid_props" in spawn_dict_copy and isinstance(spawn_dict_copy["rigid_props"], dict): rigid_cfg = RigidBodyPropertiesCfg() - _populate_from_dict(rigid_cfg, spawn_dict_copy['rigid_props']) - spawn_dict_copy['rigid_props'] = rigid_cfg - + _populate_from_dict(rigid_cfg, spawn_dict_copy["rigid_props"]) + spawn_dict_copy["rigid_props"] = rigid_cfg + # Handle collision_props - if 'collision_props' in spawn_dict_copy and isinstance(spawn_dict_copy['collision_props'], dict): + if "collision_props" in spawn_dict_copy and isinstance(spawn_dict_copy["collision_props"], dict): collision_cfg = CollisionPropertiesCfg() - _populate_from_dict(collision_cfg, spawn_dict_copy['collision_props']) - spawn_dict_copy['collision_props'] = collision_cfg - + _populate_from_dict(collision_cfg, spawn_dict_copy["collision_props"]) + spawn_dict_copy["collision_props"] = collision_cfg + # Handle mass_props - if 'mass_props' in spawn_dict_copy and isinstance(spawn_dict_copy['mass_props'], dict): + if "mass_props" in spawn_dict_copy and isinstance(spawn_dict_copy["mass_props"], dict): mass_cfg = MassPropertiesCfg() - _populate_from_dict(mass_cfg, spawn_dict_copy['mass_props']) - spawn_dict_copy['mass_props'] = mass_cfg - + _populate_from_dict(mass_cfg, spawn_dict_copy["mass_props"]) + spawn_dict_copy["mass_props"] = mass_cfg + # Handle articulation_props - if 'articulation_props' in spawn_dict_copy and isinstance(spawn_dict_copy['articulation_props'], dict): + if "articulation_props" in spawn_dict_copy and isinstance(spawn_dict_copy["articulation_props"], dict): articulation_cfg = ArticulationRootPropertiesCfg() - _populate_from_dict(articulation_cfg, spawn_dict_copy['articulation_props']) - spawn_dict_copy['articulation_props'] = articulation_cfg - + _populate_from_dict(articulation_cfg, spawn_dict_copy["articulation_props"]) + spawn_dict_copy["articulation_props"] = articulation_cfg + # Handle visual_material - if 'visual_material' in spawn_dict_copy and isinstance(spawn_dict_copy['visual_material'], dict): + if "visual_material" in spawn_dict_copy and isinstance(spawn_dict_copy["visual_material"], dict): visual_mat_cfg = VisualMaterialCfg() - _populate_from_dict(visual_mat_cfg, spawn_dict_copy['visual_material']) - spawn_dict_copy['visual_material'] = visual_mat_cfg - + _populate_from_dict(visual_mat_cfg, spawn_dict_copy["visual_material"]) + spawn_dict_copy["visual_material"] = visual_mat_cfg + + # Check if this is a camera spawn config (has spawn_camera func) + func_str = spawn_dict_copy.get("func", "") + is_camera_spawn = "spawn_camera" in str(func_str) + + # Remove fields that are invalid for camera spawn configs + if is_camera_spawn: + INVALID_CAMERA_SPAWN_FIELDS = { + "activate_contact_sensors", + "activateContactSensors", + "rigid_props", + "articulation_props", + "collision_props", + } + for field in INVALID_CAMERA_SPAWN_FIELDS: + spawn_dict_copy.pop(field, None) + # Create the UsdFileCfg with converted nested configs spawn_cfg = UsdFileCfg() _populate_from_dict(spawn_cfg, spawn_dict_copy) - + # Create a copy of asset_dict with the converted spawn result = asset_dict.copy() - result['spawn'] = spawn_cfg + result["spawn"] = spawn_cfg return result -def _convert_scene_entity_dicts(params_dict): - """Convert dictionaries that match Pose or SceneEntityCfg patterns to actual objects. - - Handles conversion of: +def _convert_term_params_to_objects(params_dict): + """Convert term parameter dictionaries to typed objects (Pose, SceneEntityCfg). + + Event and termination functions expect typed parameters, not plain dicts. + This function pattern-matches parameter dictionaries and converts them to proper types: - Pose objects (dicts with 'position_xyz' and 'rotation_wxyz') - SceneEntityCfg objects (dicts with 'name' and 'joint_names') - + + Note: + If the parameter is not a Pose or SceneEntityCfg, it is returned as is. Args: params_dict: Dictionary of parameters that may contain Pose or SceneEntityCfg dicts - + Returns: Dictionary with Pose/SceneEntityCfg dicts converted to proper objects """ result = {} for key, value in params_dict.items(): # Check if this dict looks like a Pose (has position_xyz and rotation_wxyz) - if isinstance(value, dict) and 'position_xyz' in value and 'rotation_wxyz' in value: + if isinstance(value, dict) and "position_xyz" in value and "rotation_wxyz" in value: # Convert to Pose result[key] = Pose(**value) # Check if this dict looks like a SceneEntityCfg (has 'name' and other typical fields) - elif isinstance(value, dict) and 'name' in value and 'joint_names' in value: + elif isinstance(value, dict) and "name" in value and "joint_names" in value: # Convert to SceneEntityCfg result[key] = SceneEntityCfg(**value) else: + # e.g. isaaclab_observation_config: params: link_name: "right_wrist_yaw_link" + # This is not a Pose or SceneEntityCfg, so we just return the value as is result[key] = value - + return result -def _create_config_from_class_type(item_name, item_dict, context='item'): +def _create_config_from_class_type(item_name, item_dict, context="item"): """Create a config object from a dict with class_type field. - + + E.g.for actuator: + isaaclab.actuators.actuator_pd:ImplicitActuator -> isaaclab.actuators.actuator_cfg:ImplicitActuatorCfg + E.g.for asset: + isaaclab.assets.rigid_object.rigid_object:RigidObject -> isaaclab.assets.rigid_object_cfg:RigidObjectCfg + E.g.for metric in recorder manager config: + isaaclab_arena.metrics.success_rate:SuccessRecorder -> isaaclab_arena.metrics.success_rate_cfg:SuccessRecorderCfg + Args: item_name: Name of the item item_dict: Dictionary containing class_type and other fields context: Context for error messages (e.g., 'asset', 'recorder term') - + Returns: Configured object matching the class_type """ - class_type = item_dict.get('class_type') + class_type = item_dict.get("class_type") if class_type is None: raise ValueError(f"{context} '{item_name}' has no class_type") - + # Get class_type string if not isinstance(class_type, str): class_type_str = f"{class_type.__module__}:{class_type.__name__}" else: class_type_str = class_type - + # Try to find and instantiate the config class config_patterns = _get_config_class_patterns(class_type_str) - + for config_class_str in config_patterns: try: + # class type conversion config_class = string_to_callable(config_class_str) config_obj = config_class() + # populate data fields _populate_from_dict(config_obj, item_dict, skip_conversion=True) return config_obj except (ImportError, AttributeError, ValueError): continue - + # All patterns failed raise ValueError( f"Failed to find config class for {context} '{item_name}' with class_type '{class_type_str}'. " @@ -736,7 +816,9 @@ def _create_config_from_class_type(item_name, item_dict, context='item'): ) -def _create_asset_config(asset_name: str, asset_dict: dict[str, Any]) -> AssetBaseCfg | RigidObjectCfg | ArticulationCfg: +def _create_asset_config( + asset_name: str, asset_dict: dict[str, Any] +) -> AssetBaseCfg | RigidObjectCfg | ArticulationCfg: """Create a single asset config object from dictionary. If no class_type, it's a AssetBaseCfg. If class_type is provided, it could be RigidObjectCfg, ArticulationCfg, AssetBaseCfg. Assets all come with @@ -756,14 +838,14 @@ def _create_asset_config(asset_name: str, asset_dict: dict[str, Any]) -> AssetBa asset_dict = _convert_spawn_to_config(asset_dict) # If no class_type, it's a BASE asset - if asset_dict.get('class_type') is None: + if asset_dict.get("class_type") is None: asset_cfg = AssetBaseCfg() # string callables have been converted to callables by now, only poulating the items _populate_from_dict(asset_cfg, asset_dict, skip_conversion=True) return asset_cfg # Otherwise, use class_type to create the appropriate config - return _create_config_from_class_type(asset_name, asset_dict, 'asset') + return _create_config_from_class_type(asset_name, asset_dict, "asset") def _create_scene_config(scene_dict: dict[str, Any]): @@ -801,7 +883,7 @@ def _create_scene_config(scene_dict: dict[str, Any]): # Create dynamic SceneCfg if asset_instances: asset_fields = [(name, type(inst), inst) for name, inst in asset_instances.items()] - SceneCfg = make_configclass('SceneCfg', asset_fields, bases=(InteractiveSceneCfg,)) + SceneCfg = make_configclass("SceneCfg", asset_fields, bases=(InteractiveSceneCfg,)) scene_cfg = SceneCfg() # Populate base fields @@ -835,16 +917,16 @@ def _create_recorders_config(recorders_dict: dict[str, Any]): recorder_term_instances = {} # recorder manager config contains metrics listed in the recorder manager config if any for term_name, term_dict in dynamic_terms_dict.items(): - if isinstance(term_dict, dict) and 'class_type' in term_dict and term_dict['class_type'] is not None: + if isinstance(term_dict, dict) and "class_type" in term_dict and term_dict["class_type"] is not None: term_dict = _convert_funcs_in_dict(term_dict) - recorder_term_instances[term_name] = _create_config_from_class_type(term_name, term_dict, 'recorder term') + recorder_term_instances[term_name] = _create_config_from_class_type(term_name, term_dict, "recorder term") else: raise ValueError(f"Recorder term '{term_name}' is not a dictionary with a 'class_type' field") # Create dynamic RecorderManagerCfg if recorder_term_instances: recorder_fields = [(name, type(inst), inst) for name, inst in recorder_term_instances.items()] - RecorderManagerCfg = make_configclass('RecorderManagerCfg', recorder_fields, bases=(RecorderManagerBaseCfg,)) + RecorderManagerCfg = make_configclass("RecorderManagerCfg", recorder_fields, bases=(RecorderManagerBaseCfg,)) recorder_cfg = RecorderManagerCfg() # Populate base fields @@ -856,102 +938,116 @@ def _create_recorders_config(recorders_dict: dict[str, Any]): def _create_actions_config(actions_dict: dict[str, Any]): """Create actions config with dynamic terms. - + Args: actions_dict: Dictionary containing action terms - + Returns: Dynamic ActionsCfg with all terms properly configured """ + actions_cfg = None if not actions_dict: - return None - + return actions_cfg + # Create action term configs using class_type action_terms = {} for term_name, term_dict in actions_dict.items(): - if isinstance(term_dict, dict) and 'class_type' in term_dict and term_dict['class_type'] is not None: + if isinstance(term_dict, dict) and "class_type" in term_dict and term_dict["class_type"] is not None: term_dict = _convert_funcs_in_dict(term_dict) - action_terms[term_name] = _create_config_from_class_type(term_name, term_dict, 'action term') + action_terms[term_name] = _create_config_from_class_type(term_name, term_dict, "action term") else: - action_terms[term_name] = term_dict - + raise ValueError(f"Action term '{term_name}' is not a dictionary with a 'class_type' field") + # Create dynamic ActionsCfg if action_terms: action_fields = [(name, type(term), term) for name, term in action_terms.items()] - return make_configclass('ActionsCfg', action_fields)() - - return None + ActionsCfg = make_configclass("ActionsCfg", action_fields) + actions_cfg = ActionsCfg() + + return actions_cfg -def _create_events_config(events_dict): - """Create events config with dynamic terms.""" +def _create_events_config(events_dict: dict[str, Any]): + """Create events config with dynamic terms. + + Args: + events_dict: Dictionary containing event terms + + Returns: + Dynamic EventsCfg with all terms properly configured + """ if not events_dict: return None - - return _create_dynamic_manager_config( - events_dict, - None, - EventTermCfg, - 'EventsCfg' - ) + return _create_dynamic_manager_config(events_dict, None, EventTermCfg, "EventsCfg") + + +def _create_observations_config(observations_dict: dict[str, Any]): + """Create observations config with proper structure. + + Args: + observations_dict: Dictionary containing observation groups + + Returns: + Dynamic ObservationCfg with all groups properly configured -def _create_observations_config(observations_dict): - """Create observations config with proper structure.""" + If no observation groups are listed, a default ObservationGroupCfg is returned. + If observation groups are listed, an ObservationCfg is returned with those observation groups added to the observation config. + """ if not observations_dict: return ObservationGroupCfg() - + # Get base fields of ObservationGroupCfg base_group_field_names = {f.name for f in fields(ObservationGroupCfg)} - + # Create observation groups obs_groups = {} for group_name, group_dict in observations_dict.items(): if not isinstance(group_dict, dict): - obs_groups[group_name] = group_dict - continue - + raise ValueError(f"Observation group '{group_name}' is not a dictionary") + # obs_groups[group_name] = group_dict + # continue + # Separate base attributes from observation terms group_base_attrs = {k: v for k, v in group_dict.items() if k in base_group_field_names} obs_terms_dict = {k: v for k, v in group_dict.items() if k not in base_group_field_names} - + # Create observation term configs obs_term_instances = {} for term_name, term_dict in obs_terms_dict.items(): - if isinstance(term_dict, dict) and 'func' in term_dict: + if isinstance(term_dict, dict) and "func" in term_dict: term_dict = _convert_funcs_in_dict(term_dict) - if 'params' in term_dict and isinstance(term_dict['params'], dict): - term_dict['params'] = _convert_scene_entity_dicts(term_dict['params']) + if "params" in term_dict and isinstance(term_dict["params"], dict): + term_dict["params"] = _convert_term_params_to_objects(term_dict["params"]) term_cfg = ObservationTermCfg() _populate_from_dict(term_cfg, term_dict, skip_conversion=True) obs_term_instances[term_name] = term_cfg - + # Create dynamic group config or use base if obs_term_instances: obs_term_fields = [(name, ObservationTermCfg, cfg) for name, cfg in obs_term_instances.items()] - ObsGroupCfg = make_configclass(f'{group_name.capitalize()}ObsGroupCfg', obs_term_fields, bases=(ObservationGroupCfg,)) + ObsGroupCfg = make_configclass( + f"{group_name.capitalize()}ObsGroupCfg", obs_term_fields, bases=(ObservationGroupCfg,) + ) obs_group_cfg = ObsGroupCfg() else: obs_group_cfg = ObservationGroupCfg() - + # Populate base attributes if group_base_attrs: _populate_from_dict(obs_group_cfg, group_base_attrs) - + obs_groups[group_name] = obs_group_cfg - + # Create dynamic ObservationCfg with all groups if obs_groups: obs_fields = [(name, type(group), group) for name, group in obs_groups.items()] - ObservationCfg = make_configclass('ObservationCfg', obs_fields) + ObservationCfg = make_configclass("ObservationCfg", obs_fields) return ObservationCfg() - + return ObservationGroupCfg() def _create_terminations_config(cfg, terminations_dict): """Create terminations config.""" - cfg.terminations = _create_dynamic_manager_config( - terminations_dict, None, TerminationTermCfg, 'TerminationCfg' - ) - + cfg.terminations = _create_dynamic_manager_config(terminations_dict, None, TerminationTermCfg, "TerminationCfg")