Files
roboimi/generate_dataset_videos.py
gouhanke 40c40695dd chore: 添加测试文件
- check_all_episodes.py:检查各个episode是否有重复帧。
- check_specific_frames.py:检查前几帧是否位于正确初始位置。
- generate_dataset_videos.py:根据hdf5生成视频
2026-02-26 13:59:47 +08:00

325 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
将 HDF5 数据集转换为视频,用于可视化检查
功能:
1. 将单个 episode 转换为视频
2. 对比多个 episode 的视频
3. 放慢播放速度便于观察
"""
import os
import h5py
import glob
import cv2
import numpy as np
def episode_to_video(episode_file, output_path, camera='top', fps=30, slow_factor=1):
"""
将单个 episode 转换为视频
Args:
episode_file: HDF5 文件路径
output_path: 输出视频路径
camera: 要使用的相机名称
fps: 帧率
slow_factor: 慢放倍数1=正常2=半速)
"""
try:
with h5py.File(episode_file, 'r') as f:
# 读取图像序列
img_path = f'/observations/images/{camera}'
if img_path not in f:
print(f" ❌ 相机 {camera} 不存在")
return False
images = f[img_path][:] # shape: (T, H, W, C)
qpos = f['/observations/qpos'][:]
actions = f['/action'][:]
total_frames = len(images)
height, width = images.shape[1], images.shape[2]
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
actual_fps = fps // slow_factor
out = cv2.VideoWriter(output_path, fourcc, actual_fps, (width, height))
# 逐帧写入
for i in range(total_frames):
frame = images[i].astype(np.uint8)
# 在图像上添加信息
info_text = [
f"Episode: {os.path.basename(episode_file).replace('.hdf5', '')}",
f"Frame: {i}/{total_frames}",
f"qpos[0:3]: [{qpos[i, 0]:.2f}, {qpos[i, 1]:.2f}, {qpos[i, 2]:.2f}]",
]
for j, text in enumerate(info_text):
cv2.putText(frame, text, (10, 30 + j*30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
out.write(frame)
out.release()
print(f" ✅ 保存: {output_path}")
print(f" 帧数: {total_frames}, 尺寸: {width}x{height}, FPS: {actual_fps}")
return True
except Exception as e:
print(f" ❌ 错误: {e}")
return False
def generate_all_videos(camera='top', num_episodes=5, slow_factor=1):
"""生成前 N 个 episode 的视频"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
if len(episode_files) == 0:
print(f"❌ 没有找到数据文件: {dataset_dir}")
return
# 创建输出目录
output_dir = '/tmp/dataset_videos'
os.makedirs(output_dir, exist_ok=True)
print(f"找到 {len(episode_files)} 个 episode 文件")
print(f"将生成前 {min(num_episodes, len(episode_files))} 个 episode 的视频\n")
# 生成视频
for i in range(min(num_episodes, len(episode_files))):
ep_file = episode_files[i]
ep_name = os.path.basename(ep_file).replace('.hdf5', '')
output_path = f"{output_dir}/{ep_name}_{camera}.mp4"
print(f"[{i+1}/{min(num_episodes, len(episode_files))}] {ep_name}")
episode_to_video(ep_file, output_path, camera=camera, slow_factor=slow_factor)
print()
print(f"✅ 所有视频已保存到: {output_dir}")
print(f"\n播放方法:")
print(f" # 播放单个视频")
print(f" vlc {output_dir}/*.mp4")
print(f" ")
print(f" # 或用文件管理器")
print(f" nautilus {output_dir}")
def generate_multi_camera_video(episode_idx=0, slow_factor=1):
"""生成包含多个相机的视频(分屏显示)"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
if episode_idx >= len(episode_files):
print(f"❌ Episode {episode_idx} 不存在")
return
ep_file = episode_files[episode_idx]
try:
with h5py.File(ep_file, 'r') as f:
# 获取所有相机
cameras = []
for key in f.keys():
if 'images' in key:
for cam_name in f[key].keys():
if cam_name not in cameras:
cameras.append(cam_name)
print(f"Episode {episode_idx} 的相机: {cameras}")
# 读取所有相机的图像
all_images = {}
for cam in cameras:
img_path = f'/observations/images/{cam}'
if img_path in f:
all_images[cam] = f[img_path][:]
if not all_images:
print("❌ 没有找到图像数据")
return
# 获取第一个相机的尺寸
first_cam = list(all_images.keys())[0]
total_frames = len(all_images[first_cam])
height, width = all_images[first_cam].shape[1], all_images[first_cam].shape[2]
# 创建多相机布局
num_cams = len(all_images)
cols = min(2, num_cams)
rows = (num_cams + cols - 1) // cols
canvas_width = width * cols
canvas_height = height * rows
# 创建视频写入器
output_path = f'/tmp/dataset_videos/episode_{episode_idx}_all_cameras.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height))
# 逐帧合成
for i in range(total_frames):
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)
for cam_idx, cam_name in enumerate(all_images.keys()):
img = all_images[cam_name][i]
# 计算在画布上的位置
row = cam_idx // cols
col = cam_idx % cols
y_start = row * height
y_end = y_start + height
x_start = col * width
x_end = x_start + width
# 调整大小(如果需要)
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
# 放到画布上
canvas[y_start:y_end, x_start:x_end] = img
# 添加相机名称
cv2.putText(canvas, cam_name, (x_start + 10, y_start + 30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
# 添加帧信息
cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
out.write(canvas)
out.release()
print(f"✅ 保存多相机视频: {output_path}")
except Exception as e:
print(f"❌ 错误: {e}")
def compare_episodes(camera='top', slow_factor=2):
"""并排对比多个 episode 的视频"""
dataset_dir = "roboimi/demos/dataset/sim_transfer"
episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5")))
# 选择要对比的 episode
episodes_to_compare = [0, 1, 2, 3, 4] # 对比前 5 个
print(f"对比 Episodes: {episodes_to_compare}")
# 读取所有 episode 的数据
all_data = []
for ep_idx in episodes_to_compare:
if ep_idx >= len(episode_files):
continue
try:
with h5py.File(episode_files[ep_idx], 'r') as f:
img_path = f'/observations/images/{camera}'
if img_path in f:
all_data.append({
'idx': ep_idx,
'images': f[img_path][:],
'qpos': f['/observations/qpos'][:]
})
except:
pass
if len(all_data) == 0:
print("❌ 没有数据")
return
# 获取参数
first_data = all_data[0]
height, width = first_data['images'].shape[1], first_data['images'].shape[2]
total_frames = min([d['images'].shape[0] for d in all_data])
# 创建并排布局
num_compare = len(all_data)
canvas_width = width * num_compare
canvas_height = height
# 创建视频
output_path = f'/tmp/dataset_videos/compare_{camera}.mp4'
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height))
print(f"生成对比视频,共 {total_frames} 帧...")
# 逐帧对比
for i in range(total_frames):
canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8)
for j, data in enumerate(all_data):
img = data['images'][i]
qpos = data['qpos'][i]
# 调整大小(如果需要)
if img.shape[:2] != (height, width):
img = cv2.resize(img, (width, height))
# 放到画布上
x_start = j * width
x_end = x_start + width
canvas[:, x_start:x_end] = img
# 添加信息
ep_name = f"Ep {data['idx']}"
cv2.putText(canvas, ep_name, (x_start + 10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
cv2.putText(canvas, f"qpos[0:3]: [{qpos[0]:.2f}, {qpos[1]:.2f}, {qpos[2]:.2f}]",
(x_start + 10, height - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# 添加帧号
cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
out.write(canvas)
if i % 100 == 0:
print(f" 进度: {i}/{total_frames}")
out.release()
print(f"✅ 保存对比视频: {output_path}")
if __name__ == "__main__":
import sys
print("="*60)
print("数据集视频生成工具")
print("="*60)
if len(sys.argv) > 1:
command = sys.argv[1]
if command == 'compare':
# 对比多个 episode
camera = sys.argv[2] if len(sys.argv) > 2 else 'top'
compare_episodes(camera=camera, slow_factor=2)
elif command == 'multi':
# 多相机视频
ep_idx = int(sys.argv[2]) if len(sys.argv) > 2 else 0
generate_multi_camera_video(episode_idx=ep_idx, slow_factor=1)
else:
print("未知命令")
else:
# 默认:生成前 5 个 episode 的视频
print("\n生成前 5 个 episode 的视频top 相机,慢放 2x...")
print("="*60 + "\n")
generate_all_videos(camera='top', num_episodes=5, slow_factor=2)
print("\n" + "="*60)
print("其他用法:")
print(" python generate_dataset_videos.py compare top # 对比多个 episode")
print(" python generate_dataset_videos.py multi 0 # 多相机视频")
print("="*60)