feat(motion_app): implement asynchronous similarity calculation

This commit is contained in:
game-loader
2025-06-22 15:17:07 +08:00
parent 53979d9501
commit 1d94e77557
3 changed files with 128 additions and 44 deletions

View File

@@ -1,6 +1,7 @@
import numpy as np
import math
import time
import threading
from collections import deque
import plotly.graph_objects as go
@@ -11,6 +12,7 @@ class PoseSimilarityAnalyzer:
self.similarity_history = deque(maxlen=500)
self.frame_timestamps = deque(maxlen=500)
self.start_time = None
self._lock = threading.Lock() # Thread safety for shared data
self.keypoint_map = {
'nose': 0, 'neck': 1, 'left_shoulder': 2, 'left_elbow': 3, 'left_wrist': 4,
@@ -94,21 +96,27 @@ class PoseSimilarityAnalyzer:
def add_similarity_score(self, score, timestamp=None):
"""Adds a similarity score to the history."""
if self.start_time is None: self.start_time = time.time()
timestamp = timestamp if timestamp is not None else time.time() - self.start_time
self.similarity_history.append(float(score))
self.frame_timestamps.append(float(timestamp))
with self._lock:
if self.start_time is None: self.start_time = time.time()
timestamp = timestamp if timestamp is not None else time.time() - self.start_time
self.similarity_history.append(float(score))
self.frame_timestamps.append(float(timestamp))
def get_similarity_plot(self):
"""Generates a Plotly figure for the similarity history."""
if len(self.similarity_history) < 2: return None
with self._lock:
if len(self.similarity_history) < 2: return None
# Create copies to avoid data changes during plotting
timestamps_copy = list(self.frame_timestamps)
history_copy = list(self.similarity_history)
fig = go.Figure()
fig.add_trace(go.Scatter(x=list(self.frame_timestamps), y=list(self.similarity_history),
fig.add_trace(go.Scatter(x=timestamps_copy, y=history_copy,
mode='lines+markers', name='Similarity',
line=dict(color='#2E86AB', width=2), marker=dict(size=4)))
avg_score = sum(self.similarity_history) / len(self.similarity_history)
avg_score = sum(history_copy) / len(history_copy)
fig.add_hline(y=avg_score, line_dash="dash", line_color="red",
annotation_text=f"Avg: {avg_score:.1f}%")
@@ -119,6 +127,7 @@ class PoseSimilarityAnalyzer:
def reset(self):
"""Resets the analyzer's history."""
self.similarity_history.clear()
self.frame_timestamps.clear()
self.start_time = None
with self._lock:
self.similarity_history.clear()
self.frame_timestamps.clear()
self.start_time = None