posedet/motion_app_qt.py
2025-06-22 16:51:48 +08:00

146 lines
5.6 KiB
Python

import cv2
import time
import os
import numpy as np
import torch
import threading
import queue
from rtmlib import Body, draw_skeleton
from audio_player import AudioPlayer
from pose_analyzer import PoseSimilarityAnalyzer
from config import REALSENSE_AVAILABLE
if REALSENSE_AVAILABLE:
import pyrealsense2 as rs
class MotionComparisonAppQt:
"""Qt version of the motion comparison application."""
def __init__(self):
self.body_detector = None
self.is_running = False
self.standard_video_path = None
self.webcam_cap = None
self.standard_cap = None
self.similarity_analyzer = PoseSimilarityAnalyzer()
self.frame_counter = 0
self.audio_player = AudioPlayer()
self.display_settings = {'resolution_mode': 'medium', 'target_width': 960, 'target_height': 720}
self.realsense_pipeline = None
self.is_realsense_active = False
self.last_error_time = 0
self.error_count = 0
# Async processing components
self.pose_data_queue = queue.Queue(maxsize=50)
self.similarity_thread = None
self.similarity_stop_flag = threading.Event()
def get_display_resolution(self):
modes = {'high': (1280, 800), 'medium': (960, 720), 'low': (640, 480)}
mode = self.display_settings.get('resolution_mode', 'medium')
return modes.get(mode, (960, 720))
def initialize_detector(self):
if self.body_detector is None:
try:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
self.body_detector = Body(mode='lightweight', to_openpose=True, backend='onnxruntime', device=device)
print(f"Keypoint detector initialized on device: {device}")
return True
except Exception as e:
print(f"Detector initialization failed: {e}")
return False
return True
def initialize_camera(self):
if REALSENSE_AVAILABLE:
try:
self.realsense_pipeline = rs.pipeline()
config = rs.config()
width, height = self.get_display_resolution()
config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, 30)
profile = self.realsense_pipeline.start(config)
device = profile.get_device().get_info(rs.camera_info.name)
print(f"✅ RealSense摄像头初始化成功: {device} ({width}x{height})")
self.is_realsense_active = True
return True
except Exception as e:
print(f"RealSense初始化失败: {e}. 切换到USB摄像头.")
return self._initialize_webcam()
else:
return self._initialize_webcam()
def _initialize_webcam(self):
try:
self.webcam_cap = cv2.VideoCapture(0)
if self.webcam_cap.isOpened():
self.webcam_cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
width, height = self.get_display_resolution()
self.webcam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
self.webcam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.webcam_cap.set(cv2.CAP_PROP_FPS, 30)
actual_w = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_h = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"✅ USB摄像头初始化成功 ({actual_w}x{actual_h})")
return True
else:
print("❌ 无法打开USB摄像头")
return False
except Exception as e:
print(f"❌ USB摄像头初始化失败: {e}")
return False
def initialize_all(self):
"""Initialize both detector and camera."""
detector_success = self.initialize_detector()
camera_success = self.initialize_camera()
return detector_success and camera_success
def read_camera_frame(self):
if self.is_realsense_active and self.realsense_pipeline:
try:
frames = self.realsense_pipeline.wait_for_frames(timeout_ms=1000)
color_frame = frames.get_color_frame()
if not color_frame:
return False, None
return True, np.asanyarray(color_frame.get_data())
except Exception:
return False, None
elif self.webcam_cap and self.webcam_cap.isOpened():
return self.webcam_cap.read()
return False, None
def get_camera_preview_frame(self):
ret, frame = self.read_camera_frame()
if not ret or frame is None:
return None
frame = cv2.flip(frame, 1)
if self.body_detector:
try:
keypoints, scores = self.body_detector(frame)
frame = draw_skeleton(frame.copy(), keypoints, scores, openpose_skeleton=True, kpt_thr=0.43)
except Exception:
pass
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
def cleanup(self):
"""Cleans up all resources."""
# Stop similarity calculation thread
if self.similarity_thread and self.similarity_thread.is_alive():
self.similarity_stop_flag.set()
self.similarity_thread.join(timeout=2)
if self.standard_cap:
self.standard_cap.release()
if self.webcam_cap:
self.webcam_cap.release()
if self.is_realsense_active and self.realsense_pipeline:
self.realsense_pipeline.stop()
self.audio_player.cleanup()
self.is_running = False