Source code for stable_worldmodel.envs.pusht

from collections.abc import Sequence

import cv2
import gymnasium as gym
import numpy as np
import pygame
import pymunk
import pymunk.pygame_util
from gymnasium import spaces
from loguru import logger as logging
from pymunk.vec2d import Vec2d

import stable_worldmodel as swm

from .utils import DrawOptions


DEFAULT_VARIATIONS = ("agent.start_position", "block.start_position", "block.angle")


[docs] class PushT(gym.Env): metadata = { "render_modes": ["human", "rgb_array"], "video.frames_per_second": 10, "render_fps": 10, } reward_range = (0.0, 1.0) def __init__( self, block_cog=None, damping=None, render_action=False, resolution=224, with_target=True, render_mode="rgb_array", fix_action_sample=True, relative=True, ): self._seed = None self.window_size = ws = 512 # The size of the PyGame window self.render_size = resolution self.relative = relative self.action_scale = 100 # physics self.control_hz = self.metadata["render_fps"] self.k_p, self.k_v = 100, 20 self.dt = 0.01 self.shapes = ["o", "L", "T", "Z", "square", "I", "small_tee", "+"] self.observation_space = spaces.Dict( { "proprio": spaces.Box( low=np.array([0, 0, 0, 0]), high=np.array([ws, ws, ws, ws]), dtype=np.float64, ), "state": spaces.Box( low=np.array([0, 0, 0, 0, 0, 0, 0]), high=np.array([ws, ws, ws, ws, np.pi * 2, ws, ws]), dtype=np.float64, ), } ) # positional goal for agent self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32) self.variation_space = swm.spaces.Dict( { "agent": swm.spaces.Dict( { # "shape": swm.spaces.Categorical( # categories=["circle", "square", "triangle"], SHOULD IMPLEMENT THIS # init_value="circle", # ), "color": swm.spaces.RGBBox(init_value=np.array(pygame.Color("RoyalBlue")[:3], dtype=np.uint8)), "scale": swm.spaces.Box( low=20, high=60, init_value=40, shape=(), dtype=np.float32, ), "shape": swm.spaces.Discrete(len(self.shapes), start=0, init_value=0), "angle": swm.spaces.Box( low=-2 * np.pi, high=2 * np.pi, init_value=0.0, shape=(), dtype=np.float64, ), "start_position": swm.spaces.Box( low=50, high=450, init_value=np.array((256, 400), dtype=np.float64), shape=(2,), dtype=np.float64, ), "velocity": swm.spaces.Box( low=0, high=ws, init_value=np.array((0.0, 0.0), dtype=np.float64), shape=(2,), dtype=np.float64, ), } ), "block": swm.spaces.Dict( { "color": swm.spaces.RGBBox( init_value=np.array(pygame.Color("LightSlateGray")[:3], dtype=np.uint8) ), "scale": swm.spaces.Box( low=20, high=60, init_value=40, shape=(), dtype=np.float32, ), "shape": swm.spaces.Discrete(len(self.shapes) - 1, start=1, init_value=2), "angle": swm.spaces.Box( low=-2 * np.pi, high=2 * np.pi, init_value=0.0, shape=(), dtype=np.float64, ), "start_position": swm.spaces.Box( low=100, high=400, init_value=np.array((400, 100), dtype=np.float64), shape=(2,), dtype=np.float64, ), } ), "goal": swm.spaces.Dict( { "color": swm.spaces.RGBBox( init_value=np.array(pygame.Color("LightGreen")[:3], dtype=np.uint8) ), "scale": swm.spaces.Box( low=20, high=60, init_value=40, shape=(), dtype=np.float32, ), "angle": swm.spaces.Box( low=-2 * np.pi, high=2 * np.pi, init_value=np.pi / 4, shape=(), dtype=np.float64, ), "position": swm.spaces.Box( low=50, high=450, init_value=np.array([256, 256], dtype=np.float64), shape=(2,), dtype=np.float64, ), } ), "background": swm.spaces.Dict( { "color": swm.spaces.RGBBox(init_value=np.array(np.array([255, 255, 255], dtype=np.uint8))), } ), }, sampling_order=["background", "goal", "block", "agent"], ) # TODO ADD CONSTRAINT TO NOT SAMPLE OVERLAPPING START POSITIONS (block and agent) self.block_cog = block_cog self.damping = damping self.render_action = render_action self.render_mode = render_mode if fix_action_sample: self.fix_action_sample() """ If human-rendering is used, `self.window` will be a reference to the window that we draw to. `self.clock` will be a clock that is used to ensure that the environment is rendered at the correct framerate in human-mode. They will remain `None` until human-mode is used for the first time. """ self.window = None self.clock = None self.screen = None self.space = None self.render_buffer = None self.latest_action = None self.with_target = with_target self.coverage_arr = []
[docs] def reset(self, seed=None, options=None): super().reset(seed=seed, options=options) self.rng = np.random.default_rng(seed) if hasattr(self, "variation_space"): self.variation_space.seed(seed) options = options or {} self.variation_space.reset() variations = options.get("variation", DEFAULT_VARIATIONS) if not isinstance(variations, Sequence): raise ValueError("variation option must be a Sequence containing variations names to sample") self.variation_space.update(variations) assert self.variation_space.check(debug=True), "Variation values must be within variation space!" ### setup pymunk space self._setup() if self.block_cog is not None: self.block.center_of_gravity = self.block_cog if self.damping is not None: self.space.damping = self.damping ### get the state goal_state = np.concatenate( [ self.variation_space["agent"]["start_position"].sample(set_value=False).tolist(), self.variation_space["block"]["start_position"].sample(set_value=False).tolist(), [self.variation_space["block"]["angle"].sample(set_value=False)], self.variation_space["agent"]["velocity"].value.tolist(), ] ) ### generate goal self.goal_state = goal_state self._set_state(goal_state) self._goal = self.render() # restore original pos state = np.concatenate( [ self.variation_space["agent"]["start_position"].value.tolist(), self.variation_space["block"]["start_position"].value.tolist(), [self.variation_space["block"]["angle"].value], self.variation_space["agent"]["velocity"].value.tolist(), ] ) self._set_state(state) #### OBS state = self._get_obs() proprio = np.concatenate((state[:2], state[-2:])) observation = {"proprio": proprio, "state": state} info = self._get_info() return observation, info
[docs] def step(self, action): self.n_contact_points = 0 n_steps = int(1 / (self.dt * self.control_hz)) self.latest_action = action if self.relative: action = self.agent.position + action * self.action_scale action = np.clip(action, 0, self.window_size) for _ in range(n_steps): # Step PD control. acceleration = self.k_p * (action - self.agent.position) + self.k_v * (Vec2d(0, 0) - self.agent.velocity) self.agent.velocity += acceleration * self.dt # Step physics. self.space.step(self.dt) # make the observation state = self._get_obs() # print(state) proprio = np.concatenate((state[:2], state[-2:])) observation = {"proprio": proprio, "state": state} # collect info info = self._get_info() # compute reward and termination terminated, distance = self.eval_state(self.goal_state, state) reward = -distance # the closer the better truncated = False return observation, reward, terminated, truncated, info
[docs] def eval_state(self, goal_state, cur_state): # success if position difference is < 20, and angle difference < np.pi/9 pos_diff = np.linalg.norm(goal_state[:4] - cur_state[:4]) angle_diff = np.abs(goal_state[4] - cur_state[4]) angle_diff = np.minimum(angle_diff, 2 * np.pi - angle_diff) success = pos_diff < 20 and angle_diff < np.pi / 9 state_dist = np.linalg.norm(goal_state - cur_state) return success, state_dist
[docs] def render(self): return self._render_frame(self.render_mode)
def _get_obs(self): obs = ( tuple(self.agent.position) + tuple(self.block.position) + (self.block.angle % (2 * np.pi),) + tuple(self.agent.velocity) ) return np.array(obs, dtype=np.float64) def _get_goal_pose_body(self, pose): mass = 1 inertia = pymunk.moment_for_box(mass, (50, 100)) body = pymunk.Body(mass, inertia) # preserving the legacy assignment order for compatibility # the order here doesn't matter somehow, maybe because CoM is aligned with body origin body.position = pose[:2].tolist() body.angle = pose[2] return body def _get_info(self): n_steps = int(1 / self.dt * self.control_hz) n_contact_points_per_step = int(np.ceil(self.n_contact_points / n_steps)) goal_proprio = np.concatenate((self.goal_state[:2], self.goal_state[-2:])) info = { "pos_agent": np.array(self.agent.position), "vel_agent": np.array(self.agent.velocity), "block_pose": np.array(list(self.block.position) + [self.block.angle]), "goal_pose": self.goal_pose, "goal_state": self.goal_state, "goal_proprio": goal_proprio, "n_contacts": n_contact_points_per_step, "goal": self._goal, } return info def _render_frame(self, mode): if self.window is None and mode == "human": pygame.init() pygame.display.init() self.window = pygame.display.set_mode((self.window_size, self.window_size)) if self.clock is None and mode == "human": self.clock = pygame.time.Clock() canvas = pygame.Surface((self.window_size, self.window_size)) canvas.fill(self.variation_space["background"]["color"].value) self.screen = canvas draw_options = DrawOptions(canvas) # Draw goal pose. goal_body = self._get_goal_pose_body(self.goal_pose) for shape in self.block.shapes: if isinstance(shape, pymunk.Circle): center_pg = pymunk.pygame_util.to_pygame(goal_body.local_to_world(shape.offset), draw_options.surface) pygame.draw.circle( canvas, self.variation_space["goal"]["color"].value, (int(center_pg[0]), int(center_pg[1])), int(shape.radius), ) else: goal_points = [ pymunk.pygame_util.to_pygame(goal_body.local_to_world(v), draw_options.surface) for v in shape.get_vertices() ] goal_points += [goal_points[0]] pygame.draw.polygon( canvas, self.variation_space["goal"]["color"].value, goal_points, ) # change agent color self._set_body_color(self.agent, self.variation_space["agent"]["color"].value.tolist()) # change block color self._set_body_color(self.block, self.variation_space["block"]["color"].value.tolist()) # Draw agent and block. self.space.debug_draw(draw_options) if mode == "human": # The following line copies our drawings from `canvas` to the visible window self.window.blit(canvas, canvas.get_rect()) pygame.event.pump() pygame.display.update() # the clock is already ticked during in step for "human" img = np.transpose(np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)) img = cv2.resize(img, (self.render_size, self.render_size)) if self.render_action: if self.render_action and (self.latest_action is not None): action = np.array(self.latest_action) coord = (action / 512 * 96).astype(np.int32) marker_size = int(8 / 96 * self.render_size) thickness = int(1 / 96 * self.render_size) cv2.drawMarker( img, coord, color=(255, 0, 0), markerType=cv2.MARKER_CROSS, markerSize=marker_size, thickness=thickness, ) return img def _set_body_color(self, body, color): color = pygame.Color(*color) if not isinstance(color, pygame.Color) else color for s in body.shapes: s.color = color
[docs] def close(self): if self.window is not None: pygame.display.quit() pygame.quit()
def _handle_collision(self, arbiter, space, data): self.n_contact_points += len(arbiter.contact_point_set.points) def _set_state(self, state): if isinstance(state, np.ndarray): state = state.tolist() pos_agent = state[:2] pos_block = state[2:4] rot_block = state[4] vel_block = tuple(state[-2:]) self.agent.velocity = vel_block self.agent.position = pos_agent self.block.angle = rot_block self.block.position = pos_block # Run physics to take effect self.space.step(self.dt) def _setup(self): ## create the space with physics self.space = pymunk.Space() self.space.gravity = 0, 0 # TODO add physics support self.space.damping = 0 self.render_buffer = [] # Add walls. walls = [ self._add_segment((5, 506), (5, 5), 2), self._add_segment((5, 5), (506, 5), 2), self._add_segment((506, 5), (506, 506), 2), self._add_segment((5, 506), (506, 506), 2), ] self.space.add(*walls) #### agent #### agent_params = { "position": self.variation_space["agent"]["start_position"].value.tolist(), "angle": self.variation_space["agent"]["angle"].value, "scale": self.variation_space["agent"]["scale"].value, "color": self.variation_space["agent"]["color"].value.tolist(), "shape": self.shapes[self.variation_space["agent"]["shape"].value], } self.agent = self.add_shape(**agent_params) #### block #### block_params = { "position": self.variation_space["block"]["start_position"].value.tolist(), "angle": self.variation_space["block"]["angle"].value, "scale": self.variation_space["block"]["scale"].value, "color": self.variation_space["block"]["color"].value.tolist(), "shape": self.shapes[self.variation_space["block"]["shape"].value], } self.block = self.add_shape(**block_params) self.goal_pose = np.concatenate( [ self.variation_space["goal"]["position"].value, [self.variation_space["goal"]["angle"].value], ] ) # Add collision handling self.space.on_collision(0, 0, post_solve=self._handle_collision) self.n_contact_points = 0 self.max_score = 50 * 100 self.success_threshold = 0.95 # 95% coverage. def _add_segment(self, a, b, radius): shape = pymunk.Segment(self.space.static_body, a, b, radius) shape.color = pygame.Color("LightGray") # https://htmlcolorcodes.com/color-names return shape
[docs] def add_circle( self, position, angle=0, scale=1, color="RoyalBlue", ): base_radius = 0.375 body = pymunk.Body(body_type=pymunk.Body.KINEMATIC) body.position = position body.friction = 1 shape = pymunk.Circle(body, base_radius * scale) shape.color = pygame.Color(color) self.space.add(body, shape) return body
[docs] def add_box(self, position, height, width, color="LightSlateGray", scale=1, angle=0): mass = 1 inertia = pymunk.moment_for_box(mass, (height * scale, width * scale)) body = pymunk.Body(mass, inertia) body.position = position shape = pymunk.Poly.create_box(body, (height * scale, width * scale)) shape.color = pygame.Color(color) self.space.add(body, shape) return body
[docs] def add_tee( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): scale = 30 mass = 1 length = 4 vertices1 = [ (-length * scale / 2, scale), (length * scale / 2, scale), (length * scale / 2, 0), (-length * scale / 2, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-scale / 2, scale), (-scale / 2, length * scale), (scale / 2, length * scale), (scale / 2, scale), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2) return body
[docs] def add_small_tee( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 vertices1 = [ (-3 * scale / 2, scale), (3 * scale / 2, scale), (3 * scale / 2, 0), (-3 * scale / 2, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-scale / 2, scale), (-scale / 2, 2 * scale), (scale / 2, 2 * scale), (scale / 2, scale), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices2) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2) return body
[docs] def add_plus( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 vertices1 = [ (-3 * scale / 2, scale / 2), (3 * scale / 2, scale / 2), (3 * scale / 2, -scale / 2), (-3 * scale / 2, -scale / 2), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-scale / 2, scale / 2), (-scale / 2, 3 * scale / 2), (scale / 2, scale / 2), (scale / 2, 3 * scale / 2), ] vertices3 = [ (-scale / 2, -scale / 2), (-scale / 2, -3 * scale / 2), (scale / 2, -scale / 2), (scale / 2, -3 * scale / 2), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices1) inertia3 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1 + inertia2 + inertia3) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape3 = pymunk.Poly(body, vertices3) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape3.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) shape3.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity + shape3.center_of_gravity) / 3 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2, shape3) return body
[docs] def add_L( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 length = 2 vertices1 = [ (0, 0), (0, scale * length), (scale * length / 2, scale * length), (scale * length / 2, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (0, 0), (scale * length, 0), (scale * length, -scale * length / 2), (0, -scale * length / 2), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices2) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2) return body
[docs] def add_Z( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 length = 2 vertices1 = [ (0, 0), (0, length * scale / 2), (length * scale, length * scale / 2), (length * scale, 0), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) vertices2 = [ (-length * scale / 2, 0), (length * scale / 2, 0), (length * scale / 2, -length * scale / 2), (-length * scale / 2, -length * scale / 2), ] inertia2 = pymunk.moment_for_poly(mass, vertices=vertices2) body = pymunk.Body(mass, inertia1 + inertia2) shape1 = pymunk.Poly(body, vertices1) shape2 = pymunk.Poly(body, vertices2) shape1.color = pygame.Color(color) shape2.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) shape2.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = (shape1.center_of_gravity + shape2.center_of_gravity) / 2 body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1, shape2) return body
[docs] def add_square( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 vertices1 = [ (-scale, -scale), (-scale, scale), (scale, scale), (scale, -scale), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1) shape1 = pymunk.Poly(body, vertices1) shape1.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = shape1.center_of_gravity body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1) return body
[docs] def add_I( self, position, angle, scale=30, color="LightSlateGray", mask=pymunk.ShapeFilter.ALL_MASKS(), ): mass = 1 vertices1 = [ (-scale / 2, -scale * 2), (-scale / 2, scale * 2), (scale / 2, scale * 2), (scale / 2, -scale * 2), ] inertia1 = pymunk.moment_for_poly(mass, vertices=vertices1) body = pymunk.Body(mass, inertia1) shape1 = pymunk.Poly(body, vertices1) shape1.color = pygame.Color(color) shape1.filter = pymunk.ShapeFilter(mask=mask) body.center_of_gravity = shape1.center_of_gravity body.position = position body.angle = angle body.friction = 1 self.space.add(body, shape1) return body
[docs] def add_shape(self, shape, *args, **kwargs): # Dispatch method based on the 'shape' parameter if shape == "L": return self.add_L(*args, **kwargs) elif shape == "T": return self.add_tee(*args, **kwargs) elif shape == "Z": return self.add_Z(*args, **kwargs) elif shape == "o": return self.add_circle(*args, **kwargs) elif shape == "square": return self.add_square(*args, **kwargs) elif shape == "I": return self.add_I(*args, **kwargs) elif shape == "small_tee": return self.add_small_tee(*args, **kwargs) if shape == "+": return self.add_plus(*args, **kwargs) else: raise ValueError(f"Unknown shape type: {shape}")
[docs] def fix_action_sample(self): logging.warning( "The action space sample method is being overridden to improve sampling. " "This is a temporary fix and will be removed in future versions." ) # Save original sample method self.original_sample = self.action_space.sample def better_sample(): # sample in a 100x100 box around the block block_pos = np.array((self.block.position.x, self.block.position.y)) action = self.rng.uniform(block_pos - 50, block_pos + 50) - self.agent.position # Clip to action space bounds action = np.clip(action, 0, self.window_size) return action # Override with new method self.action_space.sample = better_sample