chore: 删除多余文件

This commit is contained in:
gouhanke
2026-02-04 21:51:47 +08:00
parent 30b8ff4d7d
commit 8fce9c89ef
4 changed files with 0 additions and 283 deletions

View File

@@ -1 +0,0 @@
# 下载预训练 VLM 权重

View File

@@ -1,58 +0,0 @@
import hydra
import torch
from omegaconf import DictConfig, OmegaConf
from roboimi.vla.agent import VLAAgent
@hydra.main(version_base=None, config_path="../conf", config_name="config")
def main(cfg: DictConfig):
print(">>> Initializing VLA Agent (Skeleton Phase)...")
# For this test, we override the default agent with our debug config
# In a real run, this would be set via command line or defaults list
from hydra.utils import instantiate
# Instantiate the agent using the debug configuration
# Assuming 'agent' is a key in your root config.yaml that points to debug_vla
# If testing isolated, we instantiate the structure directly.
agent: VLAAgent = instantiate(cfg.agent)
print(f"✅ Agent assembled: {type(agent).__name__}")
print(f" - Backbone: {type(agent.backbone).__name__}")
print(f" - Projector: {type(agent.projector).__name__}")
print(f" - Head: {type(agent.head).__name__}")
# Mock Data
batch_size = 2
dummy_obs = {
'image': torch.randn(batch_size, 3, 224, 224),
'text': ["pick up apple"] * batch_size
}
dummy_actions = torch.randn(batch_size, 16, 7) # (B, Chunk, Act_Dim)
batch = {
'obs': dummy_obs,
'actions': dummy_actions
}
# Forward Pass
print("\n>>> Running Forward Pass...")
outputs = agent(batch)
loss = outputs['loss']
print(f"✅ Forward successful. Loss: {loss.item():.4f}")
# Backward Pass (Check Autograd Graph)
print("\n>>> Running Backward Pass...")
loss.backward()
# Verify gradients exist in the backbone (proving the chain is intact)
# Note: DebugBackbone needs a dummy parameter to show grad
backbone_has_grad = agent.backbone.dummy_param.grad is not None or \
any(p.grad is not None for p in agent.backbone.parameters())
if backbone_has_grad:
print("✅ Backward successful. Gradients reached Backbone.")
else:
print("❌ Warning: No gradients found in Backbone.")
if __name__ == "__main__":
main()

View File

@@ -1,135 +0,0 @@
import os
import cv2
import torch
import numpy as np
import argparse
from torch.utils.data import DataLoader
from roboimi.vla.data.dataset import VLAChunkedDataset
# 颜色常量 (BGR)
COLOR_TEXT = (255, 255, 255)
COLOR_VALID = (0, 255, 0) # 有效动作显示为绿色
COLOR_PAD = (0, 0, 255) # Padding 动作显示为红色
def render_text_block(canvas_width, text_lines):
"""创建一个显示文本信息的图像块"""
h_per_line = 30
h = len(text_lines) * h_per_line + 20
block = np.zeros((h, canvas_width, 3), dtype=np.uint8)
for i, line in enumerate(text_lines):
cv2.putText(block, line, (10, 30 + i * h_per_line),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, COLOR_TEXT, 1)
return block
def visualize_dataset(data_path: str, output_dir: str):
os.makedirs(output_dir, exist_ok=True)
# 1. 实例化 Dataset (使用你最新的定义)
dataset = VLAChunkedDataset(
data_path=data_path,
pred_horizon=16, # 预测未来 16 步
obs_horizon=2, # 观察过去 2 帧
obs_keys=["top", "angle"] # 你的两个视角
)
# 使用 DataLoader 模拟训练时的读取行为
dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
print(f"[VISUALIZE] 开始生成样本检查图至: {output_dir}")
print(f" - 数据总长: {len(dataset)}")
# 我们抽取开头几个,和末尾几个(检查 Mask 逻辑)
indices_to_check = list(range(0, 5)) + list(range(len(dataset)-5, len(dataset)))
for i, batch in enumerate(dataloader):
# 为了演示,只处理我们感兴趣的索引,或者随机抽取
# 这里为了简单,我们遍历前 10 个和最后 5 个
is_start = i < 5
is_end = i > (len(dataset) - 6)
if not (is_start or is_end):
continue
# --- 数据解包 ---
# Batch size = 1, 取 index 0
obs = batch['obs'] # Dict
qpos = batch['qpos'][0].numpy() # [State_Dim]
actions = batch['actions'][0].numpy() # [Pred_Horizon, Action_Dim]
mask = batch['action_mask'][0].numpy() # [Pred_Horizon]
lang = batch['language'][0] # String
# --- 1. 图像渲染 (obs) ---
# 逻辑:将不同视角的历史帧横向拼接,不同视角纵向拼接
view_blocks = []
for key in dataset.obs_keys:
# tensor: [1, T, C, H, W] -> [T, C, H, W]
imgs_tensor = obs[key][0]
T, C, H, W = imgs_tensor.shape
frame_list = []
for t in range(T):
# [C, H, W] -> [H, W, C] -> numpy
img_np = imgs_tensor[t].permute(1, 2, 0).numpy()
img_np = (img_np * 255).astype(np.uint8)
img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR)
# 标记时间步 (t-1, t-0)
label = f"{key} (t - {T-1-t})"
cv2.putText(img_bgr, label, (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
frame_list.append(img_bgr)
# 横向拼接历史帧
view_blocks.append(np.hstack(frame_list))
# 纵向拼接不同视角
visual_block = np.vstack(view_blocks)
H_vis, W_vis, _ = visual_block.shape
# --- 2. 文本信息渲染 (Language & QPos) ---
info_lines = [
f"Sample Index: {i} {'(TRAJECTORY END)' if is_end else ''}",
f"Language: {lang}",
f"Current QPos (First 6): {np.round(qpos[:6], 3)}"
]
info_block = render_text_block(W_vis, info_lines)
# --- 3. 动作块渲染 (Action Chunk & Mask) ---
# 我们创建一个专门的区域来显示 16 个动作的数值和有效性
action_lines = ["Future Action Chunk (Pred Horizon=16):"]
for t_act in range(len(actions)):
# 检查 Mask
is_valid = mask[t_act] > 0.5
status = "[VALID]" if is_valid else "[PAD] "
vals = np.round(actions[t_act][:6], 3) # 只显示前6维
line = f" t+{t_act:02d} {status} {vals}"
action_lines.append(line)
# 动态改变颜色有点复杂,这里用简单的文本块,但在上面画色条
action_block = render_text_block(W_vis, action_lines)
# 给 Action Block 加颜色标记
# 简单处理:如果是 PAD在文字左侧画红条VALID 画绿条
line_h = 30
start_y = 50 # 文本起始偏移
for t_act in range(len(actions)):
is_valid = mask[t_act] > 0.5
color = COLOR_VALID if is_valid else COLOR_PAD
# 画一个小矩形指示器
cv2.rectangle(action_block, (0, start_y + t_act*line_h - 20), (5, start_y + t_act*line_h - 5), color, -1)
# --- 4. 最终合成 ---
final_img = np.vstack([info_block, visual_block, action_block])
save_path = os.path.join(output_dir, f"check_{i:04d}.png")
cv2.imwrite(save_path, final_img)
print(f"\n[SUCCESS] 可视化完成。请重点检查 {output_dir} 中的最后几张图 (Mask 是否变红)。")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--data", type=str, default="roboimi/demos/dataset/sim_transfer/episode_0.hdf5", help="数据路径")
parser.add_argument("--out", type=str, default="vla_debug_vis", help="输出目录")
args = parser.parse_args()
visualize_dataset(args.data, args.out)

View File

@@ -1,89 +0,0 @@
import h5py
import cv2
import numpy as np
import argparse
import os
from tqdm import tqdm
def visualize_episode(hdf5_path: str, output_path: str, fps: int = 30):
"""
将单个 episode_x.hdf5 转换为带有遥测数据叠加的可视化视频。
"""
if not os.path.exists(hdf5_path):
print(f"错误: 找不到文件 {hdf5_path}")
return
# 如果 output_path 是目录,则自动生成文件名
if os.path.isdir(output_path) or not output_path.endswith('.mp4'):
os.makedirs(output_path, exist_ok=True)
base_name = os.path.splitext(os.path.basename(hdf5_path))[0]
output_path = os.path.join(output_path, f"{base_name}.mp4")
else:
# 确保输出目录存在
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
with h5py.File(hdf5_path, 'r') as f:
# 获取基础数据
images_grp = f['observations/images']
qpos = f['observations/qpos'][:]
actions = f['action'][:]
# 获取视角列表
views = list(images_grp.keys()) # ['angle', 'r_vis', 'top']
num_steps = images_grp[views[0]].shape[0]
# 视频参数设置
# 我们将三个视角横向拼接: (H, W*3, 3)
h, w, _ = images_grp[views[0]][0].shape
out_w = w * len(views)
out_h = h + 150 # 底部留出 150 像素显示数据文字
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (out_w, out_h))
print(f"正在处理 {num_steps} 帧数据...")
for t in tqdm(range(num_steps)):
# 1. 拼接视角图像
frame_views = []
for view_name in views:
img = images_grp[view_name][t]
# HDF5 通常存为 RGBOpenCV 需要 BGR
img_bgr = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
# 在图像左上角标记视角名称
cv2.putText(img_bgr, view_name, (20, 40),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
frame_views.append(img_bgr)
combined_img = np.hstack(frame_views)
# 2. 创建底部信息栏
info_bar = np.zeros((150, out_w, 3), dtype=np.uint8)
# 3. 渲染数据文字 (qpos 和 action)
# 我们展示前 7 维作为代表(通常是臂的 6 自由度 + 夹持器)
qpos_str = "qpos (0-6): " + " ".join([f"{x:.2f}" for x in qpos[t][:7]])
act_str = "action(0-6): " + " ".join([f"{x:.2f}" for x in actions[t][:7]])
cv2.putText(info_bar, qpos_str, (20, 50),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(info_bar, act_str, (20, 100),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(info_bar, f"Step: {t}/{num_steps}", (out_w - 200, 75),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (150, 150, 150), 2)
# 4. 合并图像与信息栏
final_frame = np.vstack([combined_img, info_bar])
video_writer.write(final_frame)
video_writer.release()
print(f"\n[SUCCESS] 可视化视频已保存至: {output_path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="可视化单个 Episode HDF5 文件")
parser.add_argument("--input", type=str, required=True, help="输入 hdf5 路径")
parser.add_argument("--output", type=str, default="debug_episode.mp4", help="输出视频路径")
args = parser.parse_args()
visualize_episode(args.input, args.output)