import json import tempfile import unittest from pathlib import Path from unittest import mock import numpy as np import torch from omegaconf import OmegaConf from roboimi.demos.vla_scripts import eval_vla class _FakeAgent: def __init__(self, actions): self._actions = [torch.tensor(action, dtype=torch.float32) for action in actions] self.reset_calls = 0 def eval(self): return self def to(self, _device): return self def reset(self): self.reset_calls += 1 def select_action(self, observation): del observation return self._actions.pop(0) class _FakeEnv: def __init__(self): self.step_count = 0 self.rew = 0.0 self.render_calls = 0 self.reset_calls = [] def reset(self, box_pos): self.reset_calls.append(np.array(box_pos, copy=True)) self.step_count = 0 self.rew = 0.0 def _get_image_obs(self): frame_value = self.step_count front = np.full((6, 8, 3), fill_value=frame_value, dtype=np.uint8) top = np.full((6, 8, 3), fill_value=frame_value + 20, dtype=np.uint8) return {"images": {"front": front, "top": top}} def _get_qpos_obs(self): return {"qpos": np.arange(16, dtype=np.float32)} def step(self, action): del action self.step_count += 1 self.rew = float(self.step_count) def render(self): self.render_calls += 1 def getBodyPos(self, name): base = float(self.step_count) if name == 'eef_left': return np.array([base, base + 0.1, base + 0.2], dtype=np.float32) if name == 'eef_right': return np.array([base + 1.0, base + 1.1, base + 1.2], dtype=np.float32) raise KeyError(name) def getBodyQuat(self, name): base = float(self.step_count) if name == 'eef_left': return np.array([1.0, base, 0.0, 0.0], dtype=np.float32) if name == 'eef_right': return np.array([1.0, 0.0, base, 0.0], dtype=np.float32) raise KeyError(name) class _FakeVideoWriter: def __init__(self, output_path): self.output_path = Path(output_path) self.output_path.parent.mkdir(parents=True, exist_ok=True) self.output_path.write_bytes(b'') self.frames = [] self.released = False def isOpened(self): return True def write(self, frame): self.frames.append(np.array(frame, copy=True)) def release(self): self.released = True self.output_path.write_bytes(b'fake-mp4') class EvalVLARolloutArtifactsTest(unittest.TestCase): def test_eval_config_exposes_rollout_artifact_defaults(self): eval_cfg = OmegaConf.load(Path('roboimi/vla/conf/eval/eval.yaml')) self.assertIn('artifact_dir', eval_cfg) self.assertFalse(eval_cfg.save_summary_json) self.assertFalse(eval_cfg.save_trajectory_npz) self.assertFalse(eval_cfg.save_trajectory_image) self.assertFalse(eval_cfg.record_video) self.assertIsNone(eval_cfg.artifact_dir) self.assertIsNone(eval_cfg.trajectory_image_camera_name) self.assertIsNone(eval_cfg.video_camera_name) self.assertEqual(eval_cfg.video_fps, 30) def test_run_eval_exports_npz_summary_and_video_artifacts(self): actions = [ np.arange(16, dtype=np.float32), np.arange(16, dtype=np.float32) + 10.0, ] fake_agent = _FakeAgent(actions) fake_env = _FakeEnv() with tempfile.TemporaryDirectory() as tmpdir: cfg = OmegaConf.create( { 'agent': {}, 'eval': { 'ckpt_path': 'checkpoints/vla_model_best.pt', 'num_episodes': 1, 'max_timesteps': 2, 'device': 'cpu', 'task_name': 'sim_transfer', 'camera_names': ['front', 'top'], 'use_smoothing': True, 'smooth_alpha': 0.5, 'verbose_action': False, 'headless': True, 'artifact_dir': tmpdir, 'save_summary_json': True, 'save_trajectory_npz': True, 'save_trajectory_image': True, 'trajectory_image_camera_name': 'front', 'record_video': True, 'video_camera_name': 'front', 'video_fps': 12, }, } ) writer_holder = {} def fake_open_video_writer(output_path, frame_size, fps): self.assertEqual(frame_size, (8, 6)) self.assertEqual(fps, 12) writer = _FakeVideoWriter(output_path) writer_holder['writer'] = writer return writer with mock.patch.object( eval_vla, 'load_checkpoint', return_value=(fake_agent, None), ), mock.patch.object( eval_vla, 'make_sim_env', return_value=fake_env, ), mock.patch.object( eval_vla, 'sample_transfer_pose', return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32), ), mock.patch.object( eval_vla, 'tqdm', side_effect=lambda iterable, **kwargs: iterable, ), mock.patch.object( eval_vla, '_open_video_writer', side_effect=fake_open_video_writer, ): summary = eval_vla._run_eval(cfg) artifacts = summary['artifacts'] trajectory_path = Path(artifacts['trajectory_npz']) summary_path = Path(artifacts['summary_json']) video_path = Path(artifacts['video_mp4']) trajectory_image_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image']) self.assertEqual(Path(artifacts['output_dir']), Path(tmpdir)) self.assertEqual(artifacts['video_camera_name'], 'front') self.assertTrue(trajectory_path.exists()) self.assertTrue(summary_path.exists()) self.assertTrue(video_path.exists()) self.assertTrue(trajectory_image_path.exists()) rollout_npz = np.load(trajectory_path) np.testing.assert_array_equal(rollout_npz['episode_index'], np.array([0, 0])) np.testing.assert_array_equal(rollout_npz['timestep'], np.array([0, 1])) np.testing.assert_array_equal(rollout_npz['reward'], np.array([1.0, 2.0], dtype=np.float32)) np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][0], actions[0]) np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][1], actions[1]) np.testing.assert_array_equal(rollout_npz['executed_ee_action'][0], actions[0]) np.testing.assert_array_equal( rollout_npz['executed_ee_action'][1], (actions[0] + actions[1]) / 2.0, ) np.testing.assert_array_equal( rollout_npz['left_ee_pos'], np.array([[1.0, 1.1, 1.2], [2.0, 2.1, 2.2]], dtype=np.float32), ) np.testing.assert_array_equal( rollout_npz['right_ee_pos'], np.array([[2.0, 2.1, 2.2], [3.0, 3.1, 3.2]], dtype=np.float32), ) self.assertEqual(rollout_npz['obs_read_time_ms'].shape, (2,)) self.assertEqual(rollout_npz['preprocess_time_ms'].shape, (2,)) self.assertEqual(rollout_npz['inference_time_ms'].shape, (2,)) self.assertEqual(rollout_npz['env_step_time_ms'].shape, (2,)) self.assertEqual(rollout_npz['total_time_ms'].shape, (2,)) writer = writer_holder['writer'] self.assertTrue(writer.released) self.assertEqual(len(writer.frames), 2) np.testing.assert_array_equal(writer.frames[0], np.zeros((6, 8, 3), dtype=np.uint8)) np.testing.assert_array_equal(writer.frames[1], np.full((6, 8, 3), 1, dtype=np.uint8)) with summary_path.open('r', encoding='utf-8') as fh: saved_summary = json.load(fh) self.assertEqual(saved_summary['artifacts']['trajectory_npz'], str(trajectory_path)) self.assertEqual(saved_summary['artifacts']['video_mp4'], str(video_path)) self.assertEqual( saved_summary['episodes'][0]['artifact_paths']['trajectory_image'], str(trajectory_image_path), ) self.assertEqual(saved_summary['episode_rewards'], [3.0]) self.assertAlmostEqual(summary['avg_reward'], 3.0) self.assertIn('avg_obs_read_time_ms', summary) self.assertIn('avg_env_step_time_ms', summary) def test_run_eval_exports_front_trajectory_images_without_video_dependency(self): actions = [ np.arange(16, dtype=np.float32), np.arange(16, dtype=np.float32) + 10.0, np.arange(16, dtype=np.float32) + 100.0, np.arange(16, dtype=np.float32) + 110.0, ] fake_agent = _FakeAgent(actions) fake_env = _FakeEnv() with tempfile.TemporaryDirectory() as tmpdir: cfg = OmegaConf.create( { 'agent': {}, 'eval': { 'ckpt_path': 'checkpoints/vla_model_best.pt', 'num_episodes': 2, 'max_timesteps': 2, 'device': 'cpu', 'task_name': 'sim_transfer', 'camera_names': ['top', 'front'], 'use_smoothing': True, 'smooth_alpha': 0.5, 'verbose_action': False, 'headless': True, 'artifact_dir': tmpdir, 'save_trajectory_image': True, 'record_video': False, }, } ) trajectory_image_calls = [] def fake_save_rollout_trajectory_image( env, output_path, raw_actions, camera_name, *, line_radius=0.004, max_markers=1500, ): del env, line_radius, max_markers trajectory_image_calls.append( { 'output_path': output_path, 'camera_name': camera_name, 'raw_actions': [np.array(action, copy=True) for action in raw_actions], } ) if output_path is None: return None output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_bytes(b'fake-png') return str(output_path) with mock.patch.object( eval_vla, 'load_checkpoint', return_value=(fake_agent, None), ), mock.patch.object( eval_vla, 'make_sim_env', return_value=fake_env, ), mock.patch.object( eval_vla, 'sample_transfer_pose', return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32), ), mock.patch.object( eval_vla, 'tqdm', side_effect=lambda iterable, **kwargs: iterable, ), mock.patch.object( eval_vla, '_save_rollout_trajectory_image', side_effect=fake_save_rollout_trajectory_image, ) as save_trajectory_image_mock, mock.patch.object( eval_vla, '_open_video_writer', ) as open_video_writer_mock: summary = eval_vla._run_eval(cfg) self.assertEqual(save_trajectory_image_mock.call_count, 2) open_video_writer_mock.assert_not_called() self.assertIsNone(summary['artifacts']['video_mp4']) self.assertEqual(summary['artifacts']['trajectory_image_camera_name'], 'front') self.assertEqual( [call['camera_name'] for call in trajectory_image_calls], ['front', 'front'], ) first_episode_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image']) second_episode_path = Path(summary['episodes'][1]['artifact_paths']['trajectory_image']) self.assertTrue(first_episode_path.exists()) self.assertTrue(second_episode_path.exists()) self.assertNotEqual(first_episode_path, second_episode_path) self.assertEqual(first_episode_path.parent, Path(tmpdir)) self.assertEqual(second_episode_path.parent, Path(tmpdir)) np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][0], actions[0]) np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][1], actions[1]) np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][0], actions[2]) np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][1], actions[3]) if __name__ == '__main__': unittest.main()