| """ | |
| health_assessment.py - Dog Health Analysis from Keypoints | |
| """ | |
| import numpy as np | |
| from collections import deque | |
| class DogHealthAssessor: | |
| def __init__(self, temporal_window=15): | |
| self.temporal_window = temporal_window | |
| self.frame_buffer = deque(maxlen=temporal_window) | |
| self.spine_segments = { | |
| 'cervical': [7, 10], | |
| 'thoracic': [10, 12], | |
| 'lumbar': [12, 14] | |
| } | |
| self.joints = { | |
| 'front_left': {'shoulder': 9, 'elbow': 18, 'paw': 17}, | |
| 'front_right': {'shoulder': 9, 'elbow': 20, 'paw': 19} | |
| } | |
| self.vis_threshold = 0.5 | |
| self.reference_spine_coeffs = None | |
| def assess_health(self, keypoints_24, timestamp=0.0): | |
| if keypoints_24 is None or keypoints_24.shape[0] != 24: | |
| return None | |
| visible = np.sum(keypoints_24[:, 2] > self.vis_threshold) | |
| if visible < 12: | |
| return None | |
| view_score = self._validate_side_view(keypoints_24) | |
| if view_score < 0.7: | |
| return None | |
| self.frame_buffer.append({ | |
| 'kpts': keypoints_24.copy(), | |
| 'ts': timestamp, | |
| 'view': view_score | |
| }) | |
| if len(self.frame_buffer) < 8: | |
| result = self._quick_assess(keypoints_24, view_score) | |
| result['confidence'] *= 0.6 | |
| return result | |
| result = self._temporal_assess_advanced(view_score) | |
| if result['overall'] >= 8.5 and result['confidence'] > 0.8: | |
| self._update_baseline(keypoints_24) | |
| return result | |
| def _validate_side_view(self, kpts): | |
| score = 1.0 | |
| try: | |
| eyes = [kpts[1], kpts[2]] | |
| ears = [kpts[3], kpts[4]] | |
| nose, withers, tail = kpts[0], kpts[10], kpts[14] | |
| if eyes[0][2] > 0.5 and eyes[1][2] > 0.5: | |
| eye_dist = np.linalg.norm(eyes[0][:2] - eyes[1][:2]) | |
| head_len = np.linalg.norm(nose[:2] - withers[:2]) | |
| if head_len > 1e-7 and (eye_dist / (head_len + 1e-7)) > 0.25: | |
| score -= 0.3 | |
| if ears[0][2] > 0.5 and ears[1][2] > 0.5: | |
| ear_vec = ears[1][:2] - ears[0][:2] | |
| angle = np.abs(np.arctan2(ear_vec[1], ear_vec[0])) | |
| if np.abs(angle - np.pi/2) > np.pi/4: | |
| score -= 0.3 | |
| body_len = np.linalg.norm(nose[:2] - tail[:2]) | |
| if kpts[17][2] > 0.5 and kpts[19][2] > 0.5: | |
| width = np.abs(kpts[17][0] - kpts[19][0]) | |
| if width > 1e-7 and (body_len / (width + 1e-7)) < 1.5: | |
| score -= 0.2 | |
| if body_len > 1e-7: | |
| leg_sep = np.abs(kpts[17][0] - kpts[19][0]) | |
| if (leg_sep / (body_len + 1e-7)) > 0.3: | |
| score -= 0.2 | |
| except: | |
| score = 0.0 | |
| return max(0.0, min(1.0, score)) | |
| def _quick_assess(self, kpts, view_score): | |
| h_score, h_alerts = self._assess_head(kpts) | |
| s_score, s_alerts = self._assess_spine_segmented(kpts) | |
| l_score, l_alerts = self._assess_legs_static(kpts) | |
| b_score, b_alerts = self._assess_body(kpts) | |
| overall = h_score + s_score + l_score + b_score | |
| all_alerts = h_alerts + s_alerts + l_alerts + b_alerts | |
| return { | |
| 'overall': overall, | |
| 'status': 'Healthy' if overall>=8 else 'Monitor' if overall>=5 else 'Concern', | |
| 'head': h_score, | |
| 'spine': s_score, | |
| 'legs': l_score, | |
| 'body': b_score, | |
| 'alerts': all_alerts, | |
| 'recommendations': self._gen_recs(overall, h_score, s_score, l_score, b_score), | |
| 'confidence': view_score * 0.6, | |
| 'view_quality': view_score, | |
| 'physical_measurements': {} | |
| } | |
| def _temporal_assess_advanced(self, view_score): | |
| frames = list(self.frame_buffer) | |
| curr_kpts = frames[-1]['kpts'] | |
| h_score, h_alerts = self._assess_head(curr_kpts) | |
| s_score, s_alerts = self._assess_spine_segmented(curr_kpts) | |
| l_score, l_alerts, l_phys = self._assess_gait_advanced(frames) | |
| j_score, j_alerts = self._assess_joints(curr_kpts) | |
| b_score, b_alerts = self._assess_body(curr_kpts) | |
| overall = (h_score + s_score + l_score + j_score + b_score) / 12 * 10 | |
| temporal_consistency = self._calc_consistency(frames) | |
| all_alerts = h_alerts + s_alerts + l_alerts + j_alerts + b_alerts | |
| return { | |
| 'overall': overall, | |
| 'status': 'Healthy' if overall>=8 else 'Monitor' if overall>=5 else 'Concern', | |
| 'head': h_score, | |
| 'spine': s_score, | |
| 'legs': l_score, | |
| 'joints': j_score, | |
| 'body': b_score, | |
| 'alerts': all_alerts, | |
| 'recommendations': self._gen_recs_advanced( | |
| overall, h_score, s_score, l_score, j_score, b_score | |
| ), | |
| 'confidence': min(1.0, view_score * temporal_consistency), | |
| 'view_quality': view_score, | |
| 'physical_measurements': l_phys | |
| } | |
| def _assess_head(self, kpts): | |
| score, alerts = 2.0, [] | |
| try: | |
| nose, withers = kpts[0], kpts[10] | |
| if nose[2] > 0.5 and withers[2] > 0.5: | |
| vec = nose[:2] - withers[:2] | |
| angle = np.arctan2(vec[1], vec[0]) | |
| if np.abs(angle) > np.pi/3: | |
| score -= 0.7 | |
| alerts.append("Abnormal head angle") | |
| elif np.abs(angle) > np.pi/6: | |
| score -= 0.3 | |
| alerts.append("Mild head tilt") | |
| l_eye, r_eye = kpts[1], kpts[2] | |
| if l_eye[2] > 0.5 and r_eye[2] > 0.5: | |
| h_diff = np.abs(l_eye[1] - r_eye[1]) | |
| eye_dist = np.linalg.norm(l_eye[:2] - r_eye[:2]) | |
| if eye_dist > 1e-7 and (h_diff / (eye_dist + 1e-7)) > 0.3: | |
| score -= 0.5 | |
| alerts.append("Head tilt detected") | |
| except: | |
| pass | |
| return max(0.0, score), alerts | |
| def _assess_spine_segmented(self, kpts): | |
| score, alerts = 3.0, [] | |
| try: | |
| segment_results = {} | |
| for seg_name, indices in self.spine_segments.items(): | |
| pts = [kpts[i][:2] for i in range(indices[0], indices[1]+1) | |
| if kpts[i][2] > 0.5] | |
| if len(pts) < 2: | |
| continue | |
| pts = np.array(pts) | |
| if len(pts) >= 3: | |
| x, y = pts[:, 0], pts[:, 1] | |
| coeffs = np.polyfit(x, y, 2) | |
| a = coeffs[0] | |
| segment_results[seg_name] = a | |
| if seg_name == 'thoracic': | |
| if np.abs(a) > 0.003: | |
| score -= 1.5 | |
| alerts.append("Mid-back excessive curve") | |
| elif np.abs(a) > 0.002: | |
| score -= 0.8 | |
| alerts.append("Mid-back moderate curve") | |
| else: | |
| if np.abs(a) > 0.006: | |
| score -= 0.5 | |
| alerts.append(f"{seg_name} section high curve") | |
| if segment_results: | |
| avg_a = np.mean(list(segment_results.values())) | |
| if np.abs(avg_a) > 0.005: | |
| score -= 0.5 | |
| alerts.append("Overall spine posture concern") | |
| if self.reference_spine_coeffs: | |
| if np.abs(avg_a - self.reference_spine_coeffs) > 0.001: | |
| score -= 0.3 | |
| alerts.append("Spine changed from baseline") | |
| except: | |
| pass | |
| return max(0.0, score), alerts | |
| def _assess_legs_static(self, kpts): | |
| score, alerts = 3.0, [] | |
| try: | |
| l_paw, r_paw = kpts[17], kpts[19] | |
| if l_paw[2] > 0.5 and r_paw[2] > 0.5: | |
| diff = np.abs(l_paw[1] - r_paw[1]) | |
| if diff > 15: | |
| score -= 1.0 | |
| alerts.append("Front leg asymmetry") | |
| l_hip, r_hip = kpts[21], kpts[22] | |
| if l_hip[2] > 0.5 and r_hip[2] > 0.5: | |
| if np.abs(l_hip[1] - r_hip[1]) > 12: | |
| score -= 0.5 | |
| alerts.append("Hip asymmetry") | |
| except: | |
| pass | |
| return max(0.0, score), alerts | |
| def _assess_gait_advanced(self, frames): | |
| score, alerts, phys = 3.0, [], {} | |
| try: | |
| if len(frames) < 8: | |
| return score, alerts, phys | |
| l_traj, r_traj = [], [] | |
| for f in frames: | |
| kpts, ts = f['kpts'], f['ts'] | |
| if kpts[17][2] > 0.5: | |
| l_traj.append((ts, kpts[17][0], kpts[17][1])) | |
| if kpts[19][2] > 0.5: | |
| r_traj.append((ts, kpts[19][0], kpts[19][1])) | |
| if len(l_traj) < 3 or len(r_traj) < 3: | |
| return score, alerts, phys | |
| l_heights = [y for _, _, y in l_traj] | |
| r_heights = [y for _, _, y in r_traj] | |
| mean_l, mean_r = np.mean(l_heights), np.mean(r_heights) | |
| pos_asym = np.abs(mean_l - mean_r) | |
| threshold_pos = 20 | |
| l_velocities, r_velocities = [], [] | |
| for i in range(len(l_traj)-1): | |
| dt = l_traj[i+1][0] - l_traj[i][0] | |
| if dt > 1e-7: | |
| dy = l_traj[i+1][2] - l_traj[i][2] | |
| l_velocities.append(np.abs(dy / (dt + 1e-7))) | |
| for i in range(len(r_traj)-1): | |
| dt = r_traj[i+1][0] - r_traj[i][0] | |
| if dt > 1e-7: | |
| dy = r_traj[i+1][2] - r_traj[i][2] | |
| r_velocities.append(np.abs(dy / (dt + 1e-7))) | |
| if l_velocities and r_velocities: | |
| mean_vel_l = np.mean(l_velocities) | |
| mean_vel_r = np.mean(r_velocities) | |
| vel_asym = np.abs(mean_vel_l - mean_vel_r) | |
| max_vel = max(mean_vel_l, mean_vel_r) | |
| vel_asym_ratio = vel_asym / (max_vel + 1e-7) if max_vel > 1e-7 else 0 | |
| phys['velocity_asymmetry'] = f"{vel_asym_ratio*100:.1f}%" | |
| if vel_asym_ratio > 0.3: | |
| score -= 0.8 | |
| alerts.append(f"Velocity asymmetry detected ({vel_asym_ratio*100:.0f}%)") | |
| consistent_count = sum( | |
| 1 for i in range(min(len(l_heights), len(r_heights))) | |
| if np.abs(l_heights[i] - r_heights[i]) > threshold_pos * 0.5 | |
| ) | |
| consistency_ratio = consistent_count / (len(frames) + 1e-7) | |
| if pos_asym > threshold_pos and consistency_ratio > 0.6: | |
| score -= 1.2 | |
| alerts.append("Limping detected (position asymmetry)") | |
| elif pos_asym > threshold_pos * 0.6 and consistency_ratio > 0.5: | |
| score -= 0.6 | |
| alerts.append("Possible limping (mild asymmetry)") | |
| if len(l_heights) > 2: | |
| l_changes = np.diff(l_heights) | |
| l_var = np.var(l_changes) | |
| if l_var > 50: | |
| score -= 0.4 | |
| alerts.append("Irregular gait pattern") | |
| except: | |
| pass | |
| return max(0.0, score), alerts, phys | |
| def _assess_joints(self, kpts): | |
| score, alerts = 2.0, [] | |
| try: | |
| for side, jnts in self.joints.items(): | |
| shoulder = kpts[jnts['shoulder']] | |
| elbow = kpts[jnts['elbow']] | |
| paw = kpts[jnts['paw']] | |
| if shoulder[2] > 0.5 and elbow[2] > 0.5 and paw[2] > 0.5: | |
| vec_a = shoulder[:2] - elbow[:2] | |
| vec_b = paw[:2] - elbow[:2] | |
| dot = np.dot(vec_a, vec_b) | |
| mag_a = np.linalg.norm(vec_a) + 1e-7 | |
| mag_b = np.linalg.norm(vec_b) + 1e-7 | |
| cos_angle = np.clip(dot / (mag_a * mag_b), -1.0, 1.0) | |
| angle_deg = np.degrees(np.arccos(cos_angle)) | |
| if angle_deg > 165: | |
| score -= 0.7 | |
| alerts.append(f"{side} leg stiff ({angle_deg:.0f}° elbow)") | |
| elif angle_deg < 100: | |
| score -= 0.5 | |
| alerts.append(f"{side} leg excessive flexion") | |
| except: | |
| pass | |
| return max(0.0, score), alerts | |
| def _assess_body(self, kpts): | |
| score, alerts = 2.0, [] | |
| try: | |
| withers, tail = kpts[10], kpts[14] | |
| if withers[2] > 0.5 and tail[2] > 0.5: | |
| body_vec = tail[:2] - withers[:2] | |
| body_angle = np.arctan2(body_vec[1], body_vec[0]) | |
| if np.abs(body_angle) > np.pi/6: | |
| score -= 0.5 | |
| alerts.append("Body alignment issue") | |
| except: | |
| pass | |
| return max(0.0, score), alerts | |
| def _calc_consistency(self, frames): | |
| if len(frames) < 3: | |
| return 0.7 | |
| try: | |
| view_scores = [f['view'] for f in frames] | |
| consistency = 1.0 - (np.std(view_scores) / (np.mean(view_scores) + 1e-7)) | |
| return max(0.5, min(1.0, consistency)) | |
| except: | |
| return 0.7 | |
| def _update_baseline(self, kpts): | |
| try: | |
| pts = [kpts[i][:2] for i in range(7, 15) if kpts[i][2] > 0.5] | |
| if len(pts) >= 5: | |
| pts = np.array(pts) | |
| x, y = pts[:, 0], pts[:, 1] | |
| coeffs = np.polyfit(x, y, 2) | |
| self.reference_spine_coeffs = coeffs[0] | |
| except: | |
| pass | |
| def _gen_recs(self, overall, h, s, l, b): | |
| recs = [] | |
| if h < 1.5: | |
| recs.append("Monitor head posture") | |
| if s < 2.0: | |
| recs.append("Check spine alignment") | |
| if l < 2.0: | |
| recs.append("Examine leg symmetry") | |
| if overall < 5: | |
| recs.append("Veterinary examination recommended") | |
| return recs | |
| def _gen_recs_advanced(self, overall, h, s, l, j, b): | |
| recs = [] | |
| if h < 1.5: | |
| recs.append("Monitor head posture") | |
| if s < 2.0: | |
| recs.append("Check spine alignment") | |
| if l < 2.0: | |
| recs.append("Examine gait and leg movement") | |
| if j < 1.5: | |
| recs.append("Check joint flexibility") | |
| if overall < 5: | |
| recs.append("Veterinary examination recommended") | |
| elif overall < 7: | |
| recs.append("Continue monitoring") | |
| return recs |