Source code for stable_worldmodel.envs.ogbench_cube

"""OGBench Cube manipulation environment with multiple task variants.

This module implements a robotic manipulation environment using cubes with various
task configurations ranging from single cube pick-and-place to complex multi-cube
stacking and rearrangement tasks. The environment supports visual variations including
object colors, sizes, lighting, and camera angles for robust policy learning.

The environment is built on top of the ManipSpaceEnv from OGBench and uses MuJoCo
for physics simulation. It provides both pixel-based and state-based observations,
with support for goal-conditioned learning and data collection modes.

Example:
    Basic usage of the cube environment::

        from stable_worldmodel.envs.ogbench_cube import CubeEnv

        # Create a double cube environment with pixel observations
        env = CubeEnv(env_type='double', ob_type='pixels', multiview=True)

        # Reset with variation sampling
        obs, info = env.reset(options={'variation': ['all']})

        # Run an episode
        for _ in range(100):
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            if info['success']:
                break
.. _OGBench:
   https://github.com/seohongpark/ogbench/
"""

import mujoco
import numpy as np
from dm_control import mjcf
from ogbench.manipspace import lie
from ogbench.manipspace.envs.manipspace_env import ManipSpaceEnv

import stable_worldmodel as swm

from .utils import perturb_camera_angle


[docs] class CubeEnv(ManipSpaceEnv): """Robotic manipulation environment with cube objects and multiple task variants. This environment provides a suite of manipulation tasks involving 1-8 colored cubes that must be moved to target positions. It supports various task types including pick-and-place, stacking, swapping, and cyclic rearrangement. The environment includes comprehensive variation spaces for visual domain randomization. The environment operates in two modes: - 'task': Goal-conditioned mode where the robot must achieve specific configurations - 'data_collection': Mode for collecting demonstrations with random targets Attributes: _env_type (str): Type of environment determining number of cubes. One of: 'single', 'double', 'triple', 'quadruple', 'octuple'. _num_cubes (int): Number of cubes in the environment (1, 2, 3, 4, or 8). _permute_blocks (bool): Whether to randomly permute cube order at task init. _multiview (bool): Whether to render from both front and side cameras. _ob_type (str): Observation type, either 'pixels' or state-based. _cube_colors (ndarray): Array of RGB colors for cubes. _target_task (str): Task type identifier, set to 'cube'. _target_block (int): Index of the target cube in data collection mode. variation_space (Dict): Hierarchical space defining variation ranges for: - cube: color (RGB), size (half-extents) - agent: color (RGB) - floor: color (two RGB values for checkerboard) - camera: angle_delta (yaw/pitch offsets) - light: intensity (diffuse lighting strength) task_infos (list): List of dictionaries defining task configurations, each containing 'task_name', 'init_xyzs', and 'goal_xyzs'. cameras (dict): Dictionary of camera configurations with position and orientation. _cube_geoms_list (list): MuJoCo geom objects for each cube. _cube_target_geoms_list (list): MuJoCo geom objects for target visualizations. _cube_geom_ids_list (list): MuJoCo geom IDs for each cube. _cube_target_mocap_ids (list): MuJoCo mocap body IDs for target positions. _cube_target_geom_ids_list (list): MuJoCo geom IDs for target visualizations. _success (bool): Whether the current task has been completed successfully. _cur_goal_ob (ndarray): Goal observation for goal-conditioned learning. _cur_goal_rendered (ndarray): Rendered image of goal state, if enabled. Note: Inherits from ManipSpaceEnv which provides the underlying robotic arm control, physics simulation, and base functionality. """
[docs] def __init__( self, env_type="single", ob_type="pixels", permute_blocks=True, multiview=False, height=224, width=224, *args, **kwargs, ): """Initialize the CubeEnv with specified configuration. Sets up the manipulation environment with the specified number of cubes and configures observation type, block permutation, and camera views. Initializes the variation space for visual domain randomization. Args: env_type (str): Environment type corresponding to number of cubes. Must be one of: 'single' (1 cube), 'double' (2 cubes), 'triple' (3 cubes), 'quadruple' (4 cubes), 'octuple' (8 cubes). ob_type (str, optional): Type of observation to return. Either 'pixels' for image observations or 'state' for proprioceptive/object states. Defaults to 'pixels'. permute_blocks (bool, optional): Whether to randomly shuffle the order of cubes at the start of each episode. Helps with generalization. Defaults to True. multiview (bool, optional): Whether to render the scene from both front and side camera views simultaneously. Returns stacked images when True. Defaults to False. *args: Variable length argument list passed to parent ManipSpaceEnv. **kwargs: Arbitrary keyword arguments passed to parent ManipSpaceEnv. Raises: ValueError: If env_type is not one of the supported values. Note: The variation_space is automatically configured with appropriate ranges for all visual variations including colors, sizes, lighting, and cameras. """ self._env_type = env_type self._permute_blocks = permute_blocks self._multiview = multiview if self._env_type == "single": self._num_cubes = 1 elif self._env_type == "double": self._num_cubes = 2 elif self._env_type == "triple": self._num_cubes = 3 elif self._env_type == "quadruple": self._num_cubes = 4 elif self._env_type == "octuple": self._num_cubes = 8 else: raise ValueError(f"Invalid env_type: {env_type}") super().__init__(*args, height=height, width=width, **kwargs) self._ob_type = ob_type # Define constants. self._cube_colors = np.array( [ self._colors["red"], self._colors["blue"], self._colors["orange"], self._colors["green"], self._colors["purple"], self._colors["yellow"], self._colors["magenta"], self._colors["gray"], ] ) self._cube_success_colors = np.array( [ self._colors["lightred"], self._colors["lightblue"], self._colors["lightorange"], self._colors["lightgreen"], self._colors["lightpurple"], self._colors["lightyellow"], self._colors["lightmagenta"], self._colors["white"], ] ) # Target info. self._target_task = "cube" # The target cube position is stored in the mocap object. self._target_block = 0 self.variation_space = swm.spaces.Dict( { "cube": swm.spaces.Dict( { # "num": swm.spaces.Discrete(), "color": swm.spaces.Box( low=0.0, high=1.0, shape=(self._num_cubes, 3), dtype=np.float64, init_value=self._cube_colors[: self._num_cubes, :3].copy(), ), "size": swm.spaces.Box( low=0.01, high=0.04, shape=(self._num_cubes,), dtype=np.float64, init_value=0.02 * np.ones((self._num_cubes,), dtype=np.float32), ), } # sampling_order=["num", "color", "size"] ), "agent": swm.spaces.Dict( { "color": swm.spaces.Box( low=0.0, high=1.0, shape=(3,), dtype=np.float64, init_value=self._colors["purple"][:3], ), } ), "floor": swm.spaces.Dict( { "color": swm.spaces.Box( low=0.0, high=1.0, shape=(2, 3), dtype=np.float64, init_value=np.array([[0.08, 0.11, 0.16], [0.15, 0.18, 0.25]]), ), } ), "camera": swm.spaces.Dict( { "angle_delta": swm.spaces.Box( low=-10.0, high=10.0, shape=(2, 2) if self._multiview else (1, 2), dtype=np.float64, init_value=np.zeros([2, 2]) if self._multiview else np.zeros([1, 2]), ), } ), "light": swm.spaces.Dict( { "intensity": swm.spaces.Box( low=0.0, high=1.0, shape=(1,), dtype=np.float64, init_value=[0.6], ), } ), } )
[docs] def set_tasks(self): """Define all task configurations for the environment. Initializes the task_infos list with predefined manipulation tasks appropriate for the current env_type. Each task specifies initial and goal positions for all cubes. Tasks increase in complexity from simple pick-and-place to multi-object stacking and cyclic rearrangements. Task types by environment: - single: 5 tasks (horizontal, vertical movements, diagonals) - double: 5 tasks (single/double pick-place, swap, stack) - triple: 5 tasks (single/triple pick-place, unstack, cycle, stack) - quadruple: 5 tasks (double/quad pick-place, unstack, cycle, stack) - octuple: 5 tasks (quad/octuple pick-place, unstacking, stacking) Note: Also sets the default reward_task_id to 2 if not already configured. All positions are in MuJoCo world coordinates (x, y, z) in meters. """ if self._env_type == "single": self.task_infos = [ { "task_name": "task1_horizontal", "init_xyzs": np.array([[0.425, 0.1, 0.02]]), "goal_xyzs": np.array([[0.425, -0.1, 0.02]]), }, { "task_name": "task2_vertical1", "init_xyzs": np.array([[0.35, 0.0, 0.02]]), "goal_xyzs": np.array([[0.50, 0.0, 0.02]]), }, { "task_name": "task3_vertical2", "init_xyzs": np.array([[0.50, 0.0, 0.02]]), "goal_xyzs": np.array([[0.35, 0.0, 0.02]]), }, { "task_name": "task4_diagonal1", "init_xyzs": np.array([[0.35, -0.2, 0.02]]), "goal_xyzs": np.array([[0.50, 0.2, 0.02]]), }, { "task_name": "task5_diagonal2", "init_xyzs": np.array([[0.35, 0.2, 0.02]]), "goal_xyzs": np.array([[0.50, -0.2, 0.02]]), }, ] elif self._env_type == "double": self.task_infos = [ { "task_name": "task1_single_pnp", "init_xyzs": np.array( [ [0.425, 0.0, 0.02], [0.425, -0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, 0.0, 0.02], [0.425, 0.1, 0.02], ] ), }, { "task_name": "task2_double_pnp1", "init_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.50, -0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.35, 0.1, 0.02], [0.50, 0.1, 0.02], ] ), }, { "task_name": "task3_double_pnp2", "init_xyzs": np.array( [ [0.35, 0.0, 0.02], [0.50, 0.0, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, -0.2, 0.02], [0.425, 0.2, 0.02], ] ), }, { "task_name": "task4_swap", "init_xyzs": np.array( [ [0.425, -0.1, 0.02], [0.425, 0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, 0.1, 0.02], [0.425, -0.1, 0.02], ] ), }, { "task_name": "task5_stack", "init_xyzs": np.array( [ [0.425, -0.2, 0.02], [0.425, 0.2, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, 0.0, 0.02], [0.425, 0.0, 0.06], ] ), }, ] elif self._env_type == "triple": self.task_infos = [ { "task_name": "task1_single_pnp", "init_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.35, 0.1, 0.02], [0.50, -0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.35, 0.1, 0.02], [0.50, 0.1, 0.02], ] ), }, { "task_name": "task2_triple_pnp", "init_xyzs": np.array( [ [0.35, -0.2, 0.02], [0.35, 0.0, 0.02], [0.35, 0.2, 0.02], ] ), "goal_xyzs": np.array( [ [0.50, 0.0, 0.02], [0.50, 0.2, 0.02], [0.50, -0.2, 0.02], ] ), }, { "task_name": "task3_pnp_from_stack", "init_xyzs": np.array( [ [0.425, 0.2, 0.02], [0.425, 0.2, 0.06], [0.425, 0.2, 0.10], ] ), "goal_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.50, -0.2, 0.02], [0.50, 0.0, 0.02], ] ), }, { "task_name": "task4_cycle", "init_xyzs": np.array( [ [0.35, 0.0, 0.02], [0.50, -0.1, 0.02], [0.50, 0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.50, -0.1, 0.02], [0.50, 0.1, 0.02], [0.35, 0.0, 0.02], ] ), }, { "task_name": "task5_stack", "init_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.50, -0.2, 0.02], [0.50, 0.0, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, 0.2, 0.02], [0.425, 0.2, 0.06], [0.425, 0.2, 0.10], ] ), }, ] elif self._env_type == "quadruple": self.task_infos = [ { "task_name": "task1_double_pnp", "init_xyzs": np.array( [ [0.35, -0.1, 0.02], [0.35, 0.1, 0.02], [0.50, -0.1, 0.02], [0.50, 0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.35, -0.25, 0.02], [0.35, 0.1, 0.02], [0.50, -0.1, 0.02], [0.50, 0.25, 0.02], ] ), }, { "task_name": "task2_quadruple_pnp", "init_xyzs": np.array( [ [0.325, -0.2, 0.02], [0.325, 0.2, 0.02], [0.525, -0.2, 0.02], [0.525, 0.2, 0.02], ] ), "goal_xyzs": np.array( [ [0.375, 0.1, 0.02], [0.475, 0.1, 0.02], [0.375, -0.1, 0.02], [0.475, -0.1, 0.02], ] ), }, { "task_name": "task3_pnp_from_square", "init_xyzs": np.array( [ [0.425, -0.02, 0.02], [0.425, 0.02, 0.02], [0.425, -0.02, 0.06], [0.425, 0.02, 0.06], ] ), "goal_xyzs": np.array( [ [0.525, -0.2, 0.02], [0.325, 0.2, 0.02], [0.325, -0.2, 0.02], [0.525, 0.2, 0.02], ] ), }, { "task_name": "task4_cycle", "init_xyzs": np.array( [ [0.525, -0.1, 0.02], [0.525, 0.1, 0.02], [0.325, 0.1, 0.02], [0.325, -0.1, 0.02], ] ), "goal_xyzs": np.array( [ [0.525, 0.1, 0.02], [0.325, 0.1, 0.02], [0.325, -0.1, 0.02], [0.525, -0.1, 0.02], ] ), }, { "task_name": "task5_stack", "init_xyzs": np.array( [ [0.50, -0.05, 0.02], [0.50, -0.2, 0.02], [0.35, -0.2, 0.02], [0.35, -0.05, 0.02], ] ), "goal_xyzs": np.array( [ [0.425, 0.2, 0.02], [0.425, 0.2, 0.06], [0.425, 0.2, 0.10], [0.425, 0.2, 0.14], ] ), }, ] elif self._env_type == "octuple": self.task_infos = [ { "task_name": "task1_quadruple_pnp", "init_xyzs": np.array( [ [0.325, -0.15, 0.02], [0.425, -0.15, 0.02], [0.425, -0.05, 0.02], [0.525, -0.05, 0.02], [0.325, 0.05, 0.02], [0.425, 0.05, 0.02], [0.425, 0.15, 0.02], [0.525, 0.15, 0.02], ] ), "goal_xyzs": np.array( [ [0.525, 0.05, 0.02], [0.425, -0.15, 0.02], [0.425, -0.05, 0.02], [0.325, 0.15, 0.02], [0.525, -0.15, 0.02], [0.425, 0.05, 0.02], [0.425, 0.15, 0.02], [0.325, -0.05, 0.02], ] ), }, { "task_name": "task2_octuple_pnp1", "init_xyzs": np.array( [ [0.40, -0.15, 0.02], [0.40, -0.05, 0.02], [0.40, 0.05, 0.02], [0.40, 0.15, 0.02], [0.45, -0.15, 0.02], [0.45, -0.05, 0.02], [0.45, 0.05, 0.02], [0.45, 0.15, 0.02], ] ), "goal_xyzs": np.array( [ [0.525, 0.2, 0.02], [0.525, 0.0, 0.02], [0.525, -0.2, 0.02], [0.425, -0.2, 0.02], [0.325, -0.2, 0.02], [0.325, 0.0, 0.02], [0.325, 0.2, 0.02], [0.425, 0.2, 0.02], ] ), }, { "task_name": "task3_octuple_pnp2", "init_xyzs": np.array( [ [0.32, -0.15, 0.02], [0.32, 0.05, 0.02], [0.39, -0.05, 0.02], [0.39, 0.15, 0.02], [0.46, -0.15, 0.02], [0.46, 0.05, 0.02], [0.53, -0.05, 0.02], [0.53, 0.15, 0.02], ] ), "goal_xyzs": np.array( [ [0.32, 0.15, 0.02], [0.46, 0.15, 0.02], [0.39, 0.05, 0.02], [0.53, 0.05, 0.02], [0.32, -0.05, 0.02], [0.46, -0.05, 0.02], [0.39, -0.15, 0.02], [0.53, -0.15, 0.02], ] ), }, { "task_name": "task4_stack1", "init_xyzs": np.array( [ [0.40, -0.025, 0.02], [0.40, -0.025, 0.06], [0.40, 0.025, 0.02], [0.40, 0.025, 0.06], [0.45, -0.025, 0.02], [0.45, -0.025, 0.06], [0.45, 0.025, 0.02], [0.45, 0.025, 0.06], ] ), "goal_xyzs": np.array( [ [0.525, 0.05, 0.02], [0.525, -0.05, 0.02], [0.425, -0.15, 0.06], [0.425, -0.15, 0.02], [0.425, 0.15, 0.06], [0.425, 0.15, 0.02], [0.325, -0.05, 0.02], [0.325, 0.05, 0.02], ] ), }, { "task_name": "task5_stack2", "init_xyzs": np.array( [ [0.50, 0.2, 0.06], [0.50, 0.2, 0.02], [0.50, -0.2, 0.06], [0.50, -0.2, 0.02], [0.35, -0.2, 0.06], [0.35, -0.2, 0.02], [0.35, 0.2, 0.06], [0.35, 0.2, 0.02], ] ), "goal_xyzs": np.array( [ [0.325, 0.0, 0.02], [0.325, 0.0, 0.06], [0.425, 0.2, 0.02], [0.425, 0.2, 0.06], [0.525, 0.0, 0.02], [0.525, 0.0, 0.06], [0.425, -0.2, 0.02], [0.425, -0.2, 0.06], ] ), }, ] if self._reward_task_id == 0: self._reward_task_id = 2 # Default task.
[docs] def reset(self, seed=None, options=None, *args, **kwargs): """Reset the environment to an initial state. Resets the environment and optionally samples from the variation space to create visual diversity. Handles both task mode (with predefined goals) and data collection mode (with random targets). Args: options (dict, optional): Dictionary of reset options. Supported keys: - 'variation': List/tuple of variation names to sample. Use ['all'] to sample all variations, or specify individual ones like ['cube.color', 'light.intensity']. Defaults to None (no variation). *args: Variable length argument list passed to parent reset. **kwargs: Arbitrary keyword arguments passed to parent reset. Returns: tuple: (observation, info) where: - observation: Current observation based on ob_type configuration - info: Dictionary containing reset information and task state Raises: AssertionError: If variation option is not a list/tuple, or if variation values are outside their defined spaces. Example: Reset with all variations enabled:: obs, info = env.reset(options={'variation': ['all']}) Reset with specific variations:: obs, info = env.reset(options={'variation': ['cube.color', 'camera.angle_delta']}) """ if hasattr(self, "variation_space"): self.variation_space.seed(seed) options = options or {} self.variation_options = options.get("variation", {}) self.variation_space.reset() if "variation" in options: assert isinstance(options["variation"], list | tuple), ( "variation option must be a list or tuple containing variation names to sample" ) if len(options["variation"]) == 1 and options["variation"][0] == "all": self.variation_space.sample() else: self.variation_space.update(set(options["variation"])) assert self.variation_space.check(debug=True), "Variation values must be within variation space!" return super().reset(seed=seed, options=options, *args, **kwargs)
[docs] def add_objects(self, arena_mjcf): """Add cube objects and cameras to the MuJoCo scene. Constructs the manipulation scene by loading cube XML descriptions and positioning them appropriately. Sets up multiple camera viewpoints for rendering observations. Args: arena_mjcf (mjcf.RootElement): The MuJoCo XML root element representing the arena where objects and cameras will be added. Note: - Cubes are positioned with 0.05m spacing along the y-axis - Each cube has both a physical object and a semi-transparent target marker - Three cameras are added: 'front', 'front_pixels', and 'side_pixels' - All cube geoms are stored for later color and property modifications """ # Add cube scene. cube_outer_mjcf = mjcf.from_path((self._desc_dir / "cube_outer.xml").as_posix()) arena_mjcf.include_copy(cube_outer_mjcf) # Add `num_cubes` cubes to the scene. distance = 0.05 for i in range(self._num_cubes): cube_mjcf = mjcf.from_path((self._desc_dir / "cube_inner.xml").as_posix()) pos = -distance * (self._num_cubes - 1) + 2 * distance * i cube_mjcf.find("body", "object_0").pos[1] = pos cube_mjcf.find("body", "object_target_0").pos[1] = pos for tag in ["body", "joint", "geom", "site"]: for item in cube_mjcf.find_all(tag): if hasattr(item, "name") and item.name is not None and item.name.endswith("_0"): item.name = item.name[:-2] + f"_{i}" arena_mjcf.include_copy(cube_mjcf) # Save cube geoms. self._cube_geoms_list = [] for i in range(self._num_cubes): self._cube_geoms_list.append(arena_mjcf.find("body", f"object_{i}").find_all("geom")) self._cube_target_geoms_list = [] for i in range(self._num_cubes): self._cube_target_geoms_list.append(arena_mjcf.find("body", f"object_target_{i}").find_all("geom")) # Add cameras. self.cameras = { "front": { "pos": (1.287, 0.000, 0.509), "xyaxes": (0.000, 1.000, 0.000, -0.342, 0.000, 0.940), }, "front_pixels": { "pos": (1.053, -0.014, 0.639), "xyaxes": (0.000, 1.000, 0.000, -0.628, 0.001, 0.778), }, "side_pixels": { "pos": (0.414, -0.753, 0.639), "xyaxes": (1.000, 0.000, 0.000, -0.001, 0.628, 0.778), }, } for camera_name, camera_kwargs in self.cameras.items(): arena_mjcf.worldbody.add("camera", name=camera_name, **camera_kwargs)
[docs] def post_compilation_objects(self): """Extract MuJoCo object IDs after model compilation. Retrieves and stores the integer IDs for all cube geoms, target mocap bodies, and target geoms. These IDs are used for efficient access during simulation and rendering. Note: Must be called after the MuJoCo model has been compiled. IDs are needed for direct manipulation of model properties like colors and positions. """ # Cube geom IDs. self._cube_geom_ids_list = [ [self._model.geom(geom.full_identifier).id for geom in cube_geoms] for cube_geoms in self._cube_geoms_list ] self._cube_target_mocap_ids = [ self._model.body(f"object_target_{i}").mocapid[0] for i in range(self._num_cubes) ] self._cube_target_geom_ids_list = [ [self._model.geom(geom.full_identifier).id for geom in cube_target_geoms] for cube_target_geoms in self._cube_target_geoms_list ]
[docs] def modify_mjcf_model(self, mjcf_model): """Apply visual variations to the MuJoCo model based on variation space. Modifies the MJCF model XML to apply sampled variations including floor colors, robot arm colors, cube sizes, camera angles, and lighting. Only variations that are enabled in the variation_options are applied. Args: mjcf_model (mjcf.RootElement): The MuJoCo XML model to modify. Returns: mjcf.RootElement: The modified model with variations applied. Note: - Variations are only applied if specified in variation_options during reset - Some variations (size, light) call self.mark_dirty() to trigger recompilation - Camera angle perturbations use the perturb_camera_angle helper function """ if "all" in self.variation_options or "floor.color" in self.variation_options: # Modify floor color grid_texture = mjcf_model.find("texture", "grid") grid_texture.rgb1 = self.variation_space["floor"]["color"].value[0] grid_texture.rgb2 = self.variation_space["floor"]["color"].value[1] if "all" in self.variation_options or "agent.color" in self.variation_options: # Modify arm color mjcf_model.find("material", "ur5e/robotiq/black").rgba[:3] = self.variation_space["agent"]["color"].value mjcf_model.find("material", "ur5e/robotiq/pad_gray").rgba[:3] = self.variation_space["agent"][ "color" ].value if "all" in self.variation_options or "cube.size" in self.variation_options: # Modify cube size based on variation space for i in range(self._num_cubes): # Regular cubes body = mjcf_model.find("body", f"object_{i}") if body: for geom in body.find_all("geom"): geom.size = self.variation_space["cube"]["size"].value[i] * np.ones( (3), dtype=np.float32 ) # half-extents (x, y, z) # Target cubes (if any) target_body = mjcf_model.find("body", f"object_target_{i}") if target_body: for geom in target_body.find_all("geom"): geom.size = self.variation_space["cube"]["size"].value[i] * np.ones((3), dtype=np.float32) self.mark_dirty() if "all" in self.variation_options or "camera.angle_delta" in self.variation_options: # Perturb camera angle cameras_to_vary = ["front_pixels", "side_pixels"] if self._multiview else ["front_pixels"] for i, cam_name in enumerate(cameras_to_vary): cam = mjcf_model.find("camera", cam_name) cam.xyaxes = perturb_camera_angle( self.cameras[cam_name]["xyaxes"], self.variation_space["camera"]["angle_delta"].value[i] ) if "all" in self.variation_options or "light.intensity" in self.variation_options: # Modify light intensity light = mjcf_model.find("light", "global") light.diffuse = self.variation_space["light"]["intensity"].value[0] * np.ones((3), dtype=np.float32) self.mark_dirty() return mjcf_model
[docs] def initialize_episode(self): """Initialize the environment state at the start of an episode. Sets up cube colors, arm position, and object placements based on the current mode (task or data_collection). In task mode, creates goal observations and places cubes according to the current task definition. In data collection mode, randomizes cube placements and sets a random target. The initialization process: 1. Apply cube colors from variation space 2. Reset arm to home position 3. In task mode: Set cubes to task-specific positions, generate goal observation 4. In data_collection mode: Randomize cube positions and orientations 5. Run forward kinematics to stabilize the scene Note: - In task mode, goal observation is computed by first placing cubes at goal positions, rendering/observing, then resetting to initial positions - Small random perturbations (±0.01m) are added to initial positions - Random yaw rotations (0-2π) are applied to all cubes """ # Set cube colors. for i in range(self._num_cubes): for gid in self._cube_geom_ids_list[i]: self._model.geom(gid).rgba[:3] = self.variation_space["cube"]["color"].value[i] self._model.geom(gid).rgba[3] = 1.0 for gid in self._cube_target_geom_ids_list[i]: self._model.geom(gid).rgba[:3] = self.variation_space["cube"]["color"].value[i] self._data.qpos[self._arm_joint_ids] = self._home_qpos mujoco.mj_kinematics(self._model, self._data) if self._mode == "data_collection": # Randomize the scene. self.initialize_arm() # Randomize object positions and orientations. for i in range(self._num_cubes): xy = self.np_random.uniform(*self._object_sampling_bounds) obj_pos = (*xy, 0.02) yaw = self.np_random.uniform(0, 2 * np.pi) obj_ori = lie.SO3.from_z_radians(yaw).wxyz.tolist() self._data.joint(f"object_joint_{i}").qpos[:3] = obj_pos self._data.joint(f"object_joint_{i}").qpos[3:] = obj_ori # Set a new target. self.set_new_target(return_info=False) else: # Set object positions and orientations based on the current task. if self._permute_blocks: # Randomize the order of the cubes when there are multiple cubes. permutation = self.np_random.permutation(self._num_cubes) else: permutation = np.arange(self._num_cubes) init_xyzs = self.cur_task_info["init_xyzs"].copy()[permutation] goal_xyzs = self.cur_task_info["goal_xyzs"].copy()[permutation] # First, force set the current scene to the goal state to obtain the goal observation. saved_qpos = self._data.qpos.copy() saved_qvel = self._data.qvel.copy() self.initialize_arm() for i in range(self._num_cubes): self._data.joint(f"object_joint_{i}").qpos[:3] = goal_xyzs[i] self._data.joint(f"object_joint_{i}").qpos[3:] = lie.SO3.identity().wxyz.tolist() self._data.mocap_pos[self._cube_target_mocap_ids[i]] = goal_xyzs[i] self._data.mocap_quat[self._cube_target_mocap_ids[i]] = lie.SO3.identity().wxyz.tolist() mujoco.mj_forward(self._model, self._data) # Do a few random steps to make the scene stable. for _ in range(2): self.step(self.action_space.sample()) # Save the goal observation. self._cur_goal_ob = ( self.compute_oracle_observation() if self._use_oracle_rep else self.compute_observation() ) if self._render_goal: self._cur_goal_rendered = self.render() else: self._cur_goal_rendered = None # Now, do the actual reset. self._data.qpos[:] = saved_qpos self._data.qvel[:] = saved_qvel self.initialize_arm() for i in range(self._num_cubes): # Randomize the position and orientation of the cube slightly. obj_pos = init_xyzs[i].copy() obj_pos[:2] += self.np_random.uniform(-0.01, 0.01, size=2) self._data.joint(f"object_joint_{i}").qpos[:3] = obj_pos yaw = self.np_random.uniform(0, 2 * np.pi) obj_ori = lie.SO3.from_z_radians(yaw).wxyz.tolist() self._data.joint(f"object_joint_{i}").qpos[3:] = obj_ori self._data.mocap_pos[self._cube_target_mocap_ids[i]] = goal_xyzs[i] self._data.mocap_quat[self._cube_target_mocap_ids[i]] = lie.SO3.identity().wxyz.tolist() # Forward kinematics to update site positions. self.pre_step() mujoco.mj_forward(self._model, self._data) self.post_step() self._success = False
[docs] def set_new_target(self, return_info=True, p_stack=0.5): """Set a new random target for data collection mode. Randomly selects one of the "top" cubes (not stacked under another) as the target and assigns it a random goal position. The goal can be either a flat surface position or stacked on top of another cube. Args: return_info (bool, optional): Whether to return the observation and reset info after setting the new target. Defaults to True. p_stack (float, optional): Probability of setting the target to stack on top of another block (when multiple blocks are available). Must be in range [0, 1]. Defaults to 0.5. Returns: tuple or None: If return_info is True, returns (observation, reset_info). Otherwise returns None. Raises: AssertionError: If called when mode is not 'data_collection'. Note: - Only cubes that are not underneath other cubes can be selected as targets - Target markers are made visible for the selected cube, invisible for others - Stacking targets are positioned 0.04m above the base cube's z-position - Non-stacking targets are randomly sampled from the target sampling bounds """ assert self._mode == "data_collection" block_xyzs = np.array([self._data.joint(f"object_joint_{i}").qpos[:3] for i in range(self._num_cubes)]) # Compute the top blocks. top_blocks = [] for i in range(self._num_cubes): for j in range(self._num_cubes): if i == j: continue if ( block_xyzs[j][2] > block_xyzs[i][2] and np.linalg.norm(block_xyzs[i][:2] - block_xyzs[j][:2]) < 0.02 ): break else: top_blocks.append(i) # Pick one of the top cubes as the target. self._target_block = self.np_random.choice(top_blocks) stack = len(top_blocks) >= 2 and self.np_random.uniform() < p_stack if stack: # Stack the target block on top of another block. block_idx = self.np_random.choice(list(set(top_blocks) - {self._target_block})) block_pos = self._data.joint(f"object_joint_{block_idx}").qpos[:3] tar_pos = np.array([block_pos[0], block_pos[1], block_pos[2] + 0.04]) else: # Randomize target position. xy = self.np_random.uniform(*self._target_sampling_bounds) tar_pos = (*xy, 0.02) # Randomize target orientation. yaw = self.np_random.uniform(0, 2 * np.pi) tar_ori = lie.SO3.from_z_radians(yaw).wxyz.tolist() # Only show the target block. for i in range(self._num_cubes): if i == self._target_block: # Set the target position and orientation. self._data.mocap_pos[self._cube_target_mocap_ids[i]] = tar_pos self._data.mocap_quat[self._cube_target_mocap_ids[i]] = tar_ori else: # Move the non-target blocks out of the way. self._data.mocap_pos[self._cube_target_mocap_ids[i]] = (0, 0, -0.3) self._data.mocap_quat[self._cube_target_mocap_ids[i]] = lie.SO3.identity().wxyz.tolist() # Set the target colors. for i in range(self._num_cubes): if self._visualize_info and i == self._target_block: for gid in self._cube_target_geom_ids_list[i]: self._model.geom(gid).rgba[3] = 0.2 else: for gid in self._cube_target_geom_ids_list[i]: self._model.geom(gid).rgba[3] = 0.0 if return_info: return self.compute_observation(), self.get_reset_info()
def _compute_successes(self): """Compute success status for each cube. Checks whether each cube is within the success threshold distance of its corresponding target position. A cube is considered successful if the Euclidean distance to its target is ≤ 0.04m. Returns: list of bool: Boolean list where each element indicates whether the corresponding cube has reached its target position. Length equals the number of cubes in the environment. Note: Success threshold of 0.04m (40mm) allows for small positioning errors while ensuring cubes are substantially at their goals. """ cube_successes = [] for i in range(self._num_cubes): obj_pos = self._data.joint(f"object_joint_{i}").qpos[:3] tar_pos = self._data.mocap_pos[self._cube_target_mocap_ids[i]] if np.linalg.norm(obj_pos - tar_pos) <= 0.04: cube_successes.append(True) else: cube_successes.append(False) return cube_successes
[docs] def post_step(self): """Update environment state after each simulation step. Computes success status and adjusts target marker visibility based on the current mode. In task mode, all cube targets are evaluated. In data collection mode, only the designated target cube is evaluated. Updates: - self._success: Set to True if success conditions are met - Target geom alpha: Made visible (0.2) for relevant targets when visualization is enabled, invisible (0.0) otherwise Note: Success in task mode requires ALL cubes to reach their targets. Success in data collection mode requires only the target cube to succeed. """ # Check if the cubes are in the target positions. cube_successes = self._compute_successes() if self._mode == "data_collection": self._success = cube_successes[self._target_block] else: self._success = all(cube_successes) # Adjust the colors of the cubes based on success. for i in range(self._num_cubes): if self._visualize_info and (self._mode == "task" or i == self._target_block): for gid in self._cube_target_geom_ids_list[i]: self._model.geom(gid).rgba[3] = 0.2 else: for gid in self._cube_target_geom_ids_list[i]: self._model.geom(gid).rgba[3] = 0.0
[docs] def get_reset_info(self): """Get information dictionary at environment reset. Compiles observation info along with goal observations (in task mode) and success status to provide comprehensive reset information. Returns: dict: Dictionary containing: - All keys from compute_ob_info() (proprioception and object states) - 'goal': Goal observation (task mode only) - 'success': Boolean indicating current success status Note: Called after initialize_episode() to provide initial state information. """ reset_info = self.compute_ob_info() if self._mode == "task": reset_info["goal"] = self._cur_goal_ob reset_info["success"] = self._success return reset_info
[docs] def get_step_info(self): """Get information dictionary after each environment step. Compiles current observation info along with goal observations (in task mode) and success status to provide comprehensive step information. Returns: dict: Dictionary containing: - All keys from compute_ob_info() (proprioception and object states) - 'goal': Goal observation (task mode only) - 'success': Boolean indicating whether task is completed Note: Called after each step to provide feedback about current state and progress. """ ob_info = self.compute_ob_info() if self._mode == "task": ob_info["goal"] = self._cur_goal_ob ob_info["success"] = self._success return ob_info
[docs] def add_object_info(self, ob_info): """Add cube-specific information to the observation info dictionary. Augments the info dictionary with privileged state information about all cubes including positions, orientations, and target information (in data collection mode). Args: ob_info (dict): Observation info dictionary to augment. Modified in-place. Adds to ob_info: - 'privileged/block_{i}_pos': 3D position (x, y, z) of cube i - 'privileged/block_{i}_quat': Quaternion (w, x, y, z) of cube i - 'privileged/block_{i}_yaw': Yaw angle in radians of cube i - 'privileged/target_task': Task type string (data collection mode only) - 'privileged/target_block': Index of target cube (data collection mode only) - 'privileged/target_block_pos': Target position (data collection mode only) - 'privileged/target_block_yaw': Target yaw angle (data collection mode only) Note: All positions are in world coordinates. Quaternions use (w, x, y, z) format. Privileged information is typically not available to policies during deployment. """ # Cube positions and orientations. for i in range(self._num_cubes): ob_info[f"privileged/block_{i}_pos"] = self._data.joint(f"object_joint_{i}").qpos[:3].copy() ob_info[f"privileged/block_{i}_quat"] = self._data.joint(f"object_joint_{i}").qpos[3:].copy() ob_info[f"privileged/block_{i}_yaw"] = np.array( [lie.SO3(wxyz=self._data.joint(f"object_joint_{i}").qpos[3:]).compute_yaw_radians()] ) if self._mode == "data_collection": # Target cube info. ob_info["privileged/target_task"] = self._target_task target_mocap_id = self._cube_target_mocap_ids[self._target_block] ob_info["privileged/target_block"] = self._target_block ob_info["privileged/target_block_pos"] = self._data.mocap_pos[target_mocap_id].copy() ob_info["privileged/target_block_yaw"] = np.array( [lie.SO3(wxyz=self._data.mocap_quat[target_mocap_id]).compute_yaw_radians()] )
[docs] def compute_observation(self): """Compute the current observation based on observation type. Generates either pixel-based or state-based observations depending on the ob_type configuration. State observations include scaled proprioceptive and object state information. Returns: ndarray: Observation array. If ob_type is 'pixels', returns image array with shape (H, W, C) or (N, H, W, C) for multiview. If ob_type is not 'pixels', returns flattened state vector containing: - Arm joint positions (6D) and velocities (6D) - End-effector position (3D, scaled), yaw angle (2D: cos/sin) - Gripper opening (1D, scaled) and contact (binary) - For each cube: position (3D, scaled), quaternion (4D), yaw (2D: cos/sin) Note: State observations use a centering offset (0.425, 0.0, 0.0) and scaling factors (10.0 for positions, 3.0 for gripper) to normalize values. Yaw angles are encoded as (cos, sin) pairs for continuity. """ if self._ob_type == "pixels": return self.get_pixel_observation() else: xyz_center = np.array([0.425, 0.0, 0.0]) xyz_scaler = 10.0 gripper_scaler = 3.0 ob_info = self.compute_ob_info() ob = [ ob_info["proprio/joint_pos"], ob_info["proprio/joint_vel"], (ob_info["proprio/effector_pos"] - xyz_center) * xyz_scaler, np.cos(ob_info["proprio/effector_yaw"]), np.sin(ob_info["proprio/effector_yaw"]), ob_info["proprio/gripper_opening"] * gripper_scaler, ob_info["proprio/gripper_contact"], ] for i in range(self._num_cubes): ob.extend( [ (ob_info[f"privileged/block_{i}_pos"] - xyz_center) * xyz_scaler, ob_info[f"privileged/block_{i}_quat"], np.cos(ob_info[f"privileged/block_{i}_yaw"]), np.sin(ob_info[f"privileged/block_{i}_yaw"]), ] ) return np.concatenate(ob)
[docs] def compute_oracle_observation(self): """Compute oracle goal representation containing only cube positions. Returns a compact state representation containing only the positions of all cubes, useful for goal-conditioned learning where the goal is defined by object configurations rather than the full state. Returns: ndarray: Concatenated cube positions with shape (num_cubes * 3,). Each cube contributes its (x, y, z) position, centered and scaled by the same factors used in compute_observation(). Note: This representation excludes robot state and object orientations, focusing only on cube positions. Used primarily in task mode for goal specification. """ xyz_center = np.array([0.425, 0.0, 0.0]) xyz_scaler = 10.0 ob_info = self.compute_ob_info() ob = [] for i in range(self._num_cubes): ob.append((ob_info[f"privileged/block_{i}_pos"] - xyz_center) * xyz_scaler) return np.concatenate(ob)
[docs] def compute_reward(self, ob, action): """Compute the reward for the current step. Calculates reward based on task success. If a specific reward_task_id is set, uses a custom reward function that counts successful cube placements minus the total number of cubes (range: -num_cubes to 0). Otherwise defers to parent class reward computation. Args: ob (ndarray): Current observation (not used in custom reward computation). action (ndarray): Action taken in this step (not used in custom reward). Returns: float: Scalar reward value. Custom reward ranges from -num_cubes (all cubes far from targets) to 0 (all cubes at targets). Parent class reward depends on its implementation. Note: The custom reward provides dense feedback about task progress by counting how many cubes are successfully positioned. Each successful cube adds 1 to the base value of -num_cubes. """ if self._reward_task_id is None: return super().compute_reward(ob, action) # Compute the reward based on the task. successes = self._compute_successes() reward = float(sum(successes) - len(successes)) return reward
[docs] def render( self, camera="front_pixels", *args, **kwargs, ): """Render the current scene from a specified camera view. Generates an RGB image of the current environment state from a single camera viewpoint. This method renders from one camera at a time. Args: camera (str, optional): Camera name to render from. Defaults to 'front_pixels'. Supports any camera defined in self.cameras (e.g., 'front_pixels', 'side_pixels'). *args: Additional positional arguments passed to parent render method. **kwargs: Additional keyword arguments passed to parent render method. Returns: ndarray: Rendered image with shape (H, W, C) where H is height, W is width, and C is the number of color channels (typically 3 for RGB). Note: For rendering from multiple cameras simultaneously, use the `render_multiview()` method instead. """ return super().render(camera=camera, *args, **kwargs)
[docs] def render_multiview( self, camera="front_pixels", *args, **kwargs, ): """Render the current scene from multiple camera views or a fallback single view. When multiview mode is enabled (`_multiview=True`), renders the scene from both 'front_pixels' and 'side_pixels' cameras and returns them as a dictionary. When multiview is disabled, falls back to rendering from a single camera. Args: camera (str, optional): Camera name to use for fallback rendering when multiview is disabled. Defaults to 'front_pixels'. Ignored when multiview is enabled. *args: Additional positional arguments passed to the render method. **kwargs: Additional keyword arguments passed to the render method. Returns: dict or ndarray: If multiview is enabled, returns a dictionary with camera names as keys ('front_pixels', 'side_pixels') and rendered images as values, where each image has shape (H, W, C). If multiview is disabled, returns a single rendered image array with shape (H, W, C). Note: The multiview dictionary format is useful for policies that process multiple viewpoints separately. The 'front_pixels' camera provides an oblique view while 'side_pixels' shows a perpendicular side view. """ if not self._multiview: return self.render(camera=camera, *args, **kwargs) cam_names = ["front_pixels", "side_pixels"] multi_view = {cam: self.render(camera=cam, *args, **kwargs) for cam in cam_names} return multi_view