diff --git a/docs/lewm-imf-experiment-guide.md b/docs/lewm-imf-experiment-guide.md new file mode 100644 index 0000000..e9f147a --- /dev/null +++ b/docs/lewm-imf-experiment-guide.md @@ -0,0 +1,471 @@ +# feat-lewm-imf-fusion 实验操作指南 + +适用 worktree:`/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion` + +## 0. 先记住当前常用 recipe + +当前这条分支最常用的训练/验证配方,直接参考: +`experiment_suites/2026-04-21-lewm-fromscratch-old9-epoch50-roll5-val-20260421-153037/` + +核心约定: +- agent:`lewm_resnet_query_imf_attnres` +- from scratch:`train.pretrained_ckpt=null`,`agent.lewm_pretrained_ckpt=null` +- 训练:`batch_size=32`,`lr=1e-4`,`max_steps=109350`,`save_freq=10000` +- 数值验证:`train.val_split=0.0` + `train.val_episode_indices=[100]` +- held-out numeric validation:`train.action_mse_val_freq_epochs=1` +- rollout validation:`train.rollout_val_freq_epochs=5`,`train.rollout_num_episodes=10` +- SwanLab:`train.use_swanlab=true`,project=`roboimi-vla` + +--- + +## 1. 分支结构与关键文件 + +| 路径 | 作用 | +| --- | --- | +| `roboimi/demos/vla_scripts/train_vla.py` | 主训练入口;负责数据集、checkpoint、数值验证、训练期 rollout 验证、SwanLab | +| `roboimi/demos/vla_scripts/eval_vla.py` | 单次 rollout / 离线验证入口;支持 headless、summary、trajectory image/video artifact | +| `roboimi/vla/conf/config.yaml` | 全局 Hydra 配置;训练默认值都在这里 | +| `roboimi/vla/conf/eval/eval.yaml` | eval 默认配置;`eval.ckpt_path`、`eval.num_episodes`、artifact 开关都在这里 | +| `roboimi/vla/conf/agent/lewm_resnet_query_imf_attnres.yaml` | 本分支最常用 agent;LeWM query fusion + IMF AttnRes head | +| `roboimi/vla/conf/backbone/lewm_resnet_query_fusion.yaml` | LeWM 多视角 ResNet query fusion backbone 配置 | +| `roboimi/vla/agent_imf.py` | `IMFVLAAgent` 实现;one-step IMF 推理、LeWM loss、LeWM 预训练组件加载 | +| `roboimi/vla/data/simpe_robot_dataset.py` | HDF5 懒加载数据集;也负责 `episode_indices` 过滤 | +| `roboimi/vla/scripts/calculate_stats.py` | 重算 `dataset_stats.pkl` | +| `experiment_suites/2026-04-21-lewm-fromscratch-old9-epoch50-roll5-val-20260421-153037/` | 当前最常用 suite;manifest、notes、launch log、local 启动脚本都在这里 | + +补充: +- 本分支常用 run name 形如 `lewmimf-q08-ph08-ex08-emb384-l12-fromscratch-epoch50-step109350-5090g0-20260421-153037` +- `q08/ph16/ex08` 这类后缀分别对应 `agent.lewm_query_offsets`、`agent.pred_horizon`、`agent.num_action_steps` + +--- + +## 2. 三台机器与环境 + +| 机器 | GPU | repo / worktree | Python | 常用数据集路径 | +| --- | --- | --- | --- | --- | +| 本地 `droid-z790eagleax` | 1× RTX 5090 32GB | `/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion` | `/home/droid/.conda/envs/roboimi/bin/python` | `/home/droid/project/diana_sim/sim_transfer` | +| 5880 节点 `100.73.14.65` | 2× RTX 5880 Ada 48GB | `/home/droid/roboimi_suite_20260416_lewm_imf_fusion` | `/home/droid/miniforge3/envs/roboimi/bin/python` | `/home/droid/sim_dataset/sim_transfer` | +| L20 节点 `100.119.99.14` | 8× NVIDIA L20 46GB | `/data/roboimi_suite_20260416_lewm_imf_fusion` | `/home/droid/miniforge3/envs/roboimi/bin/python` | `/data/simtransfer/current` | + +连接: +- 5880:`ssh droid@100.73.14.65` +- L20:`ssh droid@100.119.99.14` + +经验规则: +- 本地 5090:适合单条 smoke / 小规模主跑 / 本地调参 +- 5880:适合 2 条并行主跑 +- L20:适合大 grid;数据和 run 建议都放 `/data` + +--- + +## 3. 训练流怎么走 + +`train_vla.py` 的实际流程: + +1. 读取 Hydra 配置并打印完整 cfg +2. 通过 `build_train_val_datasets()` 构建 train/val dataset +3. 用 `DataLoader` 建 train/val loader +4. 从 `dataset_dir/dataset_stats.pkl` 读取归一化统计 +5. instantiate `IMFVLAAgent` +6. 可选加载: + - `train.pretrained_ckpt` + - `train.resume_ckpt` + - `agent.lewm_pretrained_ckpt` +7. 训练循环里按 `log_freq` 打 train loss / lr +8. 按 `save_freq` 保存 `checkpoints/vla_model_step_*.pt` +9. 每个 epoch 结束时,按配置跑: + - held-out action MSE + - rollout validation +10. 最后写: + - `checkpoints/vla_model_best.pt` + - `checkpoints/vla_model_final.pt` + +当前 best model 选择逻辑: +- **第一次拿到 rollout reward 之前**:先用 `val_loss`(或 train loss 回退)挑 best +- **第一次 rollout 之后**:优先用 `rollout_avg_reward` 挑 best + +输出目录一般通过 `hydra.run.dir=...` 固定;否则 Hydra 自己生成。 + +--- + +## 4. 验证流怎么走 + +### 4.1 held-out 数值验证 + +当前常用做法不是随机切 `val_split`,而是: +- `train.val_split=0.0` +- `train.val_episode_indices=[100]` +- `train.action_mse_val_freq_epochs=1` + +这样每个 epoch 结束都会在 `episode_100.hdf5` 上跑一次 `compute_action_mse_validation()`,日志 key 是: +- 控制台 / `train_vla.log`:`held-out action MSE` +- SwanLab:`val/action_mse` + +### 4.2 rollout 验证 + +当前训练内 rollout 验证由 `train_vla.py -> run_rollout_validation() -> eval_vla._run_eval()` 触发。 + +当前这条分支的常用训练内 rollout 约束是: +- `train.rollout_val_freq_epochs=5` +- `train.rollout_num_episodes=10` +- `train.rollout_validate_on_checkpoint=false` +- 强制 headless +- 强制 `verbose_action=false` +- 强制 `record_video=false` +- 强制 `save_trajectory_image=true` +- 强制 `trajectory_image_camera_name=front` +- 强制 `save_summary_json=true` + +当前已经修正为**配置驱动的 rollout device / worker 路径**: +- `train.rollout_device`:默认跟随 `train.device` +- `train.rollout_num_workers`:默认 `null` + - 当 rollout 设备是 CPU 时,自动退化为 `1` + - 当 rollout 设备是 CUDA 时,自动推断为 `min(train.rollout_num_episodes, 8)` +- `train.rollout_cuda_devices`:默认 `null`,等价于当前可见逻辑 GPU `[0]` +- `train.rollout_response_timeout_s` +- `train.rollout_server_startup_timeout_s` + +所以现在: +- 训练在 `cuda` 上时,**训练期 rollout 默认会走 GPU** +- 如果 `rollout_num_workers > 1`,就会自动走并行 rollout +- 可以是 **单 GPU 多 worker 共用一个 inference server** +- 也可以是 **多 GPU 多 server 分摊 worker** + +训练内 rollout artifact 默认落到: +`/rollout_artifacts//` + +常见文件: +- `rollout_summary.json` +- `rollout_front_ep01_trajectory.png` ... `rollout_front_ep10_trajectory.png` + +日志重点看: +- `Epoch X rollout 平均奖励` +- `最佳模型已更新` + +--- + +## 5. 数据集加载与 `val_episode_indices` 机制 + +### 5.1 数据集格式 + +`SimpleRobotDataset` 读取 `dataset_dir` 下的 `episode_*.hdf5`,每个 episode 文件里至少要有: +- `action` +- `observations/qpos` +- `observations/images/{cam_name}` + +当前常用相机: +- `r_vis` +- `top` +- `front` + +### 5.2 懒加载行为 + +`roboimi/vla/data/simpe_robot_dataset.py` 是按帧懒加载,不会一次性把整套 HDF5 全读进内存。 + +它会: +- 扫描目录下的 HDF5 文件 +- 用文件名里的 episode 编号(如 `episode_100.hdf5` -> `100`)建立 `available_episode_indices` +- 在 worker 内做 HDF5 文件句柄 LRU 缓存 + +### 5.3 `val_episode_indices` 怎么切 + +`build_train_val_datasets()` 的逻辑是: + +1. 先 instantiate 一次完整 dataset +2. 读取 `dataset.available_episode_indices` +3. 检查 `train.val_episode_indices` 是否都存在 +4. 用 `episode_indices=` 再各 instantiate 一次: + - train dataset = 全部 episode - held-out episode + - val dataset = 只包含 held-out episode + +因此: +- `train.val_episode_indices=[100]` 的意思是“把 `episode_100.hdf5` 整个拿去做 held-out val” +- 如果 episode 不存在,会直接报错 +- 如果你把所有 episode 都塞进 `val_episode_indices`,也会直接报错,因为训练集会变空 + +### 5.4 图像 resize 与 LeWM 附加字段 + +dataset 侧 resize 默认来自: +- `data.image_resize_shape` +- 如果 backbone 额外覆盖,则优先 `agent.vision_backbone.dataset_image_resize_shape` + +返回 batch 除了常规: +- `observation.state` +- `observation.` +- `action` + +还会在 LeWM 打开时返回: +- `lewm.observation.state` +- `lewm.observation.` +- `lewm.future.state` +- `lewm.future.` + +### 5.5 统计文件 + +训练和推理都默认依赖 `dataset_stats.pkl`。数据集更新后重算: + +```bash +/home/droid/.conda/envs/roboimi/bin/python roboimi/vla/scripts/calculate_stats.py \ + --dataset_dir /home/droid/project/diana_sim/sim_transfer +``` + +远端只要把 `--dataset_dir` 换成对应主机路径即可。 + +--- + +## 6. SwanLab 行为 + +当前配置默认值里 `train.use_swanlab=false`,但本分支常用 recipe 基本都显式开: +- `train.use_swanlab=true` +- `train.swanlab_project=roboimi-vla` +- `train.swanlab_run_name=` + +`train_vla.py` 的 SwanLab 行为: +- 初始化时上传 `train` / `data` / `agent` 三段 config +- 训练中记录: + - `train/loss` + - `train/lr` + - `train/best_loss` + - `train/step` +- checkpoint 验证时记录: + - `val/loss` +- held-out 数值验证时记录: + - `val/action_mse` +- rollout 验证时记录: + - `rollout/avg_reward` + - `rollout/epoch` +- 训练结束时记录: + - `final/checkpoint_path` + - `final/best_checkpoint_path` + +训练期 rollout 生成的前视图轨迹 PNG 会 best-effort 上传到 SwanLab;失败只会 warning,不会让训练中断。 + +--- + +## 7. 并行 rollout 说明 + +### 7.1 这套能力从哪里来 + +本分支的并行 rollout 方向不是 DataLoader 并行,而是 **`eval_vla.py` 的 multiprocess rollout path**。 +参考来源: +`/home/droid/project/roboimi/.worktrees/multiprocess-rollout/roboimi/demos/vla_scripts/eval_vla.py` + +那条路径的控制参数是: +- `eval.num_workers` +- `eval.cuda_devices` + +语义是: +- `eval.num_workers`:环境 worker 数,按 episode 切分 +- `eval.cuda_devices`:推理 server 绑定到哪些逻辑 GPU + +### 7.2 两种常见模式 + +1. **单机单卡,多 worker 共用同一张 GPU** + - 典型:本地 5090 只有 1 卡,但想让 4 个 rollout worker 并行跑环境 + - 形式:`eval.device=cuda eval.num_workers=4 'eval.cuda_devices=[0]'` + - 这时是 **1 个 CUDA inference server + 4 个 env worker** + +2. **单机多卡,多 server 分摊 worker** + - 典型:5880 有 2 卡,L20 有多卡 + - 形式:`eval.device=cuda eval.num_workers=8 'eval.cuda_devices=[0,1]'` + - worker 会按 round-robin 分到多个 server 上 + +### 7.3 操作上要注意什么 + +- 并行 rollout 依赖 **多进程 eval 路径**,不是 `train.num_workers` +- `train.num_workers` 是 DataLoader worker,和 rollout 并行不是一回事 +- `eval.num_workers > 1` 时必须 `eval.headless=true` +- worker 数会自动 cap 到 `eval.num_episodes` +- multiprocess rollout 当前已经支持 **per-episode trajectory image PNG**;多 worker 时每个 worker 会在自己的 artifact 子目录下写图,summary 会带回对应路径 +- 但多 worker 时仍然不要同时要求: + - `eval.record_video=true` + - `eval.save_trajectory=true` + - `eval.save_trajectory_npz=true` +- `eval.save_trajectory_image=true` 现在是可以开的,适合并行 reward + 定性检查一起做 + +### 7.4 并行 rollout 命令模板 + +**5090 单卡 4 worker:** + +```bash +/home/droid/.conda/envs/roboimi/bin/python roboimi/demos/vla_scripts/eval_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/project/diana_sim/sim_transfer \ + train.device=cuda eval.device=cuda eval.headless=true eval.verbose_action=false \ + eval.ckpt_path=/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion/runs//checkpoints/vla_model_best.pt \ + eval.num_episodes=10 eval.num_workers=4 'eval.cuda_devices=[0]' \ + eval.save_summary_json=true eval.artifact_dir=/tmp/lewm_parallel_eval_5090 +``` + +**5880 双卡 8 worker:** + +```bash +/home/droid/miniforge3/envs/roboimi/bin/python roboimi/demos/vla_scripts/eval_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/sim_dataset/sim_transfer \ + train.device=cuda eval.device=cuda eval.headless=true eval.verbose_action=false \ + eval.ckpt_path=/home/droid/roboimi_suite_20260416_lewm_imf_fusion/runs//checkpoints/vla_model_best.pt \ + eval.num_episodes=10 eval.num_workers=8 'eval.cuda_devices=[0,1]' \ + eval.save_summary_json=true eval.artifact_dir=/tmp/lewm_parallel_eval_5880 +``` + +--- + +## 8. 当前常用命令 / 脚本 + +### 8.1 本地 5090:直接用 suite 脚本 + +现成脚本: +`experiment_suites/2026-04-21-lewm-fromscratch-old9-epoch50-roll5-val-20260421-153037/launch_local_5090.sh` + +运行: + +```bash +bash experiment_suites/2026-04-21-lewm-fromscratch-old9-epoch50-roll5-val-20260421-153037/launch_local_5090.sh +``` + +### 8.2 本地 5090:手动启动同 recipe + +```bash +/home/droid/.conda/envs/roboimi/bin/python roboimi/demos/vla_scripts/train_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/project/diana_sim/sim_transfer \ + 'agent.lewm_query_offsets=[8]' \ + agent.pred_horizon=8 \ + agent.num_action_steps=8 \ + train.device=cuda \ + train.batch_size=32 \ + train.lr=0.0001 \ + train.max_steps=109350 \ + train.num_workers=4 \ + train.save_freq=10000 \ + train.rollout_validate_on_checkpoint=false \ + train.rollout_val_freq_epochs=5 \ + train.rollout_num_episodes=10 \ + train.val_split=0.0 \ + 'train.val_episode_indices=[100]' \ + train.action_mse_val_freq_epochs=1 \ + train.use_swanlab=true \ + train.swanlab_project=roboimi-vla \ + train.swanlab_run_name=lewmimf-q08-ph08-ex08-emb384-l12-fromscratch-epoch50-step109350-5090g0-20260421-153037 \ + train.pretrained_ckpt=null \ + agent.lewm_pretrained_ckpt=null \ + hydra.run.dir=/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion/runs/lewmimf-q08-ph08-ex08-emb384-l12-fromscratch-epoch50-step109350-5090g0-20260421-153037 +``` + +### 8.3 5880:常用命令模板 + +```bash +ssh droid@100.73.14.65 +cd /home/droid/roboimi_suite_20260416_lewm_imf_fusion +/home/droid/miniforge3/envs/roboimi/bin/python roboimi/demos/vla_scripts/train_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/sim_dataset/sim_transfer \ + 'agent.lewm_query_offsets=[8]' \ + agent.pred_horizon=16 \ + agent.num_action_steps=8 \ + train.device=cuda train.batch_size=32 train.lr=0.0001 train.max_steps=109350 \ + train.num_workers=4 train.save_freq=10000 train.rollout_validate_on_checkpoint=false \ + train.rollout_val_freq_epochs=5 train.rollout_num_episodes=10 train.val_split=0.0 \ + 'train.val_episode_indices=[100]' train.action_mse_val_freq_epochs=1 \ + train.use_swanlab=true train.swanlab_project=roboimi-vla \ + train.swanlab_run_name=lewmimf-q08-ph16-ex08-emb384-l12-fromscratch-epoch50-step109350-5880g0-20260421-153037 \ + train.pretrained_ckpt=null agent.lewm_pretrained_ckpt=null \ + hydra.run.dir=/home/droid/roboimi_suite_20260416_lewm_imf_fusion/runs/lewmimf-q08-ph16-ex08-emb384-l12-fromscratch-epoch50-step109350-5880g0-20260421-153037 +``` + +### 8.4 L20:常用命令模板 + +```bash +ssh droid@100.119.99.14 +cd /data/roboimi_suite_20260416_lewm_imf_fusion +/home/droid/miniforge3/envs/roboimi/bin/python roboimi/demos/vla_scripts/train_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/data/simtransfer/current \ + 'agent.lewm_query_offsets=[16]' \ + agent.pred_horizon=16 \ + agent.num_action_steps=16 \ + train.device=cuda train.batch_size=32 train.lr=0.0001 train.max_steps=109350 \ + train.num_workers=4 train.save_freq=10000 train.rollout_validate_on_checkpoint=false \ + train.rollout_val_freq_epochs=5 train.rollout_num_episodes=10 train.val_split=0.0 \ + 'train.val_episode_indices=[100]' train.action_mse_val_freq_epochs=1 \ + train.use_swanlab=true train.swanlab_project=roboimi-vla \ + train.swanlab_run_name=lewmimf-q16-ph16-ex16-emb384-l12-fromscratch-epoch50-step109350-l20g0-20260421-153037 \ + train.pretrained_ckpt=null agent.lewm_pretrained_ckpt=null \ + hydra.run.dir=/data/roboimi_suite_20260416_lewm_imf_fusion/runs/lewmimf-q16-ph16-ex16-emb384-l12-fromscratch-epoch50-step109350-l20g0-20260421-153037 +``` + +### 8.5 单次离线验证(当前分支已支持并行) + +**单 GPU / 4 worker:** + +```bash +/home/droid/.conda/envs/roboimi/bin/python roboimi/demos/vla_scripts/eval_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/project/diana_sim/sim_transfer \ + train.device=cuda eval.device=cuda \ + eval.ckpt_path=/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion/runs//checkpoints/vla_model_best.pt \ + eval.num_episodes=10 eval.num_workers=4 'eval.cuda_devices=[0]' \ + eval.headless=true eval.verbose_action=false \ + eval.save_summary_json=true eval.save_trajectory_image=true \ + eval.trajectory_image_camera_name=front \ + eval.artifact_dir=/tmp/lewm_eval_front +``` + +**训练内启用并行 GPU rollout(推荐显式写清楚)**: + +```bash +/home/droid/.conda/envs/roboimi/bin/python roboimi/demos/vla_scripts/train_vla.py \ + agent=lewm_resnet_query_imf_attnres \ + data.dataset_dir=/home/droid/project/diana_sim/sim_transfer \ + 'agent.lewm_query_offsets=[8]' \ + agent.pred_horizon=8 \ + agent.num_action_steps=8 \ + train.device=cuda \ + train.batch_size=32 \ + train.lr=0.0001 \ + train.max_steps=109350 \ + train.num_workers=4 \ + train.save_freq=10000 \ + train.rollout_val_freq_epochs=5 \ + train.rollout_num_episodes=10 \ + train.rollout_device=cuda \ + train.rollout_num_workers=4 \ + 'train.rollout_cuda_devices=[0]' \ + train.rollout_validate_on_checkpoint=false \ + train.val_split=0.0 \ + 'train.val_episode_indices=[100]' \ + train.action_mse_val_freq_epochs=1 \ + train.use_swanlab=true \ + train.swanlab_project=roboimi-vla \ + train.swanlab_run_name= \ + hydra.run.dir=/home/droid/project/roboimi/.worktrees/feat-lewm-imf-fusion/runs/ +``` + +### 8.6 监控日志 + +```bash +tail -f runs//launch.stdout.log +tail -f runs//train_vla.log +``` + +远端就把 `runs/` 换成 manifest 里的绝对路径。 + +--- + +## 9. 操作建议 + +- **优先以 suite 的 `manifest.json` / `notes.md` / `launch_logs/*.launch.log` 为准**,不要手写一套和历史 run 不一致的命令 +- 要做当前常用验证,就显式加上: + - `train.val_split=0.0` + - `train.val_episode_indices=[100]` + - `train.action_mse_val_freq_epochs=1` + - `train.rollout_val_freq_epochs=5` + - `train.rollout_num_episodes=10` +- 本分支如果要对比不同 horizon / action-step,尽量只改: + - `agent.lewm_query_offsets` + - `agent.pred_horizon` + - `agent.num_action_steps` +- 想复现 2026-04-21 那轮 from-scratch 结果时,记得同时设: + - `train.pretrained_ckpt=null` + - `agent.lewm_pretrained_ckpt=null` diff --git a/roboimi/demos/vla_scripts/eval_vla.py b/roboimi/demos/vla_scripts/eval_vla.py index ff093f2..be00296 100644 --- a/roboimi/demos/vla_scripts/eval_vla.py +++ b/roboimi/demos/vla_scripts/eval_vla.py @@ -15,16 +15,21 @@ import os import json import logging import time +import queue +import concurrent.futures +import multiprocessing import torch import numpy as np import hydra from pathlib import Path +from collections import deque from typing import Any, Dict, Optional from tqdm import tqdm from omegaconf import DictConfig, OmegaConf from hydra.utils import instantiate from einops import rearrange +from roboimi.envs.double_pos_ctrl_env import make_sim_env from roboimi.utils.act_ex_utils import sample_transfer_pose from roboimi.vla.eval_utils import execute_policy_action @@ -45,11 +50,6 @@ def _configure_headless_mujoco_gl(eval_cfg: DictConfig) -> None: log.info('headless eval detected; set MUJOCO_GL=egl') -def make_sim_env(task_name: str, headless: bool = False): - from roboimi.envs.double_pos_ctrl_env import make_sim_env as _make_sim_env_impl - return _make_sim_env_impl(task_name, headless=headless) - - def load_checkpoint( ckpt_path: str, agent_cfg: DictConfig, @@ -121,12 +121,13 @@ def prepare_observation( Returns: agent 格式的观测字典 """ + import cv2 + # 转换图像: numpy -> tensor, HWC -> CHW images = {} for cam_name in camera_names: img = obs['images'][cam_name] if image_resize_shape is not None: - import cv2 img = cv2.resize(img, tuple(image_resize_shape), interpolation=cv2.INTER_LINEAR) img = rearrange(img, 'h w c -> c h w') img = torch.from_numpy(img / 255.0).float() @@ -138,6 +139,204 @@ def prepare_observation( return {'qpos': qpos, 'images': images} +def _resolve_policy_camera_names(cfg: DictConfig) -> list[str]: + agent_cfg = cfg.agent + eval_cfg = cfg.eval + camera_names = agent_cfg.get('camera_names', None) + if camera_names is not None: + return list(camera_names) + + agent_target = str(agent_cfg.get('_target_', '')) + if agent_target.endswith('VLAAgentGr00tDiT'): + return list(eval_cfg.camera_names) + return sorted(eval_cfg.camera_names) + + +def _new_local_policy_queues(obs_horizon: int) -> dict[str, deque]: + return { + 'qpos': deque(maxlen=int(obs_horizon)), + 'images': deque(maxlen=int(obs_horizon)), + 'action': deque(), + } + + +def _populate_local_policy_queues( + queues: dict[str, deque], + observation: Dict[str, torch.Tensor], +) -> None: + if 'qpos' in observation: + queues['qpos'].append(observation['qpos'].detach().clone()) + if 'images' in observation: + queues['images'].append({ + camera_name: image.detach().clone() + for camera_name, image in observation['images'].items() + }) + + +def _prepare_local_policy_batch( + queues: dict[str, deque], + obs_horizon: int, + camera_names: list[str], +) -> Dict[str, torch.Tensor]: + qpos_list = list(queues['qpos']) + if not qpos_list: + raise ValueError('observation queue is empty.') + while len(qpos_list) < int(obs_horizon): + qpos_list.append(qpos_list[-1]) + batch_qpos = torch.stack(qpos_list, dim=0).unsqueeze(0) + + images_list = list(queues['images']) + if not images_list: + raise ValueError('image queue is empty.') + while len(images_list) < int(obs_horizon): + images_list.append(images_list[-1]) + + ordered_camera_names = list(camera_names) if camera_names else sorted(images_list[0].keys()) + batch_images = { + camera_name: torch.stack( + [image_history[camera_name] for image_history in images_list], + dim=0, + ).unsqueeze(0) + for camera_name in ordered_camera_names + } + return {'qpos': batch_qpos, 'images': batch_images} + + +def _enqueue_predicted_actions( + queues: dict[str, deque], + predicted_actions: Any, + obs_horizon: int, + num_action_steps: int, +) -> None: + if isinstance(predicted_actions, np.ndarray): + predicted_actions = torch.from_numpy(predicted_actions) + if predicted_actions.ndim == 2: + predicted_actions = predicted_actions.unsqueeze(0) + + start = int(obs_horizon) - 1 + end = start + int(num_action_steps) + executable_actions = predicted_actions[:, start:end] + for action_index in range(executable_actions.shape[1]): + queues['action'].append( + executable_actions[:, action_index].squeeze(0).detach().cpu().clone() + ) + + +def _serialize_policy_batch(batch: Dict[str, torch.Tensor]) -> dict[str, Any]: + return { + 'qpos': batch['qpos'].detach().cpu().numpy().astype(np.float32, copy=True), + 'images': { + camera_name: image.detach().cpu().numpy().astype(np.float32, copy=True) + for camera_name, image in batch['images'].items() + }, + } + + +def _deserialize_policy_batch(batch: dict[str, Any], device: str) -> Dict[str, torch.Tensor]: + return { + 'qpos': torch.as_tensor(batch['qpos'], dtype=torch.float32, device=device), + 'images': { + camera_name: torch.as_tensor(image, dtype=torch.float32, device=device) + for camera_name, image in batch['images'].items() + }, + } + + +class _LocalPolicyRunner: + def __init__(self, agent: torch.nn.Module): + self.agent = agent + self.uses_local_model = True + + def reset(self): + self.agent.reset() + + def select_action( + self, + observation: Dict[str, torch.Tensor], + *, + episode_index: int, + timestep: int, + ) -> tuple[Any, bool]: + del episode_index, timestep + action_queue = getattr(self.agent, '_queues', {}).get('action', None) + model_inference_triggered = len(action_queue) == 0 if action_queue is not None else True + action = self.agent.select_action(observation) + return action, bool(model_inference_triggered) + + +class _RemotePolicyRunner: + def __init__( + self, + *, + worker_index: int, + server_index: int, + request_queue, + response_queue, + camera_names: list[str], + obs_horizon: int, + num_action_steps: int, + response_timeout_s: float = 30.0, + ): + self.worker_index = int(worker_index) + self.server_index = int(server_index) + self.request_queue = request_queue + self.response_queue = response_queue + self.camera_names = list(camera_names) + self.obs_horizon = int(obs_horizon) + self.num_action_steps = int(num_action_steps) + self.response_timeout_s = float(response_timeout_s) + self.local_queues = _new_local_policy_queues(self.obs_horizon) + self.uses_local_model = False + + def reset(self): + self.local_queues = _new_local_policy_queues(self.obs_horizon) + + def select_action( + self, + observation: Dict[str, torch.Tensor], + *, + episode_index: int, + timestep: int, + ) -> tuple[torch.Tensor, bool]: + _populate_local_policy_queues(self.local_queues, observation) + model_inference_triggered = len(self.local_queues['action']) == 0 + if model_inference_triggered: + batch = _prepare_local_policy_batch( + self.local_queues, + obs_horizon=self.obs_horizon, + camera_names=self.camera_names, + ) + self.request_queue.put({ + 'type': 'predict_chunk', + 'worker_index': self.worker_index, + 'server_index': self.server_index, + 'episode_index': int(episode_index), + 'timestep': int(timestep), + 'batch': _serialize_policy_batch(batch), + }) + try: + response = self.response_queue.get(timeout=self.response_timeout_s) + except queue.Empty as exc: + raise RuntimeError( + f'worker {self.worker_index} timed out waiting for inference server {self.server_index}' + ) from exc + if response.get('type') != 'predict_chunk_result': + raise RuntimeError( + f'worker {self.worker_index} received unexpected inference response: ' + f'{response.get("type")}' + ) + _enqueue_predicted_actions( + self.local_queues, + predicted_actions=response['actions'], + obs_horizon=self.obs_horizon, + num_action_steps=self.num_action_steps, + ) + + if not self.local_queues['action']: + raise RuntimeError(f'worker {self.worker_index} received no executable action from server') + return self.local_queues['action'].popleft(), bool(model_inference_triggered) + + def _to_numpy_action(action: Any) -> np.ndarray: if isinstance(action, torch.Tensor): return action.detach().cpu().numpy().astype(np.float32, copy=True) @@ -186,6 +385,129 @@ def _summarize_timing_breakdown( } +_TIMING_SAMPLE_KEYS = ( + 'obs_read_time_ms', + 'preprocess_time_ms', + 'inference_time_ms', + 'env_step_time_ms', + 'total_time_ms', +) + + +def _empty_merge_state() -> dict[str, list[float] | list[bool]]: + return { + **{key: [] for key in _TIMING_SAMPLE_KEYS}, + 'model_forward_flags': [], + } + + +def _normalize_num_workers(num_workers: int, num_episodes: int) -> int: + num_episodes = max(int(num_episodes), 0) + if num_episodes == 0: + return 0 + return min(max(int(num_workers), 1), num_episodes) + + +def _split_episode_indices(num_episodes: int, num_workers: int) -> list[list[int]]: + active_workers = _normalize_num_workers(num_workers=num_workers, num_episodes=num_episodes) + if active_workers == 0: + return [] + episode_indices = np.arange(int(num_episodes), dtype=np.int32) + return [ + chunk.tolist() + for chunk in np.array_split(episode_indices, active_workers) + if len(chunk) > 0 + ] + + +def _plan_episode_box_poses( + num_episodes: int, + sampler=None, +) -> list[np.ndarray]: + if sampler is None: + sampler = sample_transfer_pose + return [ + np.asarray(sampler(), dtype=np.float32).copy() + for _ in range(int(num_episodes)) + ] + + +def _merge_worker_summaries( + worker_summaries: list[dict[str, Any]], + artifact_paths: dict[str, Optional[str]], +) -> dict[str, Any]: + merged_episodes = [] + merged_state = _empty_merge_state() + for worker_summary in worker_summaries: + merged_episodes.extend(worker_summary.get('episodes', [])) + merge_state = worker_summary.get('_merge_state', {}) + for key in _TIMING_SAMPLE_KEYS: + merged_state[key].extend(float(value) for value in merge_state.get(key, [])) + merged_state['model_forward_flags'].extend( + bool(value) for value in merge_state.get('model_forward_flags', []) + ) + + merged_episodes = sorted( + merged_episodes, + key=lambda episode: int(episode.get('episode_index', 0)), + ) + episode_rewards = [ + float(episode.get('episode_reward', 0.0)) + for episode in merged_episodes + ] + episode_max_rewards = [ + ( + float(episode['episode_max_reward']) + if episode.get('episode_max_reward') is not None + else None + ) + for episode in merged_episodes + ] + valid_max_rewards = [ + value for value in episode_max_rewards + if value is not None + ] + summary = { + 'num_episodes': len(merged_episodes), + 'episode_rewards': episode_rewards, + 'episode_max_rewards': episode_max_rewards, + 'avg_reward': float(np.mean(episode_rewards)) if episode_rewards else 0.0, + 'avg_max_reward': float(np.mean(valid_max_rewards)) if valid_max_rewards else 0.0, + 'episodes': merged_episodes, + 'artifact_dir': artifact_paths.get('output_dir') if artifact_paths else None, + 'artifacts': artifact_paths, + } + + if merged_episodes: + summary.update({ + 'avg_inference_fps': float(np.mean([ + float(episode.get('inference_fps', 0.0)) + for episode in merged_episodes + ])), + 'avg_control_fps': float(np.mean([ + float(episode.get('control_fps', 0.0)) + for episode in merged_episodes + ])), + 'avg_obs_read_time_ms': _mean_or_zero(merged_state['obs_read_time_ms']), + 'avg_preprocess_time_ms': _mean_or_zero(merged_state['preprocess_time_ms']), + 'avg_inference_time_ms': _mean_or_zero(merged_state['inference_time_ms']), + 'avg_env_step_time_ms': _mean_or_zero(merged_state['env_step_time_ms']), + 'avg_total_time_ms': _mean_or_zero(merged_state['total_time_ms']), + 'timing_summary': _summarize_timing_breakdown( + { + 'obs_read': merged_state['obs_read_time_ms'], + 'preprocess': merged_state['preprocess_time_ms'], + 'inference': merged_state['inference_time_ms'], + 'env_step': merged_state['env_step_time_ms'], + 'loop_total': merged_state['total_time_ms'], + }, + merged_state['model_forward_flags'], + ), + }) + + return summary + + def _json_friendly(value: Any) -> Any: if isinstance(value, dict): return {str(key): _json_friendly(item) for key, item in value.items()} @@ -200,7 +522,10 @@ def _json_friendly(value: Any) -> Any: return value -def _resolve_artifact_paths(eval_cfg: DictConfig) -> dict[str, Optional[str]]: +def _resolve_artifact_paths( + eval_cfg: DictConfig, + output_dir_override: Optional[str] = None, +) -> dict[str, Optional[str]]: save_timing = bool(eval_cfg.get('save_timing', False)) save_trajectory = bool( eval_cfg.get('save_trajectory', False) or eval_cfg.get('save_trajectory_npz', False) @@ -215,13 +540,16 @@ def _resolve_artifact_paths(eval_cfg: DictConfig) -> dict[str, Optional[str]]: ]) output_dir: Optional[Path] = None if wants_artifacts: - artifact_dir = eval_cfg.get('artifact_dir', None) - if artifact_dir: - output_dir = Path(str(artifact_dir)).expanduser().resolve() + if output_dir_override: + output_dir = Path(str(output_dir_override)).expanduser().resolve() else: - ckpt_stem = Path(str(eval_cfg.ckpt_path)).stem or 'rollout' - timestamp = time.strftime('%Y%m%d-%H%M%S') - output_dir = (Path.cwd() / 'rollout_artifacts' / f'{ckpt_stem}-{timestamp}').resolve() + artifact_dir = eval_cfg.get('artifact_dir', None) + if artifact_dir: + output_dir = Path(str(artifact_dir)).expanduser().resolve() + else: + ckpt_stem = Path(str(eval_cfg.ckpt_path)).stem or 'rollout' + timestamp = time.strftime('%Y%m%d-%H%M%S') + output_dir = (Path.cwd() / 'rollout_artifacts' / f'{ckpt_stem}-{timestamp}').resolve() output_dir.mkdir(parents=True, exist_ok=True) video_camera_name = None @@ -578,6 +906,12 @@ def _save_summary_json(output_path: str, summary: dict[str, Any]): json.dump(_json_friendly(summary), f, ensure_ascii=False, indent=2) +def _public_summary(summary: dict[str, Any]) -> dict[str, Any]: + public_summary = dict(summary) + public_summary.pop('_merge_state', None) + return _json_friendly(public_summary) + + class ActionSmoother: """ 动作平滑器(指数移动平均) @@ -630,14 +964,7 @@ def _close_env(env): viewer.close() -def _run_eval(cfg: DictConfig): - """ - 使用 agent 内置队列管理的简化版 VLA 评估 - - 所有评估参数来自 vla/conf/eval.yaml,合并到 cfg 中。 - 命令行覆盖: python eval_vla_simple.py eval.ckpt_path=... eval.num_episodes=5 - """ - +def _print_eval_config(cfg: DictConfig): # 打印配置 print("=" * 80) print("VLA 评估配置:") @@ -645,37 +972,45 @@ def _run_eval(cfg: DictConfig): print(OmegaConf.to_yaml(cfg)) print("=" * 80) + +def _build_episode_plans( + num_episodes: int, + box_poses: Optional[list[np.ndarray]] = None, +) -> list[dict[str, Any]]: + if box_poses is None: + return [ + { + 'episode_index': int(episode_index), + } + for episode_index in range(int(num_episodes)) + ] + return [ + { + 'episode_index': int(episode_index), + 'box_pos': np.asarray(box_pos, dtype=np.float32).copy(), + } + for episode_index, box_pos in enumerate(box_poses) + ] + + +def _run_eval_episode_plans( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + policy_runner, + worker_index: int = 0, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = True, +) -> dict[str, Any]: eval_cfg = cfg.eval - _configure_headless_mujoco_gl(eval_cfg) - device = eval_cfg.device + device = str(eval_cfg.device) camera_names = list(eval_cfg.camera_names) - artifact_paths = _resolve_artifact_paths(eval_cfg) + artifact_paths = artifact_paths or _resolve_artifact_paths(eval_cfg) video_recorder = _RolloutVideoRecorder( output_path=artifact_paths['video_mp4'], fps=int(eval_cfg.get('video_fps', 30)), ) rollout_trajectory = _empty_rollout_trajectory() - global_obs_read_times_ms = [] - global_preprocess_times_ms = [] - global_inference_times_ms = [] - global_env_step_times_ms = [] - global_total_times_ms = [] - global_model_forward_flags = [] - - # ========================================================================= - # 加载模型 - # ========================================================================= - log.info(f"🚀 从 {eval_cfg.ckpt_path} 加载模型...") - agent, dataset_stats = load_checkpoint( - ckpt_path=eval_cfg.ckpt_path, - agent_cfg=cfg.agent, - device=device - ) - vision_encoder = getattr(agent, 'vision_encoder', None) - image_resize_shape = getattr(vision_encoder, 'eval_image_resize_shape', (224, 224)) - - # 重置 agent 的队列 - agent.reset() + merge_state = _empty_merge_state() # 可选:动作平滑器 smoother = ActionSmoother(alpha=eval_cfg.smooth_alpha) if eval_cfg.use_smoothing else None @@ -683,25 +1018,32 @@ def _run_eval(cfg: DictConfig): # ========================================================================= # 创建环境 # ========================================================================= - env = make_sim_env(eval_cfg.task_name, headless=eval_cfg.headless) - - # ========================================================================= - # 运行评估回合 - # ========================================================================= - all_stats = [] - episode_rewards = [] - episode_max_rewards = [] + env = None try: - for episode_idx in range(eval_cfg.num_episodes): - print(f"\n{'='*60}") - print(f"回合 {episode_idx + 1}/{eval_cfg.num_episodes}") - print(f"{'='*60}\n") + env = make_sim_env(eval_cfg.task_name, headless=eval_cfg.headless) + + # ========================================================================= + # 运行评估回合 + # ========================================================================= + all_stats = [] + episode_rewards = [] + episode_max_rewards = [] + for plan in episode_plans: + episode_idx = int(plan['episode_index']) + box_pos = plan.get('box_pos') + if box_pos is None: + box_pos = sample_transfer_pose() + box_pos = np.asarray(box_pos, dtype=np.float32) + + if show_progress: + print(f"\n{'='*60}") + print(f"回合 {episode_idx + 1}/{eval_cfg.num_episodes}") + print(f"{'='*60}\n") - box_pos = sample_transfer_pose() env.reset(box_pos) - # 为新回合重置 agent 队列 - agent.reset() + # 为新回合重置 rollout policy 状态 + policy_runner.reset() if smoother: smoother.reset() @@ -717,7 +1059,11 @@ def _run_eval(cfg: DictConfig): episode_raw_actions: list[np.ndarray] = [] with torch.inference_mode(): - for t in tqdm(range(eval_cfg.max_timesteps), desc=f"回合 {episode_idx + 1}"): + episode_iterator = range(eval_cfg.max_timesteps) + if show_progress: + episode_iterator = tqdm(episode_iterator, desc=f"回合 {episode_idx + 1}") + + for t in episode_iterator: start_total = time.perf_counter() # 从环境获取观测 @@ -730,20 +1076,22 @@ def _run_eval(cfg: DictConfig): video_recorder.write(video_frame) # 准备给 agent 的观测 - observation = prepare_observation( - obs, - camera_names, - image_resize_shape=image_resize_shape, - ) + observation = prepare_observation(obs, camera_names) end_preprocess = time.perf_counter() - # 选择动作(agent 内部处理队列管理) - action_queue = getattr(agent, '_queues', {}).get('action', None) - model_inference_triggered = len(action_queue) == 0 if action_queue is not None else True + # 选择动作(本地 agent 或远端 inference server) start_inference = time.perf_counter() - action = agent.select_action(observation) + action, model_inference_triggered = policy_runner.select_action( + observation, + episode_index=episode_idx, + timestep=t, + ) - if str(device).startswith('cuda') and torch.cuda.is_available(): + if ( + getattr(policy_runner, 'uses_local_model', False) + and _is_cuda_device(device) + and torch.cuda.is_available() + ): torch.cuda.synchronize() end_inference = time.perf_counter() @@ -793,12 +1141,12 @@ def _run_eval(cfg: DictConfig): env_step_times_ms.append(step_timing_ms['env_step_time_ms']) total_times_ms.append(step_timing_ms['total_time_ms']) model_forward_flags.append(bool(model_inference_triggered)) - global_obs_read_times_ms.append(step_timing_ms['obs_read_time_ms']) - global_preprocess_times_ms.append(step_timing_ms['preprocess_time_ms']) - global_inference_times_ms.append(step_timing_ms['inference_time_ms']) - global_env_step_times_ms.append(step_timing_ms['env_step_time_ms']) - global_total_times_ms.append(step_timing_ms['total_time_ms']) - global_model_forward_flags.append(bool(model_inference_triggered)) + merge_state['obs_read_time_ms'].append(step_timing_ms['obs_read_time_ms']) + merge_state['preprocess_time_ms'].append(step_timing_ms['preprocess_time_ms']) + merge_state['inference_time_ms'].append(step_timing_ms['inference_time_ms']) + merge_state['env_step_time_ms'].append(step_timing_ms['env_step_time_ms']) + merge_state['total_time_ms'].append(step_timing_ms['total_time_ms']) + merge_state['model_forward_flags'].append(bool(model_inference_triggered)) if artifact_paths['trajectory_npz'] is not None: _append_rollout_step( @@ -844,6 +1192,8 @@ def _run_eval(cfg: DictConfig): } stats = { + 'worker_index': int(worker_index), + 'episode_index': int(episode_idx), 'inference_fps': 1000.0 / avg_inference_time_ms if avg_inference_time_ms > 0 else 0.0, 'control_fps': 1000.0 / avg_total_time_ms if avg_total_time_ms > 0 else 0.0, 'avg_obs_read_time_ms': avg_obs_read_time_ms, @@ -864,46 +1214,55 @@ def _run_eval(cfg: DictConfig): } all_stats.append(stats) episode_rewards.append(float(episode_reward)) - if episode_max_reward != float('-inf'): - episode_max_rewards.append(float(episode_max_reward)) + episode_max_rewards.append( + float(episode_max_reward) if episode_max_reward != float('-inf') else None + ) - print(f"\n回合 {episode_idx + 1} 完成 ({eval_cfg.max_timesteps} 时间步)") - print(f" 模型推理 FPS: {stats['inference_fps']:.2f} Hz") - print(f" 控制循环 FPS: {stats['control_fps']:.2f} Hz") - print(f" 平均读观测时间: {stats['avg_obs_read_time_ms']:.2f} ms") - print(f" 平均预处理时间: {stats['avg_preprocess_time_ms']:.2f} ms") - print(f" 平均推理时间: {stats['avg_inference_time_ms']:.2f} ms") - print(f" 平均环境步进时间: {stats['avg_env_step_time_ms']:.2f} ms") - print(f" 平均总时间: {stats['avg_total_time_ms']:.2f} ms") - print(f" 总推理次数: {stats['num_inferences']}") - print(f" 回合累计奖励: {stats['episode_reward']:.2f}") + if show_progress: + print(f"\n回合 {episode_idx + 1} 完成 ({eval_cfg.max_timesteps} 时间步)") + print(f" 模型推理 FPS: {stats['inference_fps']:.2f} Hz") + print(f" 控制循环 FPS: {stats['control_fps']:.2f} Hz") + print(f" 平均读观测时间: {stats['avg_obs_read_time_ms']:.2f} ms") + print(f" 平均预处理时间: {stats['avg_preprocess_time_ms']:.2f} ms") + print(f" 平均推理时间: {stats['avg_inference_time_ms']:.2f} ms") + print(f" 平均环境步进时间: {stats['avg_env_step_time_ms']:.2f} ms") + print(f" 平均总时间: {stats['avg_total_time_ms']:.2f} ms") + print(f" 总推理次数: {stats['num_inferences']}") + print(f" 回合累计奖励: {stats['episode_reward']:.2f}") # ========================================================================= # 总体统计 # ========================================================================= - print(f"\n{'='*60}") - print("评估完成!") - print(f"{'='*60}") + if show_progress: + print(f"\n{'='*60}") + print("评估完成!") + print(f"{'='*60}") + + valid_max_rewards = [ + reward for reward in episode_max_rewards + if reward is not None + ] summary = { - 'num_episodes': int(eval_cfg.num_episodes), + 'num_episodes': len(episode_plans), 'episode_rewards': episode_rewards, 'episode_max_rewards': episode_max_rewards, 'avg_reward': float(np.mean(episode_rewards)) if episode_rewards else 0.0, - 'avg_max_reward': float(np.mean(episode_max_rewards)) if episode_max_rewards else 0.0, + 'avg_max_reward': float(np.mean(valid_max_rewards)) if valid_max_rewards else 0.0, 'episodes': all_stats, 'artifact_dir': artifact_paths['output_dir'], 'artifacts': artifact_paths, + '_merge_state': merge_state, } if all_stats: avg_inference_fps = np.mean([s['inference_fps'] for s in all_stats]) avg_control_fps = np.mean([s['control_fps'] for s in all_stats]) - avg_obs_read_time = _mean_or_zero(global_obs_read_times_ms) - avg_preprocess_time = _mean_or_zero(global_preprocess_times_ms) - avg_inference_time = _mean_or_zero(global_inference_times_ms) - avg_env_step_time = _mean_or_zero(global_env_step_times_ms) - avg_total_time = _mean_or_zero(global_total_times_ms) + avg_obs_read_time = _mean_or_zero(merge_state['obs_read_time_ms']) + avg_preprocess_time = _mean_or_zero(merge_state['preprocess_time_ms']) + avg_inference_time = _mean_or_zero(merge_state['inference_time_ms']) + avg_env_step_time = _mean_or_zero(merge_state['env_step_time_ms']) + avg_total_time = _mean_or_zero(merge_state['total_time_ms']) summary.update({ 'avg_inference_fps': float(avg_inference_fps), 'avg_control_fps': float(avg_control_fps), @@ -914,39 +1273,630 @@ def _run_eval(cfg: DictConfig): 'avg_total_time_ms': float(avg_total_time), 'timing_summary': _summarize_timing_breakdown( { - 'obs_read': global_obs_read_times_ms, - 'preprocess': global_preprocess_times_ms, - 'inference': global_inference_times_ms, - 'env_step': global_env_step_times_ms, - 'loop_total': global_total_times_ms, + 'obs_read': merge_state['obs_read_time_ms'], + 'preprocess': merge_state['preprocess_time_ms'], + 'inference': merge_state['inference_time_ms'], + 'env_step': merge_state['env_step_time_ms'], + 'loop_total': merge_state['total_time_ms'], }, - global_model_forward_flags, + merge_state['model_forward_flags'], ), }) - print(f"\n总体统计 ({eval_cfg.num_episodes} 个回合):") - print(f" 平均模型推理 FPS: {avg_inference_fps:.2f} Hz") - print(f" 平均控制循环 FPS: {avg_control_fps:.2f} Hz") - print(f" 平均读观测时间: {avg_obs_read_time:.2f} ms") - print(f" 平均预处理时间: {avg_preprocess_time:.2f} ms") - print(f" 平均推理时间: {avg_inference_time:.2f} ms") - print(f" 平均环境步进时间: {avg_env_step_time:.2f} ms") - print(f" 平均总时间: {avg_total_time:.2f} ms") - print(f" 平均累计奖励: {summary['avg_reward']:.2f}") + if show_progress: + print(f"\n总体统计 ({len(episode_plans)} 个回合):") + print(f" 平均模型推理 FPS: {avg_inference_fps:.2f} Hz") + print(f" 平均控制循环 FPS: {avg_control_fps:.2f} Hz") + print(f" 平均读观测时间: {avg_obs_read_time:.2f} ms") + print(f" 平均预处理时间: {avg_preprocess_time:.2f} ms") + print(f" 平均推理时间: {avg_inference_time:.2f} ms") + print(f" 平均环境步进时间: {avg_env_step_time:.2f} ms") + print(f" 平均总时间: {avg_total_time:.2f} ms") + print(f" 平均累计奖励: {summary['avg_reward']:.2f}") if artifact_paths['trajectory_npz'] is not None: _save_rollout_trajectory_npz(artifact_paths['trajectory_npz'], rollout_trajectory) + public_summary = _public_summary(summary) if artifact_paths['summary_json'] is not None: - _save_summary_json(artifact_paths['summary_json'], summary) + _save_summary_json(artifact_paths['summary_json'], public_summary) if artifact_paths['timing_json'] is not None: - _save_summary_json(artifact_paths['timing_json'], summary.get('timing_summary', {})) - print() - return _json_friendly(summary) + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + if show_progress: + print() + return summary finally: video_recorder.close() _close_env(env) +def _run_eval_worker( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + worker_index: int = 0, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = True, +) -> dict[str, Any]: + eval_cfg = cfg.eval + + log.info(f"🚀 从 {eval_cfg.ckpt_path} 加载模型...") + agent, _dataset_stats = load_checkpoint( + ckpt_path=eval_cfg.ckpt_path, + agent_cfg=cfg.agent, + device=str(eval_cfg.device), + ) + policy_runner = _LocalPolicyRunner(agent) + return _run_eval_episode_plans( + cfg, + episode_plans=episode_plans, + policy_runner=policy_runner, + worker_index=worker_index, + artifact_paths=artifact_paths, + show_progress=show_progress, + ) + + +def _run_remote_eval_worker( + cfg: DictConfig, + episode_plans: list[dict[str, Any]], + *, + worker_index: int, + server_index: int, + request_queue, + response_queue, + artifact_paths: Optional[dict[str, Optional[str]]] = None, + show_progress: bool = False, +) -> dict[str, Any]: + eval_cfg = cfg.eval + agent_cfg = cfg.agent + num_action_steps = int(agent_cfg.get('num_action_steps', eval_cfg.get('num_queries', 1))) + policy_runner = _RemotePolicyRunner( + worker_index=worker_index, + server_index=server_index, + request_queue=request_queue, + response_queue=response_queue, + camera_names=_resolve_policy_camera_names(cfg), + obs_horizon=int(agent_cfg.get('obs_horizon', eval_cfg.obs_horizon)), + num_action_steps=num_action_steps, + response_timeout_s=float(eval_cfg.get('response_timeout_s', 300.0)), + ) + return _run_eval_episode_plans( + cfg, + episode_plans=episode_plans, + policy_runner=policy_runner, + worker_index=worker_index, + artifact_paths=artifact_paths, + show_progress=show_progress, + ) + + +def _run_eval_serial(cfg: DictConfig): + eval_cfg = cfg.eval + _configure_headless_mujoco_gl(eval_cfg) + artifact_paths = _resolve_artifact_paths(eval_cfg) + episode_plans = _build_episode_plans(eval_cfg.num_episodes) + summary = _run_eval_worker( + cfg, + episode_plans=episode_plans, + worker_index=0, + artifact_paths=artifact_paths, + show_progress=True, + ) + return _public_summary(summary) + + +def _validate_parallel_eval_cfg(eval_cfg: DictConfig): + if not bool(eval_cfg.get('headless', False)): + raise ValueError('eval.num_workers > 1 requires eval.headless=true') + + unsupported_exports = [ + flag_name + for flag_name in ( + 'record_video', + 'save_trajectory', + 'save_trajectory_npz', + ) + if bool(eval_cfg.get(flag_name, False)) + ] + if unsupported_exports: + joined_flags = ', '.join(unsupported_exports) + raise ValueError( + 'eval.num_workers > 1 does not yet support parallel export for ' + f'{joined_flags}' + ) + + +def _is_cuda_device(device: Any) -> bool: + return str(device).lower().startswith('cuda') + + +def _resolve_cuda_devices(eval_cfg: DictConfig) -> list[int]: + if not _is_cuda_device(eval_cfg.get('device', 'cpu')): + return [] + + configured_devices = eval_cfg.get('cuda_devices', None) + if configured_devices is None: + return [0] + + resolved_devices = [int(device_index) for device_index in configured_devices] + if not resolved_devices: + raise ValueError('eval.cuda_devices must not be empty when eval.device is CUDA') + if any(device_index < 0 for device_index in resolved_devices): + raise ValueError('eval.cuda_devices must contain non-negative logical CUDA device indices') + return resolved_devices + + +def _run_spawn_jobs( + payloads: list[dict[str, Any]], + max_workers: int, + worker_fn, +) -> list[Any]: + if not payloads: + return [] + + ctx = multiprocessing.get_context('spawn') + results = [] + with concurrent.futures.ProcessPoolExecutor( + max_workers=int(max_workers), + mp_context=ctx, + ) as executor: + future_to_payload = { + executor.submit(worker_fn, payload): payload + for payload in payloads + } + try: + for future in concurrent.futures.as_completed(future_to_payload): + results.append(future.result()) + except Exception: + for future in future_to_payload: + future.cancel() + raise + return results + + +def _run_eval_worker_entry(payload: dict[str, Any]) -> dict[str, Any]: + if payload.get('_spawn_probe', False): + return { + 'probe_value': int(payload['probe_value']), + 'worker_index': int(payload.get('worker_index', -1)), + } + + cfg = OmegaConf.create(payload['cfg']) + artifact_paths = _resolve_artifact_paths( + cfg.eval, + output_dir_override=payload.get('artifact_dir'), + ) + return _run_eval_worker( + cfg, + episode_plans=list(payload.get('episode_plans', [])), + worker_index=int(payload.get('worker_index', 0)), + artifact_paths=artifact_paths, + show_progress=False, + ) + + +def _build_parallel_worker_payloads( + cfg: DictConfig, + artifact_paths: dict[str, Optional[str]], +) -> tuple[list[dict[str, Any]], int]: + eval_cfg = cfg.eval + requested_workers = int(eval_cfg.get('num_workers', 1)) + episode_splits = _split_episode_indices( + num_episodes=int(eval_cfg.num_episodes), + num_workers=requested_workers, + ) + box_poses = _plan_episode_box_poses(int(eval_cfg.num_episodes)) + resolved_cfg = OmegaConf.to_container(cfg, resolve=True) + payloads = [] + workers_dir = None + if artifact_paths.get('output_dir') is not None: + workers_dir = Path(str(artifact_paths['output_dir'])) / 'workers' + workers_dir.mkdir(parents=True, exist_ok=True) + artifact_paths['workers_dir'] = str(workers_dir) + + for worker_index, episode_indices in enumerate(episode_splits): + worker_artifact_dir = None + if workers_dir is not None: + worker_artifact_dir = workers_dir / f'worker_{worker_index:02d}' + worker_artifact_dir.mkdir(parents=True, exist_ok=True) + + worker_cfg = json.loads(json.dumps(resolved_cfg)) + worker_cfg['eval']['artifact_dir'] = ( + str(worker_artifact_dir) if worker_artifact_dir is not None else None + ) + payloads.append({ + 'cfg': worker_cfg, + 'worker_index': int(worker_index), + 'artifact_dir': str(worker_artifact_dir) if worker_artifact_dir is not None else None, + 'episode_plans': [ + { + 'episode_index': int(episode_index), + 'box_pos': np.asarray(box_poses[episode_index], dtype=np.float32).tolist(), + } + for episode_index in episode_indices + ], + }) + + return payloads, len(episode_splits) + + +def _build_cuda_server_payloads( + cfg: DictConfig, + worker_payloads: list[dict[str, Any]], + cuda_devices: list[int], +) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + resolved_cfg = OmegaConf.to_container(cfg, resolve=True) + server_payloads = [ + { + 'cfg': json.loads(json.dumps(resolved_cfg)), + 'server_index': int(server_index), + 'device_index': int(device_index), + 'worker_indices': [], + } + for server_index, device_index in enumerate(cuda_devices) + ] + + assigned_workers = [] + for worker_payload in worker_payloads: + server_index = int(worker_payload['worker_index']) % len(server_payloads) + assigned_payload = dict(worker_payload) + assigned_payload['server_index'] = server_index + assigned_payload['device_index'] = int(server_payloads[server_index]['device_index']) + assigned_workers.append(assigned_payload) + server_payloads[server_index]['worker_indices'].append(int(worker_payload['worker_index'])) + + return server_payloads, assigned_workers + + +def _inference_server_main(payload: dict[str, Any]) -> None: + request_queue = payload['request_queue'] + response_queues = payload['response_queues'] + server_index = int(payload.get('server_index', -1)) + + if payload.get('_spawn_probe', False): + while True: + message = request_queue.get() + if message.get('type') == 'shutdown_server': + return + if message.get('type') != 'predict_chunk': + continue + response_queues[int(message['worker_index'])].put({ + 'type': 'predict_chunk_result', + 'server_index': server_index, + 'actions': np.asarray([[[11.0], [22.0], [33.0]]], dtype=np.float32), + }) + return + + result_queue = payload.get('result_queue') + cfg = OmegaConf.create(payload['cfg']) + device = f'cuda:{int(payload["device_index"])}' + try: + agent, _dataset_stats = load_checkpoint( + ckpt_path=cfg.eval.ckpt_path, + agent_cfg=cfg.agent, + device=device, + ) + if result_queue is not None: + result_queue.put({ + 'kind': 'server_ready', + 'server_index': server_index, + }) + while True: + message = request_queue.get() + message_type = message.get('type') + if message_type == 'shutdown_server': + return + if message_type != 'predict_chunk': + raise ValueError(f'unknown server message type: {message_type}') + + batch = _deserialize_policy_batch(message['batch'], device=device) + with torch.inference_mode(): + actions = agent.predict_action_chunk(batch) + if torch.cuda.is_available(): + torch.cuda.synchronize() + response_queues[int(message['worker_index'])].put({ + 'type': 'predict_chunk_result', + 'server_index': server_index, + 'actions': actions.detach().cpu().numpy().astype(np.float32, copy=True), + }) + except Exception as exc: + if result_queue is not None: + result_queue.put({ + 'kind': 'server_error', + 'server_index': server_index, + 'error': str(exc), + }) + raise + + +def _env_worker_main(payload: dict[str, Any]) -> None: + worker_index = int(payload.get('worker_index', -1)) + server_index = int(payload.get('server_index', -1)) + request_queue = payload['request_queue'] + response_queue = payload['response_queue'] + result_queue = payload.get('result_queue') + + if payload.get('_spawn_probe', False): + request_queue.put({ + 'type': 'predict_chunk', + 'worker_index': worker_index, + 'server_index': server_index, + 'episode_index': 0, + 'timestep': 0, + 'batch': { + 'qpos': np.zeros((1, 1, 1), dtype=np.float32), + 'images': {}, + }, + }) + response = response_queue.get(timeout=10.0) + result_queue.put({ + 'kind': 'worker_result', + 'worker_index': worker_index, + 'summary': { + 'probe_worker_index': worker_index, + 'probe_server_index': server_index, + 'probe_actions': np.asarray(response['actions']).tolist(), + }, + }) + return + + cfg = OmegaConf.create(payload['cfg']) + artifact_paths = _resolve_artifact_paths( + cfg.eval, + output_dir_override=payload.get('artifact_dir'), + ) + + try: + summary = _run_remote_eval_worker( + cfg, + episode_plans=list(payload.get('episode_plans', [])), + worker_index=worker_index, + server_index=server_index, + request_queue=request_queue, + response_queue=response_queue, + artifact_paths=artifact_paths, + show_progress=False, + ) + except Exception as exc: + if result_queue is not None: + result_queue.put({ + 'kind': 'worker_error', + 'worker_index': worker_index, + 'server_index': server_index, + 'error': str(exc), + }) + raise + + if result_queue is not None: + result_queue.put({ + 'kind': 'worker_result', + 'worker_index': worker_index, + 'server_index': server_index, + 'summary': summary, + }) + + +def _shutdown_processes(processes: list[tuple[Any, dict[str, Any]]], *, terminate: bool = False) -> None: + for process, _payload in processes: + if terminate and process.is_alive(): + process.terminate() + for process, _payload in processes: + process.join(timeout=5.0) + if process.is_alive(): + process.terminate() + process.join(timeout=5.0) + + +def _run_cuda_parallel_processes( + server_payloads: list[dict[str, Any]], + worker_payloads: list[dict[str, Any]], +) -> list[dict[str, Any]]: + if not worker_payloads: + return [] + + ctx = multiprocessing.get_context('spawn') + result_queue = ctx.Queue() + response_queues = [ctx.Queue() for _ in range(len(worker_payloads))] + request_queues = {} + server_processes: list[tuple[Any, dict[str, Any]]] = [] + worker_processes: list[tuple[Any, dict[str, Any]]] = [] + should_terminate = False + startup_timeout_s = float( + server_payloads[0]['cfg']['eval'].get('server_startup_timeout_s', 300.0) + ) if server_payloads else 300.0 + + try: + for server_payload in server_payloads: + request_queue = ctx.Queue() + request_queues[int(server_payload['server_index'])] = request_queue + process_payload = dict(server_payload) + process_payload['request_queue'] = request_queue + process_payload['response_queues'] = response_queues + process_payload['result_queue'] = result_queue + process = ctx.Process( + target=_inference_server_main, + args=(process_payload,), + name=f'eval-inference-server-{int(server_payload["server_index"]):02d}', + ) + process.start() + server_processes.append((process, server_payload)) + + pending_servers = {int(payload['server_index']) for _process, payload in server_processes} + startup_deadline = time.monotonic() + startup_timeout_s + while pending_servers: + remaining = startup_deadline - time.monotonic() + if remaining <= 0: + raise RuntimeError( + 'Timed out waiting for CUDA inference servers to become ready' + ) + try: + message = result_queue.get(timeout=min(0.2, remaining)) + except queue.Empty: + for process, payload in server_processes: + if process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA inference server {int(payload["server_index"])} exited with code ' + f'{process.exitcode}' + ) + continue + + message_kind = message.get('kind') + if message_kind == 'server_ready': + pending_servers.discard(int(message['server_index'])) + continue + if message_kind == 'server_error': + raise RuntimeError( + f'CUDA inference server {int(message["server_index"])} failed: {message.get("error")}' + ) + raise RuntimeError(f'Unexpected CUDA startup message: {message_kind}') + + for worker_payload in worker_payloads: + worker_index = int(worker_payload['worker_index']) + process_payload = dict(worker_payload) + process_payload['request_queue'] = request_queues[int(worker_payload['server_index'])] + process_payload['response_queue'] = response_queues[worker_index] + process_payload['result_queue'] = result_queue + process = ctx.Process( + target=_env_worker_main, + args=(process_payload,), + name=f'eval-env-worker-{worker_index:02d}', + ) + process.start() + worker_processes.append((process, worker_payload)) + + pending_workers = {int(payload['worker_index']) for payload in worker_payloads} + worker_summaries = {} + while pending_workers: + try: + message = result_queue.get(timeout=0.2) + except queue.Empty: + for process, payload in server_processes: + if process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA inference server {int(payload["server_index"])} exited with code ' + f'{process.exitcode}' + ) + for process, payload in worker_processes: + worker_index = int(payload['worker_index']) + if worker_index in pending_workers and process.exitcode not in (None, 0): + raise RuntimeError( + f'CUDA rollout worker {worker_index} exited with code {process.exitcode}' + ) + continue + + message_kind = message.get('kind') + if message_kind == 'worker_result': + worker_index = int(message['worker_index']) + worker_summaries[worker_index] = message['summary'] + pending_workers.discard(worker_index) + continue + if message_kind == 'worker_error': + raise RuntimeError( + f'CUDA rollout worker {int(message["worker_index"])} failed: {message.get("error")}' + ) + if message_kind == 'server_error': + raise RuntimeError( + f'CUDA inference server {int(message["server_index"])} failed: {message.get("error")}' + ) + raise RuntimeError(f'Unexpected CUDA parallel message: {message_kind}') + + return [ + worker_summaries[worker_index] + for worker_index in sorted(worker_summaries) + ] + except Exception: + should_terminate = True + raise + finally: + for request_queue in request_queues.values(): + try: + request_queue.put({'type': 'shutdown_server'}) + except Exception: + continue + _shutdown_processes(worker_processes, terminate=should_terminate) + _shutdown_processes(server_processes, terminate=should_terminate) + + +def _run_eval_parallel(cfg: DictConfig): + eval_cfg = cfg.eval + _configure_headless_mujoco_gl(eval_cfg) + _validate_parallel_eval_cfg(eval_cfg) + if _is_cuda_device(eval_cfg.get('device', 'cpu')): + _resolve_cuda_devices(eval_cfg) + return _run_eval_parallel_cuda(cfg) + return _run_eval_parallel_cpu(cfg) + + +def _run_eval_parallel_cpu(cfg: DictConfig): + eval_cfg = cfg.eval + artifact_paths = _resolve_artifact_paths(eval_cfg) + payloads, active_workers = _build_parallel_worker_payloads(cfg, artifact_paths) + + try: + worker_summaries = _run_spawn_jobs( + payloads=payloads, + max_workers=max(active_workers, 1), + worker_fn=_run_eval_worker_entry, + ) + except Exception as exc: + raise RuntimeError(f'Parallel rollout worker failed: {exc}') from exc + + summary = _merge_worker_summaries(worker_summaries, artifact_paths) + public_summary = _public_summary(summary) + if artifact_paths.get('summary_json') is not None: + _save_summary_json(artifact_paths['summary_json'], public_summary) + if artifact_paths.get('timing_json') is not None: + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + return public_summary + + +def _run_eval_parallel_cuda(cfg: DictConfig): + eval_cfg = cfg.eval + _validate_parallel_eval_cfg(eval_cfg) + artifact_paths = _resolve_artifact_paths(eval_cfg) + worker_payloads, _active_workers = _build_parallel_worker_payloads(cfg, artifact_paths) + cuda_devices = _resolve_cuda_devices(eval_cfg) + server_payloads, assigned_worker_payloads = _build_cuda_server_payloads( + cfg, + worker_payloads=worker_payloads, + cuda_devices=cuda_devices, + ) + + try: + worker_summaries = _run_cuda_parallel_processes( + server_payloads=server_payloads, + worker_payloads=assigned_worker_payloads, + ) + except Exception as exc: + raise RuntimeError(f'Parallel CUDA rollout failed: {exc}') from exc + + summary = _merge_worker_summaries(worker_summaries, artifact_paths) + public_summary = _public_summary(summary) + if artifact_paths.get('summary_json') is not None: + _save_summary_json(artifact_paths['summary_json'], public_summary) + if artifact_paths.get('timing_json') is not None: + _save_summary_json(artifact_paths['timing_json'], public_summary.get('timing_summary', {})) + return public_summary + + +def _run_eval(cfg: DictConfig): + """ + 使用 agent 内置队列管理的简化版 VLA 评估 + + 所有评估参数来自 vla/conf/eval.yaml,合并到 cfg 中。 + 命令行覆盖: python eval_vla_simple.py eval.ckpt_path=... eval.num_episodes=5 + """ + + _print_eval_config(cfg) + requested_workers = int(cfg.eval.get('num_workers', 1)) + active_workers = _normalize_num_workers( + num_workers=requested_workers, + num_episodes=int(cfg.eval.get('num_episodes', 0)), + ) + if active_workers <= 1: + return _run_eval_serial(cfg) + return _run_eval_parallel(cfg) + + @hydra.main(version_base=None, config_path="../../vla/conf", config_name="config") def main(cfg: DictConfig): return _run_eval(cfg) diff --git a/roboimi/demos/vla_scripts/train_vla.py b/roboimi/demos/vla_scripts/train_vla.py index d4e3842..bace221 100644 --- a/roboimi/demos/vla_scripts/train_vla.py +++ b/roboimi/demos/vla_scripts/train_vla.py @@ -838,10 +838,28 @@ def _run_training(cfg: DictConfig): from roboimi.demos.vla_scripts import eval_vla rollout_cfg = OmegaConf.create(OmegaConf.to_container(cfg, resolve=False)) + rollout_num_episodes = int(cfg.train.get('rollout_num_episodes', 1)) + rollout_device = str(cfg.train.get('rollout_device', cfg.train.device)) + configured_rollout_workers = cfg.train.get('rollout_num_workers', None) + if configured_rollout_workers is None: + if rollout_device.startswith('cuda'): + rollout_num_workers = min(max(rollout_num_episodes, 1), 8) + else: + rollout_num_workers = 1 + else: + rollout_num_workers = int(configured_rollout_workers) rollout_cfg.eval.ckpt_path = str(checkpoint_path) - rollout_cfg.eval.num_episodes = int(cfg.train.get('rollout_num_episodes', 1)) + rollout_cfg.eval.num_episodes = rollout_num_episodes + rollout_cfg.eval.num_workers = rollout_num_workers rollout_cfg.eval.headless = True - rollout_cfg.eval.device = 'cpu' + rollout_cfg.eval.device = rollout_device + rollout_cfg.eval.cuda_devices = cfg.train.get('rollout_cuda_devices', None) + rollout_cfg.eval.response_timeout_s = float( + cfg.train.get('rollout_response_timeout_s', 300.0) + ) + rollout_cfg.eval.server_startup_timeout_s = float( + cfg.train.get('rollout_server_startup_timeout_s', 300.0) + ) rollout_cfg.eval.verbose_action = False rollout_cfg.eval.record_video = False rollout_cfg.eval.save_trajectory_image = True @@ -852,9 +870,11 @@ def _run_training(cfg: DictConfig): ) log.info( - "🎯 开始 checkpoint rollout 验证: %s (episodes=%s, headless=True)", + "🎯 开始 checkpoint rollout 验证: %s (episodes=%s, device=%s, workers=%s, headless=True)", checkpoint_path, rollout_cfg.eval.num_episodes, + rollout_cfg.eval.device, + rollout_cfg.eval.num_workers, ) return eval_vla._run_eval(rollout_cfg) diff --git a/roboimi/vla/conf/config.yaml b/roboimi/vla/conf/config.yaml index 05818a0..ab9e1ab 100644 --- a/roboimi/vla/conf/config.yaml +++ b/roboimi/vla/conf/config.yaml @@ -31,6 +31,11 @@ train: rollout_val_freq_epochs: 50 # 每隔多少个 epoch 执行一次 rollout 验证 rollout_validate_on_checkpoint: false # 是否在保存 checkpoint 后立即运行 rollout 验证 rollout_num_episodes: 3 # rollout 验证的回合数 + rollout_device: ${train.device} # rollout 使用的设备;默认跟随训练设备 + rollout_num_workers: null # rollout 并行 worker 数;null 时 CUDA 自动推断,CPU 保持 1 + rollout_cuda_devices: null # rollout CUDA 并行使用的逻辑 device 列表;null 时默认 [0] + rollout_response_timeout_s: 300.0 # rollout worker 等待 inference server 响应的超时时间 + rollout_server_startup_timeout_s: 300.0 # rollout 等待 inference server 就绪的超时时间 # 学习率调度器(带预热) warmup_steps: 2000 # 预热步数(Transformer建议更长) diff --git a/roboimi/vla/conf/eval/eval.yaml b/roboimi/vla/conf/eval/eval.yaml index 06b47e3..b2e38a2 100644 --- a/roboimi/vla/conf/eval/eval.yaml +++ b/roboimi/vla/conf/eval/eval.yaml @@ -2,6 +2,10 @@ # 评估配置 ckpt_path: "checkpoints/vla_model_best.pt" # 模型检查点路径 num_episodes: 3 # 评估回合数 +num_workers: 1 # 并行 worker 数;1 表示保持单进程评估 +cuda_devices: null # CUDA 并行评估时使用的逻辑设备列表;null 表示默认 [0] +response_timeout_s: 300.0 # worker 等待 inference server 响应的超时时间(秒) +server_startup_timeout_s: 300.0 # parent 等待 inference server 就绪的超时时间(秒) max_timesteps: 700 # 每回合最大时间步 device: ${train.device} # 与训练保持一致 task_name: "sim_transfer" # 环境任务名称 diff --git a/tests/test_eval_vla_execution.py b/tests/test_eval_vla_execution.py index 6a468ac..f9d7a44 100644 --- a/tests/test_eval_vla_execution.py +++ b/tests/test_eval_vla_execution.py @@ -1,5 +1,11 @@ import unittest +from unittest import mock +import numpy as np +import torch +from omegaconf import OmegaConf + +from roboimi.demos.vla_scripts import eval_vla from roboimi.vla.eval_utils import execute_policy_action @@ -14,6 +20,48 @@ class _FakeEnv: self.calls.append(("step_jnt", action)) +class _FakeQueue: + def __init__(self, initial_items=None): + self.items = list(initial_items or []) + self.put_calls = [] + + def put(self, item): + self.put_calls.append(item) + self.items.append(item) + + def get(self, timeout=None): + del timeout + if not self.items: + raise AssertionError("queue unexpectedly empty") + return self.items.pop(0) + + +def _make_parallel_cfg(**eval_overrides): + eval_cfg = { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 5, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "artifact_dir": None, + "save_artifacts": False, + "save_summary_json": False, + "save_timing": False, + "save_trajectory": False, + "save_trajectory_npz": False, + "record_video": False, + "save_trajectory_image": False, + } + eval_cfg.update(eval_overrides) + return OmegaConf.create({"agent": {}, "eval": eval_cfg}) + + class EvalVLAExecutionTest(unittest.TestCase): def test_execute_policy_action_uses_ee_step(self): env = _FakeEnv() @@ -23,6 +71,662 @@ class EvalVLAExecutionTest(unittest.TestCase): self.assertEqual(env.calls, [("step", action)]) + def test_split_episode_indices_balances_workers(self): + self.assertEqual( + eval_vla._split_episode_indices(num_episodes=10, num_workers=3), + [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9]], + ) + + def test_normalize_num_workers_caps_worker_count_to_episode_count(self): + self.assertEqual(eval_vla._normalize_num_workers(num_workers=5, num_episodes=2), 2) + + def test_plan_episode_box_poses_uses_global_episode_order(self): + planned_poses = [ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([1.1, 1.2, 1.3], dtype=np.float32), + np.array([2.1, 2.2, 2.3], dtype=np.float32), + ] + sampler = mock.Mock(side_effect=planned_poses) + + result = eval_vla._plan_episode_box_poses(num_episodes=3, sampler=sampler) + + self.assertEqual(sampler.call_count, 3) + self.assertEqual(len(result), 3) + for expected, actual in zip(planned_poses, result): + np.testing.assert_array_equal(actual, expected) + + def test_resolve_policy_camera_names_matches_vlaagent_fallback_sorting(self): + cfg = OmegaConf.create( + { + "agent": { + "_target_": "roboimi.vla.agent.VLAAgent", + }, + "eval": { + "camera_names": ["r_vis", "top", "front"], + }, + } + ) + + self.assertEqual( + eval_vla._resolve_policy_camera_names(cfg), + ["front", "r_vis", "top"], + ) + + def test_resolve_policy_camera_names_matches_gr00t_fallback_input_order(self): + cfg = OmegaConf.create( + { + "agent": { + "_target_": "roboimi.vla.agent_gr00t_dit.VLAAgentGr00tDiT", + }, + "eval": { + "camera_names": ["r_vis", "top", "front"], + }, + } + ) + + self.assertEqual( + eval_vla._resolve_policy_camera_names(cfg), + ["r_vis", "top", "front"], + ) + + def test_build_episode_plans_without_box_poses_keeps_serial_sampling_lazy(self): + plans = eval_vla._build_episode_plans(num_episodes=3) + + self.assertEqual( + plans, + [ + {"episode_index": 0}, + {"episode_index": 1}, + {"episode_index": 2}, + ], + ) + + def test_prepare_local_policy_batch_pads_latest_observation_to_obs_horizon(self): + queues = eval_vla._new_local_policy_queues(obs_horizon=3) + observation = { + "qpos": torch.tensor([1.0, 2.0], dtype=torch.float32), + "images": { + "front": torch.tensor([[[1.0]]], dtype=torch.float32), + }, + } + + eval_vla._populate_local_policy_queues(queues, observation) + batch = eval_vla._prepare_local_policy_batch( + queues, + obs_horizon=3, + camera_names=["front"], + ) + + self.assertEqual(tuple(batch["qpos"].shape), (1, 3, 2)) + self.assertEqual(tuple(batch["images"]["front"].shape), (1, 3, 1, 1, 1)) + np.testing.assert_array_equal( + batch["qpos"][0].cpu().numpy(), + np.array([[1.0, 2.0], [1.0, 2.0], [1.0, 2.0]], dtype=np.float32), + ) + np.testing.assert_array_equal( + batch["images"]["front"][0].cpu().numpy(), + np.array([[[[1.0]]], [[[1.0]]], [[[1.0]]]], dtype=np.float32), + ) + + def test_enqueue_predicted_actions_uses_executable_slice(self): + queues = eval_vla._new_local_policy_queues(obs_horizon=2) + predicted_actions = torch.tensor( + [[[10.0], [20.0], [30.0], [40.0]]], + dtype=torch.float32, + ) + + eval_vla._enqueue_predicted_actions( + queues, + predicted_actions=predicted_actions, + obs_horizon=2, + num_action_steps=2, + ) + + self.assertEqual(len(queues["action"]), 2) + np.testing.assert_array_equal(queues["action"].popleft().numpy(), np.array([20.0], dtype=np.float32)) + np.testing.assert_array_equal(queues["action"].popleft().numpy(), np.array([30.0], dtype=np.float32)) + + def test_remote_policy_runner_only_requests_server_inference_when_local_action_queue_is_empty(self): + request_queue = _FakeQueue() + response_queue = _FakeQueue( + [ + { + "type": "predict_chunk_result", + "actions": np.asarray([[[10.0], [20.0], [30.0]]], dtype=np.float32), + } + ] + ) + runner = eval_vla._RemotePolicyRunner( + worker_index=3, + server_index=1, + request_queue=request_queue, + response_queue=response_queue, + camera_names=["front"], + obs_horizon=2, + num_action_steps=2, + ) + first_observation = { + "qpos": torch.tensor([1.0, 2.0], dtype=torch.float32), + "images": {"front": torch.tensor([[[1.0]]], dtype=torch.float32)}, + } + second_observation = { + "qpos": torch.tensor([3.0, 4.0], dtype=torch.float32), + "images": {"front": torch.tensor([[[2.0]]], dtype=torch.float32)}, + } + + first_action, first_forward = runner.select_action( + first_observation, + episode_index=7, + timestep=0, + ) + second_action, second_forward = runner.select_action( + second_observation, + episode_index=7, + timestep=1, + ) + + self.assertTrue(first_forward) + self.assertFalse(second_forward) + self.assertEqual(len(request_queue.put_calls), 1) + self.assertEqual(request_queue.put_calls[0]["type"], "predict_chunk") + self.assertEqual(request_queue.put_calls[0]["worker_index"], 3) + self.assertEqual(request_queue.put_calls[0]["server_index"], 1) + np.testing.assert_array_equal(first_action.numpy(), np.array([20.0], dtype=np.float32)) + np.testing.assert_array_equal(second_action.numpy(), np.array([30.0], dtype=np.float32)) + + def test_merge_worker_summaries_sorts_episodes_and_recomputes_aggregates(self): + worker_summaries = [ + { + "avg_inference_fps": 999.0, + "avg_control_fps": 999.0, + "avg_obs_read_time_ms": 999.0, + "avg_total_time_ms": 999.0, + "timing_summary": {"count": 999, "model_forward_count": 999}, + "episodes": [ + { + "episode_index": 2, + "episode_reward": 9.0, + "episode_max_reward": 4.0, + "inference_fps": 30.0, + "control_fps": 15.0, + } + ], + "_merge_state": { + "obs_read_time_ms": [9.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [3.0], + "env_step_time_ms": [4.0], + "total_time_ms": [10.0], + "model_forward_flags": [False], + }, + }, + { + "avg_inference_fps": 888.0, + "avg_control_fps": 888.0, + "avg_obs_read_time_ms": 888.0, + "avg_total_time_ms": 888.0, + "timing_summary": {"count": 888, "model_forward_count": 888}, + "episodes": [ + { + "episode_index": 1, + "episode_reward": 6.0, + "episode_max_reward": 3.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 5.0, + "episode_max_reward": 2.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0, 12.0], + "preprocess_time_ms": [2.0, 3.0, 4.0], + "inference_time_ms": [4.0, 5.0, 6.0], + "env_step_time_ms": [6.0, 7.0, 8.0], + "total_time_ms": [8.0, 9.0, 20.0], + "model_forward_flags": [True, False, True], + }, + }, + ] + artifact_paths = { + "output_dir": "/tmp/merged", + "summary_json": "/tmp/merged/rollout_summary.json", + "timing_json": "/tmp/merged/timing.json", + "trajectory_npz": None, + "video_mp4": None, + "video_camera_name": None, + } + + merged = eval_vla._merge_worker_summaries(worker_summaries, artifact_paths) + + self.assertEqual([episode["episode_index"] for episode in merged["episodes"]], [0, 1, 2]) + self.assertEqual(merged["episode_rewards"], [5.0, 6.0, 9.0]) + self.assertEqual(merged["episode_max_rewards"], [2.0, 3.0, 4.0]) + self.assertAlmostEqual(merged["avg_reward"], 20.0 / 3.0) + self.assertAlmostEqual(merged["avg_max_reward"], 3.0) + self.assertAlmostEqual(merged["avg_inference_fps"], 20.0) + self.assertAlmostEqual(merged["avg_control_fps"], 10.0) + self.assertAlmostEqual(merged["avg_obs_read_time_ms"], 6.0) + self.assertAlmostEqual(merged["avg_total_time_ms"], 47.0 / 4.0) + self.assertEqual(merged["timing_summary"]["count"], 4) + self.assertEqual(merged["timing_summary"]["model_forward_count"], 2) + self.assertEqual(merged["artifact_dir"], "/tmp/merged") + self.assertEqual(merged["artifacts"], artifact_paths) + + def test_build_cuda_server_payloads_uses_round_robin_worker_assignment(self): + cfg = _make_parallel_cfg(num_episodes=4, num_workers=4, device="cuda", cuda_devices=[0, 1]) + artifact_paths = {"output_dir": None} + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + np.array([1.0, 1.1, 1.2], dtype=np.float32), + ], + ): + worker_payloads, _ = eval_vla._build_parallel_worker_payloads(cfg, artifact_paths) + + server_payloads, assigned_workers = eval_vla._build_cuda_server_payloads( + cfg, + worker_payloads=worker_payloads, + cuda_devices=[0, 1], + ) + + self.assertEqual([payload["device_index"] for payload in server_payloads], [0, 1]) + self.assertEqual([payload["worker_index"] for payload in assigned_workers], [0, 1, 2, 3]) + self.assertEqual([payload["server_index"] for payload in assigned_workers], [0, 1, 0, 1]) + self.assertEqual(server_payloads[0]["worker_indices"], [0, 2]) + self.assertEqual(server_payloads[1]["worker_indices"], [1, 3]) + + def test_run_eval_parallel_dispatches_episode_splits_and_box_poses(self): + cfg = _make_parallel_cfg(num_episodes=5, num_workers=2, artifact_dir="/tmp/parallel-root") + planned_poses = [ + np.array([float(index), float(index) + 0.1, float(index) + 0.2], dtype=np.float32) + for index in range(5) + ] + observed_payloads = [] + + def fake_run_spawn_jobs(payloads, max_workers, worker_fn): + del worker_fn + self.assertEqual(max_workers, 2) + observed_payloads.extend(payloads) + return [ + { + "episodes": [ + { + "episode_index": 4, + "episode_reward": 5.0, + "episode_max_reward": 5.0, + "inference_fps": 50.0, + "control_fps": 25.0, + }, + { + "episode_index": 3, + "episode_reward": 4.0, + "episode_max_reward": 4.0, + "inference_fps": 40.0, + "control_fps": 20.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [4.0, 5.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 5.0], + "model_forward_flags": [True, True], + }, + }, + { + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + }, + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0, 3.0], + "preprocess_time_ms": [1.0, 1.0, 1.0], + "inference_time_ms": [2.0, 2.0, 2.0], + "env_step_time_ms": [3.0, 3.0, 3.0], + "total_time_ms": [1.0, 2.0, 3.0], + "model_forward_flags": [False, True, False], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=planned_poses, + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=fake_run_spawn_jobs, + ): + summary = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(len(observed_payloads), 2) + self.assertEqual( + [[plan["episode_index"] for plan in payload["episode_plans"]] for payload in observed_payloads], + [[0, 1, 2], [3, 4]], + ) + for payload in observed_payloads: + for plan in payload["episode_plans"]: + np.testing.assert_array_equal( + np.asarray(plan["box_pos"], dtype=np.float32), + planned_poses[plan["episode_index"]], + ) + self.assertEqual([episode["episode_index"] for episode in summary["episodes"]], [0, 1, 2, 3, 4]) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0, 4.0, 5.0]) + self.assertEqual(summary["num_episodes"], 5) + + def test_run_eval_parallel_allows_trajectory_images_and_keeps_worker_artifact_paths(self): + cfg = _make_parallel_cfg( + num_episodes=2, + num_workers=2, + artifact_dir="/tmp/parallel-images", + save_summary_json=True, + save_trajectory_image=True, + ) + observed_payloads = [] + + def fake_run_spawn_jobs(payloads, max_workers, worker_fn): + del worker_fn + self.assertEqual(max_workers, 2) + observed_payloads.extend(payloads) + return [ + { + "episodes": [ + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + "artifact_paths": { + "trajectory_image": f"{payloads[0]['artifact_dir']}/rollout_front_ep01_trajectory.png", + }, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0], + "preprocess_time_ms": [1.0], + "inference_time_ms": [1.0], + "env_step_time_ms": [1.0], + "total_time_ms": [1.0], + "model_forward_flags": [True], + }, + }, + { + "episodes": [ + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + "artifact_paths": { + "trajectory_image": f"{payloads[1]['artifact_dir']}/rollout_front_ep02_trajectory.png", + }, + }, + ], + "_merge_state": { + "obs_read_time_ms": [2.0], + "preprocess_time_ms": [2.0], + "inference_time_ms": [2.0], + "env_step_time_ms": [2.0], + "total_time_ms": [2.0], + "model_forward_flags": [False], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=fake_run_spawn_jobs, + ): + summary = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(len(observed_payloads), 2) + self.assertTrue(observed_payloads[0]["artifact_dir"].endswith("workers/worker_00")) + self.assertTrue(observed_payloads[1]["artifact_dir"].endswith("workers/worker_01")) + self.assertTrue( + summary["episodes"][0]["artifact_paths"]["trajectory_image"].endswith( + "workers/worker_00/rollout_front_ep01_trajectory.png" + ) + ) + self.assertTrue( + summary["episodes"][1]["artifact_paths"]["trajectory_image"].endswith( + "workers/worker_01/rollout_front_ep02_trajectory.png" + ) + ) + + def test_run_eval_parallel_surfaces_worker_failures(self): + cfg = _make_parallel_cfg(num_episodes=2, num_workers=2) + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_spawn_jobs", + side_effect=RuntimeError("boom"), + ): + with self.assertRaisesRegex(RuntimeError, "Parallel rollout worker failed"): + eval_vla._run_eval_parallel(cfg) + + def test_run_eval_parallel_cuda_builds_server_payloads_and_merges_worker_results(self): + cfg = _make_parallel_cfg( + num_episodes=4, + num_workers=4, + device="cuda", + cuda_devices=[0], + artifact_dir="/tmp/cuda-root", + ) + observed_server_payloads = [] + observed_worker_payloads = [] + + def fake_run_cuda_parallel_processes(server_payloads, worker_payloads): + observed_server_payloads.extend(server_payloads) + observed_worker_payloads.extend(worker_payloads) + return [ + { + "episodes": [ + { + "episode_index": 2, + "episode_reward": 3.0, + "episode_max_reward": 3.0, + "inference_fps": 30.0, + "control_fps": 15.0, + }, + { + "episode_index": 0, + "episode_reward": 1.0, + "episode_max_reward": 1.0, + "inference_fps": 10.0, + "control_fps": 5.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [1.0, 2.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 4.0], + "model_forward_flags": [True, False], + }, + }, + { + "episodes": [ + { + "episode_index": 3, + "episode_reward": 4.0, + "episode_max_reward": 4.0, + "inference_fps": 40.0, + "control_fps": 20.0, + }, + { + "episode_index": 1, + "episode_reward": 2.0, + "episode_max_reward": 2.0, + "inference_fps": 20.0, + "control_fps": 10.0, + }, + ], + "_merge_state": { + "obs_read_time_ms": [3.0, 4.0], + "preprocess_time_ms": [1.0, 1.0], + "inference_time_ms": [2.0, 2.0], + "env_step_time_ms": [3.0, 3.0], + "total_time_ms": [4.0, 4.0], + "model_forward_flags": [True, True], + }, + }, + ] + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + np.array([0.7, 0.8, 0.9], dtype=np.float32), + np.array([1.0, 1.1, 1.2], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_cuda_parallel_processes", + side_effect=fake_run_cuda_parallel_processes, + create=True, + ): + summary = eval_vla._run_eval_parallel_cuda(cfg) + + self.assertEqual(len(observed_server_payloads), 1) + self.assertEqual(observed_server_payloads[0]["device_index"], 0) + self.assertEqual(len(observed_worker_payloads), 4) + self.assertTrue(all(payload["server_index"] == 0 for payload in observed_worker_payloads)) + self.assertEqual([episode["episode_index"] for episode in summary["episodes"]], [0, 1, 2, 3]) + self.assertEqual(summary["episode_rewards"], [1.0, 2.0, 3.0, 4.0]) + self.assertEqual(summary["num_episodes"], 4) + + def test_run_eval_parallel_cuda_surfaces_server_failures(self): + cfg = _make_parallel_cfg(num_episodes=2, num_workers=2, device="cuda", cuda_devices=[0]) + + with mock.patch.object( + eval_vla, + "sample_transfer_pose", + side_effect=[ + np.array([0.1, 0.2, 0.3], dtype=np.float32), + np.array([0.4, 0.5, 0.6], dtype=np.float32), + ], + ), mock.patch.object( + eval_vla, + "_run_cuda_parallel_processes", + side_effect=RuntimeError("server boom"), + create=True, + ): + with self.assertRaisesRegex(RuntimeError, "Parallel CUDA rollout failed"): + eval_vla._run_eval_parallel_cuda(cfg) + + def test_run_spawn_jobs_supports_real_spawn_with_actual_eval_worker_entry(self): + payloads = [ + {"_spawn_probe": True, "probe_value": 1, "worker_index": 0}, + {"_spawn_probe": True, "probe_value": 2, "worker_index": 1}, + ] + + results = eval_vla._run_spawn_jobs( + payloads=payloads, + max_workers=2, + worker_fn=eval_vla._run_eval_worker_entry, + ) + + self.assertEqual(sorted(result["probe_value"] for result in results), [1, 2]) + self.assertEqual(sorted(result["worker_index"] for result in results), [0, 1]) + + def test_cuda_server_and_env_worker_entrypoints_support_real_spawn_probe(self): + ctx = eval_vla.multiprocessing.get_context("spawn") + request_queue = ctx.Queue() + response_queue = ctx.Queue() + result_queue = ctx.Queue() + + server = ctx.Process( + target=eval_vla._inference_server_main, + args=( + { + "_spawn_probe": True, + "server_index": 0, + "request_queue": request_queue, + "response_queues": [response_queue], + }, + ), + ) + worker = ctx.Process( + target=eval_vla._env_worker_main, + args=( + { + "_spawn_probe": True, + "worker_index": 0, + "server_index": 0, + "request_queue": request_queue, + "response_queue": response_queue, + "result_queue": result_queue, + }, + ), + ) + + server.start() + worker.start() + + result = result_queue.get(timeout=10.0) + + worker.join(timeout=10.0) + request_queue.put({"type": "shutdown_server"}) + server.join(timeout=10.0) + + self.assertEqual(result["kind"], "worker_result") + self.assertEqual(result["summary"]["probe_worker_index"], 0) + self.assertEqual(result["summary"]["probe_server_index"], 0) + self.assertEqual(result["summary"]["probe_actions"], [[[11.0], [22.0], [33.0]]]) + self.assertEqual(worker.exitcode, 0) + self.assertEqual(server.exitcode, 0) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_eval_vla_headless.py b/tests/test_eval_vla_headless.py index d617855..b921f43 100644 --- a/tests/test_eval_vla_headless.py +++ b/tests/test_eval_vla_headless.py @@ -126,6 +126,26 @@ class EvalVLAHeadlessTest(unittest.TestCase): self.assertIn("headless", eval_cfg) self.assertFalse(eval_cfg.headless) + def test_eval_config_exposes_num_workers_default(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("num_workers", eval_cfg) + self.assertEqual(eval_cfg.num_workers, 1) + + def test_eval_config_exposes_cuda_devices_default(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("cuda_devices", eval_cfg) + self.assertIsNone(eval_cfg.cuda_devices) + + def test_eval_config_exposes_parallel_timeout_defaults(self): + eval_cfg = OmegaConf.load(Path("roboimi/vla/conf/eval/eval.yaml")) + + self.assertIn("response_timeout_s", eval_cfg) + self.assertIn("server_startup_timeout_s", eval_cfg) + self.assertEqual(eval_cfg.response_timeout_s, 300.0) + self.assertEqual(eval_cfg.server_startup_timeout_s, 300.0) + def test_make_sim_env_accepts_headless_and_disables_render(self): fake_env = object() @@ -327,6 +347,172 @@ class EvalVLAHeadlessTest(unittest.TestCase): self.assertAlmostEqual(summary["avg_reward"], 3.75) self.assertEqual(summary["num_episodes"], 2) + def test_run_eval_uses_serial_path_when_num_workers_is_one(self): + cfg = OmegaConf.create( + { + "eval": { + "num_workers": 1, + "num_episodes": 3, + } + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_serial", + return_value={"mode": "serial"}, + ) as run_eval_serial, mock.patch.object( + eval_vla, + "_run_eval_parallel", + ) as run_eval_parallel: + result = eval_vla._run_eval(cfg) + + self.assertEqual(result, {"mode": "serial"}) + run_eval_serial.assert_called_once_with(cfg) + run_eval_parallel.assert_not_called() + + def test_run_eval_uses_serial_path_when_requested_workers_collapse_to_one(self): + cfg = OmegaConf.create( + { + "eval": { + "num_workers": 8, + "num_episodes": 1, + } + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_serial", + return_value={"mode": "serial"}, + ) as run_eval_serial, mock.patch.object( + eval_vla, + "_run_eval_parallel", + ) as run_eval_parallel: + result = eval_vla._run_eval(cfg) + + self.assertEqual(result, {"mode": "serial"}) + run_eval_serial.assert_called_once_with(cfg) + run_eval_parallel.assert_not_called() + + def test_run_eval_parallel_requires_headless_true(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": False, + }, + } + ) + + with self.assertRaisesRegex(ValueError, "headless=true"): + eval_vla._run_eval_parallel(cfg) + + def test_run_eval_parallel_dispatches_to_cpu_workers_when_device_is_cpu(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cpu", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "cuda_devices": None, + }, + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_parallel_cpu", + return_value={"mode": "cpu"}, + create=True, + ) as run_cpu_parallel, mock.patch.object( + eval_vla, + "_run_eval_parallel_cuda", + create=True, + ) as run_cuda_parallel: + result = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(result, {"mode": "cpu"}) + run_cpu_parallel.assert_called_once_with(cfg) + run_cuda_parallel.assert_not_called() + + def test_run_eval_parallel_dispatches_to_cuda_servers_when_device_is_cuda(self): + cfg = OmegaConf.create( + { + "agent": {}, + "eval": { + "ckpt_path": "checkpoints/vla_model_best.pt", + "num_episodes": 2, + "num_workers": 2, + "max_timesteps": 1, + "device": "cuda", + "task_name": "sim_transfer", + "camera_names": ["front"], + "use_smoothing": False, + "smooth_alpha": 0.3, + "verbose_action": False, + "headless": True, + "cuda_devices": [0], + }, + } + ) + + with mock.patch.object( + eval_vla, + "_run_eval_parallel_cpu", + create=True, + ) as run_cpu_parallel, mock.patch.object( + eval_vla, + "_run_eval_parallel_cuda", + return_value={"mode": "cuda"}, + create=True, + ) as run_cuda_parallel: + result = eval_vla._run_eval_parallel(cfg) + + self.assertEqual(result, {"mode": "cuda"}) + run_cpu_parallel.assert_not_called() + run_cuda_parallel.assert_called_once_with(cfg) + + def test_resolve_cuda_devices_defaults_to_single_logical_gpu(self): + cfg = OmegaConf.create( + { + "device": "cuda", + "cuda_devices": None, + } + ) + + self.assertEqual(eval_vla._resolve_cuda_devices(cfg), [0]) + + def test_resolve_cuda_devices_rejects_empty_selection(self): + cfg = OmegaConf.create( + { + "device": "cuda", + "cuda_devices": [], + } + ) + + with self.assertRaisesRegex(ValueError, "cuda_devices"): + eval_vla._resolve_cuda_devices(cfg) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_train_vla_rollout_validation.py b/tests/test_train_vla_rollout_validation.py index 1dbdf9e..78d3159 100644 --- a/tests/test_train_vla_rollout_validation.py +++ b/tests/test_train_vla_rollout_validation.py @@ -158,6 +158,106 @@ class TrainVLARolloutValidationTest(unittest.TestCase): self.assertGreater(float(cfg.train.lr), 5e-5) self.assertGreater(cfg.train.num_workers, 8) self.assertEqual(cfg.train.rollout_val_freq_epochs, 50) + self.assertEqual(cfg.train.rollout_device, cfg.train.device) + self.assertIsNone(cfg.train.rollout_num_workers) + self.assertIsNone(cfg.train.rollout_cuda_devices) + + def test_run_training_rollout_validation_propagates_gpu_parallel_settings(self): + cfg = OmegaConf.create( + { + 'train': { + 'device': 'cpu', + 'batch_size': 1, + 'num_workers': 0, + 'val_split': 0.0, + 'seed': 0, + 'lr': 1e-3, + 'max_steps': 2, + 'log_freq': 1, + 'save_freq': 1000, + 'warmup_steps': 1, + 'scheduler_type': 'constant', + 'min_lr': 0.0, + 'grad_clip': 1.0, + 'weight_decay': 0.0, + 'pretrained_ckpt': None, + 'resume_ckpt': None, + 'use_swanlab': False, + 'rollout_val_freq_epochs': 2, + 'rollout_num_episodes': 5, + 'rollout_device': 'cuda', + 'rollout_num_workers': 4, + 'rollout_cuda_devices': [0, 1], + 'rollout_response_timeout_s': 123.0, + 'rollout_server_startup_timeout_s': 456.0, + }, + 'data': { + 'camera_names': ['front'], + }, + 'agent': { + '_target_': 'fake.agent', + }, + 'eval': { + 'ckpt_path': 'unused.pt', + 'num_episodes': 99, + 'max_timesteps': 1, + 'device': 'cpu', + 'task_name': 'sim_transfer', + 'camera_names': ['front'], + 'use_smoothing': False, + 'smooth_alpha': 0.3, + 'verbose_action': False, + 'headless': False, + }, + } + ) + rollout_mock = mock.Mock(return_value={'avg_reward': 1.0}) + + def fake_instantiate(config_node, **_kwargs): + if config_node is cfg.data: + return _FakeDataset() + if config_node is cfg.agent: + return _FakeAgent() + raise AssertionError(f'unexpected instantiate config: {config_node!r}') + + def fake_dataloader(_dataset, *, shuffle, **_kwargs): + del shuffle, _kwargs + return _FakeLoader( + { + 'observation.front': torch.zeros(1, 3, 2, 2), + 'observation.state': torch.zeros(1, 4), + 'action': torch.zeros(1, 2), + 'action_is_pad': torch.zeros(1, 1, dtype=torch.bool), + }, + length=1, + ) + + with tempfile.TemporaryDirectory() as tempdir: + previous_cwd = os.getcwd() + try: + os.chdir(tempdir) + with mock.patch.object(train_vla, 'instantiate', side_effect=fake_instantiate), \ + mock.patch.object(train_vla, 'DataLoader', side_effect=fake_dataloader), \ + mock.patch.object(train_vla, 'build_training_optimizer', return_value=_FakeOptimizer(cfg.train.lr)), \ + mock.patch.object(train_vla, 'get_lr_schedule_with_warmup', return_value=_FakeScheduler()), \ + mock.patch.object(train_vla, 'tqdm', side_effect=lambda iterable, **kwargs: _FakeProgressBar(iterable)), \ + mock.patch.object(train_vla.torch, 'save', return_value=None), \ + mock.patch.object(eval_vla, '_run_eval', rollout_mock, create=True): + train_vla._run_training(cfg) + finally: + os.chdir(previous_cwd) + + rollout_cfg = rollout_mock.call_args.args[0] + self.assertEqual(rollout_cfg.eval.device, 'cuda') + self.assertEqual(rollout_cfg.eval.num_workers, 4) + self.assertEqual(list(rollout_cfg.eval.cuda_devices), [0, 1]) + self.assertEqual(float(rollout_cfg.eval.response_timeout_s), 123.0) + self.assertEqual(float(rollout_cfg.eval.server_startup_timeout_s), 456.0) + self.assertTrue(rollout_cfg.eval.headless) + self.assertEqual(rollout_cfg.eval.num_episodes, 5) + self.assertFalse(rollout_cfg.eval.record_video) + self.assertTrue(rollout_cfg.eval.save_summary_json) + self.assertTrue(rollout_cfg.eval.save_trajectory_image) def test_training_passes_backbone_image_resize_override_to_dataset_instantiation(self): cfg = OmegaConf.create(