import numpy as np import time import copy as cp from roboimi.envs.mujoco_base import MujocoEnv import cv2 from roboimi.utils.interpolators import OTG import mujoco as mj import threading import collections import time from roboimi.utils.interpolators import OTG class DualDianaMed(MujocoEnv): """ :param is_render: Choose if use the renderer to render the scene or not. :param renderer: Choose official renderer with "viewer", another renderer with "mujoco_viewer" :param control_freq: Upper-layer control frequency. Note that high frequency will cause high time-lag. :param is_interpolate: Use interpolator while stepping. """ def __init__(self, robot=None, is_render=False, renderer="viewer", control_freq=20, cam_view=None, is_interpolate=False ): super().__init__( robot=robot, is_render=is_render, renderer=renderer, control_freq=control_freq ) self.arm_left = self.robot.left_arm self.arm_right = self.robot.right_arm self.base_time = self.control_timestep self.cam = cam_view self.interpolator_left = None self.interpolator_right = None self.compute_qpos = np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) if is_interpolate: self._initInterpolator(self.robot.arms) #camera view setting self.r_vis = None self.l_vis = None self.top = None self.front = None self.obs = None self.rew = None self._offscreen_renderer = None def actuate_J(self, q_target, qdot_target, Arm): """ Compute desired torque with robot dynamics modeling: > M(q)qdd + C(q, qd)qd + G(q) + tau_F(qd) = tau_ctrl + tau_env :param q_target: joint position :param qdot_target: joint velocity """ # print(Arm.kdl_solver.NbOfJnt) acc_desire = [ (self.robot.kp[i] * (q_target[i] - Arm.arm_qpos[i]) - (self.robot.kd[i] * Arm.arm_qvel[i])) for i in range(Arm.jnt_num)] qM = Arm.kdl_solver.getInertiaMat([Arm.arm_qpos[i] for i in range(len(Arm.joint_index))]) tau_target = np.dot(qM, acc_desire) + np.array( [self.mj_data.joint(Arm.joint_index[i]).qfrc_bias[0] for i in range(len(Arm.joint_index))]) # Send torque to simulation for i in range(7): self.mj_data.actuator(Arm.actuator_index[i]).ctrl = tau_target[i] def reset(self): self._setInitPose() mj.mj_step(self.mj_model, self.mj_data) if self.interpolator_left is not None and self.interpolator_right is not None: self.interpolator_left.setOTGParam(self.robot.arms[0].arm_qpos, np.zeros(len(self.robot.arms[0].joint_index))) self.interpolator_right.setOTGParam(self.robot.arms[1].arm_qpos, np.zeros(len(self.robot.arms[1].joint_index))) def step(self,action): self.compute_qpos = action #for observation ! if self.interpolator_left is not None and self.interpolator_right is not None: self.interpolator_left.updateInput(action[:7], control_cycle=self.base_time) self.interpolator_right.updateInput(action[7:-2], control_cycle=self.base_time) ctrl_cur_time = time.time() for i in range(int(self.control_timestep / self.model_timestep)): if int(self.control_timestep / self.model_timestep) == 0: raise ValueError("Control frequency is too low. Checkout you are not in renderer mode." "Current Model-Timestep:{}".format(self.model_timestep)) super().step(action) self.base_time = time.time() - ctrl_cur_time self.obs = self._get_obs() def preStep(self, action): ''' action size: left joints: 7 right joints: 7 left gripper left: 1 right gripper left: 1 total: 16 ''' if len(action) == 16: action_gripper_left = action[-2] action_gripper_right = action[-1] q_target_l = action[:7], q_target_r= action[7:-2], if self.interpolator_left is not None and self.interpolator_right is not None: try: q_target_l, _ = self.interpolator_left.updateState() q_target_r, _ = self.interpolator_right.updateState() except ValueError: print(action) self.actuate_J(q_target_l, np.zeros(7), self.arm_left) self.actuate_J(q_target_r, np.zeros(7), self.arm_right) self.gripper_ctrl(action_gripper_left,self.arm_left) self.gripper_ctrl(action_gripper_right,self.arm_right) else: raise AttributeError def gripper_ctrl(self,ctrl,Arm): ''' open: 100 close: -100 ''' if -100 <= ctrl <= 100: self.mj_data.actuator(Arm.actuator_index[7]).ctrl = ctrl else: if ctrl>100: self.mj_data.actuator(Arm.actuator_index[7]).ctrl = 100 else: self.mj_data.actuator(Arm.actuator_index[7]).ctrl = -100 # raise ValueError("the torq should be given between -100 and 100 ") @property def get_obs_qpos(self): qpos_left = np.zeros(self.arm_left.jnt_num) qpos_right = np.zeros(self.arm_right.jnt_num) for i in range(self.arm_left.jnt_num): qpos_left[i] = cp.deepcopy(self.mj_data.joint(self.arm_left.joint_index[i]).qpos[0]) for i in range(self.arm_right.jnt_num): qpos_right[i] = cp.deepcopy(self.mj_data.joint(self.arm_right.joint_index[i]).qpos[0]) return np.hstack([qpos_left,self.mj_data.actuator(self.arm_left.actuator_index[7]).ctrl, qpos_right,self.mj_data.actuator(self.arm_right.actuator_index[7]).ctrl]) def _get_obs(self): if not self.is_render: self._update_camera_images_sync() obs = collections.OrderedDict() obs['qpos'] = self.get_obs_qpos obs['action'] = self.compute_qpos obs['images'] = dict() obs['images']['r_vis'] = self.r_vis obs['images']['l_vis'] = self.l_vis obs['images']['top'] = self.top obs['images']['front'] = self.front return obs def _get_image_obs(self): if not self.is_render: self._update_camera_images_sync() obs = collections.OrderedDict() obs['images'] = dict() obs['images']['r_vis'] = self.r_vis obs['images']['l_vis'] = self.l_vis obs['images']['top'] = self.top obs['images']['front'] = self.front return obs def _get_qpos_obs(self): obs = collections.OrderedDict() obs['qpos'] = self.get_obs_qpos return obs def get_env_state(self): raise NotImplementedError def _get_reward(self): raise NotImplementedError @property def cam_view(self): if self.cam == 'r_vis': return self.r_vis elif self.cam == 'l_vis': return self.l_vis elif self.cam == 'top': return self.top elif self.cam == 'front': return self.front else: raise AttributeError("please input right name") def _get_or_create_offscreen_renderer(self): renderer = getattr(self, '_offscreen_renderer', None) if renderer is None: renderer = mj.Renderer(self.mj_model, height=480, width=640) self._offscreen_renderer = renderer return renderer def _render_camera_set(self, img_renderer): img_renderer.update_scene(self.mj_data, camera="rs_cam_right") self.r_vis = img_renderer.render()[:, :, ::-1] img_renderer.update_scene(self.mj_data, camera="rs_cam_left") self.l_vis = img_renderer.render()[:, :, ::-1] img_renderer.update_scene(self.mj_data, camera="top") self.top = img_renderer.render()[:, :, ::-1] img_renderer.update_scene(self.mj_data, camera="front") self.front = img_renderer.render()[:, :, ::-1] def _update_camera_images_sync(self): img_renderer = self._get_or_create_offscreen_renderer() self._render_camera_set(img_renderer) def camera_viewer(self): img_renderer = self._get_or_create_offscreen_renderer() show_gui = self.is_render if show_gui: cv2.namedWindow('Cam view',cv2.WINDOW_NORMAL) while not self.exit_flag: self._render_camera_set(img_renderer) if show_gui: if self.cam_view is not None: cv2.imshow('Cam view', self.cam_view) cv2.waitKey(1) def cam_start(self): if not self.is_render: self.cam_thread = None return self.cam_thread = threading.Thread(target=self.camera_viewer,daemon=True) self.cam_thread.start() def _initInterpolator(self, Arm): self.interpolator_left = OTG( OTG_Dof=self.robot.left_arm.jnt_num, control_cycle=0.0005, max_velocity=0.05, max_acceleration=0.1, max_jerk=0.2 ) self.interpolator_left.setOTGParam(Arm[0].arm_qpos, Arm[0].arm_qvel) self.interpolator_right = OTG( OTG_Dof=self.robot.right_arm.jnt_num, control_cycle=0.0005, max_velocity=0.05, max_acceleration=0.1, max_jerk=0.2 ) self.interpolator_right.setOTGParam(Arm[1].arm_qpos, Arm[1].arm_qvel) if __name__ == "__main__": from roboimi.assets.robots.diana_med import BiDianaMed env = DualDianaMed( robot=BiDianaMed(), is_render=True, control_freq=200, cam_view='top', is_interpolate=True ) env.reset() for t in range(int(1e6)): action_l = np.array([0.0, 0.0, 0.0, 0.0, 0.0 ,0.0, 0.0]) action_r = np.array([0.0, 0.0, 0.0, 0.0, 0.0 ,0.0, 0.0]) gripper = np.array([-100.0,-100.0]) action = np.hstack((action_l,action_r,gripper)) env.step(action) l_q = np.zeros(7) l_q[3] = 0.0 r_q = np.zeros(7) r_q[3] = 0.0 print("\nkdl fk left =",env.arm_left.kdl_solver.getEEtf(l_q)) # print("\n left pos=",env.robot.robot_data.body("left_link7").xpos,"\n") # print("\nleft xmat",env.robot.robot_data.body("left_link7").xmat,"\n") print("============================") print("\nkdl fk right =",env.arm_right.kdl_solver.getEEtf(r_q)) # print("\n right pos=",env.robot.robot_data.body("right_link7").xpos,"\n") # print("\nright xmat",env.robot.robot_data.body("right_link7").xmat,"\n") # quat_left = np.zeros(4) # quat_right = np.zeros(4) # mj._functions.mju_mat2Quat(quat_left, rot_left) # print("quat_left =",quat_left,"\n") # mj._functions.mju_mat2Quat(quat_right, rot_right) # print("quat_right =",quat_right,"\n") if env.is_render: env.render()