|
|
""" |
|
|
tracking.py - DeepSORT Multi-Dog Tracking |
|
|
""" |
|
|
|
|
|
import numpy as np |
|
|
from scipy.optimize import linear_sum_assignment |
|
|
from scipy.spatial.distance import cosine |
|
|
from collections import deque |
|
|
from typing import List, Optional, Tuple |
|
|
|
|
|
|
|
|
class KalmanFilter: |
|
|
def __init__(self): |
|
|
self.F = np.eye(8) |
|
|
for i in range(4): |
|
|
self.F[i, i+4] = 1 |
|
|
|
|
|
self.H = np.eye(4, 8) |
|
|
self.Q = np.eye(8) |
|
|
self.Q[4:, 4:] *= 0.01 |
|
|
self.R = np.eye(4) * 10 |
|
|
self.P = np.eye(8) * 1000 |
|
|
self.x = np.zeros(8) |
|
|
|
|
|
def initiate(self, measurement: np.ndarray): |
|
|
self.x[:4] = measurement |
|
|
self.x[4:] = 0 |
|
|
|
|
|
def predict(self): |
|
|
self.x = self.F @ self.x |
|
|
self.P = self.F @ self.P @ self.F.T + self.Q |
|
|
return self.x[:4] |
|
|
|
|
|
def update(self, measurement: np.ndarray): |
|
|
y = measurement - self.H @ self.x |
|
|
S = self.H @ self.P @ self.H.T + self.R |
|
|
K = self.P @ self.H.T @ np.linalg.inv(S) |
|
|
self.x = self.x + K @ y |
|
|
self.P = (np.eye(8) - K @ self.H) @ self.P |
|
|
return self.x[:4] |
|
|
|
|
|
def get_state(self) -> np.ndarray: |
|
|
return self.x[:4] |
|
|
|
|
|
|
|
|
class DeepSORTTrack: |
|
|
_id_counter = 1 |
|
|
|
|
|
def __init__(self, detection, track_id: Optional[int] = None): |
|
|
self.track_id = track_id if track_id else DeepSORTTrack._id_counter |
|
|
DeepSORTTrack._id_counter += 1 |
|
|
|
|
|
x1, y1, x2, y2 = detection.bbox |
|
|
cx = (x1 + x2) / 2 |
|
|
cy = (y1 + y2) / 2 |
|
|
w = x2 - x1 |
|
|
h = y2 - y1 |
|
|
|
|
|
self.kf = KalmanFilter() |
|
|
self.kf.initiate(np.array([cx, cy, w, h])) |
|
|
|
|
|
self.detections = [detection] |
|
|
self.confidence = detection.confidence |
|
|
self.hits = 1 |
|
|
self.age = 1 |
|
|
self.time_since_update = 0 |
|
|
self.state = 'tentative' |
|
|
|
|
|
self.features = deque(maxlen=100) |
|
|
if hasattr(detection, 'features') and detection.features is not None: |
|
|
self.features.append(detection.features) |
|
|
|
|
|
self.trajectory = deque(maxlen=30) |
|
|
self.trajectory.append((cx, cy)) |
|
|
|
|
|
self.avg_confidence = self.confidence |
|
|
self.consecutive_misses = 0 |
|
|
|
|
|
@property |
|
|
def bbox(self) -> List[float]: |
|
|
cx, cy, w, h = self.kf.get_state() |
|
|
return [cx - w/2, cy - h/2, cx + w/2, cy + h/2] |
|
|
|
|
|
def predict(self): |
|
|
self.age += 1 |
|
|
self.time_since_update += 1 |
|
|
self.consecutive_misses += 1 |
|
|
self.kf.predict() |
|
|
|
|
|
def update(self, detection): |
|
|
x1, y1, x2, y2 = detection.bbox |
|
|
cx = (x1 + x2) / 2 |
|
|
cy = (y1 + y2) / 2 |
|
|
w = x2 - x1 |
|
|
h = y2 - y1 |
|
|
|
|
|
self.kf.update(np.array([cx, cy, w, h])) |
|
|
|
|
|
self.detections.append(detection) |
|
|
self.confidence = detection.confidence |
|
|
self.avg_confidence = 0.9 * self.avg_confidence + 0.1 * self.confidence |
|
|
|
|
|
self.hits += 1 |
|
|
self.time_since_update = 0 |
|
|
self.consecutive_misses = 0 |
|
|
|
|
|
if hasattr(detection, 'features') and detection.features is not None: |
|
|
self.features.append(detection.features) |
|
|
|
|
|
self.trajectory.append((cx, cy)) |
|
|
|
|
|
if self.state == 'tentative' and self.hits >= 3: |
|
|
self.state = 'confirmed' |
|
|
|
|
|
if len(self.detections) > 5: |
|
|
for old_det in self.detections[:-5]: |
|
|
if hasattr(old_det, 'image_crop'): |
|
|
old_det.image_crop = None |
|
|
self.detections = self.detections[-5:] |
|
|
|
|
|
def mark_missed(self): |
|
|
if self.state == 'confirmed': |
|
|
if self.consecutive_misses > 30 or self.time_since_update > 60: |
|
|
self.state = 'deleted' |
|
|
elif self.state == 'tentative': |
|
|
if self.consecutive_misses > 5: |
|
|
self.state = 'deleted' |
|
|
|
|
|
def get_feature(self) -> Optional[np.ndarray]: |
|
|
if not self.features: |
|
|
return None |
|
|
|
|
|
features_array = np.array(list(self.features)) |
|
|
weights = np.exp(np.linspace(-1, 0, len(features_array))) |
|
|
weights /= weights.sum() |
|
|
|
|
|
return np.average(features_array, axis=0, weights=weights) |
|
|
|
|
|
|
|
|
class DeepSORTTracker: |
|
|
def __init__(self, max_iou_distance: float = 0.7, max_age: int = 30, |
|
|
n_init: int = 3, use_appearance: bool = True): |
|
|
self.max_iou_distance = max_iou_distance |
|
|
self.max_age = max_age |
|
|
self.n_init = n_init |
|
|
self.use_appearance = use_appearance |
|
|
|
|
|
self.tracks: List[DeepSORTTrack] = [] |
|
|
self.gating_threshold_iou = 0.3 |
|
|
self.gating_threshold_appearance = 0.5 |
|
|
|
|
|
print(f"✅ DeepSORT Tracker initialized") |
|
|
print(f" Max IoU distance: {max_iou_distance}") |
|
|
print(f" Max age: {max_age}") |
|
|
|
|
|
def update(self, detections: List) -> List[DeepSORTTrack]: |
|
|
for track in self.tracks: |
|
|
track.predict() |
|
|
|
|
|
matches, unmatched_tracks, unmatched_detections = self._match( |
|
|
detections, self.tracks |
|
|
) |
|
|
|
|
|
for track_idx, det_idx in matches: |
|
|
self.tracks[track_idx].update(detections[det_idx]) |
|
|
|
|
|
for track_idx in unmatched_tracks: |
|
|
self.tracks[track_idx].mark_missed() |
|
|
|
|
|
for det_idx in unmatched_detections: |
|
|
self._initiate_track(detections[det_idx]) |
|
|
|
|
|
self.tracks = [t for t in self.tracks if t.state != 'deleted'] |
|
|
|
|
|
return [t for t in self.tracks if t.state == 'confirmed'] |
|
|
|
|
|
def _match(self, detections: List, tracks: List[DeepSORTTrack]) -> Tuple: |
|
|
if not tracks or not detections: |
|
|
return [], list(range(len(tracks))), list(range(len(detections))) |
|
|
|
|
|
confirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'confirmed'] |
|
|
unconfirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'tentative'] |
|
|
|
|
|
matches_a, unmatched_tracks_a, unmatched_detections = \ |
|
|
self._matching_cascade(detections, tracks, confirmed_tracks) |
|
|
|
|
|
iou_track_candidates = unconfirmed_tracks + unmatched_tracks_a |
|
|
remaining_detections = [detections[i] for i in unmatched_detections] |
|
|
|
|
|
matches_b, unmatched_tracks_b, unmatched_detections_b = \ |
|
|
self._match_iou(remaining_detections, tracks, iou_track_candidates) |
|
|
|
|
|
matches_b = [(t, unmatched_detections[d]) for t, d in matches_b] |
|
|
unmatched_detections = [unmatched_detections[i] for i in unmatched_detections_b] |
|
|
|
|
|
matches = matches_a + matches_b |
|
|
unmatched_tracks = unmatched_tracks_b |
|
|
|
|
|
return matches, unmatched_tracks, unmatched_detections |
|
|
|
|
|
def _matching_cascade(self, detections: List, tracks: List[DeepSORTTrack], |
|
|
track_indices: List[int]) -> Tuple: |
|
|
matches = [] |
|
|
unmatched_detections = list(range(len(detections))) |
|
|
|
|
|
for level in range(self.max_age): |
|
|
if len(unmatched_detections) == 0: |
|
|
break |
|
|
|
|
|
track_indices_l = [ |
|
|
k for k in track_indices |
|
|
if tracks[k].time_since_update == level |
|
|
] |
|
|
|
|
|
if len(track_indices_l) == 0: |
|
|
continue |
|
|
|
|
|
detection_subset = [detections[i] for i in unmatched_detections] |
|
|
|
|
|
matches_l, _, unmatched_subset_indices = self._match_features_and_iou( |
|
|
detection_subset, tracks, track_indices_l |
|
|
) |
|
|
|
|
|
matches_l_remapped = [(t, unmatched_detections[d]) for t, d in matches_l] |
|
|
matches.extend(matches_l_remapped) |
|
|
|
|
|
unmatched_detections = [unmatched_detections[i] for i in unmatched_subset_indices] |
|
|
|
|
|
unmatched_tracks = [ |
|
|
k for k in track_indices |
|
|
if k not in [t for t, _ in matches] |
|
|
] |
|
|
|
|
|
return matches, unmatched_tracks, unmatched_detections |
|
|
|
|
|
def _match_features_and_iou(self, detections: List, tracks: List[DeepSORTTrack], |
|
|
track_indices: List[int]) -> Tuple: |
|
|
if not track_indices or not detections: |
|
|
return [], track_indices, list(range(len(detections))) |
|
|
|
|
|
cost_matrix = np.zeros((len(track_indices), len(detections))) |
|
|
|
|
|
for i, track_idx in enumerate(track_indices): |
|
|
track = tracks[track_idx] |
|
|
track_feature = track.get_feature() |
|
|
|
|
|
for j, detection in enumerate(detections): |
|
|
iou_cost = 1 - self._calculate_iou(track.bbox, detection.bbox) |
|
|
|
|
|
if self.use_appearance and track_feature is not None: |
|
|
if hasattr(detection, 'features') and detection.features is not None: |
|
|
appearance_cost = cosine(track_feature, detection.features) |
|
|
cost_matrix[i, j] = 0.5 * iou_cost + 0.5 * appearance_cost |
|
|
else: |
|
|
cost_matrix[i, j] = iou_cost |
|
|
else: |
|
|
cost_matrix[i, j] = iou_cost |
|
|
|
|
|
row_ind, col_ind = linear_sum_assignment(cost_matrix) |
|
|
|
|
|
matches = [] |
|
|
unmatched_tracks = list(range(len(track_indices))) |
|
|
unmatched_detections = list(range(len(detections))) |
|
|
|
|
|
for r, c in zip(row_ind, col_ind): |
|
|
if cost_matrix[r, c] < 0.7: |
|
|
matches.append((track_indices[r], c)) |
|
|
unmatched_tracks.remove(r) |
|
|
unmatched_detections.remove(c) |
|
|
|
|
|
unmatched_tracks = [track_indices[i] for i in unmatched_tracks] |
|
|
|
|
|
return matches, unmatched_tracks, unmatched_detections |
|
|
|
|
|
def _match_iou(self, detections: List, tracks: List[DeepSORTTrack], |
|
|
track_indices: List[int]) -> Tuple: |
|
|
if not track_indices or not detections: |
|
|
return [], track_indices, list(range(len(detections))) |
|
|
|
|
|
cost_matrix = np.zeros((len(track_indices), len(detections))) |
|
|
|
|
|
for i, track_idx in enumerate(track_indices): |
|
|
for j, detection in enumerate(detections): |
|
|
cost_matrix[i, j] = 1 - self._calculate_iou( |
|
|
tracks[track_idx].bbox, detection.bbox |
|
|
) |
|
|
|
|
|
row_ind, col_ind = linear_sum_assignment(cost_matrix) |
|
|
|
|
|
matches = [] |
|
|
unmatched_tracks = list(range(len(track_indices))) |
|
|
unmatched_detections = list(range(len(detections))) |
|
|
|
|
|
for r, c in zip(row_ind, col_ind): |
|
|
if cost_matrix[r, c] < self.max_iou_distance: |
|
|
matches.append((track_indices[r], c)) |
|
|
unmatched_tracks.remove(r) |
|
|
unmatched_detections.remove(c) |
|
|
|
|
|
unmatched_tracks = [track_indices[i] for i in unmatched_tracks] |
|
|
|
|
|
return matches, unmatched_tracks, unmatched_detections |
|
|
|
|
|
def _calculate_iou(self, bbox1: List[float], bbox2: List[float]) -> float: |
|
|
x1_1, y1_1, x2_1, y2_1 = bbox1 |
|
|
x1_2, y1_2, x2_2, y2_2 = bbox2 |
|
|
|
|
|
xi1 = max(x1_1, x1_2) |
|
|
yi1 = max(y1_1, y1_2) |
|
|
xi2 = min(x2_1, x2_2) |
|
|
yi2 = min(y2_1, y2_2) |
|
|
|
|
|
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1) |
|
|
|
|
|
box1_area = (x2_1 - x1_1) * (y2_1 - y1_1) |
|
|
box2_area = (x2_2 - x1_2) * (y2_2 - y1_2) |
|
|
|
|
|
union_area = box1_area + box2_area - inter_area |
|
|
|
|
|
if union_area == 0: |
|
|
return 0.0 |
|
|
|
|
|
return inter_area / union_area |
|
|
|
|
|
def _initiate_track(self, detection): |
|
|
track = DeepSORTTrack(detection) |
|
|
self.tracks.append(track) |
|
|
|
|
|
def reset(self): |
|
|
self.tracks.clear() |
|
|
DeepSORTTrack._id_counter = 1 |
|
|
print("Tracker reset") |