import cv2 import time import os import numpy as np import torch import threading import queue from rtmlib import Body, draw_skeleton from audio_player import AudioPlayer from pose_analyzer import PoseSimilarityAnalyzer from config import REALSENSE_AVAILABLE if REALSENSE_AVAILABLE: import pyrealsense2 as rs class MotionComparisonAppQt: """Qt version of the motion comparison application.""" def __init__(self): self.body_detector = None self.is_running = False self.standard_video_path = None self.webcam_cap = None self.standard_cap = None self.similarity_analyzer = PoseSimilarityAnalyzer() self.frame_counter = 0 self.audio_player = AudioPlayer() self.display_settings = {'resolution_mode': 'medium', 'target_width': 960, 'target_height': 720} self.realsense_pipeline = None self.is_realsense_active = False self.last_error_time = 0 self.error_count = 0 # Async processing components self.pose_data_queue = queue.Queue(maxsize=50) self.similarity_thread = None self.similarity_stop_flag = threading.Event() def get_display_resolution(self): modes = {'high': (1280, 800), 'medium': (960, 720), 'low': (640, 480)} mode = self.display_settings.get('resolution_mode', 'medium') return modes.get(mode, (960, 720)) def initialize_detector(self): if self.body_detector is None: try: device = 'cuda' if torch.cuda.is_available() else 'cpu' self.body_detector = Body(mode='lightweight', to_openpose=True, backend='onnxruntime', device=device) print(f"Keypoint detector initialized on device: {device}") return True except Exception as e: print(f"Detector initialization failed: {e}") return False return True def initialize_camera(self): if REALSENSE_AVAILABLE: try: self.realsense_pipeline = rs.pipeline() config = rs.config() width, height = self.get_display_resolution() config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, 30) profile = self.realsense_pipeline.start(config) device = profile.get_device().get_info(rs.camera_info.name) print(f"✅ RealSense摄像头初始化成功: {device} ({width}x{height})") self.is_realsense_active = True return True except Exception as e: print(f"RealSense初始化失败: {e}. 切换到USB摄像头.") return self._initialize_webcam() else: return self._initialize_webcam() def _initialize_webcam(self): try: self.webcam_cap = cv2.VideoCapture(0) if self.webcam_cap.isOpened(): self.webcam_cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) width, height = self.get_display_resolution() self.webcam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, width) self.webcam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height) self.webcam_cap.set(cv2.CAP_PROP_FPS, 30) actual_w = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_h = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) print(f"✅ USB摄像头初始化成功 ({actual_w}x{actual_h})") return True else: print("❌ 无法打开USB摄像头") return False except Exception as e: print(f"❌ USB摄像头初始化失败: {e}") return False def initialize_all(self): """Initialize both detector and camera.""" detector_success = self.initialize_detector() camera_success = self.initialize_camera() return detector_success and camera_success def read_camera_frame(self): if self.is_realsense_active and self.realsense_pipeline: try: frames = self.realsense_pipeline.wait_for_frames(timeout_ms=1000) color_frame = frames.get_color_frame() if not color_frame: return False, None return True, np.asanyarray(color_frame.get_data()) except Exception: return False, None elif self.webcam_cap and self.webcam_cap.isOpened(): return self.webcam_cap.read() return False, None def get_camera_preview_frame(self): ret, frame = self.read_camera_frame() if not ret or frame is None: return None frame = cv2.flip(frame, 1) if self.body_detector: try: keypoints, scores = self.body_detector(frame) frame = draw_skeleton(frame.copy(), keypoints, scores, openpose_skeleton=True, kpt_thr=0.43) except Exception: pass return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) def cleanup(self): """Cleans up all resources.""" # Stop similarity calculation thread if self.similarity_thread and self.similarity_thread.is_alive(): self.similarity_stop_flag.set() self.similarity_thread.join(timeout=2) if self.standard_cap: self.standard_cap.release() if self.webcam_cap: self.webcam_cap.release() if self.is_realsense_active and self.realsense_pipeline: self.realsense_pipeline.stop() self.audio_player.cleanup() self.is_running = False