146 lines
5.6 KiB
Python
146 lines
5.6 KiB
Python
import cv2
|
|
import time
|
|
import os
|
|
import numpy as np
|
|
import torch
|
|
import threading
|
|
import queue
|
|
from rtmlib import Body, draw_skeleton
|
|
|
|
from audio_player import AudioPlayer
|
|
from pose_analyzer import PoseSimilarityAnalyzer
|
|
from config import REALSENSE_AVAILABLE
|
|
|
|
if REALSENSE_AVAILABLE:
|
|
import pyrealsense2 as rs
|
|
|
|
class MotionComparisonAppQt:
|
|
"""Qt version of the motion comparison application."""
|
|
|
|
def __init__(self):
|
|
self.body_detector = None
|
|
self.is_running = False
|
|
self.standard_video_path = None
|
|
self.webcam_cap = None
|
|
self.standard_cap = None
|
|
self.similarity_analyzer = PoseSimilarityAnalyzer()
|
|
self.frame_counter = 0
|
|
self.audio_player = AudioPlayer()
|
|
|
|
self.display_settings = {'resolution_mode': 'medium', 'target_width': 960, 'target_height': 720}
|
|
self.realsense_pipeline = None
|
|
self.is_realsense_active = False
|
|
self.last_error_time = 0
|
|
self.error_count = 0
|
|
|
|
# Async processing components
|
|
self.pose_data_queue = queue.Queue(maxsize=50)
|
|
self.similarity_thread = None
|
|
self.similarity_stop_flag = threading.Event()
|
|
|
|
def get_display_resolution(self):
|
|
modes = {'high': (1280, 800), 'medium': (960, 720), 'low': (640, 480)}
|
|
mode = self.display_settings.get('resolution_mode', 'medium')
|
|
return modes.get(mode, (960, 720))
|
|
|
|
def initialize_detector(self):
|
|
if self.body_detector is None:
|
|
try:
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
self.body_detector = Body(mode='lightweight', to_openpose=True, backend='onnxruntime', device=device)
|
|
print(f"Keypoint detector initialized on device: {device}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Detector initialization failed: {e}")
|
|
return False
|
|
return True
|
|
|
|
def initialize_camera(self):
|
|
if REALSENSE_AVAILABLE:
|
|
try:
|
|
self.realsense_pipeline = rs.pipeline()
|
|
config = rs.config()
|
|
width, height = self.get_display_resolution()
|
|
config.enable_stream(rs.stream.color, width, height, rs.format.bgr8, 30)
|
|
profile = self.realsense_pipeline.start(config)
|
|
device = profile.get_device().get_info(rs.camera_info.name)
|
|
print(f"✅ RealSense摄像头初始化成功: {device} ({width}x{height})")
|
|
self.is_realsense_active = True
|
|
return True
|
|
except Exception as e:
|
|
print(f"RealSense初始化失败: {e}. 切换到USB摄像头.")
|
|
return self._initialize_webcam()
|
|
else:
|
|
return self._initialize_webcam()
|
|
|
|
def _initialize_webcam(self):
|
|
try:
|
|
self.webcam_cap = cv2.VideoCapture(0)
|
|
if self.webcam_cap.isOpened():
|
|
self.webcam_cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
|
|
width, height = self.get_display_resolution()
|
|
self.webcam_cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
|
self.webcam_cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
|
self.webcam_cap.set(cv2.CAP_PROP_FPS, 30)
|
|
actual_w = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
actual_h = int(self.webcam_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
print(f"✅ USB摄像头初始化成功 ({actual_w}x{actual_h})")
|
|
return True
|
|
else:
|
|
print("❌ 无法打开USB摄像头")
|
|
return False
|
|
except Exception as e:
|
|
print(f"❌ USB摄像头初始化失败: {e}")
|
|
return False
|
|
|
|
def initialize_all(self):
|
|
"""Initialize both detector and camera."""
|
|
detector_success = self.initialize_detector()
|
|
camera_success = self.initialize_camera()
|
|
return detector_success and camera_success
|
|
|
|
def read_camera_frame(self):
|
|
if self.is_realsense_active and self.realsense_pipeline:
|
|
try:
|
|
frames = self.realsense_pipeline.wait_for_frames(timeout_ms=1000)
|
|
color_frame = frames.get_color_frame()
|
|
if not color_frame:
|
|
return False, None
|
|
return True, np.asanyarray(color_frame.get_data())
|
|
except Exception:
|
|
return False, None
|
|
elif self.webcam_cap and self.webcam_cap.isOpened():
|
|
return self.webcam_cap.read()
|
|
return False, None
|
|
|
|
def get_camera_preview_frame(self):
|
|
ret, frame = self.read_camera_frame()
|
|
if not ret or frame is None:
|
|
return None
|
|
|
|
frame = cv2.flip(frame, 1)
|
|
if self.body_detector:
|
|
try:
|
|
keypoints, scores = self.body_detector(frame)
|
|
frame = draw_skeleton(frame.copy(), keypoints, scores, openpose_skeleton=True, kpt_thr=0.43)
|
|
except Exception:
|
|
pass
|
|
|
|
return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
def cleanup(self):
|
|
"""Cleans up all resources."""
|
|
# Stop similarity calculation thread
|
|
if self.similarity_thread and self.similarity_thread.is_alive():
|
|
self.similarity_stop_flag.set()
|
|
self.similarity_thread.join(timeout=2)
|
|
|
|
if self.standard_cap:
|
|
self.standard_cap.release()
|
|
if self.webcam_cap:
|
|
self.webcam_cap.release()
|
|
if self.is_realsense_active and self.realsense_pipeline:
|
|
self.realsense_pipeline.stop()
|
|
self.audio_player.cleanup()
|
|
self.is_running = False
|