diff --git a/exp_type/sawyer_xyz_env_fixed.py b/exp_type/sawyer_xyz_env_fixed.py new file mode 100644 index 0000000000000000000000000000000000000000..769a78eb41ba9e7459839c6c483d1b7d11212dc4 --- /dev/null +++ b/exp_type/sawyer_xyz_env_fixed.py @@ -0,0 +1,308 @@ +import abc +import copy +import pickle + +from gym.spaces import Box +from gym.spaces import Discrete +import mujoco_py +import numpy as np + +from metaworld.envs.mujoco.mujoco_env import MujocoEnv, _assert_task_is_set + + +class SawyerMocapBase(MujocoEnv, metaclass=abc.ABCMeta): + """ + Provides some commonly-shared functions for Sawyer Mujoco envs that use + mocap for XYZ control. + """ + mocap_low = np.array([-0.2, 0.5, 0.06]) + mocap_high = np.array([0.2, 0.7, 0.6]) + + def __init__(self, model_name, frame_skip=5): + MujocoEnv.__init__(self, model_name, frame_skip=frame_skip) + self.reset_mocap_welds() + + def get_endeff_pos(self): + return self.data.get_body_xpos('hand').copy() + + def get_env_state(self): + joint_state = self.sim.get_state() + mocap_state = self.data.mocap_pos, self.data.mocap_quat + state = joint_state, mocap_state + return copy.deepcopy(state) + + def set_env_state(self, state): + joint_state, mocap_state = state + self.sim.set_state(joint_state) + mocap_pos, mocap_quat = mocap_state + self.data.set_mocap_pos('mocap', mocap_pos) + self.data.set_mocap_quat('mocap', mocap_quat) + self.sim.forward() + + def __getstate__(self): + state = self.__dict__.copy() + del state['model'] + del state['sim'] + del state['data'] + mjb = self.model.get_mjb() + return {'state': state, 'mjb': mjb, 'env_state': self.get_env_state()} + + def __setstate__(self, state): + self.__dict__ = state['state'] + self.model = mujoco_py.load_model_from_mjb(state['mjb']) + self.sim = mujoco_py.MjSim(self.model) + self.data = self.sim.data + self.set_env_state(state['env_state']) + + def reset_mocap_welds(self): + """Resets the mocap welds that we use for actuation.""" + sim = self.sim + if sim.model.nmocap > 0 and sim.model.eq_data is not None: + for i in range(sim.model.eq_data.shape[0]): + if sim.model.eq_type[i] == mujoco_py.const.EQ_WELD: + sim.model.eq_data[i, :] = np.array( + [0., 0., 0., 1., 0., 0., 0.]) + sim.forward() + + +class SawyerXYZEnv(SawyerMocapBase, metaclass=abc.ABCMeta): + _HAND_SPACE = Box( + np.array([-0.525, .35, -.0525]), + np.array([+0.525, 1.025, .525]) + ) + max_path_length = 200 + + def __init__( + self, + model_name, + frame_skip=5, + hand_low=(-0.2, 0.55, 0.05), + hand_high=(0.2, 0.75, 0.3), + mocap_low=None, + mocap_high=None, + action_scale=1./100, + action_rot_scale=1., + ): + super().__init__(model_name, frame_skip=frame_skip) + self.random_init = True + self.action_scale = action_scale + self.action_rot_scale = action_rot_scale + self.hand_low = np.array(hand_low) + self.hand_high = np.array(hand_high) + if mocap_low is None: + mocap_low = hand_low + if mocap_high is None: + mocap_high = hand_high + self.mocap_low = np.hstack(mocap_low) + self.mocap_high = np.hstack(mocap_high) + self.curr_path_length = 0 + self._freeze_rand_vec = True + self._last_rand_vec = None + + # We use continuous goal space by default and + # can discretize the goal space by calling + # the `discretize_goal_space` method. + self.discrete_goal_space = None + self.discrete_goals = [] + self.active_discrete_goal = None + + self.action_space = Box( + np.array([-1, -1, -1, -1]), + np.array([+1, +1, +1, +1]), + ) + + self._pos_obj_max_len = 6 + self._pos_obj_possible_lens = (3, 6) + + self._set_task_called = False + self._partially_observable = True + + self.hand_init_pos = None # OVERRIDE ME + self._target_pos = None # OVERRIDE ME + self._random_reset_space = None # OVERRIDE ME + + def _set_task_inner(self): + # Doesn't absorb "extra" kwargs, to ensure nothing's missed. + pass + + def set_task(self, task): + self._set_task_called = True + data = pickle.loads(task.data) + assert isinstance(self, data['env_cls']) + del data['env_cls'] + self._last_rand_vec = data['rand_vec'] + # This is the original file (Not random at each episode) + self._freeze_rand_vec = True + self._last_rand_vec = data['rand_vec'] + del data['rand_vec'] + self._partially_observable = data['partially_observable'] + del data['partially_observable'] + self._set_task_inner(**data) + + def set_xyz_action(self, action): + action = np.clip(action, -1, 1) + pos_delta = action * self.action_scale + new_mocap_pos = self.data.mocap_pos + pos_delta[None] + + new_mocap_pos[0, :] = np.clip( + new_mocap_pos[0, :], + self.mocap_low, + self.mocap_high, + ) + self.data.set_mocap_pos('mocap', new_mocap_pos) + self.data.set_mocap_quat('mocap', np.array([1, 0, 1, 0])) + + def discretize_goal_space(self, goals): + assert False + assert len(goals) >= 1 + self.discrete_goals = goals + # update the goal_space to a Discrete space + self.discrete_goal_space = Discrete(len(self.discrete_goals)) + + # Belows are methods for using the new wrappers. + # `sample_goals` is implmented across the sawyer_xyz + # as sampling from the task lists. This will be done + # with the new `discrete_goals`. After all the algorithms + # conform to this API (i.e. using the new wrapper), we can + # just remove the underscore in all method signature. + def sample_goals_(self, batch_size): + assert False + if self.discrete_goal_space is not None: + return [self.discrete_goal_space.sample() for _ in range(batch_size)] + else: + return [self.goal_space.sample() for _ in range(batch_size)] + + def set_goal_(self, goal): + assert False + if self.discrete_goal_space is not None: + self.active_discrete_goal = goal + self.goal = self.discrete_goals[goal] + self._target_pos_idx = np.zeros(len(self.discrete_goals)) + self._target_pos_idx[goal] = 1. + else: + self.goal = goal + + def _set_obj_xyz(self, pos): + qpos = self.data.qpos.flat.copy() + qvel = self.data.qvel.flat.copy() + qpos[9:12] = pos.copy() + qvel[9:15] = 0 + self.set_state(qpos, qvel) + + def _get_site_pos(self, siteName): + _id = self.model.site_names.index(siteName) + return self.data.site_xpos[_id].copy() + + def _set_pos_site(self, name, pos): + """Sets the position of the site corresponding to `name` + + Args: + name (str): The site's name + pos (np.ndarray): Flat, 3 element array indicating site's location + """ + assert isinstance(pos, np.ndarray) + assert pos.ndim == 1 + + self.data.site_xpos[self.model.site_name2id(name)] = pos[:3] + + @property + def _target_site_config(self): + """Retrieves site name(s) and position(s) corresponding to env targets + + :rtype: list of (str, np.ndarray) + """ + return [('goal', self._target_pos)] + + def _get_pos_objects(self): + """Retrieves object position(s) from mujoco properties or instance vars + + Returns: + np.ndarray: Flat array (usually 3 elements) representing the + object(s)' position(s) + """ + # Throw error rather than making this an @abc.abstractmethod so that + # V1 environments don't have to implement it + raise NotImplementedError + + def _get_pos_goal(self): + """Retrieves goal position from mujoco properties or instance vars + + Returns: + np.ndarray: Flat array (3 elements) representing the goal position + """ + assert isinstance(self._target_pos, np.ndarray) + assert self._target_pos.ndim == 1 + return self._target_pos + + def _get_obs(self): + """Combines positions of the end effector, object(s) and goal into a + single flat observation + + Returns: + np.ndarray: The flat observation array (12 elements) + """ + pos_hand = self.get_endeff_pos() + + pos_obj_padded = np.zeros(self._pos_obj_max_len) + pos_obj = self._get_pos_objects() + assert len(pos_obj) in self._pos_obj_possible_lens + pos_obj_padded[:len(pos_obj)] = pos_obj + + pos_goal = self._get_pos_goal() + if self._partially_observable: + pos_goal = np.zeros_like(pos_goal) + + return np.hstack((pos_hand, pos_obj_padded, pos_goal)) + + def _get_obs_dict(self): + obs = self._get_obs() + return dict( + state_observation=obs, + state_desired_goal=self._get_pos_goal(), + state_achieved_goal=obs[3:-3], + ) + + @property + def observation_space(self): + obj_low = np.full(6, -np.inf) + obj_high = np.full(6, +np.inf) + goal_low = np.zeros(3) if self._partially_observable \ + else self.goal_space.low + goal_high = np.zeros(3) if self._partially_observable \ + else self.goal_space.high + return Box( + np.hstack((self._HAND_SPACE.low, obj_low, goal_low)), + np.hstack((self._HAND_SPACE.high, obj_high, goal_high)) + ) + + @_assert_task_is_set + def step(self, action): + self.set_xyz_action(action[:3]) + self.do_simulation([action[-1], -action[-1]]) + + for site in self._target_site_config: + self._set_pos_site(*site) + + return self._get_obs() + + def reset(self): + self.curr_path_length = 0 + return super().reset() + + def _reset_hand(self, steps=50): + for _ in range(steps): + self.data.set_mocap_pos('mocap', self.hand_init_pos) + self.data.set_mocap_quat('mocap', np.array([1, 0, 1, 0])) + self.do_simulation([-1, 1], self.frame_skip) + + def _get_state_rand_vec(self): + if self._freeze_rand_vec: + assert self._last_rand_vec is not None + return self._last_rand_vec + else: + rand_vec = np.random.uniform( + self._random_reset_space.low, + self._random_reset_space.high, + size=self._random_reset_space.low.size) + self._last_rand_vec = rand_vec + return rand_vec diff --git a/exp_type/sawyer_xyz_env_random.py b/exp_type/sawyer_xyz_env_random.py new file mode 100644 index 0000000000000000000000000000000000000000..27f5a774a1b40cc57be58178fc062aa0c6a91ae1 --- /dev/null +++ b/exp_type/sawyer_xyz_env_random.py @@ -0,0 +1,308 @@ +import abc +import copy +import pickle + +from gym.spaces import Box +from gym.spaces import Discrete +import mujoco_py +import numpy as np + +from metaworld.envs.mujoco.mujoco_env import MujocoEnv, _assert_task_is_set + + +class SawyerMocapBase(MujocoEnv, metaclass=abc.ABCMeta): + """ + Provides some commonly-shared functions for Sawyer Mujoco envs that use + mocap for XYZ control. + """ + mocap_low = np.array([-0.2, 0.5, 0.06]) + mocap_high = np.array([0.2, 0.7, 0.6]) + + def __init__(self, model_name, frame_skip=5): + MujocoEnv.__init__(self, model_name, frame_skip=frame_skip) + self.reset_mocap_welds() + + def get_endeff_pos(self): + return self.data.get_body_xpos('hand').copy() + + def get_env_state(self): + joint_state = self.sim.get_state() + mocap_state = self.data.mocap_pos, self.data.mocap_quat + state = joint_state, mocap_state + return copy.deepcopy(state) + + def set_env_state(self, state): + joint_state, mocap_state = state + self.sim.set_state(joint_state) + mocap_pos, mocap_quat = mocap_state + self.data.set_mocap_pos('mocap', mocap_pos) + self.data.set_mocap_quat('mocap', mocap_quat) + self.sim.forward() + + def __getstate__(self): + state = self.__dict__.copy() + del state['model'] + del state['sim'] + del state['data'] + mjb = self.model.get_mjb() + return {'state': state, 'mjb': mjb, 'env_state': self.get_env_state()} + + def __setstate__(self, state): + self.__dict__ = state['state'] + self.model = mujoco_py.load_model_from_mjb(state['mjb']) + self.sim = mujoco_py.MjSim(self.model) + self.data = self.sim.data + self.set_env_state(state['env_state']) + + def reset_mocap_welds(self): + """Resets the mocap welds that we use for actuation.""" + sim = self.sim + if sim.model.nmocap > 0 and sim.model.eq_data is not None: + for i in range(sim.model.eq_data.shape[0]): + if sim.model.eq_type[i] == mujoco_py.const.EQ_WELD: + sim.model.eq_data[i, :] = np.array( + [0., 0., 0., 1., 0., 0., 0.]) + sim.forward() + + +class SawyerXYZEnv(SawyerMocapBase, metaclass=abc.ABCMeta): + _HAND_SPACE = Box( + np.array([-0.525, .35, -.0525]), + np.array([+0.525, 1.025, .525]) + ) + max_path_length = 200 + + def __init__( + self, + model_name, + frame_skip=5, + hand_low=(-0.2, 0.55, 0.05), + hand_high=(0.2, 0.75, 0.3), + mocap_low=None, + mocap_high=None, + action_scale=1./100, + action_rot_scale=1., + ): + super().__init__(model_name, frame_skip=frame_skip) + self.random_init = True + self.action_scale = action_scale + self.action_rot_scale = action_rot_scale + self.hand_low = np.array(hand_low) + self.hand_high = np.array(hand_high) + if mocap_low is None: + mocap_low = hand_low + if mocap_high is None: + mocap_high = hand_high + self.mocap_low = np.hstack(mocap_low) + self.mocap_high = np.hstack(mocap_high) + self.curr_path_length = 0 + self._freeze_rand_vec = True + self._last_rand_vec = None + + # We use continuous goal space by default and + # can discretize the goal space by calling + # the `discretize_goal_space` method. + self.discrete_goal_space = None + self.discrete_goals = [] + self.active_discrete_goal = None + + self.action_space = Box( + np.array([-1, -1, -1, -1]), + np.array([+1, +1, +1, +1]), + ) + + self._pos_obj_max_len = 6 + self._pos_obj_possible_lens = (3, 6) + + self._set_task_called = False + self._partially_observable = True + + self.hand_init_pos = None # OVERRIDE ME + self._target_pos = None # OVERRIDE ME + self._random_reset_space = None # OVERRIDE ME + + def _set_task_inner(self): + # Doesn't absorb "extra" kwargs, to ensure nothing's missed. + pass + + def set_task(self, task): + self._set_task_called = True + data = pickle.loads(task.data) + assert isinstance(self, data['env_cls']) + del data['env_cls'] + self._last_rand_vec = data['rand_vec'] + # Originally set to True + self._freeze_rand_vec = False + self._last_rand_vec = data['rand_vec'] + del data['rand_vec'] + self._partially_observable = data['partially_observable'] + del data['partially_observable'] + self._set_task_inner(**data) + + def set_xyz_action(self, action): + action = np.clip(action, -1, 1) + pos_delta = action * self.action_scale + new_mocap_pos = self.data.mocap_pos + pos_delta[None] + + new_mocap_pos[0, :] = np.clip( + new_mocap_pos[0, :], + self.mocap_low, + self.mocap_high, + ) + self.data.set_mocap_pos('mocap', new_mocap_pos) + self.data.set_mocap_quat('mocap', np.array([1, 0, 1, 0])) + + def discretize_goal_space(self, goals): + assert False + assert len(goals) >= 1 + self.discrete_goals = goals + # update the goal_space to a Discrete space + self.discrete_goal_space = Discrete(len(self.discrete_goals)) + + # Belows are methods for using the new wrappers. + # `sample_goals` is implmented across the sawyer_xyz + # as sampling from the task lists. This will be done + # with the new `discrete_goals`. After all the algorithms + # conform to this API (i.e. using the new wrapper), we can + # just remove the underscore in all method signature. + def sample_goals_(self, batch_size): + assert False + if self.discrete_goal_space is not None: + return [self.discrete_goal_space.sample() for _ in range(batch_size)] + else: + return [self.goal_space.sample() for _ in range(batch_size)] + + def set_goal_(self, goal): + assert False + if self.discrete_goal_space is not None: + self.active_discrete_goal = goal + self.goal = self.discrete_goals[goal] + self._target_pos_idx = np.zeros(len(self.discrete_goals)) + self._target_pos_idx[goal] = 1. + else: + self.goal = goal + + def _set_obj_xyz(self, pos): + qpos = self.data.qpos.flat.copy() + qvel = self.data.qvel.flat.copy() + qpos[9:12] = pos.copy() + qvel[9:15] = 0 + self.set_state(qpos, qvel) + + def _get_site_pos(self, siteName): + _id = self.model.site_names.index(siteName) + return self.data.site_xpos[_id].copy() + + def _set_pos_site(self, name, pos): + """Sets the position of the site corresponding to `name` + + Args: + name (str): The site's name + pos (np.ndarray): Flat, 3 element array indicating site's location + """ + assert isinstance(pos, np.ndarray) + assert pos.ndim == 1 + + self.data.site_xpos[self.model.site_name2id(name)] = pos[:3] + + @property + def _target_site_config(self): + """Retrieves site name(s) and position(s) corresponding to env targets + + :rtype: list of (str, np.ndarray) + """ + return [('goal', self._target_pos)] + + def _get_pos_objects(self): + """Retrieves object position(s) from mujoco properties or instance vars + + Returns: + np.ndarray: Flat array (usually 3 elements) representing the + object(s)' position(s) + """ + # Throw error rather than making this an @abc.abstractmethod so that + # V1 environments don't have to implement it + raise NotImplementedError + + def _get_pos_goal(self): + """Retrieves goal position from mujoco properties or instance vars + + Returns: + np.ndarray: Flat array (3 elements) representing the goal position + """ + assert isinstance(self._target_pos, np.ndarray) + assert self._target_pos.ndim == 1 + return self._target_pos + + def _get_obs(self): + """Combines positions of the end effector, object(s) and goal into a + single flat observation + + Returns: + np.ndarray: The flat observation array (12 elements) + """ + pos_hand = self.get_endeff_pos() + + pos_obj_padded = np.zeros(self._pos_obj_max_len) + pos_obj = self._get_pos_objects() + assert len(pos_obj) in self._pos_obj_possible_lens + pos_obj_padded[:len(pos_obj)] = pos_obj + + pos_goal = self._get_pos_goal() + if self._partially_observable: + pos_goal = np.zeros_like(pos_goal) + + return np.hstack((pos_hand, pos_obj_padded, pos_goal)) + + def _get_obs_dict(self): + obs = self._get_obs() + return dict( + state_observation=obs, + state_desired_goal=self._get_pos_goal(), + state_achieved_goal=obs[3:-3], + ) + + @property + def observation_space(self): + obj_low = np.full(6, -np.inf) + obj_high = np.full(6, +np.inf) + goal_low = np.zeros(3) if self._partially_observable \ + else self.goal_space.low + goal_high = np.zeros(3) if self._partially_observable \ + else self.goal_space.high + return Box( + np.hstack((self._HAND_SPACE.low, obj_low, goal_low)), + np.hstack((self._HAND_SPACE.high, obj_high, goal_high)) + ) + + @_assert_task_is_set + def step(self, action): + self.set_xyz_action(action[:3]) + self.do_simulation([action[-1], -action[-1]]) + + for site in self._target_site_config: + self._set_pos_site(*site) + + return self._get_obs() + + def reset(self): + self.curr_path_length = 0 + return super().reset() + + def _reset_hand(self, steps=50): + for _ in range(steps): + self.data.set_mocap_pos('mocap', self.hand_init_pos) + self.data.set_mocap_quat('mocap', np.array([1, 0, 1, 0])) + self.do_simulation([-1, 1], self.frame_skip) + + def _get_state_rand_vec(self): + if self._freeze_rand_vec: + assert self._last_rand_vec is not None + return self._last_rand_vec + else: + rand_vec = np.random.uniform( + self._random_reset_space.low, + self._random_reset_space.high, + size=self._random_reset_space.low.size) + self._last_rand_vec = rand_vec + return rand_vec