import math
from collections.abc import Sequence
import cv2
import gymnasium as gym
import numpy as np
import pygame
import pymunk
from gymnasium import spaces
from pymunk.vec2d import Vec2d
import stable_worldmodel as swm
from .utils import DrawOptions, light_color, pymunk_to_shapely, to_pygame
DEFAULT_VARIATIONS = ("agent.position", "goal.position")
[docs]
class TwoRoomEnv(gym.Env):
"""A simple navigation two-room environment."""
metadata = {
"render_modes": ["human", "rgb_array"],
"render_fps": 10,
}
def __init__(
self,
render_size=224,
render_mode="rgb_array",
):
# gym
assert render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
# render
self.window_size = 512
self.border_size = bs = 9
self.energy_bound = 200
self.size = self.window_size - 2 * self.border_size
self.render_size = render_size
# physics
self.control_hz = self.metadata["render_fps"]
self.dt = 0.01
# attributes
self.max_door = 3
self.max_speed = 20.0
self.wall_pos = math.ceil(self.size / 2)
self.max_step_norm = 2.45
self.observation_space = spaces.Dict(
{
"proprio": spaces.Box(
low=np.array([bs, bs, 0, 10]),
high=np.array(2 * [self.size] + [self.energy_bound, self.max_speed]),
dtype=np.float64,
),
"state": spaces.Box(
low=np.array([bs, bs, bs, bs, 50, 0.5]),
high=np.array(4 * [self.size] + [self.energy_bound, self.max_speed]),
dtype=np.float64,
),
}
)
# gym spaces
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(2,), dtype=np.float32)
# variation space
self.variation_space = swm.spaces.Dict(
{
"agent": swm.spaces.Dict(
{
"color": swm.spaces.RGBBox(init_value=np.array([255, 0, 0], dtype=np.uint8)),
"radius": swm.spaces.Box(
low=np.array([15], dtype=np.float32),
high=np.array([30], dtype=np.float32),
init_value=np.array([15], dtype=np.float32),
shape=(1,),
dtype=np.float32,
),
"position": swm.spaces.Box(
low=np.array([bs, bs], dtype=np.float32),
high=np.array([self.size, self.size], dtype=np.float32),
shape=(2,),
dtype=np.float32,
init_value=np.array([50.0, 50.0], dtype=np.float32),
constrain_fn=lambda x: not self.check_collide(x, entity="agent"),
),
"max_energy": swm.spaces.Discrete(self.energy_bound - 50, start=50, init_value=100),
"speed": swm.spaces.Box(
low=np.array([10], dtype=np.float32),
high=np.array([self.max_speed], dtype=np.float32),
init_value=np.array([10.0], dtype=np.float32),
shape=(1,),
dtype=np.float32,
),
},
sampling_order=[
"color",
"radius",
"position",
"max_energy",
"speed",
],
),
"goal": swm.spaces.Dict(
{
"color": swm.spaces.RGBBox(init_value=np.array([0, 255, 0], dtype=np.uint8)),
"radius": swm.spaces.Box(
low=np.array([15], dtype=np.float32),
high=np.array([30], dtype=np.float32),
init_value=np.array([15], dtype=np.float32),
shape=(1,),
dtype=np.float32,
),
# constrain it should be in the other room and at n_steps max
"position": swm.spaces.Box(
low=np.array([bs, bs], dtype=np.float32),
high=np.array([self.size, self.size], dtype=np.float32),
shape=(2,),
dtype=np.float32,
init_value=np.array([450.0, 450.0], dtype=np.float32),
constrain_fn=lambda x: not self.check_collide(x, entity="goal")
and self.check_other_room(x),
),
},
sampling_order=["color", "radius", "position"],
),
"wall": swm.spaces.Dict(
{
"color": swm.spaces.RGBBox(init_value=np.array([115, 127, 145], dtype=np.uint8)),
"thickness": swm.spaces.Discrete(25, start=9, init_value=19),
# 0: horizontal, 1: vertical
"axis": swm.spaces.Discrete(2, init_value=1),
# "position": swm.spaces.Discrete(
# self.size,
# init_value=self.size // 2,
# ),
"border_color": swm.spaces.RGBBox(init_value=np.array([180, 189, 204], dtype=np.uint8)),
},
sampling_order=["color", "border_color", "thickness", "axis"],
),
"door": swm.spaces.Dict(
{
"color": swm.spaces.RGBBox(init_value=np.array([255, 255, 255], dtype=np.uint8)),
"number": swm.spaces.Discrete(self.max_door, start=1, init_value=1),
## add constraint so that doors do not overlap?
"size": swm.spaces.MultiDiscrete(
nvec=[50] * self.max_door,
start=[35] * self.max_door,
init_value=[75] * self.max_door,
constrain_fn=self.check_one_door_fit,
),
"position": swm.spaces.MultiDiscrete(
nvec=[self.size] * self.max_door,
init_value=[self.size // 2] * self.max_door,
),
},
sampling_order=["color", "number", "size", "position"],
),
"background": swm.spaces.Dict(
{
"color": swm.spaces.RGBBox(init_value=np.array([255, 255, 255], dtype=np.uint8)),
}
),
},
sampling_order=["background", "wall", "agent", "door", "goal"],
)
self.window = None
self.clock = None
self.screen = None
return
[docs]
def reset(self, seed=None, options=None):
super().reset(seed=seed, options=options)
if hasattr(self, "variation_space"):
self.variation_space.seed(seed)
options = options or {}
self.variation_space.reset()
variations = options.get("variation", DEFAULT_VARIATIONS)
if not isinstance(variations, Sequence):
raise ValueError("variation option must be a Sequence containing variations names to sample")
self.variation_space.update(variations)
assert self.variation_space.check(debug=True), "Variation values must be within variation space!"
self._setup()
# generate goal
goal_state = self.variation_space["goal"]["position"].value
self._set_state(np.concatenate([goal_state, goal_state]))
self._goal = self.render()
# restore original state
agent_pos = self.variation_space["agent"]["position"].value
goal_pos = self.variation_space["goal"]["position"].value
self._set_state(np.concatenate([agent_pos, goal_pos]))
# generate observation
state = self._get_obs()
proprio = np.concatenate((state[:2], state[-2:]))
observation = {"proprio": proprio, "state": state}
info = self._get_info()
info["fraction_of_goal"] = 0.0
info["fraction_of_agent"] = 0.0
return observation, info
[docs]
def step(self, action):
self.n_contact_points = 0
n_steps = int(1 / (self.dt * self.control_hz))
control_period = n_steps * self.dt
action_norm = np.linalg.norm(action)
if action_norm > self.max_step_norm:
# action is a numPy array
action = (action / action_norm) * self.max_step_norm
velocity = action / control_period
speed = self.variation_space["agent"]["speed"].value.item()
self.latest_action = action
for _ in range(n_steps):
self.agent.velocity = Vec2d(0, 0) + velocity * speed
self.space.step(self.dt)
self.energy -= 1 # TODO energy proportional to action norm?
state = self._get_obs()
proprio = np.concatenate((state[:2], state[-2:]))
observation = {
"proprio": proprio,
"state": state,
}
info = self._get_info()
### check termination condition
goal_geom = pymunk_to_shapely(self.goal, self.goal.shapes)
agent_geom = pymunk_to_shapely(self.agent, self.agent.shapes)
intersection_area = goal_geom.intersection(agent_geom).area
goal_area = goal_geom.area
agent_area = agent_geom.area
fraction_of_goal = intersection_area / goal_area
fraction_of_agent = intersection_area / agent_area
info["fraction_of_goal"] = fraction_of_goal
info["fraction_of_agent"] = fraction_of_agent
terminated = (
fraction_of_goal >= 0.5 # at least 50% of goal covered
or fraction_of_agent >= 0.5 # at least 50% of agent inside
)
truncated = self.energy <= 0
reward = 1.0 if terminated else -0.01
return observation, reward, terminated, truncated, info
# def add_circle(
# self,
# position,
# radius,
# color,
# ):
# body = pymunk.Body(body_type=pymunk.Body.DYNAMIC)
# body.position = position
# body.friction = 1
# shape = pymunk.Circle(body, radius)
# shape.color = pygame.Color(color)
# self.space.add(body, shape)
# return body
[docs]
def add_circle(self, position, radius, color, *, is_goal=False):
if not is_goal:
mass = 1.0
moment = pymunk.moment_for_circle(mass, 0, radius)
body = pymunk.Body(mass, moment, body_type=pymunk.Body.DYNAMIC)
else:
body = pymunk.Body(body_type=pymunk.Body.KINEMATIC)
body.position = position
body.friction = 1
shape = pymunk.Circle(body, radius)
shape.sensor = is_goal
shape.color = pygame.Color(color)
shape.friction = 0.8
shape.elasticity = 0.0
if not is_goal:
self.space.add(body, shape)
return body
def _add_segment(self, a, b, size, color, collision=True):
a, b = Vec2d(*a), Vec2d(*b)
ab = (b - a).normalized()
perp = ab.perpendicular() * (size / 2)
points = [a + perp, b + perp, b - perp, a - perp]
shape = pymunk.Poly(self.space.static_body, points)
shape.color = pygame.Color(color)
shape.sensor = not collision
shape.z_order = 1
return shape
def _setup(self):
self.space = pymunk.Space()
self.space.gravity = 0, 0
self.space.damping = 0
self.render_buffer = []
# -- wall and doors
wall_color = self.variation_space["wall"]["color"].value.tolist()
wall_thickness = self.variation_space["wall"]["thickness"].value
wall_axis = self.variation_space["wall"]["axis"].value
door_number = self.variation_space["door"]["number"].value
door_positions = self.variation_space["door"]["position"].value[:door_number]
door_sizes = self.variation_space["door"]["size"].value[:door_number]
door_color = self.variation_space["door"]["color"].value.tolist()
door_positions, door_sizes = zip(*sorted(zip(door_positions, door_sizes), key=lambda x: x[0]))
# if door overlaps, merge them
merged_positions = []
merged_sizes = []
current_pos = door_positions[0]
current_size = door_sizes[0]
for pos, size in zip(door_positions[1:], door_sizes[1:]):
if pos <= current_pos + current_size:
# overlap
new_end = max(current_pos + current_size, pos + size)
current_size = new_end - current_pos
else:
merged_positions.append(current_pos)
merged_sizes.append(current_size)
current_pos = pos
current_size = size
def pt(t):
return (self.wall_pos, self.border_size + t) if wall_axis == 1 else (self.border_size + t, self.wall_pos)
wall_segments = []
door_segments = []
current = 0
for pos, size in zip(door_positions, door_sizes):
wall_span = (current, pos - 1)
door_span = (
pos,
pos + size,
)
door = self._add_segment(
pt(door_span[0]),
pt(door_span[1]),
wall_thickness,
door_color,
collision=False,
)
wall = self._add_segment(pt(wall_span[0]), pt(wall_span[1]), wall_thickness, wall_color)
door_segments.append(door)
wall_segments.append(wall)
current = door_span[1] + 1
# add last wall segment
last_wall = self._add_segment(pt(current), pt(self.size), wall_thickness, wall_color)
wall_segments.append(last_wall)
self.doors = door_segments
self.space.add(*wall_segments)
# -- border
border_dict = {
"bottom": ((0, 0), (self.window_size - 1, 0)),
"left": ((0, 0), (0, self.window_size)),
"right": ((self.window_size, 0), (self.window_size - 1, self.window_size)),
"top": ((0, self.window_size), (self.window_size, self.window_size)),
}
border_color = self.variation_space["wall"]["border_color"].value.tolist()
border = [self._add_segment(a, b, self.border_size, border_color) for (a, b) in border_dict.values()]
self.space.add(*border)
# TODO add wall and doors
# consider the whole wall and split it into segments to create the doors?
# assert the total size is width of the wall
# to make door traversable, remove friction (shape.sensor = True)
# -- agent
agent_pos = self.variation_space["agent"]["position"].value.tolist()
agent_radius = self.variation_space["agent"]["radius"].value.item()
agent_color = self.variation_space["agent"]["color"].value.tolist()
self.agent = self.add_circle(agent_pos, agent_radius, agent_color)
# -- goal
goal_pos = self.variation_space["goal"]["position"].value.tolist()
goal_radius = self.variation_space["goal"]["radius"].value.item()
goal_color = self.variation_space["goal"]["color"].value.tolist()
self.goal = self.add_circle(goal_pos, goal_radius, goal_color, is_goal=True)
# -- energy
self.energy = self.variation_space["agent"]["max_energy"].value
# add collision handler
self.space.on_collision(0, 0, post_solve=self._handle_collision)
self.n_contact_points = 0
def _set_state(self, state):
if isinstance(state, np.ndarray):
state = state.tolist()
pos_agent = state[:2]
pos_goal = state[2:4]
# energy = state[-1]
self.agent.position = pos_agent
self.goal.position = pos_goal
self.space.step(self.dt)
def _get_obs(self):
speed = self.variation_space["agent"]["speed"].value.item()
obs = tuple(self.agent.position) + tuple(self.goal.position) + (self.energy, speed)
return np.array(obs, dtype=np.float64)
def _get_info(self):
n_steps = int(1 / self.dt * self.control_hz)
n_contact_points_per_step = int(np.ceil(self.n_contact_points / n_steps))
info = {
"pos_agent": np.array(self.agent.position),
"pos_goal": np.array(self.goal.position),
"n_contacts": n_contact_points_per_step,
"goal_pos": self.variation_space["goal"]["position"].value,
"goal": self._goal,
"energy": self.energy,
"max_energy": self.variation_space["agent"]["max_energy"].value,
}
return info
def _set_body_color(self, body, color):
color = pygame.Color(*color) if not isinstance(color, pygame.Color) else color
for s in body.shapes:
s.color = color
[docs]
def render(self):
return self._render_frame(self.render_mode)
def _get_pose_body(self, pose):
mass = 1
inertia = pymunk.moment_for_box(mass, (50, 100))
body = pymunk.Body(mass, inertia)
body.position = pose[:2].tolist()
body.angle = pose[2]
return body
def _render_frame(self, mode):
if self.window is None and mode == "human":
pygame.init()
pygame.display.init()
self.window = pygame.display.set_mode((self.window_size, self.window_size))
if self.clock is None and mode == "human":
self.clock = pygame.time.Clock()
canvas = pygame.Surface((self.window_size, self.window_size))
canvas.fill(self.variation_space["background"]["color"].value)
self.screen = canvas
draw_options = DrawOptions(canvas)
self._set_body_color(self.goal, self.variation_space["goal"]["color"].value.tolist())
# draw doors
for door in self.doors:
door_points = [
pymunk.pygame_util.to_pygame(door.body.local_to_world(v), draw_options.surface)
for v in door.get_vertices()
]
door_points.append(door_points[0]) # close shape
pygame.draw.polygon(canvas, door.color, door_points)
# draw goal
for shape in self.goal.shapes:
p = to_pygame(self.goal.position, draw_options.surface)
pygame.draw.circle(canvas, shape.color, p, round(shape.radius), 0)
pygame.draw.circle(
canvas,
light_color(shape.color).as_int(),
p,
round(shape.radius - 4),
0,
)
self._set_body_color(self.agent, self.variation_space["agent"]["color"].value.tolist())
self.space.debug_draw(draw_options)
img = np.transpose(np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2))
img = cv2.resize(img, (self.render_size, self.render_size))
return img
def _handle_collision(self, arbiter, space, data):
self.n_contact_points += len(arbiter.contact_point_set.points)
[docs]
def seed(self, seed=None):
if seed is None:
seed = np.random.randint(0, 25536)
self._seed = seed
self.np_random = np.random.default_rng(seed)
self.random_state = np.random.RandomState(seed)
self.observation_space.seed(seed)
self.action_space.seed(seed)
self.variation_space.seed(seed)
[docs]
def close(self):
if self.window is not None:
pygame.display.quit()
pygame.quit()
#### constraint functions for variation space ####
[docs]
def check_one_door_fit(self, x):
number = self.variation_space.value["door"]["number"]
agent_radius = self.variation_space.value["agent"]["radius"].item()
for size in x[:number]:
if size >= 2.5 * agent_radius:
return True
return False
[docs]
def check_other_room(self, x):
agent_pos = self.variation_space.value["agent"]["position"]
wall_axis = self.variation_space.value["wall"]["axis"]
wall_pos = self.wall_pos
# pick the relevant axis: 0 = x (vertical wall), 1 = y (horizontal wall)
i = 1 if wall_axis == 0 else 0
return (agent_pos[i] < wall_pos and x[i] > wall_pos) or (agent_pos[i] > wall_pos and x[i] < wall_pos)
[docs]
def check_collide(self, x, entity="agent"):
assert entity in ["agent", "goal"]
cx, cy = x
r = self.variation_space.value[entity]["radius"]
# collide with border
if (cx - r) <= self.border_size or (cx + r) >= self.size:
return True
if (cy - r) <= self.border_size or (cy + r) >= self.size:
return True
# check collide with wall
wall_axis = self.variation_space.value["wall"]["axis"]
wall_pos = self.wall_pos
wall_thickness = self.variation_space.value["wall"]["thickness"]
if wall_axis == 0:
if abs(cy - wall_pos) <= (wall_thickness / 2 + r):
return True
else:
if abs(cx - wall_pos) <= (wall_thickness / 2 + r):
return True
return False
# if __name__ == "__main__":
# env = TwoRoomEnv()
# obs = env.reset(options={"variation": ["all"]})
# img = env.render()
# plt.imshow(img)
# plt.axis("off")
# plt.savefig("test.png")