import numpy as np import math import time import threading from collections import deque import plotly.graph_objects as go class PoseSimilarityAnalyzer: """Analyzes pose similarity based on joint angles.""" def __init__(self): self.similarity_history = deque(maxlen=500) self.frame_timestamps = deque(maxlen=500) self.start_time = None self._lock = threading.Lock() # Thread safety for shared data self.keypoint_map = { 'nose': 0, 'neck': 1, 'left_shoulder': 2, 'left_elbow': 3, 'left_wrist': 4, 'right_shoulder': 5, 'right_elbow': 6, 'right_wrist': 7, 'left_hip': 8, 'left_knee': 9, 'left_ankle': 10, 'right_hip': 11, 'right_knee': 12, 'right_ankle': 13, 'left_eye': 14, 'right_eye': 15, 'left_ear': 16, 'right_ear': 17 } self.joint_angles = { 'left_elbow': ['left_shoulder', 'left_elbow', 'left_wrist'], 'right_elbow': ['right_shoulder', 'right_elbow', 'right_wrist'], 'left_shoulder': ['left_elbow', 'left_shoulder', 'neck'], 'right_shoulder': ['right_elbow', 'right_shoulder', 'neck'], 'left_knee': ['left_hip', 'left_knee', 'left_ankle'], 'right_knee': ['right_hip', 'right_knee', 'right_ankle'], 'left_hip': ['left_knee', 'left_hip', 'neck'], 'right_hip': ['right_knee', 'right_hip', 'neck'], } self.joint_weights = { 'left_elbow': 1.2, 'right_elbow': 1.2, 'left_shoulder': 1.0, 'right_shoulder': 1.0, 'left_knee': 1.3, 'right_knee': 1.3, 'left_hip': 1.1, 'right_hip': 1.1 } def calculate_angle(self, p1, p2, p3): """Calculates the angle formed by three points.""" try: v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]], dtype=np.float64) v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]], dtype=np.float64) v1_norm = np.linalg.norm(v1) v2_norm = np.linalg.norm(v2) if v1_norm == 0 or v2_norm == 0: return None cos_angle = np.dot(v1, v2) / (v1_norm * v2_norm) cos_angle = np.clip(cos_angle, -1.0, 1.0) angle = np.arccos(cos_angle) return np.degrees(angle) except Exception: return None def extract_joint_angles(self, keypoints, scores, confidence_threshold=0.3): """Extracts all defined joint angles from keypoints.""" if keypoints is None or len(keypoints) == 0: return None try: person_kpts = keypoints[0] if len(keypoints.shape) > 2 else keypoints person_scores = scores[0] if len(scores.shape) > 1 else scores angles = {} for joint, (p1_n, p2_n, p3_n) in self.joint_angles.items(): p1_idx, p2_idx, p3_idx = self.keypoint_map[p1_n], self.keypoint_map[p2_n], self.keypoint_map[p3_n] if max(p1_idx, p2_idx, p3_idx) >= len(person_scores): continue if all(s > confidence_threshold for s in [person_scores[p1_idx], person_scores[p2_idx], person_scores[p3_idx]]): angle = self.calculate_angle(person_kpts[p1_idx], person_kpts[p2_idx], person_kpts[p3_idx]) if angle is not None: angles[joint] = angle return angles except Exception: return None def calculate_similarity(self, angles1, angles2): """Calculates similarity score between two sets of joint angles.""" if not angles1 or not angles2: return 0.0 common_joints = set(angles1.keys()) & set(angles2.keys()) if not common_joints: return 0.0 total_weight, weighted_similarity = 0, 0 for joint in common_joints: angle_diff = abs(angles1[joint] - angles2[joint]) similarity = math.exp(-(angle_diff ** 2) / (2 * (30 ** 2))) weight = self.joint_weights.get(joint, 1.0) weighted_similarity += similarity * weight total_weight += weight final_similarity = (weighted_similarity / total_weight) * 100 if total_weight > 0 else 0 return min(max(final_similarity, 0), 100) def calculate_joint_similarities(self, angles1, angles2): """计算每个关节的相似度""" joint_similarities = {} if not angles1 or not angles2: return joint_similarities common_joints = set(angles1.keys()) & set(angles2.keys()) for joint in common_joints: angle_diff = abs(angles1[joint] - angles2[joint]) similarity = math.exp(-(angle_diff ** 2) / (2 * (30 ** 2))) joint_similarities[joint] = min(max(similarity * 100, 0), 100) return joint_similarities def add_similarity_score(self, score, timestamp=None): """Adds a similarity score to the history.""" with self._lock: if self.start_time is None: self.start_time = time.time() timestamp = timestamp if timestamp is not None else time.time() - self.start_time self.similarity_history.append(float(score)) self.frame_timestamps.append(float(timestamp)) def get_similarity_plot(self): """Generates a Plotly figure for the similarity history.""" with self._lock: if len(self.similarity_history) < 2: return None # Create copies to avoid data changes during plotting timestamps_copy = list(self.frame_timestamps) history_copy = list(self.similarity_history) fig = go.Figure() fig.add_trace(go.Scatter(x=timestamps_copy, y=history_copy, mode='lines+markers', name='Similarity', line=dict(color='#2E86AB', width=2), marker=dict(size=4))) avg_score = sum(history_copy) / len(history_copy) fig.add_hline(y=avg_score, line_dash="dash", line_color="red", annotation_text=f"Avg: {avg_score:.1f}%") fig.update_layout(title='Similarity Trend', xaxis_title='Time (s)', yaxis_title='Score (%)', yaxis=dict(range=[0, 100]), height=250, margin=dict(l=50, r=50, t=50, b=50), showlegend=False) return fig def reset(self): """Resets the analyzer's history.""" with self._lock: self.similarity_history.clear() self.frame_timestamps.clear() self.start_time = None