#!/usr/bin/env python3 """ 将 HDF5 数据集转换为视频,用于可视化检查 功能: 1. 将单个 episode 转换为视频 2. 对比多个 episode 的视频 3. 放慢播放速度便于观察 """ import os import h5py import glob import cv2 import numpy as np def episode_to_video(episode_file, output_path, camera='top', fps=30, slow_factor=1): """ 将单个 episode 转换为视频 Args: episode_file: HDF5 文件路径 output_path: 输出视频路径 camera: 要使用的相机名称 fps: 帧率 slow_factor: 慢放倍数(1=正常,2=半速) """ try: with h5py.File(episode_file, 'r') as f: # 读取图像序列 img_path = f'/observations/images/{camera}' if img_path not in f: print(f" ❌ 相机 {camera} 不存在") return False images = f[img_path][:] # shape: (T, H, W, C) qpos = f['/observations/qpos'][:] actions = f['/action'][:] total_frames = len(images) height, width = images.shape[1], images.shape[2] # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') actual_fps = fps // slow_factor out = cv2.VideoWriter(output_path, fourcc, actual_fps, (width, height)) # 逐帧写入 for i in range(total_frames): frame = images[i].astype(np.uint8) # 在图像上添加信息 info_text = [ f"Episode: {os.path.basename(episode_file).replace('.hdf5', '')}", f"Frame: {i}/{total_frames}", f"qpos[0:3]: [{qpos[i, 0]:.2f}, {qpos[i, 1]:.2f}, {qpos[i, 2]:.2f}]", ] for j, text in enumerate(info_text): cv2.putText(frame, text, (10, 30 + j*30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) out.write(frame) out.release() print(f" ✅ 保存: {output_path}") print(f" 帧数: {total_frames}, 尺寸: {width}x{height}, FPS: {actual_fps}") return True except Exception as e: print(f" ❌ 错误: {e}") return False def generate_all_videos(camera='top', num_episodes=5, slow_factor=1): """生成前 N 个 episode 的视频""" dataset_dir = "roboimi/demos/dataset/sim_transfer" episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5"))) if len(episode_files) == 0: print(f"❌ 没有找到数据文件: {dataset_dir}") return # 创建输出目录 output_dir = '/tmp/dataset_videos' os.makedirs(output_dir, exist_ok=True) print(f"找到 {len(episode_files)} 个 episode 文件") print(f"将生成前 {min(num_episodes, len(episode_files))} 个 episode 的视频\n") # 生成视频 for i in range(min(num_episodes, len(episode_files))): ep_file = episode_files[i] ep_name = os.path.basename(ep_file).replace('.hdf5', '') output_path = f"{output_dir}/{ep_name}_{camera}.mp4" print(f"[{i+1}/{min(num_episodes, len(episode_files))}] {ep_name}") episode_to_video(ep_file, output_path, camera=camera, slow_factor=slow_factor) print() print(f"✅ 所有视频已保存到: {output_dir}") print(f"\n播放方法:") print(f" # 播放单个视频") print(f" vlc {output_dir}/*.mp4") print(f" ") print(f" # 或用文件管理器") print(f" nautilus {output_dir}") def generate_multi_camera_video(episode_idx=0, slow_factor=1): """生成包含多个相机的视频(分屏显示)""" dataset_dir = "roboimi/demos/dataset/sim_transfer" episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5"))) if episode_idx >= len(episode_files): print(f"❌ Episode {episode_idx} 不存在") return ep_file = episode_files[episode_idx] try: with h5py.File(ep_file, 'r') as f: # 获取所有相机 cameras = [] for key in f.keys(): if 'images' in key: for cam_name in f[key].keys(): if cam_name not in cameras: cameras.append(cam_name) print(f"Episode {episode_idx} 的相机: {cameras}") # 读取所有相机的图像 all_images = {} for cam in cameras: img_path = f'/observations/images/{cam}' if img_path in f: all_images[cam] = f[img_path][:] if not all_images: print("❌ 没有找到图像数据") return # 获取第一个相机的尺寸 first_cam = list(all_images.keys())[0] total_frames = len(all_images[first_cam]) height, width = all_images[first_cam].shape[1], all_images[first_cam].shape[2] # 创建多相机布局 num_cams = len(all_images) cols = min(2, num_cams) rows = (num_cams + cols - 1) // cols canvas_width = width * cols canvas_height = height * rows # 创建视频写入器 output_path = f'/tmp/dataset_videos/episode_{episode_idx}_all_cameras.mp4' fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height)) # 逐帧合成 for i in range(total_frames): canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8) for cam_idx, cam_name in enumerate(all_images.keys()): img = all_images[cam_name][i] # 计算在画布上的位置 row = cam_idx // cols col = cam_idx % cols y_start = row * height y_end = y_start + height x_start = col * width x_end = x_start + width # 调整大小(如果需要) if img.shape[:2] != (height, width): img = cv2.resize(img, (width, height)) # 放到画布上 canvas[y_start:y_end, x_start:x_end] = img # 添加相机名称 cv2.putText(canvas, cam_name, (x_start + 10, y_start + 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2) # 添加帧信息 cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) out.write(canvas) out.release() print(f"✅ 保存多相机视频: {output_path}") except Exception as e: print(f"❌ 错误: {e}") def compare_episodes(camera='top', slow_factor=2): """并排对比多个 episode 的视频""" dataset_dir = "roboimi/demos/dataset/sim_transfer" episode_files = sorted(glob.glob(os.path.join(dataset_dir, "episode_*.hdf5"))) # 选择要对比的 episode episodes_to_compare = [0, 1, 2, 3, 4] # 对比前 5 个 print(f"对比 Episodes: {episodes_to_compare}") # 读取所有 episode 的数据 all_data = [] for ep_idx in episodes_to_compare: if ep_idx >= len(episode_files): continue try: with h5py.File(episode_files[ep_idx], 'r') as f: img_path = f'/observations/images/{camera}' if img_path in f: all_data.append({ 'idx': ep_idx, 'images': f[img_path][:], 'qpos': f['/observations/qpos'][:] }) except: pass if len(all_data) == 0: print("❌ 没有数据") return # 获取参数 first_data = all_data[0] height, width = first_data['images'].shape[1], first_data['images'].shape[2] total_frames = min([d['images'].shape[0] for d in all_data]) # 创建并排布局 num_compare = len(all_data) canvas_width = width * num_compare canvas_height = height # 创建视频 output_path = f'/tmp/dataset_videos/compare_{camera}.mp4' fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, 30 // slow_factor, (canvas_width, canvas_height)) print(f"生成对比视频,共 {total_frames} 帧...") # 逐帧对比 for i in range(total_frames): canvas = np.zeros((canvas_height, canvas_width, 3), dtype=np.uint8) for j, data in enumerate(all_data): img = data['images'][i] qpos = data['qpos'][i] # 调整大小(如果需要) if img.shape[:2] != (height, width): img = cv2.resize(img, (width, height)) # 放到画布上 x_start = j * width x_end = x_start + width canvas[:, x_start:x_end] = img # 添加信息 ep_name = f"Ep {data['idx']}" cv2.putText(canvas, ep_name, (x_start + 10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) cv2.putText(canvas, f"qpos[0:3]: [{qpos[0]:.2f}, {qpos[1]:.2f}, {qpos[2]:.2f}]", (x_start + 10, height - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) # 添加帧号 cv2.putText(canvas, f"Frame: {i}/{total_frames}", (10, canvas_height - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) out.write(canvas) if i % 100 == 0: print(f" 进度: {i}/{total_frames}") out.release() print(f"✅ 保存对比视频: {output_path}") if __name__ == "__main__": import sys print("="*60) print("数据集视频生成工具") print("="*60) if len(sys.argv) > 1: command = sys.argv[1] if command == 'compare': # 对比多个 episode camera = sys.argv[2] if len(sys.argv) > 2 else 'top' compare_episodes(camera=camera, slow_factor=2) elif command == 'multi': # 多相机视频 ep_idx = int(sys.argv[2]) if len(sys.argv) > 2 else 0 generate_multi_camera_video(episode_idx=ep_idx, slow_factor=1) else: print("未知命令") else: # 默认:生成前 5 个 episode 的视频 print("\n生成前 5 个 episode 的视频(top 相机,慢放 2x)...") print("="*60 + "\n") generate_all_videos(camera='top', num_episodes=5, slow_factor=2) print("\n" + "="*60) print("其他用法:") print(" python generate_dataset_videos.py compare top # 对比多个 episode") print(" python generate_dataset_videos.py multi 0 # 多相机视频") print("="*60)