Files
roboimi/tests/test_eval_vla_rollout_artifacts.py

345 lines
13 KiB
Python

import json
import tempfile
import unittest
from pathlib import Path
from unittest import mock
import numpy as np
import torch
from omegaconf import OmegaConf
from roboimi.demos.vla_scripts import eval_vla
class _FakeAgent:
def __init__(self, actions):
self._actions = [torch.tensor(action, dtype=torch.float32) for action in actions]
self.reset_calls = 0
def eval(self):
return self
def to(self, _device):
return self
def reset(self):
self.reset_calls += 1
def select_action(self, observation):
del observation
return self._actions.pop(0)
class _FakeEnv:
def __init__(self):
self.step_count = 0
self.rew = 0.0
self.render_calls = 0
self.reset_calls = []
def reset(self, box_pos):
self.reset_calls.append(np.array(box_pos, copy=True))
self.step_count = 0
self.rew = 0.0
def _get_image_obs(self):
frame_value = self.step_count
front = np.full((6, 8, 3), fill_value=frame_value, dtype=np.uint8)
top = np.full((6, 8, 3), fill_value=frame_value + 20, dtype=np.uint8)
return {"images": {"front": front, "top": top}}
def _get_qpos_obs(self):
return {"qpos": np.arange(16, dtype=np.float32)}
def step(self, action):
del action
self.step_count += 1
self.rew = float(self.step_count)
def render(self):
self.render_calls += 1
def getBodyPos(self, name):
base = float(self.step_count)
if name == 'eef_left':
return np.array([base, base + 0.1, base + 0.2], dtype=np.float32)
if name == 'eef_right':
return np.array([base + 1.0, base + 1.1, base + 1.2], dtype=np.float32)
raise KeyError(name)
def getBodyQuat(self, name):
base = float(self.step_count)
if name == 'eef_left':
return np.array([1.0, base, 0.0, 0.0], dtype=np.float32)
if name == 'eef_right':
return np.array([1.0, 0.0, base, 0.0], dtype=np.float32)
raise KeyError(name)
class _FakeVideoWriter:
def __init__(self, output_path):
self.output_path = Path(output_path)
self.output_path.parent.mkdir(parents=True, exist_ok=True)
self.output_path.write_bytes(b'')
self.frames = []
self.released = False
def isOpened(self):
return True
def write(self, frame):
self.frames.append(np.array(frame, copy=True))
def release(self):
self.released = True
self.output_path.write_bytes(b'fake-mp4')
class EvalVLARolloutArtifactsTest(unittest.TestCase):
def test_eval_config_exposes_rollout_artifact_defaults(self):
eval_cfg = OmegaConf.load(Path('roboimi/vla/conf/eval/eval.yaml'))
self.assertIn('artifact_dir', eval_cfg)
self.assertFalse(eval_cfg.save_summary_json)
self.assertFalse(eval_cfg.save_trajectory_npz)
self.assertFalse(eval_cfg.save_trajectory_image)
self.assertFalse(eval_cfg.record_video)
self.assertIsNone(eval_cfg.artifact_dir)
self.assertIsNone(eval_cfg.trajectory_image_camera_name)
self.assertIsNone(eval_cfg.video_camera_name)
self.assertEqual(eval_cfg.video_fps, 30)
def test_run_eval_exports_npz_summary_and_video_artifacts(self):
actions = [
np.arange(16, dtype=np.float32),
np.arange(16, dtype=np.float32) + 10.0,
]
fake_agent = _FakeAgent(actions)
fake_env = _FakeEnv()
with tempfile.TemporaryDirectory() as tmpdir:
cfg = OmegaConf.create(
{
'agent': {},
'eval': {
'ckpt_path': 'checkpoints/vla_model_best.pt',
'num_episodes': 1,
'max_timesteps': 2,
'device': 'cpu',
'task_name': 'sim_transfer',
'camera_names': ['front', 'top'],
'use_smoothing': True,
'smooth_alpha': 0.5,
'verbose_action': False,
'headless': True,
'artifact_dir': tmpdir,
'save_summary_json': True,
'save_trajectory_npz': True,
'save_trajectory_image': True,
'trajectory_image_camera_name': 'front',
'record_video': True,
'video_camera_name': 'front',
'video_fps': 12,
},
}
)
writer_holder = {}
def fake_open_video_writer(output_path, frame_size, fps):
self.assertEqual(frame_size, (8, 6))
self.assertEqual(fps, 12)
writer = _FakeVideoWriter(output_path)
writer_holder['writer'] = writer
return writer
with mock.patch.object(
eval_vla,
'load_checkpoint',
return_value=(fake_agent, None),
), mock.patch.object(
eval_vla,
'make_sim_env',
return_value=fake_env,
), mock.patch.object(
eval_vla,
'sample_transfer_pose',
return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32),
), mock.patch.object(
eval_vla,
'tqdm',
side_effect=lambda iterable, **kwargs: iterable,
), mock.patch.object(
eval_vla,
'_open_video_writer',
side_effect=fake_open_video_writer,
):
summary = eval_vla._run_eval(cfg)
artifacts = summary['artifacts']
trajectory_path = Path(artifacts['trajectory_npz'])
summary_path = Path(artifacts['summary_json'])
video_path = Path(artifacts['video_mp4'])
trajectory_image_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image'])
self.assertEqual(Path(artifacts['output_dir']), Path(tmpdir))
self.assertEqual(artifacts['video_camera_name'], 'front')
self.assertTrue(trajectory_path.exists())
self.assertTrue(summary_path.exists())
self.assertTrue(video_path.exists())
self.assertTrue(trajectory_image_path.exists())
rollout_npz = np.load(trajectory_path)
np.testing.assert_array_equal(rollout_npz['episode_index'], np.array([0, 0]))
np.testing.assert_array_equal(rollout_npz['timestep'], np.array([0, 1]))
np.testing.assert_array_equal(rollout_npz['reward'], np.array([1.0, 2.0], dtype=np.float32))
np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][0], actions[0])
np.testing.assert_array_equal(rollout_npz['raw_predicted_ee_action'][1], actions[1])
np.testing.assert_array_equal(rollout_npz['executed_ee_action'][0], actions[0])
np.testing.assert_array_equal(
rollout_npz['executed_ee_action'][1],
(actions[0] + actions[1]) / 2.0,
)
np.testing.assert_array_equal(
rollout_npz['left_ee_pos'],
np.array([[1.0, 1.1, 1.2], [2.0, 2.1, 2.2]], dtype=np.float32),
)
np.testing.assert_array_equal(
rollout_npz['right_ee_pos'],
np.array([[2.0, 2.1, 2.2], [3.0, 3.1, 3.2]], dtype=np.float32),
)
self.assertEqual(rollout_npz['obs_read_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['preprocess_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['inference_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['env_step_time_ms'].shape, (2,))
self.assertEqual(rollout_npz['total_time_ms'].shape, (2,))
writer = writer_holder['writer']
self.assertTrue(writer.released)
self.assertEqual(len(writer.frames), 2)
np.testing.assert_array_equal(writer.frames[0], np.zeros((6, 8, 3), dtype=np.uint8))
np.testing.assert_array_equal(writer.frames[1], np.full((6, 8, 3), 1, dtype=np.uint8))
with summary_path.open('r', encoding='utf-8') as fh:
saved_summary = json.load(fh)
self.assertEqual(saved_summary['artifacts']['trajectory_npz'], str(trajectory_path))
self.assertEqual(saved_summary['artifacts']['video_mp4'], str(video_path))
self.assertEqual(
saved_summary['episodes'][0]['artifact_paths']['trajectory_image'],
str(trajectory_image_path),
)
self.assertEqual(saved_summary['episode_rewards'], [3.0])
self.assertAlmostEqual(summary['avg_reward'], 3.0)
self.assertIn('avg_obs_read_time_ms', summary)
self.assertIn('avg_env_step_time_ms', summary)
def test_run_eval_exports_front_trajectory_images_without_video_dependency(self):
actions = [
np.arange(16, dtype=np.float32),
np.arange(16, dtype=np.float32) + 10.0,
np.arange(16, dtype=np.float32) + 100.0,
np.arange(16, dtype=np.float32) + 110.0,
]
fake_agent = _FakeAgent(actions)
fake_env = _FakeEnv()
with tempfile.TemporaryDirectory() as tmpdir:
cfg = OmegaConf.create(
{
'agent': {},
'eval': {
'ckpt_path': 'checkpoints/vla_model_best.pt',
'num_episodes': 2,
'max_timesteps': 2,
'device': 'cpu',
'task_name': 'sim_transfer',
'camera_names': ['top', 'front'],
'use_smoothing': True,
'smooth_alpha': 0.5,
'verbose_action': False,
'headless': True,
'artifact_dir': tmpdir,
'save_trajectory_image': True,
'record_video': False,
},
}
)
trajectory_image_calls = []
def fake_save_rollout_trajectory_image(
env,
output_path,
raw_actions,
camera_name,
*,
line_radius=0.004,
max_markers=1500,
):
del env, line_radius, max_markers
trajectory_image_calls.append(
{
'output_path': output_path,
'camera_name': camera_name,
'raw_actions': [np.array(action, copy=True) for action in raw_actions],
}
)
if output_path is None:
return None
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(b'fake-png')
return str(output_path)
with mock.patch.object(
eval_vla,
'load_checkpoint',
return_value=(fake_agent, None),
), mock.patch.object(
eval_vla,
'make_sim_env',
return_value=fake_env,
), mock.patch.object(
eval_vla,
'sample_transfer_pose',
return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32),
), mock.patch.object(
eval_vla,
'tqdm',
side_effect=lambda iterable, **kwargs: iterable,
), mock.patch.object(
eval_vla,
'_save_rollout_trajectory_image',
side_effect=fake_save_rollout_trajectory_image,
) as save_trajectory_image_mock, mock.patch.object(
eval_vla,
'_open_video_writer',
) as open_video_writer_mock:
summary = eval_vla._run_eval(cfg)
self.assertEqual(save_trajectory_image_mock.call_count, 2)
open_video_writer_mock.assert_not_called()
self.assertIsNone(summary['artifacts']['video_mp4'])
self.assertEqual(summary['artifacts']['trajectory_image_camera_name'], 'front')
self.assertEqual(
[call['camera_name'] for call in trajectory_image_calls],
['front', 'front'],
)
first_episode_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image'])
second_episode_path = Path(summary['episodes'][1]['artifact_paths']['trajectory_image'])
self.assertTrue(first_episode_path.exists())
self.assertTrue(second_episode_path.exists())
self.assertNotEqual(first_episode_path, second_episode_path)
self.assertEqual(first_episode_path.parent, Path(tmpdir))
self.assertEqual(second_episode_path.parent, Path(tmpdir))
np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][0], actions[0])
np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][1], actions[1])
np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][0], actions[2])
np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][1], actions[3])
if __name__ == '__main__':
unittest.main()