chore: 添加测试文件

- check_all_episodes.py:检查各个episode是否有重复帧。
- check_specific_frames.py:检查前几帧是否位于正确初始位置。
- generate_dataset_videos.py:根据hdf5生成视频
This commit is contained in:
gouhanke
2026-02-26 13:59:47 +08:00
parent 3deeffb9fe
commit 40c40695dd
3 changed files with 617 additions and 0 deletions

324
generate_dataset_videos.py Normal file
View File

@@ -0,0 +1,324 @@
#!/usr/bin/env python3
"""
将 HDF5 数据集转换为视频,用于可视化检查
功能:
1. 将单个 episode 转换为视频
2. 对比多个 episode 的视频
3. 放慢播放速度便于观察
"""
import os
import h5py
import glob
import cv2
import numpy as np
def episode_to_video(episode_file, output_path, camera='top', fps=30, slow_factor=1):
"""
将单个 episode 转换为视频
Args:
episode_file: HDF5 文件路径
output_path: 输出视频路径
camera: 要使用的相机名称
fps: 帧率
slow_factor: 慢放倍数1=正常2=半速)
"""
try:
with h5py.File(episode_file, 'r') as f:
# 读取图像序列
img_path = f'/observations/images/{camera}'
if img_path not in f:
print(f" ❌ 相机 {camera} 不存在")
return False
images = f[img_path][:] # shape: (T, H, W, C)
qpos = f['/observations/qpos'][:]
actions = f['/action'][:]
total_frames = len(images)
height, width = images.shape[1], images.shape[2]
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
actual_fps = fps // slow_factor
out = cv2.VideoWriter(output_path, fourcc, actual_fps, (width, height))
# 逐帧写入
for i in range(total_frames):
frame = images[i].astype(np.uint8)
# 在图像上添加信息
info_text = [
f"Episode: {os.path.basename(episode_file).replace('.hdf5', '')}",
f"Frame: {i}/{total_frames}",
f"qpos[0:3]: [{qpos[i, 0]:.2f}, {qpos[i, 1]:.2f}, {qpos[i, 2]:.2f}]",
]
for j, text in enumerate(info_text):
cv2.putText(frame, text, (10, 30 + j*30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
out.write(frame)
out.release()
print(f" ✅ 保存: {output_path}")
print(f" 帧数: {total_frames}, 尺寸: {width}x{height}, FPS: {actual_fps}")
return True
except Exception as e:
print(f" ❌ 错误: {e}")
return False
def generate_all_videos(camera='top', num_episodes=5, slow_factor=1):
"""生成前 N 个 episode 的视频"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
if len(episode_files) == 0:
print(f"❌ 没有找到数据文件: {dataset_dir}")
return
# 创建输出目录
output_dir = '/tmp/dataset_videos'
os.makedirs(output_dir, exist_ok=True)
print(f"找到 {len(episode_files)} 个 episode 文件")
print(f"将生成前 {min(num_episodes, len(episode_files))} 个 episode 的视频\n")
# 生成视频
for i in range(min(num_episodes, len(episode_files))):
ep_file = episode_files[i]
ep_name = os.path.basename(ep_file).replace('.hdf5', '')
output_path = f"{output_dir}/{ep_name}_{camera}.mp4"
print(f"[{i+1}/{min(num_episodes, len(episode_files))}] {ep_name}")
episode_to_video(ep_file, output_path, camera=camera, slow_factor=slow_factor)
print()
print(f"✅ 所有视频已保存到: {output_dir}")
print(f"\n播放方法:")
print(f" # 播放单个视频")
print(f" vlc {output_dir}/*.mp4")
print(f" ")
print(f" # 或用文件管理器")
print(f" nautilus {output_dir}")
def generate_multi_camera_video(episode_idx=0, slow_factor=1):
"""生成包含多个相机的视频(分屏显示)"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
if episode_idx >= len(episode_files):
print(f"❌ Episode {episode_idx} 不存在")
return
ep_file = episode_files[episode_idx]
try:
with h5py.File(ep_file, 'r') as f:
# 获取所有相机
cameras = []
for key in f.keys():
if 'images' in key:
for cam_name in f[key].keys():
if cam_name not in cameras:
cameras.append(cam_name)
print(f"Episode {episode_idx} 的相机: {cameras}")
# 读取所有相机的图像
all_images = {}
for cam in cameras:
img_path = f'/observations/images/{cam}'
if img_path in f:
all_images[cam] = f[img_path][:]
if not all_images:
print("❌ 没有找到图像数据")
return
# 获取第一个相机的尺寸
first_cam = list(all_images.keys())[0]
total_frames = len(all_images[first_cam])
height, width = all_images[first_cam].shape[1], all_images[first_cam].shape[2]
# 创建多相机布局
num_cams = len(all_images)
cols = min(2, num_cams)
rows = (num_cams + cols - 1) // cols
canvas_width = width * cols
canvas_height = height * rows
# 创建视频写入器
output_path = f'/tmp/dataset_videos/episode_{episode_idx}_all_cameras.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height))
# 逐帧合成
for i in range(total_frames):
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)
for cam_idx, cam_name in enumerate(all_images.keys()):
img = all_images[cam_name][i]
# 计算在画布上的位置
row = cam_idx // cols
col = cam_idx % cols
y_start = row * height
y_end = y_start + height
x_start = col * width
x_end = x_start + width
# 调整大小(如果需要)
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
# 放到画布上
canvas[y_start:y_end, x_start:x_end] = img
# 添加相机名称
cv2.putText(canvas, cam_name, (x_start + 10, y_start + 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
# 添加帧信息
cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
out.write(canvas)
out.release()
print(f"✅ 保存多相机视频: {output_path}")
except Exception as e:
print(f"❌ 错误: {e}")
def compare_episodes(camera='top', slow_factor=2):
"""并排对比多个 episode 的视频"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
# 选择要对比的 episode
episodes_to_compare = [0, 1, 2, 3, 4] # 对比前 5 个
print(f"对比 Episodes: {episodes_to_compare}")
# 读取所有 episode 的数据
all_data = []
for ep_idx in episodes_to_compare:
if ep_idx >= len(episode_files):
continue
try:
with h5py.File(episode_files[ep_idx], 'r') as f:
img_path = f'/observations/images/{camera}'
if img_path in f:
all_data.append({
'idx': ep_idx,
'images': f[img_path][:],
'qpos': f['/observations/qpos'][:]
})
except:
pass
if len(all_data) == 0:
print("❌ 没有数据")
return
# 获取参数
first_data = all_data[0]
height, width = first_data['images'].shape[1], first_data['images'].shape[2]
total_frames = min([d['images'].shape[0] for d in all_data])
# 创建并排布局
num_compare = len(all_data)
canvas_width = width * num_compare
canvas_height = height
# 创建视频
output_path = f'/tmp/dataset_videos/compare_{camera}.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height))
print(f"生成对比视频,共 {total_frames} 帧...")
# 逐帧对比
for i in range(total_frames):
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)
for j, data in enumerate(all_data):
img = data['images'][i]
qpos = data['qpos'][i]
# 调整大小(如果需要)
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
# 放到画布上
x_start = j * width
x_end = x_start + width
canvas[:, x_start:x_end] = img
# 添加信息
ep_name = f"Ep {data['idx']}"
cv2.putText(canvas, ep_name, (x_start + 10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.putText(canvas, f"qpos[0:3]: [{qpos[0]:.2f}, {qpos[1]:.2f}, {qpos[2]:.2f}]",
(x_start + 10, height - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# 添加帧号
cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
out.write(canvas)
if i % 100 == 0:
print(f" 进度: {i}/{total_frames}")
out.release()
print(f"✅ 保存对比视频: {output_path}")
if __name__ == "__main__":
import sys
print("="*60)
print("数据集视频生成工具")
print("="*60)
if len(sys.argv) > 1:
command = sys.argv[1]
if command == 'compare':
# 对比多个 episode
camera = sys.argv[2] if len(sys.argv) > 2 else 'top'
compare_episodes(camera=camera, slow_factor=2)
elif command == 'multi':
# 多相机视频
ep_idx = int(sys.argv[2]) if len(sys.argv) > 2 else 0
generate_multi_camera_video(episode_idx=ep_idx, slow_factor=1)
else:
print("未知命令")
else:
# 默认:生成前 5 个 episode 的视频
print("\n生成前 5 个 episode 的视频top 相机,慢放 2x...")
print("="*60 + "\n")
generate_all_videos(camera='top', num_episodes=5, slow_factor=2)
print("\n" + "="*60)
print("其他用法:")
print(" python generate_dataset_videos.py compare top # 对比多个 episode")
print(" python generate_dataset_videos.py multi 0 # 多相机视频")
print("="*60)