feat(eval): export rollout video timing and ee trajectory

This commit is contained in:
Logic
2026-03-31 15:34:28 +08:00
parent cb79e00546
commit 424c265823
5 changed files with 886 additions and 101 deletions

View File

@@ -0,0 +1,228 @@
import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import numpy as np
import torch
from omegaconf import OmegaConf
from roboimi.demos.vla_scripts import eval_vla
class _FakeAgent:
def __init__(self, actions):
self._actions = [torch.tensor(action, dtype=torch.float32) for action in actions]
self.reset_calls = 0
def eval(self):
return self
def to(self, _device):
return self
def reset(self):
self.reset_calls += 1
def select_action(self, observation):
del observation
return self._actions.pop(0)
class _FakeEnv:
def __init__(self):
self.step_count = 0
self.rew = 0.0
self.render_calls = 0
self.reset_calls = []
def reset(self, box_pos):
self.reset_calls.append(np.array(box_pos, copy=True))
self.step_count = 0
self.rew = 0.0
def _get_image_obs(self):
frame_value = self.step_count
front = np.full((6, 8, 3), fill_value=frame_value, dtype=np.uint8)
top = np.full((6, 8, 3), fill_value=frame_value + 20, dtype=np.uint8)
return {"images": {"front": front, "top": top}}
def _get_qpos_obs(self):
return {"qpos": np.arange(16, dtype=np.float32)}
def step(self, action):
del action
self.step_count += 1
self.rew = float(self.step_count)
def render(self):
self.render_calls += 1
def getBodyPos(self, name):
base = float(self.step_count)
if name == 'eef_left':
return np.array([base, base + 0.1, base + 0.2], dtype=np.float32)
if name == 'eef_right':
return np.array([base + 1.0, base + 1.1, base + 1.2], dtype=np.float32)
raise KeyError(name)
def getBodyQuat(self, name):
base = float(self.step_count)
if name == 'eef_left':
return np.array([1.0, base, 0.0, 0.0], dtype=np.float32)
if name == 'eef_right':
return np.array([1.0, 0.0, base, 0.0], dtype=np.float32)
raise KeyError(name)
class _FakeVideoWriter:
def __init__(self, output_path):
self.output_path = Path(output_path)
self.output_path.parent.mkdir(parents=True, exist_ok=True)
self.output_path.write_bytes(b'')
self.frames = []
self.released = False
def isOpened(self):
return True
def write(self, frame):
self.frames.append(np.array(frame, copy=True))
def release(self):
self.released = True
self.output_path.write_bytes(b'fake-mp4')
class EvalVLARolloutArtifactsTest(unittest.TestCase):
def test_eval_config_exposes_rollout_artifact_defaults(self):
eval_cfg = OmegaConf.load(Path('roboimi/vla/conf/eval/eval.yaml'))
self.assertIn('artifact_dir', eval_cfg)
self.assertFalse(eval_cfg.save_summary_json)
self.assertFalse(eval_cfg.save_trajectory_npz)
self.assertFalse(eval_cfg.record_video)
self.assertIsNone(eval_cfg.artifact_dir)
self.assertIsNone(eval_cfg.video_camera_name)
self.assertEqual(eval_cfg.video_fps, 30)
def test_run_eval_exports_npz_summary_and_video_artifacts(self):
actions = [
np.arange(16, dtype=np.float32),
np.arange(16, dtype=np.float32) + 10.0,
]
fake_agent = _FakeAgent(actions)
fake_env = _FakeEnv()
with tempfile.TemporaryDirectory() as tmpdir:
cfg = OmegaConf.create(
{
'agent': {},
'eval': {
'ckpt_path': 'checkpoints/vla_model_best.pt',
'num_episodes': 1,
'max_timesteps': 2,
'device': 'cpu',
'task_name': 'sim_transfer',
'camera_names': ['front', 'top'],
'use_smoothing': True,
'smooth_alpha': 0.5,
'verbose_action': False,
'headless': True,
'artifact_dir': tmpdir,
'save_summary_json': True,
'save_trajectory_npz': True,
'record_video': True,
'video_camera_name': 'front',
'video_fps': 12,
},
}
)
writer_holder = {}
def fake_open_video_writer(output_path, frame_size, fps):
self.assertEqual(frame_size, (8, 6))
self.assertEqual(fps, 12)
writer = _FakeVideoWriter(output_path)
writer_holder['writer'] = writer
return writer
with mock.patch.object(
eval_vla,
'load_checkpoint',
return_value=(fake_agent, None),
), mock.patch.object(
eval_vla,
'make_sim_env',
return_value=fake_env,
), mock.patch.object(
eval_vla,
'sample_transfer_pose',
return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32),
), mock.patch.object(
eval_vla,
'tqdm',
side_effect=lambda iterable, **kwargs: iterable,
), mock.patch.object(
eval_vla,
'_open_video_writer',
side_effect=fake_open_video_writer,
):
summary = eval_vla._run_eval(cfg)
artifacts = summary['artifacts']
trajectory_path = Path(artifacts['trajectory_npz'])
summary_path = Path(artifacts['summary_json'])
video_path = Path(artifacts['video_mp4'])
self.assertEqual(Path(artifacts['output_dir']), Path(tmpdir))
self.assertEqual(artifacts['video_camera_name'], 'front')
self.assertTrue(trajectory_path.exists())
self.assertTrue(summary_path.exists())
self.assertTrue(video_path.exists())
rollout_npz = np.load(trajectory_path)
np.testing.assert_array_equal(rollout_npz['episode_index'], np.array([0, 0]))
np.testing.assert_array_equal(rollout_npz['timestep'], np.array([0, 1]))
np.testing.assert_array_equal(rollout_npz['reward'], np.array([1.0, 2.0], dtype=np.float32))
np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][0], actions[0])
np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][1], actions[1])
np.testing.assert_array_equal(rollout_npz['executed_ee_action'][0], actions[0])
np.testing.assert_array_equal(
rollout_npz['executed_ee_action'][1],
(actions[0] + actions[1]) / 2.0,
)
np.testing.assert_array_equal(
rollout_npz['left_ee_pos'],
np.array([[1.0, 1.1, 1.2], [2.0, 2.1, 2.2]], dtype=np.float32),
)
np.testing.assert_array_equal(
rollout_npz['right_ee_pos'],
np.array([[2.0, 2.1, 2.2], [3.0, 3.1, 3.2]], dtype=np.float32),
)
self.assertEqual(rollout_npz['obs_read_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['preprocess_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['inference_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['env_step_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['total_time_ms'].shape, (2,))
writer = writer_holder['writer']
self.assertTrue(writer.released)
self.assertEqual(len(writer.frames), 2)
np.testing.assert_array_equal(writer.frames[0], np.zeros((6, 8, 3), dtype=np.uint8))
np.testing.assert_array_equal(writer.frames[1], np.full((6, 8, 3), 1, dtype=np.uint8))
with summary_path.open('r', encoding='utf-8') as fh:
saved_summary = json.load(fh)
self.assertEqual(saved_summary['artifacts']['trajectory_npz'], str(trajectory_path))
self.assertEqual(saved_summary['artifacts']['video_mp4'], str(video_path))
self.assertEqual(saved_summary['episode_rewards'], [3.0])
self.assertAlmostEqual(summary['avg_reward'], 3.0)
self.assertIn('avg_obs_read_time_ms', summary)
self.assertIn('avg_env_step_time_ms', summary)
if __name__ == '__main__':
unittest.main()