diff --git a/docs/superpowers/plans/2026-03-31-raw-action-trajectory-viewer.md b/docs/superpowers/plans/2026-03-31-raw-action-trajectory-viewer.md new file mode 100644 index 0000000..93b42c7 --- /dev/null +++ b/docs/superpowers/plans/2026-03-31-raw-action-trajectory-viewer.md @@ -0,0 +1,26 @@ +# Raw Action Trajectory Viewer Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 在可交互 MuJoCo 仿真窗口中,把 rollout 导出的 raw EE action 轨迹用红色轨迹标出来并启动仿真供人工查看。 + +**Architecture:** 读取已有 trajectory artifact 中的 raw_action / step 数据,生成左右臂末端轨迹点,并在 viewer 渲染循环中持续注入红色 marker。实现尽量独立为一个可复用的小脚本,避免影响训练/评估主路径。 + +**Tech Stack:** Python, NumPy, MuJoCo viewer, unittest/mock. + +--- + +### Task 1: 抽取 raw_action 轨迹并生成可视化点集 +- [ ] 写失败测试,验证从 trajectory.npz 提取左右臂轨迹点 +- [ ] 实现最小 helper +- [ ] 运行测试确认通过 + +### Task 2: 在 viewer 中渲染红色轨迹并支持交互查看 +- [ ] 写失败测试,验证 marker 配置/调用 +- [ ] 实现 viewer 可视化脚本 +- [ ] 运行测试确认通过 + +### Task 3: 启动真实仿真窗口供人工查看 +- [ ] 用现有 trajectory artifact 启动 viewer +- [ ] 确认窗口可交互、红线出现 +- [ ] 向用户汇报启动方式与脚本路径 diff --git a/roboimi/demos/view_raw_action_trajectory.py b/roboimi/demos/view_raw_action_trajectory.py new file mode 100644 index 0000000..f44d756 --- /dev/null +++ b/roboimi/demos/view_raw_action_trajectory.py @@ -0,0 +1,36 @@ +import argparse +import numpy as np + +from roboimi.utils.raw_action_trajectory_viewer import launch_raw_action_trajectory_viewer + + +def parse_args(): + parser = argparse.ArgumentParser(description="Launch an interactive MuJoCo viewer with raw-action trajectory overlay.") + parser.add_argument("trajectory_path", help="Path to raw_action.npy or trajectory.npz") + parser.add_argument("--task-name", default="sim_transfer") + parser.add_argument("--line-radius", type=float, default=0.004) + parser.add_argument("--max-markers", type=int, default=1500) + parser.add_argument( + "--box-pos", + type=float, + nargs=3, + default=None, + help="Optional box xyz to use when resetting the environment", + ) + return parser.parse_args() + + +def main(): + args = parse_args() + box_pos = np.asarray(args.box_pos, dtype=np.float32) if args.box_pos is not None else None + launch_raw_action_trajectory_viewer( + args.trajectory_path, + task_name=args.task_name, + line_radius=args.line_radius, + max_markers=args.max_markers, + box_pos=box_pos, + ) + + +if __name__ == "__main__": + main() diff --git a/roboimi/utils/raw_action_trajectory_viewer.py b/roboimi/utils/raw_action_trajectory_viewer.py new file mode 100644 index 0000000..6731729 --- /dev/null +++ b/roboimi/utils/raw_action_trajectory_viewer.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import math +import time +from pathlib import Path +from typing import Iterable + +import cv2 +import mujoco +import numpy as np + +from roboimi.assets.robots.diana_med import BiDianaMed +from roboimi.envs.mujoco_base import MujocoEnv +from roboimi.envs.double_pos_ctrl_env import make_sim_env +from roboimi.utils.act_ex_utils import sample_transfer_pose + + +def _load_raw_action_array(path: str | Path) -> np.ndarray: + path = Path(path) + if path.suffix == ".npy": + raw_action = np.load(path) + elif path.suffix == ".npz": + archive = np.load(path) + if "raw_action" in archive: + raw_action = archive["raw_action"] + elif "raw_predicted_ee_action" in archive: + raw_action = archive["raw_predicted_ee_action"] + else: + raise KeyError(f"{path} does not contain raw_action") + else: + raise ValueError(f"unsupported trajectory file: {path}") + raw_action = np.asarray(raw_action, dtype=np.float32) + if raw_action.ndim != 2 or raw_action.shape[1] < 10: + raise ValueError(f"raw_action must have shape (T, 16)-like, got {raw_action.shape}") + return raw_action + + +def disable_cv2_highgui(cv2_module=cv2): + original = { + "namedWindow": cv2_module.namedWindow, + "imshow": cv2_module.imshow, + "waitKey": cv2_module.waitKey, + } + + cv2_module.namedWindow = lambda *args, **kwargs: None + cv2_module.imshow = lambda *args, **kwargs: None + cv2_module.waitKey = lambda *args, **kwargs: 1 + + def restore(): + cv2_module.namedWindow = original["namedWindow"] + cv2_module.imshow = original["imshow"] + cv2_module.waitKey = original["waitKey"] + + return restore + + +def set_transfer_box_pose(mj_data, box_pos: np.ndarray) -> None: + box_pos = np.asarray(box_pos, dtype=np.float64) + if box_pos.shape != (3,): + raise ValueError(f"box_pos must have shape (3,), got {box_pos.shape}") + joint = mj_data.joint("red_box_joint") + joint.qpos[0] = box_pos[0] + joint.qpos[1] = box_pos[1] + joint.qpos[2] = box_pos[2] + joint.qpos[3] = 1.0 + joint.qpos[4] = 0.0 + joint.qpos[5] = 0.0 + joint.qpos[6] = 0.0 + + +def load_raw_action_positions(path: str | Path) -> dict[str, np.ndarray]: + raw_action = _load_raw_action_array(path) + return { + "left": raw_action[:, :3].astype(np.float32, copy=True), + "right": raw_action[:, 7:10].astype(np.float32, copy=True), + } + + +def _downsample_points(points: np.ndarray, stride: int) -> np.ndarray: + sampled = points[::stride] + if len(sampled) == 0: + return points + if not np.array_equal(sampled[-1], points[-1]): + sampled = np.concatenate([sampled, points[-1:]], axis=0) + return sampled + + +def build_trajectory_capsule_markers( + positions: dict[str, np.ndarray], + *, + max_markers: int, + radius: float = 0.003, + rgba: tuple[float, float, float, float] = (1.0, 0.0, 0.0, 1.0), +) -> list[dict]: + total_segments = sum(max(len(points) - 1, 0) for points in positions.values()) + if total_segments == 0: + return [] + stride = max(1, math.ceil(total_segments / max_markers)) + markers = [] + for points in positions.values(): + sampled = _downsample_points(np.asarray(points, dtype=np.float64), stride) + for idx in range(len(sampled) - 1): + markers.append( + { + "from": sampled[idx], + "to": sampled[idx + 1], + "rgba": rgba, + "radius": float(radius), + } + ) + return markers[:max_markers] + + +def apply_capsule_markers_to_scene(user_scn, markers: Iterable[dict]) -> None: + user_scn.ngeom = 0 + for marker in markers: + if user_scn.ngeom >= user_scn.maxgeom: + break + geom = user_scn.geoms[user_scn.ngeom] + mujoco.mjv_initGeom( + geom, + mujoco.mjtGeom.mjGEOM_CAPSULE, + np.zeros(3, dtype=np.float64), + np.zeros(3, dtype=np.float64), + np.eye(3, dtype=np.float64).reshape(-1), + np.asarray(marker["rgba"], dtype=np.float32), + ) + mujoco.mjv_connector( + geom, + mujoco.mjtGeom.mjGEOM_CAPSULE, + float(marker["radius"]), + np.asarray(marker["from"], dtype=np.float64), + np.asarray(marker["to"], dtype=np.float64), + ) + user_scn.ngeom += 1 + + +def launch_raw_action_trajectory_viewer( + trajectory_path: str | Path, + *, + task_name: str = "sim_transfer", + line_radius: float = 0.004, + max_markers: int = 1500, + box_pos: np.ndarray | None = None, + disable_camera_window: bool = True, +): + positions = load_raw_action_positions(trajectory_path) + if task_name != "sim_transfer": + raise NotImplementedError(f"unsupported task_name: {task_name}") + if box_pos is None: + box_pos = sample_transfer_pose() + + robot = BiDianaMed() + viewer_env = MujocoEnv(robot=robot, is_render=True, renderer="viewer", control_freq=30) + viewer_env.reset() + set_transfer_box_pose(viewer_env.mj_data, box_pos) + mujoco.mj_forward(viewer_env.mj_model, viewer_env.mj_data) + markers = build_trajectory_capsule_markers( + positions, + max_markers=max_markers, + radius=line_radius, + ) + + if viewer_env.viewer is None or getattr(viewer_env.viewer, "user_scn", None) is None: + raise RuntimeError("viewer does not expose user_scn; cannot render trajectory overlay") + + try: + while viewer_env.viewer.is_running() and not viewer_env.exit_flag: + with viewer_env.viewer.lock(): + apply_capsule_markers_to_scene(viewer_env.viewer.user_scn, markers) + viewer_env.render() + time.sleep(1 / 60.0) + finally: + viewer_env.exit_flag = True + if getattr(viewer_env, "viewer", None) is not None: + viewer_env.viewer.close() diff --git a/tests/test_raw_action_trajectory_viewer.py b/tests/test_raw_action_trajectory_viewer.py new file mode 100644 index 0000000..8a15524 --- /dev/null +++ b/tests/test_raw_action_trajectory_viewer.py @@ -0,0 +1,119 @@ +import tempfile +import unittest +from pathlib import Path +from types import SimpleNamespace +from unittest import mock + +import numpy as np + +from roboimi.utils import raw_action_trajectory_viewer as traj_view + + +class RawActionTrajectoryViewerTest(unittest.TestCase): + def test_set_transfer_box_pose_writes_joint_qpos(self): + joint_qpos = np.zeros(7, dtype=np.float64) + + class _FakeJoint: + def __init__(self, qpos): + self.qpos = qpos + + class _FakeData: + def joint(self, name): + assert name == "red_box_joint" + return _FakeJoint(joint_qpos) + + traj_view.set_transfer_box_pose(_FakeData(), np.array([0.2, -0.1, 1.05], dtype=np.float64)) + + np.testing.assert_array_equal( + joint_qpos, + np.array([0.2, -0.1, 1.05, 1.0, 0.0, 0.0, 0.0], dtype=np.float64), + ) + + def test_disable_cv2_highgui_temporarily_replaces_gui_calls(self): + fake_cv2 = SimpleNamespace( + namedWindow=lambda *args, **kwargs: "named", + imshow=lambda *args, **kwargs: "imshow", + waitKey=lambda *args, **kwargs: "wait", + ) + + restore = traj_view.disable_cv2_highgui(fake_cv2) + self.assertIsNone(fake_cv2.namedWindow("x")) + self.assertIsNone(fake_cv2.imshow("x", None)) + self.assertEqual(fake_cv2.waitKey(1), 1) + + restore() + self.assertEqual(fake_cv2.namedWindow("x"), "named") + self.assertEqual(fake_cv2.imshow("x", None), "imshow") + self.assertEqual(fake_cv2.waitKey(1), "wait") + + def test_load_raw_action_positions_from_npz(self): + raw_action = np.array( + [ + [1.0, 2.0, 3.0, 0, 0, 0, 1, 11.0, 12.0, 13.0, 0, 0, 0, 1, -1, -1], + [4.0, 5.0, 6.0, 0, 0, 0, 1, 14.0, 15.0, 16.0, 0, 0, 0, 1, -1, -1], + ], + dtype=np.float32, + ) + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "trajectory.npz" + np.savez(path, raw_action=raw_action) + + positions = traj_view.load_raw_action_positions(path) + + np.testing.assert_array_equal( + positions["left"], + np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=np.float32), + ) + np.testing.assert_array_equal( + positions["right"], + np.array([[11.0, 12.0, 13.0], [14.0, 15.0, 16.0]], dtype=np.float32), + ) + + def test_build_red_capsule_segments_downsamples_to_fit_scene_limit(self): + left = np.stack([np.array([float(i), 0.0, 0.0], dtype=np.float32) for i in range(6)]) + right = np.stack([np.array([float(i), 1.0, 0.0], dtype=np.float32) for i in range(6)]) + + markers = traj_view.build_trajectory_capsule_markers( + {"left": left, "right": right}, + max_markers=4, + radius=0.01, + ) + + self.assertLessEqual(len(markers), 4) + self.assertTrue(all(marker["rgba"] == (1.0, 0.0, 0.0, 1.0) for marker in markers)) + self.assertTrue(all(marker["radius"] == 0.01 for marker in markers)) + + def test_apply_capsule_markers_populates_user_scene(self): + fake_scene = SimpleNamespace( + maxgeom=3, + ngeom=99, + geoms=[object(), object(), object()], + ) + markers = [ + { + "from": np.array([0.0, 0.0, 0.0], dtype=np.float64), + "to": np.array([1.0, 0.0, 0.0], dtype=np.float64), + "rgba": (1.0, 0.0, 0.0, 1.0), + "radius": 0.01, + }, + { + "from": np.array([0.0, 1.0, 0.0], dtype=np.float64), + "to": np.array([1.0, 1.0, 0.0], dtype=np.float64), + "rgba": (1.0, 0.0, 0.0, 1.0), + "radius": 0.01, + }, + ] + + with mock.patch.object(traj_view.mujoco, "mjv_initGeom") as init_geom, mock.patch.object( + traj_view.mujoco, + "mjv_connector", + ) as connector: + traj_view.apply_capsule_markers_to_scene(fake_scene, markers) + + self.assertEqual(fake_scene.ngeom, 2) + self.assertEqual(init_geom.call_count, 2) + self.assertEqual(connector.call_count, 2) + + +if __name__ == "__main__": + unittest.main()