147 lines
6.5 KiB
Python
147 lines
6.5 KiB
Python
import numpy as np
|
|
import math
|
|
import time
|
|
import threading
|
|
from collections import deque
|
|
import plotly.graph_objects as go
|
|
|
|
class PoseSimilarityAnalyzer:
|
|
"""Analyzes pose similarity based on joint angles."""
|
|
|
|
def __init__(self):
|
|
self.similarity_history = deque(maxlen=500)
|
|
self.frame_timestamps = deque(maxlen=500)
|
|
self.start_time = None
|
|
self._lock = threading.Lock() # Thread safety for shared data
|
|
|
|
self.keypoint_map = {
|
|
'nose': 0, 'neck': 1, 'left_shoulder': 2, 'left_elbow': 3, 'left_wrist': 4,
|
|
'right_shoulder': 5, 'right_elbow': 6, 'right_wrist': 7, 'left_hip': 8,
|
|
'left_knee': 9, 'left_ankle': 10, 'right_hip': 11, 'right_knee': 12,
|
|
'right_ankle': 13, 'left_eye': 14, 'right_eye': 15, 'left_ear': 16, 'right_ear': 17
|
|
}
|
|
|
|
self.joint_angles = {
|
|
'left_elbow': ['left_shoulder', 'left_elbow', 'left_wrist'],
|
|
'right_elbow': ['right_shoulder', 'right_elbow', 'right_wrist'],
|
|
'left_shoulder': ['left_elbow', 'left_shoulder', 'neck'],
|
|
'right_shoulder': ['right_elbow', 'right_shoulder', 'neck'],
|
|
'left_knee': ['left_hip', 'left_knee', 'left_ankle'],
|
|
'right_knee': ['right_hip', 'right_knee', 'right_ankle'],
|
|
'left_hip': ['left_knee', 'left_hip', 'neck'],
|
|
'right_hip': ['right_knee', 'right_hip', 'neck'],
|
|
}
|
|
|
|
self.joint_weights = {
|
|
'left_elbow': 1.2, 'right_elbow': 1.2, 'left_shoulder': 1.0, 'right_shoulder': 1.0,
|
|
'left_knee': 1.3, 'right_knee': 1.3, 'left_hip': 1.1, 'right_hip': 1.1
|
|
}
|
|
|
|
def calculate_angle(self, p1, p2, p3):
|
|
"""Calculates the angle formed by three points."""
|
|
try:
|
|
v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]], dtype=np.float64)
|
|
v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]], dtype=np.float64)
|
|
v1_norm = np.linalg.norm(v1)
|
|
v2_norm = np.linalg.norm(v2)
|
|
if v1_norm == 0 or v2_norm == 0: return None
|
|
|
|
cos_angle = np.dot(v1, v2) / (v1_norm * v2_norm)
|
|
cos_angle = np.clip(cos_angle, -1.0, 1.0)
|
|
angle = np.arccos(cos_angle)
|
|
return np.degrees(angle)
|
|
except Exception:
|
|
return None
|
|
|
|
def extract_joint_angles(self, keypoints, scores, confidence_threshold=0.3):
|
|
"""Extracts all defined joint angles from keypoints."""
|
|
if keypoints is None or len(keypoints) == 0:
|
|
return None
|
|
|
|
try:
|
|
person_kpts = keypoints[0] if len(keypoints.shape) > 2 else keypoints
|
|
person_scores = scores[0] if len(scores.shape) > 1 else scores
|
|
|
|
angles = {}
|
|
for joint, (p1_n, p2_n, p3_n) in self.joint_angles.items():
|
|
p1_idx, p2_idx, p3_idx = self.keypoint_map[p1_n], self.keypoint_map[p2_n], self.keypoint_map[p3_n]
|
|
|
|
if max(p1_idx, p2_idx, p3_idx) >= len(person_scores): continue
|
|
|
|
if all(s > confidence_threshold for s in [person_scores[p1_idx], person_scores[p2_idx], person_scores[p3_idx]]):
|
|
angle = self.calculate_angle(person_kpts[p1_idx], person_kpts[p2_idx], person_kpts[p3_idx])
|
|
if angle is not None:
|
|
angles[joint] = angle
|
|
return angles
|
|
except Exception:
|
|
return None
|
|
|
|
def calculate_similarity(self, angles1, angles2):
|
|
"""Calculates similarity score between two sets of joint angles."""
|
|
if not angles1 or not angles2: return 0.0
|
|
|
|
common_joints = set(angles1.keys()) & set(angles2.keys())
|
|
if not common_joints: return 0.0
|
|
|
|
total_weight, weighted_similarity = 0, 0
|
|
for joint in common_joints:
|
|
angle_diff = abs(angles1[joint] - angles2[joint])
|
|
similarity = math.exp(-(angle_diff ** 2) / (2 * (30 ** 2)))
|
|
weight = self.joint_weights.get(joint, 1.0)
|
|
weighted_similarity += similarity * weight
|
|
total_weight += weight
|
|
|
|
final_similarity = (weighted_similarity / total_weight) * 100 if total_weight > 0 else 0
|
|
return min(max(final_similarity, 0), 100)
|
|
|
|
def calculate_joint_similarities(self, angles1, angles2):
|
|
"""计算每个关节的相似度"""
|
|
joint_similarities = {}
|
|
if not angles1 or not angles2: return joint_similarities
|
|
|
|
common_joints = set(angles1.keys()) & set(angles2.keys())
|
|
for joint in common_joints:
|
|
angle_diff = abs(angles1[joint] - angles2[joint])
|
|
similarity = math.exp(-(angle_diff ** 2) / (2 * (30 ** 2)))
|
|
joint_similarities[joint] = min(max(similarity * 100, 0), 100)
|
|
|
|
return joint_similarities
|
|
|
|
def add_similarity_score(self, score, timestamp=None):
|
|
"""Adds a similarity score to the history."""
|
|
with self._lock:
|
|
if self.start_time is None: self.start_time = time.time()
|
|
timestamp = timestamp if timestamp is not None else time.time() - self.start_time
|
|
self.similarity_history.append(float(score))
|
|
self.frame_timestamps.append(float(timestamp))
|
|
|
|
def get_similarity_plot(self):
|
|
"""Generates a Plotly figure for the similarity history."""
|
|
with self._lock:
|
|
if len(self.similarity_history) < 2: return None
|
|
|
|
# Create copies to avoid data changes during plotting
|
|
timestamps_copy = list(self.frame_timestamps)
|
|
history_copy = list(self.similarity_history)
|
|
|
|
fig = go.Figure()
|
|
fig.add_trace(go.Scatter(x=timestamps_copy, y=history_copy,
|
|
mode='lines+markers', name='Similarity',
|
|
line=dict(color='#2E86AB', width=2), marker=dict(size=4)))
|
|
|
|
avg_score = sum(history_copy) / len(history_copy)
|
|
fig.add_hline(y=avg_score, line_dash="dash", line_color="red",
|
|
annotation_text=f"Avg: {avg_score:.1f}%")
|
|
|
|
fig.update_layout(title='Similarity Trend', xaxis_title='Time (s)',
|
|
yaxis_title='Score (%)', yaxis=dict(range=[0, 100]),
|
|
height=250, margin=dict(l=50, r=50, t=50, b=50), showlegend=False)
|
|
return fig
|
|
|
|
def reset(self):
|
|
"""Resets the analyzer's history."""
|
|
with self._lock:
|
|
self.similarity_history.clear()
|
|
self.frame_timestamps.clear()
|
|
self.start_time = None
|