mustafa2ak's picture
Create tracking.py
3bc696d verified
"""
tracking.py - DeepSORT Multi-Dog Tracking
"""
import numpy as np
from scipy.optimize import linear_sum_assignment
from scipy.spatial.distance import cosine
from collections import deque
from typing import List, Optional, Tuple
class KalmanFilter:
def __init__(self):
self.F = np.eye(8)
for i in range(4):
self.F[i, i+4] = 1
self.H = np.eye(4, 8)
self.Q = np.eye(8)
self.Q[4:, 4:] *= 0.01
self.R = np.eye(4) * 10
self.P = np.eye(8) * 1000
self.x = np.zeros(8)
def initiate(self, measurement: np.ndarray):
self.x[:4] = measurement
self.x[4:] = 0
def predict(self):
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + self.Q
return self.x[:4]
def update(self, measurement: np.ndarray):
y = measurement - self.H @ self.x
S = self.H @ self.P @ self.H.T + self.R
K = self.P @ self.H.T @ np.linalg.inv(S)
self.x = self.x + K @ y
self.P = (np.eye(8) - K @ self.H) @ self.P
return self.x[:4]
def get_state(self) -> np.ndarray:
return self.x[:4]
class DeepSORTTrack:
_id_counter = 1
def __init__(self, detection, track_id: Optional[int] = None):
self.track_id = track_id if track_id else DeepSORTTrack._id_counter
DeepSORTTrack._id_counter += 1
x1, y1, x2, y2 = detection.bbox
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
self.kf = KalmanFilter()
self.kf.initiate(np.array([cx, cy, w, h]))
self.detections = [detection]
self.confidence = detection.confidence
self.hits = 1
self.age = 1
self.time_since_update = 0
self.state = 'tentative'
self.features = deque(maxlen=100)
if hasattr(detection, 'features') and detection.features is not None:
self.features.append(detection.features)
self.trajectory = deque(maxlen=30)
self.trajectory.append((cx, cy))
self.avg_confidence = self.confidence
self.consecutive_misses = 0
@property
def bbox(self) -> List[float]:
cx, cy, w, h = self.kf.get_state()
return [cx - w/2, cy - h/2, cx + w/2, cy + h/2]
def predict(self):
self.age += 1
self.time_since_update += 1
self.consecutive_misses += 1
self.kf.predict()
def update(self, detection):
x1, y1, x2, y2 = detection.bbox
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
w = x2 - x1
h = y2 - y1
self.kf.update(np.array([cx, cy, w, h]))
self.detections.append(detection)
self.confidence = detection.confidence
self.avg_confidence = 0.9 * self.avg_confidence + 0.1 * self.confidence
self.hits += 1
self.time_since_update = 0
self.consecutive_misses = 0
if hasattr(detection, 'features') and detection.features is not None:
self.features.append(detection.features)
self.trajectory.append((cx, cy))
if self.state == 'tentative' and self.hits >= 3:
self.state = 'confirmed'
if len(self.detections) > 5:
for old_det in self.detections[:-5]:
if hasattr(old_det, 'image_crop'):
old_det.image_crop = None
self.detections = self.detections[-5:]
def mark_missed(self):
if self.state == 'confirmed':
if self.consecutive_misses > 30 or self.time_since_update > 60:
self.state = 'deleted'
elif self.state == 'tentative':
if self.consecutive_misses > 5:
self.state = 'deleted'
def get_feature(self) -> Optional[np.ndarray]:
if not self.features:
return None
features_array = np.array(list(self.features))
weights = np.exp(np.linspace(-1, 0, len(features_array)))
weights /= weights.sum()
return np.average(features_array, axis=0, weights=weights)
class DeepSORTTracker:
def __init__(self, max_iou_distance: float = 0.7, max_age: int = 30,
n_init: int = 3, use_appearance: bool = True):
self.max_iou_distance = max_iou_distance
self.max_age = max_age
self.n_init = n_init
self.use_appearance = use_appearance
self.tracks: List[DeepSORTTrack] = []
self.gating_threshold_iou = 0.3
self.gating_threshold_appearance = 0.5
print(f"✅ DeepSORT Tracker initialized")
print(f" Max IoU distance: {max_iou_distance}")
print(f" Max age: {max_age}")
def update(self, detections: List) -> List[DeepSORTTrack]:
for track in self.tracks:
track.predict()
matches, unmatched_tracks, unmatched_detections = self._match(
detections, self.tracks
)
for track_idx, det_idx in matches:
self.tracks[track_idx].update(detections[det_idx])
for track_idx in unmatched_tracks:
self.tracks[track_idx].mark_missed()
for det_idx in unmatched_detections:
self._initiate_track(detections[det_idx])
self.tracks = [t for t in self.tracks if t.state != 'deleted']
return [t for t in self.tracks if t.state == 'confirmed']
def _match(self, detections: List, tracks: List[DeepSORTTrack]) -> Tuple:
if not tracks or not detections:
return [], list(range(len(tracks))), list(range(len(detections)))
confirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'confirmed']
unconfirmed_tracks = [i for i, t in enumerate(tracks) if t.state == 'tentative']
matches_a, unmatched_tracks_a, unmatched_detections = \
self._matching_cascade(detections, tracks, confirmed_tracks)
iou_track_candidates = unconfirmed_tracks + unmatched_tracks_a
remaining_detections = [detections[i] for i in unmatched_detections]
matches_b, unmatched_tracks_b, unmatched_detections_b = \
self._match_iou(remaining_detections, tracks, iou_track_candidates)
matches_b = [(t, unmatched_detections[d]) for t, d in matches_b]
unmatched_detections = [unmatched_detections[i] for i in unmatched_detections_b]
matches = matches_a + matches_b
unmatched_tracks = unmatched_tracks_b
return matches, unmatched_tracks, unmatched_detections
def _matching_cascade(self, detections: List, tracks: List[DeepSORTTrack],
track_indices: List[int]) -> Tuple:
matches = []
unmatched_detections = list(range(len(detections)))
for level in range(self.max_age):
if len(unmatched_detections) == 0:
break
track_indices_l = [
k for k in track_indices
if tracks[k].time_since_update == level
]
if len(track_indices_l) == 0:
continue
detection_subset = [detections[i] for i in unmatched_detections]
matches_l, _, unmatched_subset_indices = self._match_features_and_iou(
detection_subset, tracks, track_indices_l
)
matches_l_remapped = [(t, unmatched_detections[d]) for t, d in matches_l]
matches.extend(matches_l_remapped)
unmatched_detections = [unmatched_detections[i] for i in unmatched_subset_indices]
unmatched_tracks = [
k for k in track_indices
if k not in [t for t, _ in matches]
]
return matches, unmatched_tracks, unmatched_detections
def _match_features_and_iou(self, detections: List, tracks: List[DeepSORTTrack],
track_indices: List[int]) -> Tuple:
if not track_indices or not detections:
return [], track_indices, list(range(len(detections)))
cost_matrix = np.zeros((len(track_indices), len(detections)))
for i, track_idx in enumerate(track_indices):
track = tracks[track_idx]
track_feature = track.get_feature()
for j, detection in enumerate(detections):
iou_cost = 1 - self._calculate_iou(track.bbox, detection.bbox)
if self.use_appearance and track_feature is not None:
if hasattr(detection, 'features') and detection.features is not None:
appearance_cost = cosine(track_feature, detection.features)
cost_matrix[i, j] = 0.5 * iou_cost + 0.5 * appearance_cost
else:
cost_matrix[i, j] = iou_cost
else:
cost_matrix[i, j] = iou_cost
row_ind, col_ind = linear_sum_assignment(cost_matrix)
matches = []
unmatched_tracks = list(range(len(track_indices)))
unmatched_detections = list(range(len(detections)))
for r, c in zip(row_ind, col_ind):
if cost_matrix[r, c] < 0.7:
matches.append((track_indices[r], c))
unmatched_tracks.remove(r)
unmatched_detections.remove(c)
unmatched_tracks = [track_indices[i] for i in unmatched_tracks]
return matches, unmatched_tracks, unmatched_detections
def _match_iou(self, detections: List, tracks: List[DeepSORTTrack],
track_indices: List[int]) -> Tuple:
if not track_indices or not detections:
return [], track_indices, list(range(len(detections)))
cost_matrix = np.zeros((len(track_indices), len(detections)))
for i, track_idx in enumerate(track_indices):
for j, detection in enumerate(detections):
cost_matrix[i, j] = 1 - self._calculate_iou(
tracks[track_idx].bbox, detection.bbox
)
row_ind, col_ind = linear_sum_assignment(cost_matrix)
matches = []
unmatched_tracks = list(range(len(track_indices)))
unmatched_detections = list(range(len(detections)))
for r, c in zip(row_ind, col_ind):
if cost_matrix[r, c] < self.max_iou_distance:
matches.append((track_indices[r], c))
unmatched_tracks.remove(r)
unmatched_detections.remove(c)
unmatched_tracks = [track_indices[i] for i in unmatched_tracks]
return matches, unmatched_tracks, unmatched_detections
def _calculate_iou(self, bbox1: List[float], bbox2: List[float]) -> float:
x1_1, y1_1, x2_1, y2_1 = bbox1
x1_2, y1_2, x2_2, y2_2 = bbox2
xi1 = max(x1_1, x1_2)
yi1 = max(y1_1, y1_2)
xi2 = min(x2_1, x2_2)
yi2 = min(y2_1, y2_2)
inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
union_area = box1_area + box2_area - inter_area
if union_area == 0:
return 0.0
return inter_area / union_area
def _initiate_track(self, detection):
track = DeepSORTTrack(detection)
self.tracks.append(track)
def reset(self):
self.tracks.clear()
DeepSORTTrack._id_counter = 1
print("Tracker reset")