diff --git a/render.py b/render.py new file mode 100644 index 0000000..3ec3a94 --- /dev/null +++ b/render.py @@ -0,0 +1,200 @@ +import mujoco +from mujoco import viewer +import sys +import numpy as np +import time +import threading + + +class MjBasicRenderer: + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__(self, mj_model=None, mj_data=None): + # keyboard flag + self.render_paused = True + self.exit_flag = False + # init param + self.mj_model = mj_model + self.mj_data = mj_data + self.renderer = "viewer" # default + self.viewer = None + self._image = None + + # Set up mujoco viewer + self.image_renderer = mujoco.Renderer(self.mj_model) + + def __del__(self): + pass + + def _init_renderer(self): + """Initialize renderer, choose official renderer with "viewer"(joined from version 2.3.3), + another renderer with "mujoco_viewer" + """ + + def key_callback(keycode): + if keycode == 32: # space + self.render_paused = not self.render_paused + elif keycode == 256: # escape + self.exit_flag = not self.exit_flag + + if self.renderer == "viewer": + # This function does not block, allowing user code to continue execution. + self.viewer = viewer.launch_passive( + self.mj_model, + self.mj_data, + key_callback=key_callback, + show_left_ui=False, + show_right_ui=False, + ) + self.set_renderer_config() + else: + raise ValueError("Invalid renderer for some reason.") + + def render(self): + """mujoco render""" + if self.viewer is not None and self.render_paused is True: + if self.viewer.is_running() and self.exit_flag is False: + self.viewer: viewer.Handle + self.viewer.sync() + else: + self.viewer.close() + + def set_renderer_config(self): + """Setup mujoco global config while using viewer as renderer. + It should be noted that the render thread need locked. + """ + self.viewer.cam.lookat = np.array([0.4, 0, 0.5]) + self.viewer.cam.azimuth -= 0.005 + with self.viewer.lock(): + self.viewer.opt.flags[mujoco.mjtVisFlag.mjVIS_CONTACTPOINT] = int( + self.mj_data.time % 2 + ) + + +try: + import cv2 +except ImportError: + print("Could not import cv2, please install it to enable camera viewer.") + + +class MjMultiRenderer(MjBasicRenderer): + + # __slots__=('mj_model','mj_data','renderer','enable_camera_viewer') + def __new__(cls, *args, **kwargs): + return super().__new__(cls) + + def __init__( + self, + mj_model=None, + mj_data=None, + renderer=None, + enable_camera_viewer=False, + enable_depth=False, + ): + super().__init__(mj_model, mj_data) + self._depth = None + self.renderer = renderer + self._init_renderer() + self.enable_camera_viewer = enable_camera_viewer + if self.enable_camera_viewer: + self.enable_depth = enable_depth + self._init_window() + else: + self.enable_depth = False + print("No Camera View") + + def __del__(self): + self.close() + + def _init_renderer(self): + """ + Initialize renderer, choose official renderer with "viewer"(joined from version 2.3.3) + """ + if self.renderer == "unity": + # TODO: Support unity renderer. + raise ValueError("Unity renderer init failed for no supporting reason") + elif self.renderer == "viewer": + super()._init_renderer() + print("mujoco viewer init !") + else: + raise ValueError("renderer init failed for some reason.") + + def _init_window(self, name="Camera view"): + if not self.enable_depth: + cv2.namedWindow(name, cv2.WINDOW_NORMAL) + else: + cv2.namedWindow(name, cv2.WINDOW_NORMAL) + cv2.namedWindow("Camera depth view", cv2.WINDOW_NORMAL) + + def render(self): + """render mujoco""" + if self.renderer == "viewer": + super().render() + elif self.renderer == "unity": + # TODO: Support unity renderer. + raise ValueError("Unity renderer not supported now.") + else: + raise ValueError("Invalid renderer for some reason.") + + def render(self): + """mujoco render""" + if self.viewer is not None and self.render_paused is True: + if self.viewer.is_running() and self.exit_flag is False: + self.viewer: viewer.Handle + self.viewer.sync() + else: + self.viewer.close() + + def camera_render(self, cam=None): + if self.enable_camera_viewer: + if not self.enable_depth: + rgb, depth = self.render_from_camera(cam) + rgb = cv2.resize(rgb, (1920, 1600)) + cv2.imshow("Camera view", rgb) + cv2.waitKey(1) + + else: + rgb, depth = self.render_from_camera(cam) + cv2.imshow("Camera view", rgb) + cv2.imshow("Camera depth view", depth) + cv2.waitKey(1) + + else: + print("camera info disable") + return + + def render_from_camera(self, cam=None): + self.image_renderer.update_scene(self.mj_data, camera=cam) + if self.enable_depth is True: + self.image_renderer.enable_depth_rendering() + org = self.image_renderer.render() + depth = org[:, :] + self.image_renderer.disable_depth_rendering() + org = self.image_renderer.render() + image = org[:, :, ::-1] + else: + org = self.image_renderer.render() + image = org[:, :, ::-1] + depth = np.zeros([240, 320]) + return image, depth + + def close(self): + """close the environment.""" + if self.enable_camera_viewer and self.viewer.is_running() == False: + cv2.destroyAllWindows() + self.viewer.close() + # sys.exit(0) + + # def get_cam_intrinsic(self, fovy=45.0, width=320, height=240): + # aspect = width * 1.0 / height + # fovx = np.degrees(2 * np.arctan(aspect * np.tan(np.radians(fovy / 2)))) + + # cx = 0.5 * width + # cy = 0.5 * height + # fx = cx / np.tan(fovx * np.pi / 180 * 0.5) + # fy = cy / np.tan(fovy * np.pi / 180 * 0.5) + + # K = np.array([[fx, 0, cx], + # [0, fy, cy], + # [0, 0, 1]], dtype=np.float32) diff --git a/roboimi/demos/diana_air_insert_policy.py b/roboimi/demos/diana_air_insert_policy.py index f8ffaa4..2d33c5e 100644 --- a/roboimi/demos/diana_air_insert_policy.py +++ b/roboimi/demos/diana_air_insert_policy.py @@ -12,12 +12,11 @@ class TestAirInsertPolicy(PolicyBase): LEGACY_GRASP_STRATEGY = "legacy" SOCKET_HOLD_Z = 0.85 PEG_INSERT_START_OFFSET = np.array([0.105, 0.0, 0.0], dtype=np.float64) - INSERT_START_T = 650 - INSERT_END_T = 730 + INSERT_END_T = 580 LEFT_SOCKET_GRIPPER_CLOSED = -100 RIGHT_PEG_GRIPPER_CLOSED = -100 SOCKET_APPROACH_Z = 1.05 - EPISODE_END_T = 1000 + EPISODE_END_T = 600 def __init__(self, inject_noise=False, grasp_strategy=SOCKET_OUTER_GRASP_STRATEGY): super().__init__(inject_noise=inject_noise) @@ -120,13 +119,7 @@ class TestAirInsertPolicy(PolicyBase): "gripper": self.LEFT_SOCKET_GRIPPER_CLOSED, }, { - "t": 450, - "xyz": socket_hold_action, - "quat": left_pick_quat, - "gripper": self.LEFT_SOCKET_GRIPPER_CLOSED, - }, - { - "t": 750, + "t": 350, "xyz": socket_hold_action, "quat": left_pick_quat, "gripper": self.LEFT_SOCKET_GRIPPER_CLOSED, @@ -165,19 +158,13 @@ class TestAirInsertPolicy(PolicyBase): "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, }, { - "t": 450, + "t": 350, "xyz": peg_init_xyz, "quat": right_pick_quat, "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, }, { - "t": 550, - "xyz": peg_lift_center, - "quat": right_pick_quat, - "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, - }, - { - "t": self.INSERT_START_T, + "t": 450, "xyz": peg_lift_center, "quat": right_pick_quat, "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, @@ -188,12 +175,6 @@ class TestAirInsertPolicy(PolicyBase): "quat": right_pick_quat, "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, }, - { - "t": 750, - "xyz": peg_insert_end_center, - "quat": right_pick_quat, - "gripper": self.RIGHT_PEG_GRIPPER_CLOSED, - }, { "t": self.EPISODE_END_T, "xyz": peg_insert_end_center, diff --git a/roboimi/demos/vla_scripts/eval_vla.py b/roboimi/demos/vla_scripts/eval_vla.py index 437ea7d..fc30df2 100644 --- a/roboimi/demos/vla_scripts/eval_vla.py +++ b/roboimi/demos/vla_scripts/eval_vla.py @@ -15,10 +15,14 @@ import os import json import logging import time +import queue +import concurrent.futures +import multiprocessing import torch import numpy as np import hydra from pathlib import Path +from collections import deque from typing import Any, Dict, Optional from tqdm import tqdm from omegaconf import DictConfig, OmegaConf @@ -49,11 +53,6 @@ def _configure_headless_mujoco_gl(eval_cfg: DictConfig) -> None: log.info('headless eval detected; set MUJOCO_GL=egl') -def make_sim_env(task_name: str, headless: bool = False): - from roboimi.envs.double_pos_ctrl_env import make_sim_env as _make_sim_env_impl - return _make_sim_env_impl(task_name, headless=headless) - - def load_checkpoint( ckpt_path: str, agent_cfg: DictConfig, @@ -125,12 +124,13 @@ def prepare_observation( Returns: agent 格式的观测字典 """ + import cv2 + # 转换图像: numpy -> tensor, HWC -> CHW images = {} for cam_name in camera_names: img = obs['images'][cam_name] if image_resize_shape is not None: - import cv2 img = cv2.resize(img, tuple(image_resize_shape), interpolation=cv2.INTER_LINEAR) img = rearrange(img, 'h w c -> c h w') img = torch.from_numpy(img / 255.0).float() @@ -142,6 +142,204 @@ def prepare_observation( return {'qpos': qpos, 'images': images} +def _resolve_policy_camera_names(cfg: DictConfig) -> list[str]: + agent_cfg = cfg.agent + eval_cfg = cfg.eval + camera_names = agent_cfg.get('camera_names', None) + if camera_names is not None: + return list(camera_names) + + agent_target = str(agent_cfg.get('_target_', '')) + if agent_target.endswith('VLAAgentGr00tDiT'): + return list(eval_cfg.camera_names) + return sorted(eval_cfg.camera_names) + + +def _new_local_policy_queues(obs_horizon: int) -> dict[str, deque]: + return { + 'qpos': deque(maxlen=int(obs_horizon)), + 'images': deque(maxlen=int(obs_horizon)), + 'action': deque(), + } + + +def _populate_local_policy_queues( + queues: dict[str, deque], + observation: Dict[str, torch.Tensor], +) -> None: + if 'qpos' in observation: + queues['qpos'].append(observation['qpos'].detach().clone()) + if 'images' in observation: + queues['images'].append({ + camera_name: image.detach().clone() + for camera_name, image in observation['images'].items() + }) + + +def _prepare_local_policy_batch( + queues: dict[str, deque], + obs_horizon: int, + camera_names: list[str], +) -> Dict[str, torch.Tensor]: + qpos_list = list(queues['qpos']) + if not qpos_list: + raise ValueError('observation queue is empty.') + while len(qpos_list) < int(obs_horizon): + qpos_list.append(qpos_list[-1]) + batch_qpos = torch.stack(qpos_list, dim=0).unsqueeze(0) + + images_list = list(queues['images']) + if not images_list: + raise ValueError('image queue is empty.') + while len(images_list) < int(obs_horizon): + images_list.append(images_list[-1]) + + ordered_camera_names = list(camera_names) if camera_names else sorted(images_list[0].keys()) + batch_images = { + camera_name: torch.stack( + [image_history[camera_name] for image_history in images_list], + dim=0, + ).unsqueeze(0) + for camera_name in ordered_camera_names + } + return {'qpos': batch_qpos, 'images': batch_images} + + +def _enqueue_predicted_actions( + queues: dict[str, deque], + predicted_actions: Any, + obs_horizon: int, + num_action_steps: int, +) -> None: + if isinstance(predicted_actions, np.ndarray): + predicted_actions = torch.from_numpy(predicted_actions) + if predicted_actions.ndim == 2: + predicted_actions = predicted_actions.unsqueeze(0) + + start = int(obs_horizon) - 1 + end = start + int(num_action_steps) + executable_actions = predicted_actions[:, start:end] + for action_index in range(executable_actions.shape[1]): + queues['action'].append( + executable_actions[:, action_index].squeeze(0).detach().cpu().clone() + ) + + +def _serialize_policy_batch(batch: Dict[str, torch.Tensor]) -> dict[str, Any]: + return { + 'qpos': batch['qpos'].detach().cpu().numpy().astype(np.float32, copy=True), + 'images': { + camera_name: image.detach().cpu().numpy().astype(np.float32, copy=True) + for camera_name, image in batch['images'].items() + }, + } + + +def _deserialize_policy_batch(batch: dict[str, Any], device: str) -> Dict[str, torch.Tensor]: + return { + 'qpos': torch.as_tensor(batch['qpos'], dtype=torch.float32, device=device), + 'images': { + camera_name: torch.as_tensor(image, dtype=torch.float32, device=device) + for camera_name, image in batch['images'].items() + }, + } + + +class _LocalPolicyRunner: + def __init__(self, agent: torch.nn.Module): + self.agent = agent + self.uses_local_model = True + + def reset(self): + self.agent.reset() + + def select_action( + self, + observation: Dict[str, torch.Tensor], + *, + episode_index: int, + timestep: int, + ) -> tuple[Any, bool]: + del episode_index, timestep + action_queue = getattr(self.agent, '_queues', {}).get('action', None) + model_inference_triggered = len(action_queue) == 0 if action_queue is not None else True + action = self.agent.select_action(observation) + return action, bool(model_inference_triggered) + + +class _RemotePolicyRunner: + def __init__( + self, + *, + worker_index: int, + server_index: int, + request_queue, + response_queue, + camera_names: list[str], + obs_horizon: int, + num_action_steps: int, + response_timeout_s: float = 30.0, + ): + self.worker_index = int(worker_index) + self.server_index = int(server_index) + self.request_queue = request_queue + self.response_queue = response_queue + self.camera_names = list(camera_names) + self.obs_horizon = int(obs_horizon) + self.num_action_steps = int(num_action_steps) + self.response_timeout_s = float(response_timeout_s) + self.local_queues = _new_local_policy_queues(self.obs_horizon) + self.uses_local_model = False + + def reset(self): + self.local_queues = _new_local_policy_queues(self.obs_horizon) + + def select_action( + self, + observation: Dict[str, torch.Tensor], + *, + episode_index: int, + timestep: int, + ) -> tuple[torch.Tensor, bool]: + _populate_local_policy_queues(self.local_queues, observation) + model_inference_triggered = len(self.local_queues['action']) == 0 + if model_inference_triggered: + batch = _prepare_local_policy_batch( + self.local_queues, + obs_horizon=self.obs_horizon, + camera_names=self.camera_names, + ) + self.request_queue.put({ + 'type': 'predict_chunk', + 'worker_index': self.worker_index, + 'server_index': self.server_index, + 'episode_index': int(episode_index), + 'timestep': int(timestep), + 'batch': _serialize_policy_batch(batch), + }) + try: + response = self.response_queue.get(timeout=self.response_timeout_s) + except queue.Empty as exc: + raise RuntimeError( + f'worker {self.worker_index} timed out waiting for inference server {self.server_index}' + ) from exc + if response.get('type') != 'predict_chunk_result': + raise RuntimeError( + f'worker {self.worker_index} received unexpected inference response: ' + f'{response.get("type")}' + ) + _enqueue_predicted_actions( + self.local_queues, + predicted_actions=response['actions'], + obs_horizon=self.obs_horizon, + num_action_steps=self.num_action_steps, + ) + + if not self.local_queues['action']: + raise RuntimeError(f'worker {self.worker_index} received no executable action from server') + return self.local_queues['action'].popleft(), bool(model_inference_triggered) + + def _to_numpy_action(action: Any) -> np.ndarray: if isinstance(action, torch.Tensor): return action.detach().cpu().numpy().astype(np.float32, copy=True) @@ -190,6 +388,129 @@ def _summarize_timing_breakdown( } +_TIMING_SAMPLE_KEYS = ( + 'obs_read_time_ms', + 'preprocess_time_ms', + 'inference_time_ms', + 'env_step_time_ms', + 'total_time_ms', +) + + +def _empty_merge_state() -> dict[str, list[float] | list[bool]]: + return { + **{key: [] for key in _TIMING_SAMPLE_KEYS}, + 'model_forward_flags': [], + } + + +def _normalize_num_workers(num_workers: int, num_episodes: int) -> int: + num_episodes = max(int(num_episodes), 0) + if num_episodes == 0: + return 0 + return min(max(int(num_workers), 1), num_episodes) + + +def _split_episode_indices(num_episodes: int, num_workers: int) -> list[list[int]]: + active_workers = _normalize_num_workers(num_workers=num_workers, num_episodes=num_episodes) + if active_workers == 0: + return [] + episode_indices = np.arange(int(num_episodes), dtype=np.int32) + return [ + chunk.tolist() + for chunk in np.array_split(episode_indices, active_workers) + if len(chunk) > 0 + ] + + +def _plan_episode_box_poses( + num_episodes: int, + sampler=None, +) -> list[np.ndarray]: + if sampler is None: + sampler = sample_transfer_pose + return [ + np.asarray(sampler(), dtype=np.float32).copy() + for _ in range(int(num_episodes)) + ] + + +def _merge_worker_summaries( + worker_summaries: list[dict[str, Any]], + artifact_paths: dict[str, Optional[str]], +) -> dict[str, Any]: + merged_episodes = [] + merged_state = _empty_merge_state() + for worker_summary in worker_summaries: + merged_episodes.extend(worker_summary.get('episodes', [])) + merge_state = worker_summary.get('_merge_state', {}) + for key in _TIMING_SAMPLE_KEYS: + merged_state[key].extend(float(value) for value in merge_state.get(key, [])) + merged_state['model_forward_flags'].extend( + bool(value) for value in merge_state.get('model_forward_flags', []) + ) + + merged_episodes = sorted( + merged_episodes, + key=lambda episode: int(episode.get('episode_index', 0)), + ) + episode_rewards = [ + float(episode.get('episode_reward', 0.0)) + for episode in merged_episodes + ] + episode_max_rewards = [ + ( + float(episode['episode_max_reward']) + if episode.get('episode_max_reward') is not None + else None + ) + for episode in merged_episodes + ] + valid_max_rewards = [ + value for value in episode_max_rewards + if value is not None + ] + summary = { + 'num_episodes': len(merged_episodes), + 'episode_rewards': episode_rewards, + 'episode_max_rewards': episode_max_rewards, + 'avg_reward': float(np.mean(episode_rewards)) if episode_rewards else 0.0, + 'avg_max_reward': float(np.mean(valid_max_rewards)) if valid_max_rewards else 0.0, + 'episodes': merged_episodes, + 'artifact_dir': artifact_paths.get('output_dir') if artifact_paths else None, + 'artifacts': artifact_paths, + } + + if merged_episodes: + summary.update({ + 'avg_inference_fps': float(np.mean([ + float(episode.get('inference_fps', 0.0)) + for episode in merged_episodes + ])), + 'avg_control_fps': float(np.mean([ + float(episode.get('control_fps', 0.0)) + for episode in merged_episodes + ])), + 'avg_obs_read_time_ms': _mean_or_zero(merged_state['obs_read_time_ms']), + 'avg_preprocess_time_ms': _mean_or_zero(merged_state['preprocess_time_ms']), + 'avg_inference_time_ms': _mean_or_zero(merged_state['inference_time_ms']), + 'avg_env_step_time_ms': _mean_or_zero(merged_state['env_step_time_ms']), + 'avg_total_time_ms': _mean_or_zero(merged_state['total_time_ms']), + 'timing_summary': _summarize_timing_breakdown( + { + 'obs_read': merged_state['obs_read_time_ms'], + 'preprocess': merged_state['preprocess_time_ms'], + 'inference': merged_state['inference_time_ms'], + 'env_step': merged_state['env_step_time_ms'], + 'loop_total': merged_state['total_time_ms'], + }, + merged_state['model_forward_flags'], + ), + }) + + return summary + + def _json_friendly(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_friendly(item) for key, item in value.items()} @@ -204,7 +525,10 @@ def _json_friendly(value: Any) -> Any: return value -def _resolve_artifact_paths(eval_cfg: DictConfig) -> dict[str, Optional[str]]: +def _resolve_artifact_paths( + eval_cfg: DictConfig, + output_dir_override: Optional[str] = None, +) -> dict[str, Optional[str]]: save_timing = bool(eval_cfg.get('save_timing', False)) save_trajectory = bool( eval_cfg.get('save_trajectory', False) or eval_cfg.get('save_trajectory_npz', False) @@ -219,13 +543,16 @@ def _resolve_artifact_paths(eval_cfg: DictConfig) -> dict[str, Optional[str]]: ]) output_dir: Optional[Path] = None if wants_artifacts: - artifact_dir = eval_cfg.get('artifact_dir', None) - if artifact_dir: - output_dir = Path(str(artifact_dir)).expanduser().resolve() + if output_dir_override: + output_dir = Path(str(output_dir_override)).expanduser().resolve() else: - ckpt_stem = Path(str(eval_cfg.ckpt_path)).stem or 'rollout' - timestamp = time.strftime('%Y%m%d-%H%M%S') - output_dir = (Path.cwd() / 'rollout_artifacts' / f'{ckpt_stem}-{timestamp}').resolve() + artifact_dir = eval_cfg.get('artifact_dir', None) + if artifact_dir: + output_dir = Path(str(artifact_dir)).expanduser().resolve() + else: + ckpt_stem = Path(str(eval_cfg.ckpt_path)).stem or 'rollout' + timestamp = time.strftime('%Y%m%d-%H%M%S') + output_dir = (Path.cwd() / 'rollout_artifacts' / f'{ckpt_stem}-{timestamp}').resolve() output_dir.mkdir(parents=True, exist_ok=True) video_camera_name = None @@ -582,6 +909,12 @@ def _save_summary_json(output_path: str, summary: dict[str, Any]): json.dump(_json_friendly(summary), f, ensure_ascii=False, indent=2) +def _public_summary(summary: dict[str, Any]) -> dict[str, Any]: + public_summary = dict(summary) + public_summary.pop('_merge_state', None) + return _json_friendly(public_summary) + + class ActionSmoother: """ 动作平滑器(指数移动平均) @@ -642,14 +975,7 @@ def _sample_task_reset_state(task_name: str): raise NotImplementedError(f'Unsupported eval task reset sampling: {task_name}') -def _run_eval(cfg: DictConfig): - """ - 使用 agent 内置队列管理的简化版 VLA 评估 - - 所有评估参数来自 vla/conf/eval.yaml,合并到 cfg 中。 - 命令行覆盖: python eval_vla_simple.py eval.ckpt_path=... eval.num_episodes=5 - """ - +def _print_eval_config(cfg: DictConfig): # 打印配置 print("=" * 80) print("VLA 评估配置:") @@ -657,37 +983,45 @@ def _run_eval(cfg: DictConfig): print(OmegaConf.to_yaml(cfg)) print("=" * 80) + +def _build_episode_plans( + num_episodes: int, + box_poses: Optional[list[np.ndarray]] = None, +) -> list[dict[str, Any]]: + if box_poses is None: + return [ + { + 'episode_index': int(episode_index), + } + for episode_index in range(int(num_episodes)) + ] + return [ + { + 'episode_index': int(episode_index), + 'box_pos': np.asarray(box_pos, dtype=np.float32).copy(), + } + for episode_index, box_pos in enumerate(box_poses) + ] + + +def _run_eval_episode_plans( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + policy_runner, + worker_index: int = 0, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = True, +) -> dict[str, Any]: eval_cfg = cfg.eval - _configure_headless_mujoco_gl(eval_cfg) - device = eval_cfg.device + device = str(eval_cfg.device) camera_names = list(eval_cfg.camera_names) - artifact_paths = _resolve_artifact_paths(eval_cfg) + artifact_paths = artifact_paths or _resolve_artifact_paths(eval_cfg) video_recorder = _RolloutVideoRecorder( output_path=artifact_paths['video_mp4'], fps=int(eval_cfg.get('video_fps', 30)), ) rollout_trajectory = _empty_rollout_trajectory() - global_obs_read_times_ms = [] - global_preprocess_times_ms = [] - global_inference_times_ms = [] - global_env_step_times_ms = [] - global_total_times_ms = [] - global_model_forward_flags = [] - - # ========================================================================= - # 加载模型 - # ========================================================================= - log.info(f"🚀 从 {eval_cfg.ckpt_path} 加载模型...") - agent, dataset_stats = load_checkpoint( - ckpt_path=eval_cfg.ckpt_path, - agent_cfg=cfg.agent, - device=device - ) - vision_encoder = getattr(agent, 'vision_encoder', None) - image_resize_shape = getattr(vision_encoder, 'eval_image_resize_shape', (224, 224)) - - # 重置 agent 的队列 - agent.reset() + merge_state = _empty_merge_state() # 可选:动作平滑器 smoother = ActionSmoother(alpha=eval_cfg.smooth_alpha) if eval_cfg.use_smoothing else None @@ -695,25 +1029,34 @@ def _run_eval(cfg: DictConfig): # ========================================================================= # 创建环境 # ========================================================================= - env = make_sim_env(eval_cfg.task_name, headless=eval_cfg.headless) - - # ========================================================================= - # 运行评估回合 - # ========================================================================= - all_stats = [] - episode_rewards = [] - episode_max_rewards = [] + env = None try: - for episode_idx in range(eval_cfg.num_episodes): - print(f"\n{'='*60}") - print(f"回合 {episode_idx + 1}/{eval_cfg.num_episodes}") - print(f"{'='*60}\n") + env = make_sim_env(eval_cfg.task_name, headless=eval_cfg.headless) + + # ========================================================================= + # 运行评估回合 + # ========================================================================= + all_stats = [] + episode_rewards = [] + episode_max_rewards = [] + for plan in episode_plans: + episode_idx = int(plan['episode_index']) + task_state = plan.get('box_pos') + if task_state is None: + task_state = _sample_task_reset_state(str(eval_cfg.task_name)) + elif isinstance(task_state, np.ndarray): + task_state = np.asarray(task_state, dtype=np.float32) + + + if show_progress: + print(f"\n{'='*60}") + print(f"回合 {episode_idx + 1}/{eval_cfg.num_episodes}") + print(f"{'='*60}\n") - task_state = _sample_task_reset_state(str(eval_cfg.task_name)) env.reset(task_state) - # 为新回合重置 agent 队列 - agent.reset() + # 为新回合重置 rollout policy 状态 + policy_runner.reset() if smoother: smoother.reset() @@ -729,7 +1072,11 @@ def _run_eval(cfg: DictConfig): episode_raw_actions: list[np.ndarray] = [] with torch.inference_mode(): - for t in tqdm(range(eval_cfg.max_timesteps), desc=f"回合 {episode_idx + 1}"): + episode_iterator = range(eval_cfg.max_timesteps) + if show_progress: + episode_iterator = tqdm(episode_iterator, desc=f"回合 {episode_idx + 1}") + + for t in episode_iterator: start_total = time.perf_counter() # 从环境获取观测 @@ -742,20 +1089,22 @@ def _run_eval(cfg: DictConfig): video_recorder.write(video_frame) # 准备给 agent 的观测 - observation = prepare_observation( - obs, - camera_names, - image_resize_shape=image_resize_shape, - ) + observation = prepare_observation(obs, camera_names) end_preprocess = time.perf_counter() - # 选择动作(agent 内部处理队列管理) - action_queue = getattr(agent, '_queues', {}).get('action', None) - model_inference_triggered = len(action_queue) == 0 if action_queue is not None else True + # 选择动作(本地 agent 或远端 inference server) start_inference = time.perf_counter() - action = agent.select_action(observation) + action, model_inference_triggered = policy_runner.select_action( + observation, + episode_index=episode_idx, + timestep=t, + ) - if str(device).startswith('cuda') and torch.cuda.is_available(): + if ( + getattr(policy_runner, 'uses_local_model', False) + and _is_cuda_device(device) + and torch.cuda.is_available() + ): torch.cuda.synchronize() end_inference = time.perf_counter() @@ -805,12 +1154,12 @@ def _run_eval(cfg: DictConfig): env_step_times_ms.append(step_timing_ms['env_step_time_ms']) total_times_ms.append(step_timing_ms['total_time_ms']) model_forward_flags.append(bool(model_inference_triggered)) - global_obs_read_times_ms.append(step_timing_ms['obs_read_time_ms']) - global_preprocess_times_ms.append(step_timing_ms['preprocess_time_ms']) - global_inference_times_ms.append(step_timing_ms['inference_time_ms']) - global_env_step_times_ms.append(step_timing_ms['env_step_time_ms']) - global_total_times_ms.append(step_timing_ms['total_time_ms']) - global_model_forward_flags.append(bool(model_inference_triggered)) + merge_state['obs_read_time_ms'].append(step_timing_ms['obs_read_time_ms']) + merge_state['preprocess_time_ms'].append(step_timing_ms['preprocess_time_ms']) + merge_state['inference_time_ms'].append(step_timing_ms['inference_time_ms']) + merge_state['env_step_time_ms'].append(step_timing_ms['env_step_time_ms']) + merge_state['total_time_ms'].append(step_timing_ms['total_time_ms']) + merge_state['model_forward_flags'].append(bool(model_inference_triggered)) if artifact_paths['trajectory_npz'] is not None: _append_rollout_step( @@ -856,6 +1205,8 @@ def _run_eval(cfg: DictConfig): } stats = { + 'worker_index': int(worker_index), + 'episode_index': int(episode_idx), 'inference_fps': 1000.0 / avg_inference_time_ms if avg_inference_time_ms > 0 else 0.0, 'control_fps': 1000.0 / avg_total_time_ms if avg_total_time_ms > 0 else 0.0, 'avg_obs_read_time_ms': avg_obs_read_time_ms, @@ -876,46 +1227,55 @@ def _run_eval(cfg: DictConfig): } all_stats.append(stats) episode_rewards.append(float(episode_reward)) - if episode_max_reward != float('-inf'): - episode_max_rewards.append(float(episode_max_reward)) + episode_max_rewards.append( + float(episode_max_reward) if episode_max_reward != float('-inf') else None + ) - print(f"\n回合 {episode_idx + 1} 完成 ({eval_cfg.max_timesteps} 时间步)") - print(f" 模型推理 FPS: {stats['inference_fps']:.2f} Hz") - print(f" 控制循环 FPS: {stats['control_fps']:.2f} Hz") - print(f" 平均读观测时间: {stats['avg_obs_read_time_ms']:.2f} ms") - print(f" 平均预处理时间: {stats['avg_preprocess_time_ms']:.2f} ms") - print(f" 平均推理时间: {stats['avg_inference_time_ms']:.2f} ms") - print(f" 平均环境步进时间: {stats['avg_env_step_time_ms']:.2f} ms") - print(f" 平均总时间: {stats['avg_total_time_ms']:.2f} ms") - print(f" 总推理次数: {stats['num_inferences']}") - print(f" 回合累计奖励: {stats['episode_reward']:.2f}") + if show_progress: + print(f"\n回合 {episode_idx + 1} 完成 ({eval_cfg.max_timesteps} 时间步)") + print(f" 模型推理 FPS: {stats['inference_fps']:.2f} Hz") + print(f" 控制循环 FPS: {stats['control_fps']:.2f} Hz") + print(f" 平均读观测时间: {stats['avg_obs_read_time_ms']:.2f} ms") + print(f" 平均预处理时间: {stats['avg_preprocess_time_ms']:.2f} ms") + print(f" 平均推理时间: {stats['avg_inference_time_ms']:.2f} ms") + print(f" 平均环境步进时间: {stats['avg_env_step_time_ms']:.2f} ms") + print(f" 平均总时间: {stats['avg_total_time_ms']:.2f} ms") + print(f" 总推理次数: {stats['num_inferences']}") + print(f" 回合累计奖励: {stats['episode_reward']:.2f}") # ========================================================================= # 总体统计 # ========================================================================= - print(f"\n{'='*60}") - print("评估完成!") - print(f"{'='*60}") + if show_progress: + print(f"\n{'='*60}") + print("评估完成!") + print(f"{'='*60}") + + valid_max_rewards = [ + reward for reward in episode_max_rewards + if reward is not None + ] summary = { - 'num_episodes': int(eval_cfg.num_episodes), + 'num_episodes': len(episode_plans), 'episode_rewards': episode_rewards, 'episode_max_rewards': episode_max_rewards, 'avg_reward': float(np.mean(episode_rewards)) if episode_rewards else 0.0, - 'avg_max_reward': float(np.mean(episode_max_rewards)) if episode_max_rewards else 0.0, + 'avg_max_reward': float(np.mean(valid_max_rewards)) if valid_max_rewards else 0.0, 'episodes': all_stats, 'artifact_dir': artifact_paths['output_dir'], 'artifacts': artifact_paths, + '_merge_state': merge_state, } if all_stats: avg_inference_fps = np.mean([s['inference_fps'] for s in all_stats]) avg_control_fps = np.mean([s['control_fps'] for s in all_stats]) - avg_obs_read_time = _mean_or_zero(global_obs_read_times_ms) - avg_preprocess_time = _mean_or_zero(global_preprocess_times_ms) - avg_inference_time = _mean_or_zero(global_inference_times_ms) - avg_env_step_time = _mean_or_zero(global_env_step_times_ms) - avg_total_time = _mean_or_zero(global_total_times_ms) + avg_obs_read_time = _mean_or_zero(merge_state['obs_read_time_ms']) + avg_preprocess_time = _mean_or_zero(merge_state['preprocess_time_ms']) + avg_inference_time = _mean_or_zero(merge_state['inference_time_ms']) + avg_env_step_time = _mean_or_zero(merge_state['env_step_time_ms']) + avg_total_time = _mean_or_zero(merge_state['total_time_ms']) summary.update({ 'avg_inference_fps': float(avg_inference_fps), 'avg_control_fps': float(avg_control_fps), @@ -926,39 +1286,630 @@ def _run_eval(cfg: DictConfig): 'avg_total_time_ms': float(avg_total_time), 'timing_summary': _summarize_timing_breakdown( { - 'obs_read': global_obs_read_times_ms, - 'preprocess': global_preprocess_times_ms, - 'inference': global_inference_times_ms, - 'env_step': global_env_step_times_ms, - 'loop_total': global_total_times_ms, + 'obs_read': merge_state['obs_read_time_ms'], + 'preprocess': merge_state['preprocess_time_ms'], + 'inference': merge_state['inference_time_ms'], + 'env_step': merge_state['env_step_time_ms'], + 'loop_total': merge_state['total_time_ms'], }, - global_model_forward_flags, + merge_state['model_forward_flags'], ), }) - print(f"\n总体统计 ({eval_cfg.num_episodes} 个回合):") - print(f" 平均模型推理 FPS: {avg_inference_fps:.2f} Hz") - print(f" 平均控制循环 FPS: {avg_control_fps:.2f} Hz") - print(f" 平均读观测时间: {avg_obs_read_time:.2f} ms") - print(f" 平均预处理时间: {avg_preprocess_time:.2f} ms") - print(f" 平均推理时间: {avg_inference_time:.2f} ms") - print(f" 平均环境步进时间: {avg_env_step_time:.2f} ms") - print(f" 平均总时间: {avg_total_time:.2f} ms") - print(f" 平均累计奖励: {summary['avg_reward']:.2f}") + if show_progress: + print(f"\n总体统计 ({len(episode_plans)} 个回合):") + print(f" 平均模型推理 FPS: {avg_inference_fps:.2f} Hz") + print(f" 平均控制循环 FPS: {avg_control_fps:.2f} Hz") + print(f" 平均读观测时间: {avg_obs_read_time:.2f} ms") + print(f" 平均预处理时间: {avg_preprocess_time:.2f} ms") + print(f" 平均推理时间: {avg_inference_time:.2f} ms") + print(f" 平均环境步进时间: {avg_env_step_time:.2f} ms") + print(f" 平均总时间: {avg_total_time:.2f} ms") + print(f" 平均累计奖励: {summary['avg_reward']:.2f}") if artifact_paths['trajectory_npz'] is not None: _save_rollout_trajectory_npz(artifact_paths['trajectory_npz'], rollout_trajectory) + public_summary = _public_summary(summary) if artifact_paths['summary_json'] is not None: - _save_summary_json(artifact_paths['summary_json'], summary) + _save_summary_json(artifact_paths['summary_json'], public_summary) if artifact_paths['timing_json'] is not None: - _save_summary_json(artifact_paths['timing_json'], summary.get('timing_summary', {})) - print() - return _json_friendly(summary) + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + if show_progress: + print() + return summary finally: video_recorder.close() _close_env(env) +def _run_eval_worker( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + worker_index: int = 0, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = True, +) -> dict[str, Any]: + eval_cfg = cfg.eval + + log.info(f"🚀 从 {eval_cfg.ckpt_path} 加载模型...") + agent, _dataset_stats = load_checkpoint( + ckpt_path=eval_cfg.ckpt_path, + agent_cfg=cfg.agent, + device=str(eval_cfg.device), + ) + policy_runner = _LocalPolicyRunner(agent) + return _run_eval_episode_plans( + cfg, + episode_plans=episode_plans, + policy_runner=policy_runner, + worker_index=worker_index, + artifact_paths=artifact_paths, + show_progress=show_progress, + ) + + +def _run_remote_eval_worker( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + *, + worker_index: int, + server_index: int, + request_queue, + response_queue, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = False, +) -> dict[str, Any]: + eval_cfg = cfg.eval + agent_cfg = cfg.agent + num_action_steps = int(agent_cfg.get('num_action_steps', eval_cfg.get('num_queries', 1))) + policy_runner = _RemotePolicyRunner( + worker_index=worker_index, + server_index=server_index, + request_queue=request_queue, + response_queue=response_queue, + camera_names=_resolve_policy_camera_names(cfg), + obs_horizon=int(agent_cfg.get('obs_horizon', eval_cfg.obs_horizon)), + num_action_steps=num_action_steps, + response_timeout_s=float(eval_cfg.get('response_timeout_s', 300.0)), + ) + return _run_eval_episode_plans( + cfg, + episode_plans=episode_plans, + policy_runner=policy_runner, + worker_index=worker_index, + artifact_paths=artifact_paths, + show_progress=show_progress, + ) + + +def _run_eval_serial(cfg: DictConfig): + eval_cfg = cfg.eval + _configure_headless_mujoco_gl(eval_cfg) + artifact_paths = _resolve_artifact_paths(eval_cfg) + episode_plans = _build_episode_plans(eval_cfg.num_episodes) + summary = _run_eval_worker( + cfg, + episode_plans=episode_plans, + worker_index=0, + artifact_paths=artifact_paths, + show_progress=True, + ) + return _public_summary(summary) + + +def _validate_parallel_eval_cfg(eval_cfg: DictConfig): + if not bool(eval_cfg.get('headless', False)): + raise ValueError('eval.num_workers > 1 requires eval.headless=true') + + unsupported_exports = [ + flag_name + for flag_name in ( + 'record_video', + 'save_trajectory', + 'save_trajectory_npz', + ) + if bool(eval_cfg.get(flag_name, False)) + ] + if unsupported_exports: + joined_flags = ', '.join(unsupported_exports) + raise ValueError( + 'eval.num_workers > 1 does not yet support parallel export for ' + f'{joined_flags}' + ) + + +def _is_cuda_device(device: Any) -> bool: + return str(device).lower().startswith('cuda') + + +def _resolve_cuda_devices(eval_cfg: DictConfig) -> list[int]: + if not _is_cuda_device(eval_cfg.get('device', 'cpu')): + return [] + + configured_devices = eval_cfg.get('cuda_devices', None) + if configured_devices is None: + return [0] + + resolved_devices = [int(device_index) for device_index in configured_devices] + if not resolved_devices: + raise ValueError('eval.cuda_devices must not be empty when eval.device is CUDA') + if any(device_index < 0 for device_index in resolved_devices): + raise ValueError('eval.cuda_devices must contain non-negative logical CUDA device indices') + return resolved_devices + + +def _run_spawn_jobs( + payloads: list[dict[str, Any]], + max_workers: int, + worker_fn, +) -> list[Any]: + if not payloads: + return [] + + ctx = multiprocessing.get_context('spawn') + results = [] + with concurrent.futures.ProcessPoolExecutor( + max_workers=int(max_workers), + mp_context=ctx, + ) as executor: + future_to_payload = { + executor.submit(worker_fn, payload): payload + for payload in payloads + } + try: + for future in concurrent.futures.as_completed(future_to_payload): + results.append(future.result()) + except Exception: + for future in future_to_payload: + future.cancel() + raise + return results + + +def _run_eval_worker_entry(payload: dict[str, Any]) -> dict[str, Any]: + if payload.get('_spawn_probe', False): + return { + 'probe_value': int(payload['probe_value']), + 'worker_index': int(payload.get('worker_index', -1)), + } + + cfg = OmegaConf.create(payload['cfg']) + artifact_paths = _resolve_artifact_paths( + cfg.eval, + output_dir_override=payload.get('artifact_dir'), + ) + return _run_eval_worker( + cfg, + episode_plans=list(payload.get('episode_plans', [])), + worker_index=int(payload.get('worker_index', 0)), + artifact_paths=artifact_paths, + show_progress=False, + ) + + +def _build_parallel_worker_payloads( + cfg: DictConfig, + artifact_paths: dict[str, Optional[str]], +) -> tuple[list[dict[str, Any]], int]: + eval_cfg = cfg.eval + requested_workers = int(eval_cfg.get('num_workers', 1)) + episode_splits = _split_episode_indices( + num_episodes=int(eval_cfg.num_episodes), + num_workers=requested_workers, + ) + box_poses = _plan_episode_box_poses(int(eval_cfg.num_episodes)) + resolved_cfg = OmegaConf.to_container(cfg, resolve=True) + payloads = [] + workers_dir = None + if artifact_paths.get('output_dir') is not None: + workers_dir = Path(str(artifact_paths['output_dir'])) / 'workers' + workers_dir.mkdir(parents=True, exist_ok=True) + artifact_paths['workers_dir'] = str(workers_dir) + + for worker_index, episode_indices in enumerate(episode_splits): + worker_artifact_dir = None + if workers_dir is not None: + worker_artifact_dir = workers_dir / f'worker_{worker_index:02d}' + worker_artifact_dir.mkdir(parents=True, exist_ok=True) + + worker_cfg = json.loads(json.dumps(resolved_cfg)) + worker_cfg['eval']['artifact_dir'] = ( + str(worker_artifact_dir) if worker_artifact_dir is not None else None + ) + payloads.append({ + 'cfg': worker_cfg, + 'worker_index': int(worker_index), + 'artifact_dir': str(worker_artifact_dir) if worker_artifact_dir is not None else None, + 'episode_plans': [ + { + 'episode_index': int(episode_index), + 'box_pos': np.asarray(box_poses[episode_index], dtype=np.float32).tolist(), + } + for episode_index in episode_indices + ], + }) + + return payloads, len(episode_splits) + + +def _build_cuda_server_payloads( + cfg: DictConfig, + worker_payloads: list[dict[str, Any]], + cuda_devices: list[int], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + resolved_cfg = OmegaConf.to_container(cfg, resolve=True) + server_payloads = [ + { + 'cfg': json.loads(json.dumps(resolved_cfg)), + 'server_index': int(server_index), + 'device_index': int(device_index), + 'worker_indices': [], + } + for server_index, device_index in enumerate(cuda_devices) + ] + + assigned_workers = [] + for worker_payload in worker_payloads: + server_index = int(worker_payload['worker_index']) % len(server_payloads) + assigned_payload = dict(worker_payload) + assigned_payload['server_index'] = server_index + assigned_payload['device_index'] = int(server_payloads[server_index]['device_index']) + assigned_workers.append(assigned_payload) + server_payloads[server_index]['worker_indices'].append(int(worker_payload['worker_index'])) + + return server_payloads, assigned_workers + + +def _inference_server_main(payload: dict[str, Any]) -> None: + request_queue = payload['request_queue'] + response_queues = payload['response_queues'] + server_index = int(payload.get('server_index', -1)) + + if payload.get('_spawn_probe', False): + while True: + message = request_queue.get() + if message.get('type') == 'shutdown_server': + return + if message.get('type') != 'predict_chunk': + continue + response_queues[int(message['worker_index'])].put({ + 'type': 'predict_chunk_result', + 'server_index': server_index, + 'actions': np.asarray([[[11.0], [22.0], [33.0]]], dtype=np.float32), + }) + return + + result_queue = payload.get('result_queue') + cfg = OmegaConf.create(payload['cfg']) + device = f'cuda:{int(payload["device_index"])}' + try: + agent, _dataset_stats = load_checkpoint( + ckpt_path=cfg.eval.ckpt_path, + agent_cfg=cfg.agent, + device=device, + ) + if result_queue is not None: + result_queue.put({ + 'kind': 'server_ready', + 'server_index': server_index, + }) + while True: + message = request_queue.get() + message_type = message.get('type') + if message_type == 'shutdown_server': + return + if message_type != 'predict_chunk': + raise ValueError(f'unknown server message type: {message_type}') + + batch = _deserialize_policy_batch(message['batch'], device=device) + with torch.inference_mode(): + actions = agent.predict_action_chunk(batch) + if torch.cuda.is_available(): + torch.cuda.synchronize() + response_queues[int(message['worker_index'])].put({ + 'type': 'predict_chunk_result', + 'server_index': server_index, + 'actions': actions.detach().cpu().numpy().astype(np.float32, copy=True), + }) + except Exception as exc: + if result_queue is not None: + result_queue.put({ + 'kind': 'server_error', + 'server_index': server_index, + 'error': str(exc), + }) + raise + + +def _env_worker_main(payload: dict[str, Any]) -> None: + worker_index = int(payload.get('worker_index', -1)) + server_index = int(payload.get('server_index', -1)) + request_queue = payload['request_queue'] + response_queue = payload['response_queue'] + result_queue = payload.get('result_queue') + + if payload.get('_spawn_probe', False): + request_queue.put({ + 'type': 'predict_chunk', + 'worker_index': worker_index, + 'server_index': server_index, + 'episode_index': 0, + 'timestep': 0, + 'batch': { + 'qpos': np.zeros((1, 1, 1), dtype=np.float32), + 'images': {}, + }, + }) + response = response_queue.get(timeout=10.0) + result_queue.put({ + 'kind': 'worker_result', + 'worker_index': worker_index, + 'summary': { + 'probe_worker_index': worker_index, + 'probe_server_index': server_index, + 'probe_actions': np.asarray(response['actions']).tolist(), + }, + }) + return + + cfg = OmegaConf.create(payload['cfg']) + artifact_paths = _resolve_artifact_paths( + cfg.eval, + output_dir_override=payload.get('artifact_dir'), + ) + + try: + summary = _run_remote_eval_worker( + cfg, + episode_plans=list(payload.get('episode_plans', [])), + worker_index=worker_index, + server_index=server_index, + request_queue=request_queue, + response_queue=response_queue, + artifact_paths=artifact_paths, + show_progress=False, + ) + except Exception as exc: + if result_queue is not None: + result_queue.put({ + 'kind': 'worker_error', + 'worker_index': worker_index, + 'server_index': server_index, + 'error': str(exc), + }) + raise + + if result_queue is not None: + result_queue.put({ + 'kind': 'worker_result', + 'worker_index': worker_index, + 'server_index': server_index, + 'summary': summary, + }) + + +def _shutdown_processes(processes: list[tuple[Any, dict[str, Any]]], *, terminate: bool = False) -> None: + for process, _payload in processes: + if terminate and process.is_alive(): + process.terminate() + for process, _payload in processes: + process.join(timeout=5.0) + if process.is_alive(): + process.terminate() + process.join(timeout=5.0) + + +def _run_cuda_parallel_processes( + server_payloads: list[dict[str, Any]], + worker_payloads: list[dict[str, Any]], +) -> list[dict[str, Any]]: + if not worker_payloads: + return [] + + ctx = multiprocessing.get_context('spawn') + result_queue = ctx.Queue() + response_queues = [ctx.Queue() for _ in range(len(worker_payloads))] + request_queues = {} + server_processes: list[tuple[Any, dict[str, Any]]] = [] + worker_processes: list[tuple[Any, dict[str, Any]]] = [] + should_terminate = False + startup_timeout_s = float( + server_payloads[0]['cfg']['eval'].get('server_startup_timeout_s', 300.0) + ) if server_payloads else 300.0 + + try: + for server_payload in server_payloads: + request_queue = ctx.Queue() + request_queues[int(server_payload['server_index'])] = request_queue + process_payload = dict(server_payload) + process_payload['request_queue'] = request_queue + process_payload['response_queues'] = response_queues + process_payload['result_queue'] = result_queue + process = ctx.Process( + target=_inference_server_main, + args=(process_payload,), + name=f'eval-inference-server-{int(server_payload["server_index"]):02d}', + ) + process.start() + server_processes.append((process, server_payload)) + + pending_servers = {int(payload['server_index']) for _process, payload in server_processes} + startup_deadline = time.monotonic() + startup_timeout_s + while pending_servers: + remaining = startup_deadline - time.monotonic() + if remaining <= 0: + raise RuntimeError( + 'Timed out waiting for CUDA inference servers to become ready' + ) + try: + message = result_queue.get(timeout=min(0.2, remaining)) + except queue.Empty: + for process, payload in server_processes: + if process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA inference server {int(payload["server_index"])} exited with code ' + f'{process.exitcode}' + ) + continue + + message_kind = message.get('kind') + if message_kind == 'server_ready': + pending_servers.discard(int(message['server_index'])) + continue + if message_kind == 'server_error': + raise RuntimeError( + f'CUDA inference server {int(message["server_index"])} failed: {message.get("error")}' + ) + raise RuntimeError(f'Unexpected CUDA startup message: {message_kind}') + + for worker_payload in worker_payloads: + worker_index = int(worker_payload['worker_index']) + process_payload = dict(worker_payload) + process_payload['request_queue'] = request_queues[int(worker_payload['server_index'])] + process_payload['response_queue'] = response_queues[worker_index] + process_payload['result_queue'] = result_queue + process = ctx.Process( + target=_env_worker_main, + args=(process_payload,), + name=f'eval-env-worker-{worker_index:02d}', + ) + process.start() + worker_processes.append((process, worker_payload)) + + pending_workers = {int(payload['worker_index']) for payload in worker_payloads} + worker_summaries = {} + while pending_workers: + try: + message = result_queue.get(timeout=0.2) + except queue.Empty: + for process, payload in server_processes: + if process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA inference server {int(payload["server_index"])} exited with code ' + f'{process.exitcode}' + ) + for process, payload in worker_processes: + worker_index = int(payload['worker_index']) + if worker_index in pending_workers and process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA rollout worker {worker_index} exited with code {process.exitcode}' + ) + continue + + message_kind = message.get('kind') + if message_kind == 'worker_result': + worker_index = int(message['worker_index']) + worker_summaries[worker_index] = message['summary'] + pending_workers.discard(worker_index) + continue + if message_kind == 'worker_error': + raise RuntimeError( + f'CUDA rollout worker {int(message["worker_index"])} failed: {message.get("error")}' + ) + if message_kind == 'server_error': + raise RuntimeError( + f'CUDA inference server {int(message["server_index"])} failed: {message.get("error")}' + ) + raise RuntimeError(f'Unexpected CUDA parallel message: {message_kind}') + + return [ + worker_summaries[worker_index] + for worker_index in sorted(worker_summaries) + ] + except Exception: + should_terminate = True + raise + finally: + for request_queue in request_queues.values(): + try: + request_queue.put({'type': 'shutdown_server'}) + except Exception: + continue + _shutdown_processes(worker_processes, terminate=should_terminate) + _shutdown_processes(server_processes, terminate=should_terminate) + + +def _run_eval_parallel(cfg: DictConfig): + eval_cfg = cfg.eval + _configure_headless_mujoco_gl(eval_cfg) + _validate_parallel_eval_cfg(eval_cfg) + if _is_cuda_device(eval_cfg.get('device', 'cpu')): + _resolve_cuda_devices(eval_cfg) + return _run_eval_parallel_cuda(cfg) + return _run_eval_parallel_cpu(cfg) + + +def _run_eval_parallel_cpu(cfg: DictConfig): + eval_cfg = cfg.eval + artifact_paths = _resolve_artifact_paths(eval_cfg) + payloads, active_workers = _build_parallel_worker_payloads(cfg, artifact_paths) + + try: + worker_summaries = _run_spawn_jobs( + payloads=payloads, + max_workers=max(active_workers, 1), + worker_fn=_run_eval_worker_entry, + ) + except Exception as exc: + raise RuntimeError(f'Parallel rollout worker failed: {exc}') from exc + + summary = _merge_worker_summaries(worker_summaries, artifact_paths) + public_summary = _public_summary(summary) + if artifact_paths.get('summary_json') is not None: + _save_summary_json(artifact_paths['summary_json'], public_summary) + if artifact_paths.get('timing_json') is not None: + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + return public_summary + + +def _run_eval_parallel_cuda(cfg: DictConfig): + eval_cfg = cfg.eval + _validate_parallel_eval_cfg(eval_cfg) + artifact_paths = _resolve_artifact_paths(eval_cfg) + worker_payloads, _active_workers = _build_parallel_worker_payloads(cfg, artifact_paths) + cuda_devices = _resolve_cuda_devices(eval_cfg) + server_payloads, assigned_worker_payloads = _build_cuda_server_payloads( + cfg, + worker_payloads=worker_payloads, + cuda_devices=cuda_devices, + ) + + try: + worker_summaries = _run_cuda_parallel_processes( + server_payloads=server_payloads, + worker_payloads=assigned_worker_payloads, + ) + except Exception as exc: + raise RuntimeError(f'Parallel CUDA rollout failed: {exc}') from exc + + summary = _merge_worker_summaries(worker_summaries, artifact_paths) + public_summary = _public_summary(summary) + if artifact_paths.get('summary_json') is not None: + _save_summary_json(artifact_paths['summary_json'], public_summary) + if artifact_paths.get('timing_json') is not None: + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + return public_summary + + +def _run_eval(cfg: DictConfig): + """ + 使用 agent 内置队列管理的简化版 VLA 评估 + + 所有评估参数来自 vla/conf/eval.yaml,合并到 cfg 中。 + 命令行覆盖: python eval_vla_simple.py eval.ckpt_path=... eval.num_episodes=5 + """ + + _print_eval_config(cfg) + requested_workers = int(cfg.eval.get('num_workers', 1)) + active_workers = _normalize_num_workers( + num_workers=requested_workers, + num_episodes=int(cfg.eval.get('num_episodes', 0)), + ) + if active_workers <= 1: + return _run_eval_serial(cfg) + return _run_eval_parallel(cfg) + + @hydra.main(version_base=None, config_path="../../vla/conf", config_name="config") def main(cfg: DictConfig): return _run_eval(cfg) diff --git a/roboimi/demos/vla_scripts/train_vla.py b/roboimi/demos/vla_scripts/train_vla.py index e4a063c..2607fa3 100644 --- a/roboimi/demos/vla_scripts/train_vla.py +++ b/roboimi/demos/vla_scripts/train_vla.py @@ -702,10 +702,28 @@ def _run_training(cfg: DictConfig): from roboimi.demos.vla_scripts import eval_vla rollout_cfg = OmegaConf.create(OmegaConf.to_container(cfg, resolve=False)) + rollout_num_episodes = int(cfg.train.get('rollout_num_episodes', 1)) + rollout_device = str(cfg.train.get('rollout_device', cfg.train.device)) + configured_rollout_workers = cfg.train.get('rollout_num_workers', None) + if configured_rollout_workers is None: + if rollout_device.startswith('cuda'): + rollout_num_workers = min(max(rollout_num_episodes, 1), 8) + else: + rollout_num_workers = 1 + else: + rollout_num_workers = int(configured_rollout_workers) rollout_cfg.eval.ckpt_path = str(checkpoint_path) - rollout_cfg.eval.num_episodes = int(cfg.train.get('rollout_num_episodes', 1)) + rollout_cfg.eval.num_episodes = rollout_num_episodes + rollout_cfg.eval.num_workers = rollout_num_workers rollout_cfg.eval.headless = True - rollout_cfg.eval.device = 'cpu' + rollout_cfg.eval.device = rollout_device + rollout_cfg.eval.cuda_devices = cfg.train.get('rollout_cuda_devices', None) + rollout_cfg.eval.response_timeout_s = float( + cfg.train.get('rollout_response_timeout_s', 300.0) + ) + rollout_cfg.eval.server_startup_timeout_s = float( + cfg.train.get('rollout_server_startup_timeout_s', 300.0) + ) rollout_cfg.eval.verbose_action = False rollout_cfg.eval.record_video = False rollout_cfg.eval.save_trajectory_image = True @@ -716,9 +734,11 @@ def _run_training(cfg: DictConfig): ) log.info( - "🎯 开始 checkpoint rollout 验证: %s (episodes=%s, headless=True)", + "🎯 开始 checkpoint rollout 验证: %s (episodes=%s, device=%s, workers=%s, headless=True)", checkpoint_path, rollout_cfg.eval.num_episodes, + rollout_cfg.eval.device, + rollout_cfg.eval.num_workers, ) return eval_vla._run_eval(rollout_cfg) diff --git a/roboimi/envs/double_base.py b/roboimi/envs/double_base.py index 28392b8..014aae6 100644 --- a/roboimi/envs/double_base.py +++ b/roboimi/envs/double_base.py @@ -91,7 +91,6 @@ class DualDianaMed(MujocoEnv): def step(self,action): self.compute_qpos = action #for observation ! - self.obs = self._get_obs() if self.interpolator_left is not None and self.interpolator_right is not None: self.interpolator_left.updateInput(action[:7], control_cycle=self.base_time) self.interpolator_right.updateInput(action[7:-2], control_cycle=self.base_time) @@ -104,6 +103,7 @@ class DualDianaMed(MujocoEnv): super().step(action) self.base_time = time.time() - ctrl_cur_time + self.obs = self._get_obs() def preStep(self, action): diff --git a/roboimi/vla/conf/config.yaml b/roboimi/vla/conf/config.yaml index 7f991e0..6e43491 100644 --- a/roboimi/vla/conf/config.yaml +++ b/roboimi/vla/conf/config.yaml @@ -29,6 +29,11 @@ train: rollout_val_freq_epochs: 50 # 每隔多少个 epoch 执行一次 rollout 验证 rollout_validate_on_checkpoint: false # 是否在保存 checkpoint 后立即运行 rollout 验证 rollout_num_episodes: 3 # rollout 验证的回合数 + rollout_device: ${train.device} # rollout 使用的设备;默认跟随训练设备 + rollout_num_workers: null # rollout 并行 worker 数;null 时 CUDA 自动推断,CPU 保持 1 + rollout_cuda_devices: null # rollout CUDA 并行使用的逻辑 device 列表;null 时默认 [0] + rollout_response_timeout_s: 300.0 # rollout worker 等待 inference server 响应的超时时间 + rollout_server_startup_timeout_s: 300.0 # rollout 等待 inference server 就绪的超时时间 # 学习率调度器(带预热) warmup_steps: 2000 # 预热步数(Transformer建议更长) diff --git a/roboimi/vla/conf/eval/eval.yaml b/roboimi/vla/conf/eval/eval.yaml index 06b47e3..b2e38a2 100644 --- a/roboimi/vla/conf/eval/eval.yaml +++ b/roboimi/vla/conf/eval/eval.yaml @@ -2,6 +2,10 @@ # 评估配置 ckpt_path: "checkpoints/vla_model_best.pt" # 模型检查点路径 num_episodes: 3 # 评估回合数 +num_workers: 1 # 并行 worker 数;1 表示保持单进程评估 +cuda_devices: null # CUDA 并行评估时使用的逻辑设备列表;null 表示默认 [0] +response_timeout_s: 300.0 # worker 等待 inference server 响应的超时时间(秒) +server_startup_timeout_s: 300.0 # parent 等待 inference server 就绪的超时时间(秒) max_timesteps: 700 # 每回合最大时间步 device: ${train.device} # 与训练保持一致 task_name: "sim_transfer" # 环境任务名称 diff --git a/tests/test_eval_vla_execution.py b/tests/test_eval_vla_execution.py index 6a468ac..f9d7a44 100644 --- a/tests/test_eval_vla_execution.py +++ b/tests/test_eval_vla_execution.py @@ -1,5 +1,11 @@ import unittest +from unittest import mock +import numpy as np +import torch +from omegaconf import OmegaConf + +from roboimi.demos.vla_scripts import eval_vla from roboimi.vla.eval_utils import execute_policy_action @@ -14,6 +20,48 @@ class _FakeEnv: self.calls.append(("step_jnt", action)) +class _FakeQueue: + def __init__(self, initial_items=None): + self.items = list(initial_items or []) + self.put_calls = [] + + def put(self, item): + self.put_calls.append(item) + self.items.append(item) + + def get(self, timeout=None): + del timeout + if not self.items: + raise AssertionError("queue unexpectedly empty") + return self.items.pop(0) + + +def _make_parallel_cfg(**eval_overrides): + eval_cfg = { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 5, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "artifact_dir": None, + "save_artifacts": False, + "save_summary_json": False, + "save_timing": False, + "save_trajectory": False, + "save_trajectory_npz": False, + "record_video": False, + "save_trajectory_image": False, + } + eval_cfg.update(eval_overrides) + return OmegaConf.create({"agent": {}, "eval": eval_cfg}) + + class EvalVLAExecutionTest(unittest.TestCase): def test_execute_policy_action_uses_ee_step(self): env = _FakeEnv() @@ -23,6 +71,662 @@ class EvalVLAExecutionTest(unittest.TestCase): self.assertEqual(env.calls, [("step", action)]) + def test_split_episode_indices_balances_workers(self): + self.assertEqual( + eval_vla._split_episode_indices(num_episodes=10, num_workers=3), + [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]], + ) + + def test_normalize_num_workers_caps_worker_count_to_episode_count(self): + self.assertEqual(eval_vla._normalize_num_workers(num_workers=5, num_episodes=2), 2) + + def test_plan_episode_box_poses_uses_global_episode_order(self): + planned_poses = [ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([1.1, 1.2, 1.3], dtype=np.float32), + np.array([2.1, 2.2, 2.3], dtype=np.float32), + ] + sampler = mock.Mock(side_effect=planned_poses) + + result = eval_vla._plan_episode_box_poses(num_episodes=3, sampler=sampler) + + self.assertEqual(sampler.call_count, 3) + self.assertEqual(len(result), 3) + for expected, actual in zip(planned_poses, result): + np.testing.assert_array_equal(actual, expected) + + def test_resolve_policy_camera_names_matches_vlaagent_fallback_sorting(self): + cfg = OmegaConf.create( + { + "agent": { + "_target_": "roboimi.vla.agent.VLAAgent", + }, + "eval": { + "camera_names": ["r_vis", "top", "front"], + }, + } + ) + + self.assertEqual( + eval_vla._resolve_policy_camera_names(cfg), + ["front", "r_vis", "top"], + ) + + def test_resolve_policy_camera_names_matches_gr00t_fallback_input_order(self): + cfg = OmegaConf.create( + { + "agent": { + "_target_": "roboimi.vla.agent_gr00t_dit.VLAAgentGr00tDiT", + }, + "eval": { + "camera_names": ["r_vis", "top", "front"], + }, + } + ) + + self.assertEqual( + eval_vla._resolve_policy_camera_names(cfg), + ["r_vis", "top", "front"], + ) + + def test_build_episode_plans_without_box_poses_keeps_serial_sampling_lazy(self): + plans = eval_vla._build_episode_plans(num_episodes=3) + + self.assertEqual( + plans, + [ + {"episode_index": 0}, + {"episode_index": 1}, + {"episode_index": 2}, + ], + ) + + def test_prepare_local_policy_batch_pads_latest_observation_to_obs_horizon(self): + queues = eval_vla._new_local_policy_queues(obs_horizon=3) + observation = { + "qpos": torch.tensor([1.0, 2.0], dtype=torch.float32), + "images": { + "front": torch.tensor([[[1.0]]], dtype=torch.float32), + }, + } + + eval_vla._populate_local_policy_queues(queues, observation) + batch = eval_vla._prepare_local_policy_batch( + queues, + obs_horizon=3, + camera_names=["front"], + ) + + self.assertEqual(tuple(batch["qpos"].shape), (1, 3, 2)) + self.assertEqual(tuple(batch["images"]["front"].shape), (1, 3, 1, 1, 1)) + np.testing.assert_array_equal( + batch["qpos"][0].cpu().numpy(), + np.array([[1.0, 2.0], [1.0, 2.0], [1.0, 2.0]], dtype=np.float32), + ) + np.testing.assert_array_equal( + batch["images"]["front"][0].cpu().numpy(), + np.array([[[[1.0]]], [[[1.0]]], [[[1.0]]]], dtype=np.float32), + ) + + def test_enqueue_predicted_actions_uses_executable_slice(self): + queues = eval_vla._new_local_policy_queues(obs_horizon=2) + predicted_actions = torch.tensor( + [[[10.0], [20.0], [30.0], [40.0]]], + dtype=torch.float32, + ) + + eval_vla._enqueue_predicted_actions( + queues, + predicted_actions=predicted_actions, + obs_horizon=2, + num_action_steps=2, + ) + + self.assertEqual(len(queues["action"]), 2) + np.testing.assert_array_equal(queues["action"].popleft().numpy(), np.array([20.0], dtype=np.float32)) + np.testing.assert_array_equal(queues["action"].popleft().numpy(), np.array([30.0], dtype=np.float32)) + + def test_remote_policy_runner_only_requests_server_inference_when_local_action_queue_is_empty(self): + request_queue = _FakeQueue() + response_queue = _FakeQueue( + [ + { + "type": "predict_chunk_result", + "actions": np.asarray([[[10.0], [20.0], [30.0]]], dtype=np.float32), + } + ] + ) + runner = eval_vla._RemotePolicyRunner( + worker_index=3, + server_index=1, + request_queue=request_queue, + response_queue=response_queue, + camera_names=["front"], + obs_horizon=2, + num_action_steps=2, + ) + first_observation = { + "qpos": torch.tensor([1.0, 2.0], dtype=torch.float32), + "images": {"front": torch.tensor([[[1.0]]], dtype=torch.float32)}, + } + second_observation = { + "qpos": torch.tensor([3.0, 4.0], dtype=torch.float32), + "images": {"front": torch.tensor([[[2.0]]], dtype=torch.float32)}, + } + + first_action, first_forward = runner.select_action( + first_observation, + episode_index=7, + timestep=0, + ) + second_action, second_forward = runner.select_action( + second_observation, + episode_index=7, + timestep=1, + ) + + self.assertTrue(first_forward) + self.assertFalse(second_forward) + self.assertEqual(len(request_queue.put_calls), 1) + self.assertEqual(request_queue.put_calls[0]["type"], "predict_chunk") + self.assertEqual(request_queue.put_calls[0]["worker_index"], 3) + self.assertEqual(request_queue.put_calls[0]["server_index"], 1) + np.testing.assert_array_equal(first_action.numpy(), np.array([20.0], dtype=np.float32)) + np.testing.assert_array_equal(second_action.numpy(), np.array([30.0], dtype=np.float32)) + + def test_merge_worker_summaries_sorts_episodes_and_recomputes_aggregates(self): + worker_summaries = [ + { + "avg_inference_fps": 999.0, + "avg_control_fps": 999.0, + "avg_obs_read_time_ms": 999.0, + "avg_total_time_ms": 999.0, + "timing_summary": {"count": 999, "model_forward_count": 999}, + "episodes": [ + { + "episode_index": 2, + "episode_reward": 9.0, + "episode_max_reward": 4.0, + "inference_fps": 30.0, + "control_fps": 15.0, + } + ], + "_merge_state": { + "obs_read_time_ms": [9.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [3.0], + "env_step_time_ms": [4.0], + "total_time_ms": [10.0], + "model_forward_flags": [False], + }, + }, + { + "avg_inference_fps": 888.0, + "avg_control_fps": 888.0, + "avg_obs_read_time_ms": 888.0, + "avg_total_time_ms": 888.0, + "timing_summary": {"count": 888, "model_forward_count": 888}, + "episodes": [ + { + "episode_index": 1, + "episode_reward": 6.0, + "episode_max_reward": 3.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 5.0, + "episode_max_reward": 2.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0, 12.0], + "preprocess_time_ms": [2.0, 3.0, 4.0], + "inference_time_ms": [4.0, 5.0, 6.0], + "env_step_time_ms": [6.0, 7.0, 8.0], + "total_time_ms": [8.0, 9.0, 20.0], + "model_forward_flags": [True, False, True], + }, + }, + ] + artifact_paths = { + "output_dir": "/tmp/merged", + "summary_json": "/tmp/merged/rollout_summary.json", + "timing_json": "/tmp/merged/timing.json", + "trajectory_npz": None, + "video_mp4": None, + "video_camera_name": None, + } + + merged = eval_vla._merge_worker_summaries(worker_summaries, artifact_paths) + + self.assertEqual([episode["episode_index"] for episode in merged["episodes"]], [0, 1, 2]) + self.assertEqual(merged["episode_rewards"], [5.0, 6.0, 9.0]) + self.assertEqual(merged["episode_max_rewards"], [2.0, 3.0, 4.0]) + self.assertAlmostEqual(merged["avg_reward"], 20.0 / 3.0) + self.assertAlmostEqual(merged["avg_max_reward"], 3.0) + self.assertAlmostEqual(merged["avg_inference_fps"], 20.0) + self.assertAlmostEqual(merged["avg_control_fps"], 10.0) + self.assertAlmostEqual(merged["avg_obs_read_time_ms"], 6.0) + self.assertAlmostEqual(merged["avg_total_time_ms"], 47.0 / 4.0) + self.assertEqual(merged["timing_summary"]["count"], 4) + self.assertEqual(merged["timing_summary"]["model_forward_count"], 2) + self.assertEqual(merged["artifact_dir"], "/tmp/merged") + self.assertEqual(merged["artifacts"], artifact_paths) + + def test_build_cuda_server_payloads_uses_round_robin_worker_assignment(self): + cfg = _make_parallel_cfg(num_episodes=4, num_workers=4, device="cuda", cuda_devices=[0, 1]) + artifact_paths = {"output_dir": None} + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + np.array([1.0, 1.1, 1.2], dtype=np.float32), + ], + ): + worker_payloads, _ = eval_vla._build_parallel_worker_payloads(cfg, artifact_paths) + + server_payloads, assigned_workers = eval_vla._build_cuda_server_payloads( + cfg, + worker_payloads=worker_payloads, + cuda_devices=[0, 1], + ) + + self.assertEqual([payload["device_index"] for payload in server_payloads], [0, 1]) + self.assertEqual([payload["worker_index"] for payload in assigned_workers], [0, 1, 2, 3]) + self.assertEqual([payload["server_index"] for payload in assigned_workers], [0, 1, 0, 1]) + self.assertEqual(server_payloads[0]["worker_indices"], [0, 2]) + self.assertEqual(server_payloads[1]["worker_indices"], [1, 3]) + + def test_run_eval_parallel_dispatches_episode_splits_and_box_poses(self): + cfg = _make_parallel_cfg(num_episodes=5, num_workers=2, artifact_dir="/tmp/parallel-root") + planned_poses = [ + np.array([float(index), float(index) + 0.1, float(index) + 0.2], dtype=np.float32) + for index in range(5) + ] + observed_payloads = [] + + def fake_run_spawn_jobs(payloads, max_workers, worker_fn): + del worker_fn + self.assertEqual(max_workers, 2) + observed_payloads.extend(payloads) + return [ + { + "episodes": [ + { + "episode_index": 4, + "episode_reward": 5.0, + "episode_max_reward": 5.0, + "inference_fps": 50.0, + "control_fps": 25.0, + }, + { + "episode_index": 3, + "episode_reward": 4.0, + "episode_max_reward": 4.0, + "inference_fps": 40.0, + "control_fps": 20.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [4.0, 5.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 5.0], + "model_forward_flags": [True, True], + }, + }, + { + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + }, + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0, 3.0], + "preprocess_time_ms": [1.0, 1.0, 1.0], + "inference_time_ms": [2.0, 2.0, 2.0], + "env_step_time_ms": [3.0, 3.0, 3.0], + "total_time_ms": [1.0, 2.0, 3.0], + "model_forward_flags": [False, True, False], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=planned_poses, + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=fake_run_spawn_jobs, + ): + summary = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(len(observed_payloads), 2) + self.assertEqual( + [[plan["episode_index"] for plan in payload["episode_plans"]] for payload in observed_payloads], + [[0, 1, 2], [3, 4]], + ) + for payload in observed_payloads: + for plan in payload["episode_plans"]: + np.testing.assert_array_equal( + np.asarray(plan["box_pos"], dtype=np.float32), + planned_poses[plan["episode_index"]], + ) + self.assertEqual([episode["episode_index"] for episode in summary["episodes"]], [0, 1, 2, 3, 4]) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0, 4.0, 5.0]) + self.assertEqual(summary["num_episodes"], 5) + + def test_run_eval_parallel_allows_trajectory_images_and_keeps_worker_artifact_paths(self): + cfg = _make_parallel_cfg( + num_episodes=2, + num_workers=2, + artifact_dir="/tmp/parallel-images", + save_summary_json=True, + save_trajectory_image=True, + ) + observed_payloads = [] + + def fake_run_spawn_jobs(payloads, max_workers, worker_fn): + del worker_fn + self.assertEqual(max_workers, 2) + observed_payloads.extend(payloads) + return [ + { + "episodes": [ + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + "artifact_paths": { + "trajectory_image": f"{payloads[0]['artifact_dir']}/rollout_front_ep01_trajectory.png", + }, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [1.0], + "env_step_time_ms": [1.0], + "total_time_ms": [1.0], + "model_forward_flags": [True], + }, + }, + { + "episodes": [ + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + "artifact_paths": { + "trajectory_image": f"{payloads[1]['artifact_dir']}/rollout_front_ep02_trajectory.png", + }, + }, + ], + "_merge_state": { + "obs_read_time_ms": [2.0], + "preprocess_time_ms": [2.0], + "inference_time_ms": [2.0], + "env_step_time_ms": [2.0], + "total_time_ms": [2.0], + "model_forward_flags": [False], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=fake_run_spawn_jobs, + ): + summary = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(len(observed_payloads), 2) + self.assertTrue(observed_payloads[0]["artifact_dir"].endswith("workers/worker_00")) + self.assertTrue(observed_payloads[1]["artifact_dir"].endswith("workers/worker_01")) + self.assertTrue( + summary["episodes"][0]["artifact_paths"]["trajectory_image"].endswith( + "workers/worker_00/rollout_front_ep01_trajectory.png" + ) + ) + self.assertTrue( + summary["episodes"][1]["artifact_paths"]["trajectory_image"].endswith( + "workers/worker_01/rollout_front_ep02_trajectory.png" + ) + ) + + def test_run_eval_parallel_surfaces_worker_failures(self): + cfg = _make_parallel_cfg(num_episodes=2, num_workers=2) + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=RuntimeError("boom"), + ): + with self.assertRaisesRegex(RuntimeError, "Parallel rollout worker failed"): + eval_vla._run_eval_parallel(cfg) + + def test_run_eval_parallel_cuda_builds_server_payloads_and_merges_worker_results(self): + cfg = _make_parallel_cfg( + num_episodes=4, + num_workers=4, + device="cuda", + cuda_devices=[0], + artifact_dir="/tmp/cuda-root", + ) + observed_server_payloads = [] + observed_worker_payloads = [] + + def fake_run_cuda_parallel_processes(server_payloads, worker_payloads): + observed_server_payloads.extend(server_payloads) + observed_worker_payloads.extend(worker_payloads) + return [ + { + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 4.0], + "model_forward_flags": [True, False], + }, + }, + { + "episodes": [ + { + "episode_index": 3, + "episode_reward": 4.0, + "episode_max_reward": 4.0, + "inference_fps": 40.0, + "control_fps": 20.0, + }, + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [3.0, 4.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 4.0], + "model_forward_flags": [True, True], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + np.array([1.0, 1.1, 1.2], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_cuda_parallel_processes", + side_effect=fake_run_cuda_parallel_processes, + create=True, + ): + summary = eval_vla._run_eval_parallel_cuda(cfg) + + self.assertEqual(len(observed_server_payloads), 1) + self.assertEqual(observed_server_payloads[0]["device_index"], 0) + self.assertEqual(len(observed_worker_payloads), 4) + self.assertTrue(all(payload["server_index"] == 0 for payload in observed_worker_payloads)) + self.assertEqual([episode["episode_index"] for episode in summary["episodes"]], [0, 1, 2, 3]) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0, 4.0]) + self.assertEqual(summary["num_episodes"], 4) + + def test_run_eval_parallel_cuda_surfaces_server_failures(self): + cfg = _make_parallel_cfg(num_episodes=2, num_workers=2, device="cuda", cuda_devices=[0]) + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_cuda_parallel_processes", + side_effect=RuntimeError("server boom"), + create=True, + ): + with self.assertRaisesRegex(RuntimeError, "Parallel CUDA rollout failed"): + eval_vla._run_eval_parallel_cuda(cfg) + + def test_run_spawn_jobs_supports_real_spawn_with_actual_eval_worker_entry(self): + payloads = [ + {"_spawn_probe": True, "probe_value": 1, "worker_index": 0}, + {"_spawn_probe": True, "probe_value": 2, "worker_index": 1}, + ] + + results = eval_vla._run_spawn_jobs( + payloads=payloads, + max_workers=2, + worker_fn=eval_vla._run_eval_worker_entry, + ) + + self.assertEqual(sorted(result["probe_value"] for result in results), [1, 2]) + self.assertEqual(sorted(result["worker_index"] for result in results), [0, 1]) + + def test_cuda_server_and_env_worker_entrypoints_support_real_spawn_probe(self): + ctx = eval_vla.multiprocessing.get_context("spawn") + request_queue = ctx.Queue() + response_queue = ctx.Queue() + result_queue = ctx.Queue() + + server = ctx.Process( + target=eval_vla._inference_server_main, + args=( + { + "_spawn_probe": True, + "server_index": 0, + "request_queue": request_queue, + "response_queues": [response_queue], + }, + ), + ) + worker = ctx.Process( + target=eval_vla._env_worker_main, + args=( + { + "_spawn_probe": True, + "worker_index": 0, + "server_index": 0, + "request_queue": request_queue, + "response_queue": response_queue, + "result_queue": result_queue, + }, + ), + ) + + server.start() + worker.start() + + result = result_queue.get(timeout=10.0) + + worker.join(timeout=10.0) + request_queue.put({"type": "shutdown_server"}) + server.join(timeout=10.0) + + self.assertEqual(result["kind"], "worker_result") + self.assertEqual(result["summary"]["probe_worker_index"], 0) + self.assertEqual(result["summary"]["probe_server_index"], 0) + self.assertEqual(result["summary"]["probe_actions"], [[[11.0], [22.0], [33.0]]]) + self.assertEqual(worker.exitcode, 0) + self.assertEqual(server.exitcode, 0) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_eval_vla_headless.py b/tests/test_eval_vla_headless.py index 416c6cf..768965f 100644 --- a/tests/test_eval_vla_headless.py +++ b/tests/test_eval_vla_headless.py @@ -216,6 +216,31 @@ class EvalVLAHeadlessTest(unittest.TestCase): self.assertIsNotNone(env.top) self.assertIsNotNone(env.front) + def test_dual_diana_step_refreshes_obs_after_physics_step(self): + env = DualDianaMed.__new__(DualDianaMed) + env.compute_qpos = np.zeros(16) + env.interpolator_left = None + env.interpolator_right = None + env.control_timestep = 0.001 + env.model_timestep = 0.001 + env.base_time = 0.0 + events = [] + + def fake_get_obs(): + events.append("obs") + return {"images": {}, "qpos": np.zeros(16, dtype=np.float32)} + + env._get_obs = fake_get_obs + + with mock.patch( + "roboimi.envs.double_base.MujocoEnv.step", + autospec=True, + side_effect=lambda _self, _action: events.append("physics"), + ): + env.step(np.zeros(16)) + + self.assertEqual(events, ["physics", "obs"]) + def test_eval_main_headless_skips_render_and_still_executes_policy(self): fake_env = _FakeEnv() fake_agent = _FakeAgent() @@ -323,6 +348,193 @@ class EvalVLAHeadlessTest(unittest.TestCase): self.assertAlmostEqual(summary["avg_reward"], 3.75) self.assertEqual(summary["num_episodes"], 2) + + def test_eval_config_exposes_num_workers_default(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("num_workers", eval_cfg) + self.assertEqual(eval_cfg.num_workers, 1) + + def test_eval_config_exposes_cuda_devices_default(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("cuda_devices", eval_cfg) + self.assertIsNone(eval_cfg.cuda_devices) + + def test_eval_config_exposes_parallel_timeout_defaults(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("response_timeout_s", eval_cfg) + self.assertIn("server_startup_timeout_s", eval_cfg) + self.assertEqual(eval_cfg.response_timeout_s, 300.0) + self.assertEqual(eval_cfg.server_startup_timeout_s, 300.0) + + def test_run_eval_uses_serial_path_when_num_workers_is_one(self): + cfg = OmegaConf.create( + { + "eval": { + "num_workers": 1, + "num_episodes": 3, + } + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_serial", + return_value={"mode": "serial"}, + ) as run_eval_serial, mock.patch.object( + eval_vla, + "_run_eval_parallel", + ) as run_eval_parallel: + result = eval_vla._run_eval(cfg) + + self.assertEqual(result, {"mode": "serial"}) + run_eval_serial.assert_called_once_with(cfg) + run_eval_parallel.assert_not_called() + + def test_run_eval_uses_serial_path_when_requested_workers_collapse_to_one(self): + cfg = OmegaConf.create( + { + "eval": { + "num_workers": 8, + "num_episodes": 1, + } + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_serial", + return_value={"mode": "serial"}, + ) as run_eval_serial, mock.patch.object( + eval_vla, + "_run_eval_parallel", + ) as run_eval_parallel: + result = eval_vla._run_eval(cfg) + + self.assertEqual(result, {"mode": "serial"}) + run_eval_serial.assert_called_once_with(cfg) + run_eval_parallel.assert_not_called() + + def test_run_eval_parallel_requires_headless_true(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": False, + }, + } + ) + + with self.assertRaisesRegex(ValueError, "headless=true"): + eval_vla._run_eval_parallel(cfg) + + def test_run_eval_parallel_dispatches_to_cpu_workers_when_device_is_cpu(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "cuda_devices": None, + }, + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_parallel_cpu", + return_value={"mode": "cpu"}, + create=True, + ) as run_cpu_parallel, mock.patch.object( + eval_vla, + "_run_eval_parallel_cuda", + create=True, + ) as run_cuda_parallel: + result = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(result, {"mode": "cpu"}) + run_cpu_parallel.assert_called_once_with(cfg) + run_cuda_parallel.assert_not_called() + + def test_run_eval_parallel_dispatches_to_cuda_servers_when_device_is_cuda(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cuda", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "cuda_devices": [0], + }, + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_parallel_cpu", + create=True, + ) as run_cpu_parallel, mock.patch.object( + eval_vla, + "_run_eval_parallel_cuda", + return_value={"mode": "cuda"}, + create=True, + ) as run_cuda_parallel: + result = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(result, {"mode": "cuda"}) + run_cpu_parallel.assert_not_called() + run_cuda_parallel.assert_called_once_with(cfg) + + def test_resolve_cuda_devices_defaults_to_single_logical_gpu(self): + cfg = OmegaConf.create( + { + "device": "cuda", + "cuda_devices": None, + } + ) + + self.assertEqual(eval_vla._resolve_cuda_devices(cfg), [0]) + + def test_resolve_cuda_devices_rejects_empty_selection(self): + cfg = OmegaConf.create( + { + "device": "cuda", + "cuda_devices": [], + } + ) + + with self.assertRaisesRegex(ValueError, "cuda_devices"): + eval_vla._resolve_cuda_devices(cfg) + def test_run_eval_uses_air_insert_sampler_for_socket_peg_task(self): self.assertTrue( hasattr(eval_vla, "sample_air_insert_socket_peg_state"), diff --git a/tests/test_eval_vla_rollout_artifacts.py b/tests/test_eval_vla_rollout_artifacts.py index 7cb316a..dc41d2a 100644 --- a/tests/test_eval_vla_rollout_artifacts.py +++ b/tests/test_eval_vla_rollout_artifacts.py @@ -102,10 +102,8 @@ class EvalVLARolloutArtifactsTest(unittest.TestCase): self.assertIn('artifact_dir', eval_cfg) self.assertFalse(eval_cfg.save_summary_json) self.assertFalse(eval_cfg.save_trajectory_npz) - self.assertFalse(eval_cfg.save_trajectory_image) self.assertFalse(eval_cfg.record_video) self.assertIsNone(eval_cfg.artifact_dir) - self.assertIsNone(eval_cfg.trajectory_image_camera_name) self.assertIsNone(eval_cfg.video_camera_name) self.assertEqual(eval_cfg.video_fps, 30) @@ -135,8 +133,6 @@ class EvalVLARolloutArtifactsTest(unittest.TestCase): 'artifact_dir': tmpdir, 'save_summary_json': True, 'save_trajectory_npz': True, - 'save_trajectory_image': True, - 'trajectory_image_camera_name': 'front', 'record_video': True, 'video_camera_name': 'front', 'video_fps': 12, @@ -180,14 +176,12 @@ class EvalVLARolloutArtifactsTest(unittest.TestCase): trajectory_path = Path(artifacts['trajectory_npz']) summary_path = Path(artifacts['summary_json']) video_path = Path(artifacts['video_mp4']) - trajectory_image_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image']) self.assertEqual(Path(artifacts['output_dir']), Path(tmpdir)) self.assertEqual(artifacts['video_camera_name'], 'front') self.assertTrue(trajectory_path.exists()) self.assertTrue(summary_path.exists()) self.assertTrue(video_path.exists()) - self.assertTrue(trajectory_image_path.exists()) rollout_npz = np.load(trajectory_path) np.testing.assert_array_equal(rollout_npz['episode_index'], np.array([0, 0])) @@ -224,120 +218,267 @@ class EvalVLARolloutArtifactsTest(unittest.TestCase): saved_summary = json.load(fh) self.assertEqual(saved_summary['artifacts']['trajectory_npz'], str(trajectory_path)) self.assertEqual(saved_summary['artifacts']['video_mp4'], str(video_path)) - self.assertEqual( - saved_summary['episodes'][0]['artifact_paths']['trajectory_image'], - str(trajectory_image_path), - ) self.assertEqual(saved_summary['episode_rewards'], [3.0]) self.assertAlmostEqual(summary['avg_reward'], 3.0) self.assertIn('avg_obs_read_time_ms', summary) self.assertIn('avg_env_step_time_ms', summary) - def test_run_eval_exports_front_trajectory_images_without_video_dependency(self): - actions = [ - np.arange(16, dtype=np.float32), - np.arange(16, dtype=np.float32) + 10.0, - np.arange(16, dtype=np.float32) + 100.0, - np.arange(16, dtype=np.float32) + 110.0, + def test_run_eval_parallel_rejects_trajectory_and_video_exports(self): + unsupported_flags = [ + "record_video", + "save_trajectory", + "save_trajectory_npz", ] - fake_agent = _FakeAgent(actions) - fake_env = _FakeEnv() + for flag_name in unsupported_flags: + with self.subTest(flag_name=flag_name): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "save_artifacts": True, + flag_name: True, + }, + } + ) + + with self.assertRaisesRegex(ValueError, flag_name): + eval_vla._run_eval_parallel(cfg) + + def test_run_eval_parallel_writes_merged_summary_timing_and_worker_dirs(self): with tempfile.TemporaryDirectory() as tmpdir: cfg = OmegaConf.create( { - 'agent': {}, - 'eval': { - 'ckpt_path': 'checkpoints/vla_model_best.pt', - 'num_episodes': 2, - 'max_timesteps': 2, - 'device': 'cpu', - 'task_name': 'sim_transfer', - 'camera_names': ['top', 'front'], - 'use_smoothing': True, - 'smooth_alpha': 0.5, - 'verbose_action': False, - 'headless': True, - 'artifact_dir': tmpdir, - 'save_trajectory_image': True, - 'record_video': False, + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 3, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "artifact_dir": tmpdir, + "save_summary_json": True, + "save_timing": True, }, } ) - trajectory_image_calls = [] - - def fake_save_rollout_trajectory_image( - env, - output_path, - raw_actions, - camera_name, - *, - line_radius=0.004, - max_markers=1500, - ): - del env, line_radius, max_markers - trajectory_image_calls.append( + def fake_run_spawn_jobs(payloads, max_workers, worker_fn): + del max_workers, worker_fn + return [ { - 'output_path': output_path, - 'camera_name': camera_name, - 'raw_actions': [np.array(action, copy=True) for action in raw_actions], - } - ) - if output_path is None: - return None - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - output_path.write_bytes(b'fake-png') - return str(output_path) + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + } + ], + "_merge_state": { + "obs_read_time_ms": [3.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [2.0], + "env_step_time_ms": [4.0], + "total_time_ms": [5.0], + "model_forward_flags": [True], + }, + }, + { + "episodes": [ + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [4.0, 4.0], + "total_time_ms": [5.0, 5.0], + "model_forward_flags": [False, True], + }, + }, + ] with mock.patch.object( eval_vla, - 'load_checkpoint', - return_value=(fake_agent, None), + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + ], ), mock.patch.object( eval_vla, - 'make_sim_env', - return_value=fake_env, - ), mock.patch.object( - eval_vla, - 'sample_transfer_pose', - return_value=np.array([0.1, 0.2, 0.3], dtype=np.float32), - ), mock.patch.object( - eval_vla, - 'tqdm', - side_effect=lambda iterable, **kwargs: iterable, - ), mock.patch.object( - eval_vla, - '_save_rollout_trajectory_image', - side_effect=fake_save_rollout_trajectory_image, - ) as save_trajectory_image_mock, mock.patch.object( - eval_vla, - '_open_video_writer', - ) as open_video_writer_mock: - summary = eval_vla._run_eval(cfg) + "_run_spawn_jobs", + side_effect=fake_run_spawn_jobs, + ): + summary = eval_vla._run_eval_parallel(cfg) - self.assertEqual(save_trajectory_image_mock.call_count, 2) - open_video_writer_mock.assert_not_called() - self.assertIsNone(summary['artifacts']['video_mp4']) - self.assertEqual(summary['artifacts']['trajectory_image_camera_name'], 'front') - self.assertEqual( - [call['camera_name'] for call in trajectory_image_calls], - ['front', 'front'], + summary_path = Path(tmpdir) / "rollout_summary.json" + timing_path = Path(tmpdir) / "timing.json" + worker_00_dir = Path(tmpdir) / "workers" / "worker_00" + worker_01_dir = Path(tmpdir) / "workers" / "worker_01" + + self.assertTrue(summary_path.exists()) + self.assertTrue(timing_path.exists()) + self.assertTrue(worker_00_dir.is_dir()) + self.assertTrue(worker_01_dir.is_dir()) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0]) + + with summary_path.open("r", encoding="utf-8") as fh: + saved_summary = json.load(fh) + with timing_path.open("r", encoding="utf-8") as fh: + saved_timing = json.load(fh) + + self.assertEqual(saved_summary["episode_rewards"], [1.0, 2.0, 3.0]) + self.assertEqual(saved_summary["artifact_dir"], tmpdir) + self.assertEqual(saved_timing["count"], 3) + self.assertEqual(saved_timing["model_forward_count"], 2) + + def test_run_eval_parallel_cuda_writes_merged_summary_timing_and_worker_dirs(self): + with tempfile.TemporaryDirectory() as tmpdir: + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 3, + "num_workers": 2, + "cuda_devices": [0], + "max_timesteps": 1, + "device": "cuda", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "artifact_dir": tmpdir, + "save_summary_json": True, + "save_timing": True, + }, + } ) - first_episode_path = Path(summary['episodes'][0]['artifact_paths']['trajectory_image']) - second_episode_path = Path(summary['episodes'][1]['artifact_paths']['trajectory_image']) - self.assertTrue(first_episode_path.exists()) - self.assertTrue(second_episode_path.exists()) - self.assertNotEqual(first_episode_path, second_episode_path) - self.assertEqual(first_episode_path.parent, Path(tmpdir)) - self.assertEqual(second_episode_path.parent, Path(tmpdir)) + def fake_run_cuda_parallel_processes(server_payloads, worker_payloads): + self.assertEqual(len(server_payloads), 1) + self.assertEqual(server_payloads[0]["device_index"], 0) + self.assertEqual([payload["server_index"] for payload in worker_payloads], [0, 0]) + return [ + { + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + } + ], + "_merge_state": { + "obs_read_time_ms": [3.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [2.0], + "env_step_time_ms": [4.0], + "total_time_ms": [5.0], + "model_forward_flags": [True], + }, + }, + { + "episodes": [ + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [4.0, 4.0], + "total_time_ms": [5.0, 5.0], + "model_forward_flags": [False, True], + }, + }, + ] - np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][0], actions[0]) - np.testing.assert_array_equal(trajectory_image_calls[0]['raw_actions'][1], actions[1]) - np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][0], actions[2]) - np.testing.assert_array_equal(trajectory_image_calls[1]['raw_actions'][1], actions[3]) + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_cuda_parallel_processes", + side_effect=fake_run_cuda_parallel_processes, + create=True, + ): + summary = eval_vla._run_eval_parallel_cuda(cfg) + + summary_path = Path(tmpdir) / "rollout_summary.json" + timing_path = Path(tmpdir) / "timing.json" + worker_00_dir = Path(tmpdir) / "workers" / "worker_00" + worker_01_dir = Path(tmpdir) / "workers" / "worker_01" + + self.assertTrue(summary_path.exists()) + self.assertTrue(timing_path.exists()) + self.assertTrue(worker_00_dir.is_dir()) + self.assertTrue(worker_01_dir.is_dir()) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0]) + + with summary_path.open("r", encoding="utf-8") as fh: + saved_summary = json.load(fh) + with timing_path.open("r", encoding="utf-8") as fh: + saved_timing = json.load(fh) + + self.assertEqual(saved_summary["episode_rewards"], [1.0, 2.0, 3.0]) + self.assertEqual(saved_summary["artifact_dir"], tmpdir) + self.assertEqual(saved_timing["count"], 3) + self.assertEqual(saved_timing["model_forward_count"], 2) if __name__ == '__main__': diff --git a/tests/test_train_vla_rollout_validation.py b/tests/test_train_vla_rollout_validation.py index 1dbdf9e..2afd791 100644 --- a/tests/test_train_vla_rollout_validation.py +++ b/tests/test_train_vla_rollout_validation.py @@ -158,6 +158,101 @@ class TrainVLARolloutValidationTest(unittest.TestCase): self.assertGreater(float(cfg.train.lr), 5e-5) self.assertGreater(cfg.train.num_workers, 8) self.assertEqual(cfg.train.rollout_val_freq_epochs, 50) + self.assertEqual(cfg.train.rollout_device, cfg.train.device) + self.assertIsNone(cfg.train.rollout_num_workers) + self.assertIsNone(cfg.train.rollout_cuda_devices) + + + def test_run_training_rollout_validation_propagates_gpu_parallel_settings(self): + cfg = OmegaConf.create( + { + 'train': { + 'device': 'cpu', + 'batch_size': 1, + 'num_workers': 0, + 'val_split': 0.0, + 'seed': 0, + 'lr': 1e-3, + 'max_steps': 2, + 'log_freq': 1, + 'save_freq': 1000, + 'warmup_steps': 1, + 'scheduler_type': 'constant', + 'min_lr': 0.0, + 'grad_clip': 1.0, + 'weight_decay': 0.0, + 'pretrained_ckpt': None, + 'resume_ckpt': None, + 'use_swanlab': False, + 'rollout_val_freq_epochs': 2, + 'rollout_num_episodes': 5, + 'rollout_device': 'cuda', + 'rollout_num_workers': 4, + 'rollout_cuda_devices': [0, 1], + 'rollout_response_timeout_s': 123.0, + 'rollout_server_startup_timeout_s': 456.0, + }, + 'data': { + 'camera_names': ['front'], + }, + 'agent': { + '_target_': 'fake.agent', + }, + 'eval': { + 'ckpt_path': 'unused.pt', + 'num_episodes': 99, + 'max_timesteps': 1, + 'device': 'cpu', + 'task_name': 'sim_transfer', + 'camera_names': ['front'], + 'use_smoothing': False, + 'smooth_alpha': 0.3, + 'verbose_action': False, + 'headless': False, + }, + } + ) + rollout_mock = mock.Mock(return_value={'avg_reward': 1.0}) + + def fake_instantiate(config_node, **_kwargs): + if config_node is cfg.data: + return _FakeDataset() + if config_node is cfg.agent: + return _FakeAgent() + raise AssertionError(f'unexpected instantiate config: {config_node!r}') + + def fake_dataloader(_dataset, *, shuffle, **_kwargs): + del shuffle, _kwargs + return _FakeLoader( + { + 'observation.front': torch.zeros(1, 3, 2, 2), + 'observation.state': torch.zeros(1, 4), + 'action': torch.zeros(1, 2), + 'action_is_pad': torch.zeros(1, 1, dtype=torch.bool), + }, + length=1, + ) + + with tempfile.TemporaryDirectory() as tempdir: + previous_cwd = os.getcwd() + try: + os.chdir(tempdir) + with mock.patch.object(train_vla, 'instantiate', side_effect=fake_instantiate), mock.patch.object(train_vla, 'DataLoader', side_effect=fake_dataloader), mock.patch.object(train_vla, 'build_training_optimizer', return_value=_FakeOptimizer(cfg.train.lr)), mock.patch.object(train_vla, 'get_lr_schedule_with_warmup', return_value=_FakeScheduler()), mock.patch.object(train_vla, 'tqdm', side_effect=lambda iterable, **kwargs: _FakeProgressBar(iterable)), mock.patch.object(train_vla.torch, 'save', return_value=None), mock.patch.object(eval_vla, '_run_eval', rollout_mock, create=True): + train_vla._run_training(cfg) + finally: + os.chdir(previous_cwd) + + rollout_cfg = rollout_mock.call_args.args[0] + self.assertEqual(rollout_cfg.eval.device, 'cuda') + self.assertEqual(rollout_cfg.eval.num_workers, 4) + self.assertEqual(list(rollout_cfg.eval.cuda_devices), [0, 1]) + self.assertEqual(float(rollout_cfg.eval.response_timeout_s), 123.0) + self.assertEqual(float(rollout_cfg.eval.server_startup_timeout_s), 456.0) + self.assertTrue(rollout_cfg.eval.headless) + self.assertEqual(rollout_cfg.eval.num_episodes, 5) + self.assertFalse(rollout_cfg.eval.record_video) + self.assertTrue(rollout_cfg.eval.save_summary_json) + self.assertTrue(rollout_cfg.eval.save_trajectory_image) def test_training_passes_backbone_image_resize_override_to_dataset_instantiation(self): cfg = OmegaConf.create(