mustafa2ak's picture
Create health_assessment.py
e6fe86b verified
"""
health_assessment.py - Dog Health Analysis from Keypoints
"""
import numpy as np
from collections import deque
class DogHealthAssessor:
def __init__(self, temporal_window=15):
self.temporal_window = temporal_window
self.frame_buffer = deque(maxlen=temporal_window)
self.spine_segments = {
'cervical': [7, 10],
'thoracic': [10, 12],
'lumbar': [12, 14]
}
self.joints = {
'front_left': {'shoulder': 9, 'elbow': 18, 'paw': 17},
'front_right': {'shoulder': 9, 'elbow': 20, 'paw': 19}
}
self.vis_threshold = 0.5
self.reference_spine_coeffs = None
def assess_health(self, keypoints_24, timestamp=0.0):
if keypoints_24 is None or keypoints_24.shape[0] != 24:
return None
visible = np.sum(keypoints_24[:, 2] > self.vis_threshold)
if visible < 12:
return None
view_score = self._validate_side_view(keypoints_24)
if view_score < 0.7:
return None
self.frame_buffer.append({
'kpts': keypoints_24.copy(),
'ts': timestamp,
'view': view_score
})
if len(self.frame_buffer) < 8:
result = self._quick_assess(keypoints_24, view_score)
result['confidence'] *= 0.6
return result
result = self._temporal_assess_advanced(view_score)
if result['overall'] >= 8.5 and result['confidence'] > 0.8:
self._update_baseline(keypoints_24)
return result
def _validate_side_view(self, kpts):
score = 1.0
try:
eyes = [kpts[1], kpts[2]]
ears = [kpts[3], kpts[4]]
nose, withers, tail = kpts[0], kpts[10], kpts[14]
if eyes[0][2] > 0.5 and eyes[1][2] > 0.5:
eye_dist = np.linalg.norm(eyes[0][:2] - eyes[1][:2])
head_len = np.linalg.norm(nose[:2] - withers[:2])
if head_len > 1e-7 and (eye_dist / (head_len + 1e-7)) > 0.25:
score -= 0.3
if ears[0][2] > 0.5 and ears[1][2] > 0.5:
ear_vec = ears[1][:2] - ears[0][:2]
angle = np.abs(np.arctan2(ear_vec[1], ear_vec[0]))
if np.abs(angle - np.pi/2) > np.pi/4:
score -= 0.3
body_len = np.linalg.norm(nose[:2] - tail[:2])
if kpts[17][2] > 0.5 and kpts[19][2] > 0.5:
width = np.abs(kpts[17][0] - kpts[19][0])
if width > 1e-7 and (body_len / (width + 1e-7)) < 1.5:
score -= 0.2
if body_len > 1e-7:
leg_sep = np.abs(kpts[17][0] - kpts[19][0])
if (leg_sep / (body_len + 1e-7)) > 0.3:
score -= 0.2
except:
score = 0.0
return max(0.0, min(1.0, score))
def _quick_assess(self, kpts, view_score):
h_score, h_alerts = self._assess_head(kpts)
s_score, s_alerts = self._assess_spine_segmented(kpts)
l_score, l_alerts = self._assess_legs_static(kpts)
b_score, b_alerts = self._assess_body(kpts)
overall = h_score + s_score + l_score + b_score
all_alerts = h_alerts + s_alerts + l_alerts + b_alerts
return {
'overall': overall,
'status': 'Healthy' if overall>=8 else 'Monitor' if overall>=5 else 'Concern',
'head': h_score,
'spine': s_score,
'legs': l_score,
'body': b_score,
'alerts': all_alerts,
'recommendations': self._gen_recs(overall, h_score, s_score, l_score, b_score),
'confidence': view_score * 0.6,
'view_quality': view_score,
'physical_measurements': {}
}
def _temporal_assess_advanced(self, view_score):
frames = list(self.frame_buffer)
curr_kpts = frames[-1]['kpts']
h_score, h_alerts = self._assess_head(curr_kpts)
s_score, s_alerts = self._assess_spine_segmented(curr_kpts)
l_score, l_alerts, l_phys = self._assess_gait_advanced(frames)
j_score, j_alerts = self._assess_joints(curr_kpts)
b_score, b_alerts = self._assess_body(curr_kpts)
overall = (h_score + s_score + l_score + j_score + b_score) / 12 * 10
temporal_consistency = self._calc_consistency(frames)
all_alerts = h_alerts + s_alerts + l_alerts + j_alerts + b_alerts
return {
'overall': overall,
'status': 'Healthy' if overall>=8 else 'Monitor' if overall>=5 else 'Concern',
'head': h_score,
'spine': s_score,
'legs': l_score,
'joints': j_score,
'body': b_score,
'alerts': all_alerts,
'recommendations': self._gen_recs_advanced(
overall, h_score, s_score, l_score, j_score, b_score
),
'confidence': min(1.0, view_score * temporal_consistency),
'view_quality': view_score,
'physical_measurements': l_phys
}
def _assess_head(self, kpts):
score, alerts = 2.0, []
try:
nose, withers = kpts[0], kpts[10]
if nose[2] > 0.5 and withers[2] > 0.5:
vec = nose[:2] - withers[:2]
angle = np.arctan2(vec[1], vec[0])
if np.abs(angle) > np.pi/3:
score -= 0.7
alerts.append("Abnormal head angle")
elif np.abs(angle) > np.pi/6:
score -= 0.3
alerts.append("Mild head tilt")
l_eye, r_eye = kpts[1], kpts[2]
if l_eye[2] > 0.5 and r_eye[2] > 0.5:
h_diff = np.abs(l_eye[1] - r_eye[1])
eye_dist = np.linalg.norm(l_eye[:2] - r_eye[:2])
if eye_dist > 1e-7 and (h_diff / (eye_dist + 1e-7)) > 0.3:
score -= 0.5
alerts.append("Head tilt detected")
except:
pass
return max(0.0, score), alerts
def _assess_spine_segmented(self, kpts):
score, alerts = 3.0, []
try:
segment_results = {}
for seg_name, indices in self.spine_segments.items():
pts = [kpts[i][:2] for i in range(indices[0], indices[1]+1)
if kpts[i][2] > 0.5]
if len(pts) < 2:
continue
pts = np.array(pts)
if len(pts) >= 3:
x, y = pts[:, 0], pts[:, 1]
coeffs = np.polyfit(x, y, 2)
a = coeffs[0]
segment_results[seg_name] = a
if seg_name == 'thoracic':
if np.abs(a) > 0.003:
score -= 1.5
alerts.append("Mid-back excessive curve")
elif np.abs(a) > 0.002:
score -= 0.8
alerts.append("Mid-back moderate curve")
else:
if np.abs(a) > 0.006:
score -= 0.5
alerts.append(f"{seg_name} section high curve")
if segment_results:
avg_a = np.mean(list(segment_results.values()))
if np.abs(avg_a) > 0.005:
score -= 0.5
alerts.append("Overall spine posture concern")
if self.reference_spine_coeffs:
if np.abs(avg_a - self.reference_spine_coeffs) > 0.001:
score -= 0.3
alerts.append("Spine changed from baseline")
except:
pass
return max(0.0, score), alerts
def _assess_legs_static(self, kpts):
score, alerts = 3.0, []
try:
l_paw, r_paw = kpts[17], kpts[19]
if l_paw[2] > 0.5 and r_paw[2] > 0.5:
diff = np.abs(l_paw[1] - r_paw[1])
if diff > 15:
score -= 1.0
alerts.append("Front leg asymmetry")
l_hip, r_hip = kpts[21], kpts[22]
if l_hip[2] > 0.5 and r_hip[2] > 0.5:
if np.abs(l_hip[1] - r_hip[1]) > 12:
score -= 0.5
alerts.append("Hip asymmetry")
except:
pass
return max(0.0, score), alerts
def _assess_gait_advanced(self, frames):
score, alerts, phys = 3.0, [], {}
try:
if len(frames) < 8:
return score, alerts, phys
l_traj, r_traj = [], []
for f in frames:
kpts, ts = f['kpts'], f['ts']
if kpts[17][2] > 0.5:
l_traj.append((ts, kpts[17][0], kpts[17][1]))
if kpts[19][2] > 0.5:
r_traj.append((ts, kpts[19][0], kpts[19][1]))
if len(l_traj) < 3 or len(r_traj) < 3:
return score, alerts, phys
l_heights = [y for _, _, y in l_traj]
r_heights = [y for _, _, y in r_traj]
mean_l, mean_r = np.mean(l_heights), np.mean(r_heights)
pos_asym = np.abs(mean_l - mean_r)
threshold_pos = 20
l_velocities, r_velocities = [], []
for i in range(len(l_traj)-1):
dt = l_traj[i+1][0] - l_traj[i][0]
if dt > 1e-7:
dy = l_traj[i+1][2] - l_traj[i][2]
l_velocities.append(np.abs(dy / (dt + 1e-7)))
for i in range(len(r_traj)-1):
dt = r_traj[i+1][0] - r_traj[i][0]
if dt > 1e-7:
dy = r_traj[i+1][2] - r_traj[i][2]
r_velocities.append(np.abs(dy / (dt + 1e-7)))
if l_velocities and r_velocities:
mean_vel_l = np.mean(l_velocities)
mean_vel_r = np.mean(r_velocities)
vel_asym = np.abs(mean_vel_l - mean_vel_r)
max_vel = max(mean_vel_l, mean_vel_r)
vel_asym_ratio = vel_asym / (max_vel + 1e-7) if max_vel > 1e-7 else 0
phys['velocity_asymmetry'] = f"{vel_asym_ratio*100:.1f}%"
if vel_asym_ratio > 0.3:
score -= 0.8
alerts.append(f"Velocity asymmetry detected ({vel_asym_ratio*100:.0f}%)")
consistent_count = sum(
1 for i in range(min(len(l_heights), len(r_heights)))
if np.abs(l_heights[i] - r_heights[i]) > threshold_pos * 0.5
)
consistency_ratio = consistent_count / (len(frames) + 1e-7)
if pos_asym > threshold_pos and consistency_ratio > 0.6:
score -= 1.2
alerts.append("Limping detected (position asymmetry)")
elif pos_asym > threshold_pos * 0.6 and consistency_ratio > 0.5:
score -= 0.6
alerts.append("Possible limping (mild asymmetry)")
if len(l_heights) > 2:
l_changes = np.diff(l_heights)
l_var = np.var(l_changes)
if l_var > 50:
score -= 0.4
alerts.append("Irregular gait pattern")
except:
pass
return max(0.0, score), alerts, phys
def _assess_joints(self, kpts):
score, alerts = 2.0, []
try:
for side, jnts in self.joints.items():
shoulder = kpts[jnts['shoulder']]
elbow = kpts[jnts['elbow']]
paw = kpts[jnts['paw']]
if shoulder[2] > 0.5 and elbow[2] > 0.5 and paw[2] > 0.5:
vec_a = shoulder[:2] - elbow[:2]
vec_b = paw[:2] - elbow[:2]
dot = np.dot(vec_a, vec_b)
mag_a = np.linalg.norm(vec_a) + 1e-7
mag_b = np.linalg.norm(vec_b) + 1e-7
cos_angle = np.clip(dot / (mag_a * mag_b), -1.0, 1.0)
angle_deg = np.degrees(np.arccos(cos_angle))
if angle_deg > 165:
score -= 0.7
alerts.append(f"{side} leg stiff ({angle_deg:.0f}° elbow)")
elif angle_deg < 100:
score -= 0.5
alerts.append(f"{side} leg excessive flexion")
except:
pass
return max(0.0, score), alerts
def _assess_body(self, kpts):
score, alerts = 2.0, []
try:
withers, tail = kpts[10], kpts[14]
if withers[2] > 0.5 and tail[2] > 0.5:
body_vec = tail[:2] - withers[:2]
body_angle = np.arctan2(body_vec[1], body_vec[0])
if np.abs(body_angle) > np.pi/6:
score -= 0.5
alerts.append("Body alignment issue")
except:
pass
return max(0.0, score), alerts
def _calc_consistency(self, frames):
if len(frames) < 3:
return 0.7
try:
view_scores = [f['view'] for f in frames]
consistency = 1.0 - (np.std(view_scores) / (np.mean(view_scores) + 1e-7))
return max(0.5, min(1.0, consistency))
except:
return 0.7
def _update_baseline(self, kpts):
try:
pts = [kpts[i][:2] for i in range(7, 15) if kpts[i][2] > 0.5]
if len(pts) >= 5:
pts = np.array(pts)
x, y = pts[:, 0], pts[:, 1]
coeffs = np.polyfit(x, y, 2)
self.reference_spine_coeffs = coeffs[0]
except:
pass
def _gen_recs(self, overall, h, s, l, b):
recs = []
if h < 1.5:
recs.append("Monitor head posture")
if s < 2.0:
recs.append("Check spine alignment")
if l < 2.0:
recs.append("Examine leg symmetry")
if overall < 5:
recs.append("Veterinary examination recommended")
return recs
def _gen_recs_advanced(self, overall, h, s, l, j, b):
recs = []
if h < 1.5:
recs.append("Monitor head posture")
if s < 2.0:
recs.append("Check spine alignment")
if l < 2.0:
recs.append("Examine gait and leg movement")
if j < 1.5:
recs.append("Check joint flexibility")
if overall < 5:
recs.append("Veterinary examination recommended")
elif overall < 7:
recs.append("Continue monitoring")
return recs