diff --git a/boxmot/motion/kalman_filters/base_kalman_filter.py b/boxmot/motion/kalman_filters/aabb/base_kalman_filter.py similarity index 100% rename from boxmot/motion/kalman_filters/base_kalman_filter.py rename to boxmot/motion/kalman_filters/aabb/base_kalman_filter.py diff --git a/boxmot/motion/kalman_filters/xyah_kf.py b/boxmot/motion/kalman_filters/aabb/xyah_kf.py similarity index 97% rename from boxmot/motion/kalman_filters/xyah_kf.py rename to boxmot/motion/kalman_filters/aabb/xyah_kf.py index 7db546aeca..63100669fd 100644 --- a/boxmot/motion/kalman_filters/xyah_kf.py +++ b/boxmot/motion/kalman_filters/aabb/xyah_kf.py @@ -1,6 +1,6 @@ import numpy as np from typing import Tuple -from boxmot.motion.kalman_filters.base_kalman_filter import BaseKalmanFilter +from boxmot.motion.kalman_filters.aabb.base_kalman_filter import BaseKalmanFilter class KalmanFilterXYAH(BaseKalmanFilter): diff --git a/boxmot/motion/kalman_filters/xysr_kf.py b/boxmot/motion/kalman_filters/aabb/xysr_kf.py similarity index 100% rename from boxmot/motion/kalman_filters/xysr_kf.py rename to boxmot/motion/kalman_filters/aabb/xysr_kf.py diff --git a/boxmot/motion/kalman_filters/xywh_kf.py b/boxmot/motion/kalman_filters/aabb/xywh_kf.py similarity index 96% rename from boxmot/motion/kalman_filters/xywh_kf.py rename to boxmot/motion/kalman_filters/aabb/xywh_kf.py index c0af94dda4..8c933ca45d 100644 --- a/boxmot/motion/kalman_filters/xywh_kf.py +++ b/boxmot/motion/kalman_filters/aabb/xywh_kf.py @@ -1,6 +1,6 @@ import numpy as np from typing import Tuple -from boxmot.motion.kalman_filters.base_kalman_filter import BaseKalmanFilter +from boxmot.motion.kalman_filters.aabb.base_kalman_filter import BaseKalmanFilter class KalmanFilterXYWH(BaseKalmanFilter): diff --git a/boxmot/motion/kalman_filters/obb/xywha_kf.py b/boxmot/motion/kalman_filters/obb/xywha_kf.py new file mode 100644 index 0000000000..c42f0adc6e --- /dev/null +++ b/boxmot/motion/kalman_filters/obb/xywha_kf.py @@ -0,0 +1,637 @@ +from __future__ import absolute_import, division + +from copy import deepcopy +from math import log, exp, sqrt, pi +import sys +import numpy as np +from numpy import dot, zeros, eye, isscalar +import numpy.linalg as linalg +from filterpy.stats import logpdf +from filterpy.common import pretty_str, reshape_z +from collections import deque + + +def speed_direction_obb(bbox1, bbox2): + cx1, cy1 = bbox1[0], bbox1[1] + cx2, cy2 = bbox2[0], bbox2[1] + speed = np.array([cy2 - cy1, cx2 - cx1]) + norm = np.sqrt((cy2 - cy1) ** 2 + (cx2 - cx1) ** 2) + 1e-6 + return speed / norm + + +class KalmanBoxTrackerOBB(object): + """ + This class represents the internal state of individual tracked objects observed as oriented bbox. + """ + + count = 0 + + def __init__(self, bbox, cls, det_ind, delta_t=3, max_obs=50, Q_xy_scaling = 0.01, Q_a_scaling = 0.01): + """ + Initialises a tracker using initial bounding box. + """ + # define constant velocity model + self.det_ind = det_ind + + self.Q_xy_scaling = Q_xy_scaling + self.Q_a_scaling = Q_a_scaling + + self.kf = KalmanFilterXYWHA(dim_x=10, dim_z=5, max_obs=max_obs) + self.kf.F = np.array( + [ + [1, 0, 0, 0, 0, 1, 0, 0, 0, 0], # cx = cx + vx + [0, 1, 0, 0, 0, 0, 1, 0, 0, 0], # cy = cy + vy + [0, 0, 1, 0, 0, 0, 0, 1, 0, 0], # w = w + vw + [0, 0, 0, 1, 0, 0, 0, 0, 1, 0], # h = h + vh + [0, 0, 0, 0, 1, 0, 0, 0, 0, 1], # a = a + va + [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 1, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 1, 0], + [0, 0, 0, 0, 0, 0, 0, 0, 0, 1] + ] + ) + self.kf.H = np.array( + [ + [1, 0, 0, 0, 0, 0, 0, 0, 0 ,0], # cx + [0, 1, 0, 0, 0, 0, 0, 0, 0, 0], # cy + [0, 0, 1, 0, 0, 0, 0, 0, 0, 0], # w + [0, 0, 0, 1, 0, 0, 0, 0, 0, 0], # h + [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], # angle + ] + ) + + self.kf.R[2:, 2:] *= 10.0 + self.kf.P[ + 5:, 5: + ] *= 1000.0 # give high uncertainty to the unobservable initial velocities + self.kf.P *= 10.0 + + self.kf.Q[5:7, 5:7] *= self.Q_xy_scaling + self.kf.Q[-1, -1] *= self.Q_a_scaling + + self.kf.x[:5] = bbox[:5].reshape((5, 1)) # x, y, w, h, angle (dont take confidence score) + self.time_since_update = 0 + self.id = KalmanBoxTrackerOBB.count + KalmanBoxTrackerOBB.count += 1 + self.max_obs = max_obs + self.history = deque([], maxlen=self.max_obs) + self.hits = 0 + self.hit_streak = 0 + self.age = 0 + self.conf = bbox[-1] + self.cls = cls + """ + NOTE: [-1,-1,-1,-1,-1] is a compromising placeholder for non-observation status, the same for the return of + function k_previous_obs. It is ugly and I do not like it. But to support generate observation array in a + fast and unified way, which you would see below k_observations = np.array([k_previous_obs(...]]), + let's bear it for now. + """ + self.last_observation = np.array([-1, -1, -1, -1, -1, -1]) #WARNING : -1 is a valid angle value + self.observations = dict() + self.history_observations = deque([], maxlen=self.max_obs) + self.velocity = None + self.delta_t = delta_t + + def update(self, bbox, cls, det_ind): + """ + Updates the state vector with observed bbox. + """ + self.det_ind = det_ind + if bbox is not None: + self.conf = bbox[-1] + self.cls = cls + if self.last_observation.sum() >= 0: # no previous observation + previous_box = None + for i in range(self.delta_t): + dt = self.delta_t - i + if self.age - dt in self.observations: + previous_box = self.observations[self.age - dt] + break + if previous_box is None: + previous_box = self.last_observation + """ + Estimate the track speed direction with observations \Delta t steps away + """ + self.velocity = speed_direction_obb(previous_box, bbox) + + """ + Insert new observations. This is a ugly way to maintain both self.observations + and self.history_observations. Bear it for the moment. + """ + self.last_observation = bbox + self.observations[self.age] = bbox + self.history_observations.append(bbox) + + self.time_since_update = 0 + self.hits += 1 + self.hit_streak += 1 + self.kf.update(bbox[:5].reshape((5, 1))) # x, y, w, h, angle as column vector (dont take confidence score) + else: + self.kf.update(bbox) + + def predict(self): + """ + Advances the state vector and returns the predicted bounding box estimate. + """ + if (self.kf.x[7] + self.kf.x[2]) <= 0: # Negative width + self.kf.x[7] *= 0.0 + if (self.kf.x[8] + self.kf.x[3]) <= 0: # Negative Height + self.kf.x[8] *= 0.0 + self.kf.predict() + self.age += 1 + if self.time_since_update > 0: + self.hit_streak = 0 + self.time_since_update += 1 + self.history.append(self.kf.x[0:5].reshape((1, 5))) + return self.history[-1] + + def get_state(self): + """ + Returns the current bounding box estimate. + """ + return self.kf.x[0:5].reshape((1, 5)) + + +class KalmanFilterXYWHA(object): + """ + Implements a Kalman Filter specialized for tracking Oriented Bounding Boxes. + The default state vector is [x, y, w, h, a]^T: + + - (x, y): center of the bounding box + - w, h : width and height of the bounding box + - a : orientation angle (radians) + + This filter supports "freeze" and "unfreeze" methods to handle missing + observations (no measurements) or out-of-sequence (OOS) smoothing logic. + """ + + def __init__(self, dim_x, dim_z, dim_u=0, max_obs=50): + """ + Parameters + ---------- + dim_x : int + Dimensionality of the state vector. Typically 5 if [x, y, w, h, a]. + dim_z : int + Dimensionality of the measurement vector. Typically also 5. + dim_u : int + Dimensionality of the control vector. Default is 0 (no control). + max_obs : int + Maximum number of stored observations for freeze/unfreeze logic. + """ + if dim_x < 1: + raise ValueError('dim_x must be 1 or greater') + if dim_z < 1: + raise ValueError('dim_z must be 1 or greater') + if dim_u < 0: + raise ValueError('dim_u must be 0 or greater') + + self.dim_x = dim_x + self.dim_z = dim_z + self.dim_u = dim_u + + # State: x is a (dim_x, 1) column vector + self.x = zeros((dim_x, 1)) # state + self.P = eye(dim_x) # covariance of the state + self.Q = eye(dim_x) # process noise covariance + self.B = None # control transition matrix + self.F = eye(dim_x) # state transition matrix + self.H = zeros((dim_z, dim_x)) # measurement function + self.R = eye(dim_z) # measurement noise covariance + self._alpha_sq = 1. # fading memory control + self.M = np.zeros((dim_x, dim_z)) # cross correlation (rarely used) + self.z = np.array([[None]*self.dim_z]).T + + # Gains and residuals computed during update + self.K = np.zeros((dim_x, dim_z)) # Kalman gain + self.y = zeros((dim_z, 1)) # residual + self.S = np.zeros((dim_z, dim_z)) # system uncertainty (innovation covariance) + self.SI = np.zeros((dim_z, dim_z)) # inverse system uncertainty + + # Identity matrix (used in update) + self._I = np.eye(dim_x) + + # Save prior (after predict) and posterior (after update) + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + # Internal log-likelihood computations + self._log_likelihood = log(sys.float_info.min) + self._likelihood = sys.float_info.min + self._mahalanobis = None + + # Store recent observations for freeze/unfreeze logic + self.max_obs = max_obs + self.history_obs = deque([], maxlen=self.max_obs) + + # For potential smoothing usage + self.inv = np.linalg.inv + self.attr_saved = None + self.observed = False + self.last_measurement = None + + def apply_affine_correction(self, m, t): + """ + Apply an affine transform to the current state and covariance. + This is useful if the image or reference frame is warped. + + Parameters + ---------- + m : np.array(2x2) + Affine transform (rotation/scale) to be applied to x,y and maybe w,h + t : np.array(2x1) + Translation vector to be added after applying the transform. + + TODO: adapt for oriented bounding box (especially if the orientation + is also changed by the transform). + """ + # For demonstration, we apply the transform to [x, y] and [x_dot, y_dot], etc. + # But for your OBB case, consider carefully how w, h, and angle should transform. + + # Example basic approach: transform x, y + self.x[:2] = m @ self.x[:2] + t + + # Possibly transform w, h. But if w,h are not purely length in x,y directions, + # you might have to do something more elaborate. For demonstration: + self.x[2:4] = np.abs(m @ self.x[2:4]) # naive approach: scale w,h + + # P block for positions: + self.P[:2, :2] = m @ self.P[:2, :2] @ m.T + # P block for widths/heights (again naive if we treat w,h as x,y scale): + self.P[2:4, 2:4] = m @ self.P[2:4, 2:4] @ m.T + + # If angle is included, consider adjusting it or leaving it if the transform + # is purely in the plane with no orientation offset. Could do angle wrap here. + + # If we froze the filter, also update the frozen state + if not self.observed and self.attr_saved is not None: + self.attr_saved["x"][:2] = m @ self.attr_saved["x"][:2] + t + self.attr_saved["x"][2:4] = np.abs(m @ self.attr_saved["x"][2:4]) + self.attr_saved["P"][:2, :2] = m @ self.attr_saved["P"][:2, :2] @ m.T + self.attr_saved["P"][2:4, 2:4] = m @ self.attr_saved["P"][2:4, 2:4] @ m.T + + # last_measurement might need updating similarly + self.attr_saved["last_measurement"][:2] = ( + m @ self.attr_saved["last_measurement"][:2] + t + ) + + def predict(self, u=None, B=None, F=None, Q=None): + """ + Predict next state (prior) using the state transition matrix F + and process noise Q. + + Parameters + ---------- + u : np.array(dim_u, 1), optional + Control vector. If not provided, assumed 0. + B : np.array(dim_x, dim_u), optional + Control transition matrix. If None, self.B is used. + F : np.array(dim_x, dim_x), optional + State transition matrix. If None, self.F is used. + Q : np.array(dim_x, dim_x) or scalar, optional + Process noise matrix. If None, self.Q is used. If scalar, + Q = scalar * I. + """ + if B is None: + B = self.B + if F is None: + F = self.F + if Q is None: + Q = self.Q + elif isscalar(Q): + Q = eye(self.dim_x) * Q + + # x = F x + B u + if B is not None and u is not None: + self.x = dot(F, self.x) + dot(B, u) + else: + self.x = dot(F, self.x) + + # P = F P F^T + Q + self.P = self._alpha_sq * dot(dot(F, self.P), F.T) + Q + + # Save the prior + self.x_prior = self.x.copy() + self.P_prior = self.P.copy() + + # ---- New: Enforce bounding box and angle constraints (if dim_x >= 5) ---- + if self.dim_x >= 5: + # clamp w, h > 0 + self.x[2, 0] = max(self.x[2, 0], 1e-4) + self.x[3, 0] = max(self.x[3, 0], 1e-4) + + # wrap angle to [-pi, pi] + self.x[4, 0] = (self.x[4, 0] + pi) % (2 * pi) - pi + + def freeze(self): + """ + Save the current filter parameters in attr_saved so that if the next + observation is missing, we can revert to these parameters for + out-of-sequence or offline smoothing. + """ + self.attr_saved = deepcopy(self.__dict__) + + def unfreeze(self): + """ + Revert the filter parameters to the saved (frozen) state, then "replay" + the missing measurements from history to smooth the intermediate states. + """ + if self.attr_saved is not None: + new_history = deepcopy(list(self.history_obs)) + # revert to the frozen attributes + self.__dict__ = self.attr_saved + # remove last measurement from history (since we'll re-apply them) + self.history_obs = deque(list(self.history_obs)[:-1], maxlen=self.max_obs) + + # naive approach: re-update states between the two known measurements + occur = [int(d is None) for d in new_history] + indices = np.where(np.array(occur) == 0)[0] + if len(indices) < 2: + return # not enough measurements to replay + + index1, index2 = indices[-2], indices[-1] + box1, box2 = new_history[index1], new_history[index2] + x1, y1, w1, h1, a1 = box1 + x2, y2, w2, h2, a2 = box2 + time_gap = index2 - index1 + dx, dy = (x2 - x1) / time_gap, (y2 - y1) / time_gap + dw, dh = (w2 - w1) / time_gap, (h2 - h1) / time_gap + da = (a2 - a1) / time_gap + + for i in range(index2 - index1): + x_ = x1 + (i + 1) * dx + y_ = y1 + (i + 1) * dy + w_ = w1 + (i + 1) * dw + h_ = h1 + (i + 1) * dh + a_ = a1 + (i + 1) * da + + new_box = np.array([x_, y_, w_, h_, a_]).reshape((5, 1)) + self.update(new_box) + if i != (index2 - index1 - 1): + self.predict() + self.history_obs.pop() + self.history_obs.pop() + + def update(self, z, R=None, H=None): + """ + Incorporate a new measurement z into the state estimate. + + Parameters + ---------- + z : np.array(dim_z, 1) + Measurement vector. If None, skip update step (missing measurement). + R : np.array(dim_z, dim_z), scalar, or None + Measurement noise matrix. If None, self.R is used. + H : np.array(dim_z, dim_x) or None + Measurement function. If None, self.H is used. + """ + # reset log-likelihood computations + self._log_likelihood = None + self._likelihood = None + self._mahalanobis = None + + # Save the observation (even if None) + self.history_obs.append(z) + + # If measurement is missing + if z is None: + if self.observed: + # freeze the current parameters for future potential smoothing + self.last_measurement = self.history_obs[-2] + self.freeze() + self.observed = False + self.z = np.array([[None] * self.dim_z]).T + self.x_post = self.x.copy() + self.P_post = self.P.copy() + self.y = zeros((self.dim_z, 1)) + return + + # If we haven't observed for a while, revert to the frozen state + if not self.observed: + self.unfreeze() + self.observed = True + + if R is None: + R = self.R + elif isscalar(R): + R = eye(self.dim_z) * R + + if H is None: + H = self.H + z = reshape_z(z, self.dim_z, self.x.ndim) + + # y = z - Hx (residual) + self.y = z - dot(H, self.x) + + PHT = dot(self.P, H.T) + self.S = dot(H, PHT) + R + self.SI = self.inv(self.S) + + # K = PHT * SI + self.K = PHT.dot(self.SI) + + # Optional gating (commented out): + # mahal_dist = float(self.y.T @ self.SI @ self.y) + # gating_threshold = 9.21 # e.g., chi-square with 2-5 dof + # if mahal_dist > gating_threshold: + # # Outlier measurement, skip or handle differently + # return + + # x = x + K y + self.x = self.x + dot(self.K, self.y) + + # P = (I - K H) P (I - K H)^T + K R K^T + I_KH = self._I - dot(self.K, H) + self.P = dot(dot(I_KH, self.P), I_KH.T) + dot(dot(self.K, R), self.K.T) + + # Save measurement and posterior + self.z = deepcopy(z) + self.x_post = self.x.copy() + self.P_post = self.P.copy() + + # ---- New: Enforce bounding box and angle constraints (if dim_x >= 5) ---- + if self.dim_x >= 5: + # clamp w, h > 0 + self.x[2, 0] = max(self.x[2, 0], 1e-4) + self.x[3, 0] = max(self.x[3, 0], 1e-4) + + # wrap angle to [-pi, pi] + self.x[4, 0] = (self.x[4, 0] + pi) % (2 * pi) - pi + + def update_steadystate(self, z, H=None): + """ + Update using precomputed steady-state gain (K_steady_state) and + steady-state covariance P. Only x is updated here. + P remains unchanged. + """ + if z is None: + self.history_obs.append(z) + return + + if H is None: + H = self.H + + # residual + self.y = z - dot(H, self.x) + + # x = x + K_steady_state * y + self.x = self.x + dot(self.K_steady_state, self.y) + + # Save measurement and posterior + self.z = deepcopy(z) + self.x_post = self.x.copy() + + self.history_obs.append(z) + + def log_likelihood_of(self, z=None): + """ + Compute the log-likelihood of measurement z given the current + measurement prediction. This uses logpdf from filterpy.stats. + """ + if z is None: + z = self.z + return logpdf(z, dot(self.H, self.x), self.S) + + def likelihood_of(self, z=None): + """ + Compute the likelihood (probability) of measurement z given + the current measurement prediction. + """ + return exp(self.log_likelihood_of(z)) + + @property + def log_likelihood(self): + """ log-likelihood of the last measurement. """ + return self._log_likelihood + + @property + def likelihood(self): + """ likelihood of the last measurement. """ + return self._likelihood + + +def batch_filter(x, P, zs, Fs, Qs, Hs, Rs, Bs=None, us=None, + update_first=False, saver=None): + """ + Batch processes a sequence of measurements. + + Parameters + ---------- + x : np.array(dim_x, 1) + Initial state. + P : np.array(dim_x, dim_x) + Initial covariance. + zs : list-like + List of measurements at each time step (None for missing). + Fs : list-like + State transition matrices for each step. + Qs : list-like + Process noise covariances for each step. + Hs : list-like + Measurement matrices for each step. + Rs : list-like + Measurement noise covariances for each step. + Bs : list-like, optional + Control transition matrices for each step. + us : list-like, optional + Control vectors for each step. + update_first : bool + If True, apply update->predict. Otherwise predict->update. + saver : filterpy.common.Saver, optional + If provided, saver.save() is called at each step. + + Returns + ------- + means : np.array((n,dim_x,1)) + covariances : np.array((n,dim_x,dim_x)) + means_p : np.array((n,dim_x,1)) + Predictions after each step + covariances_p : np.array((n,dim_x,dim_x)) + Covariances after prediction each step + """ + n = np.size(zs, 0) + dim_x = x.shape[0] + + # Arrays to store results + if x.ndim == 1: + means = np.zeros((n, dim_x)) + means_p = np.zeros((n, dim_x)) + else: + means = np.zeros((n, dim_x, 1)) + means_p = np.zeros((n, dim_x, 1)) + + covariances = np.zeros((n, dim_x, dim_x)) + covariances_p = np.zeros((n, dim_x, dim_x)) + + if us is None: + us = [0.0] * n + Bs = [0.0] * n + + # Procedural version of predict->update or update->predict + for i, (z, F, Q, H, R, B, u) in enumerate(zip(zs, Fs, Qs, Hs, Rs, Bs, us)): + + if update_first: + # Update step + x, P = update(x, P, z, R=R, H=H) + means[i, :] = x + covariances[i, :, :] = P + + # Predict step + x, P = predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i, :] = x + covariances_p[i, :, :] = P + + else: + # Predict step + x, P = predict(x, P, u=u, B=B, F=F, Q=Q) + means_p[i, :] = x + covariances_p[i, :, :] = P + + # Update step + x, P = update(x, P, z, R=R, H=H) + means[i, :] = x + covariances[i, :, :] = P + + if saver is not None: + saver.save() + + return (means, covariances, means_p, covariances_p) + + +def update(x, P, z, R, H): + """ + Procedural form of the update step of the Kalman Filter. + """ + if z is None: + return x, P + + # y = z - Hx + y = z - dot(H, x) + PHT = dot(P, H.T) + S = dot(H, PHT) + R + SI = linalg.inv(S) + K = dot(PHT, SI) + + # x = x + Ky + x = x + dot(K, y) + + # P = (I - KH)P(I - KH)' + KRK' + I_KH = np.eye(x.shape[0]) - dot(K, H) + P = dot(dot(I_KH, P), I_KH.T) + dot(dot(K, R), K.T) + + return x, P + + +def predict(x, P, F, Q, B=None, u=None): + """ + Procedural form of the predict step of the Kalman Filter. + """ + if B is not None and u is not None: + x = dot(F, x) + dot(B, u) + else: + x = dot(F, x) + + P = dot(dot(F, P), F.T) + Q + return x, P diff --git a/boxmot/trackers/basetracker.py b/boxmot/trackers/basetracker.py index c56ac0f669..88772e3350 100644 --- a/boxmot/trackers/basetracker.py +++ b/boxmot/trackers/basetracker.py @@ -17,7 +17,8 @@ def __init__( max_obs: int = 50, nr_classes: int = 80, per_class: bool = False, - asso_func: str = 'iou' + asso_func: str = 'iou', + is_obb: bool = False ): """ Initialize the BaseTracker object with detection threshold, maximum age, minimum hits, @@ -41,12 +42,14 @@ def __init__( self.nr_classes = nr_classes self.iou_threshold = iou_threshold self.last_emb_size = None - self.asso_func_name = asso_func - + self.asso_func_name = asso_func+"_obb" if is_obb else asso_func + self.is_obb = is_obb + self.frame_count = 0 self.active_tracks = [] # This might be handled differently in derived classes self.per_class_active_tracks = None self._first_frame_processed = False # Flag to track if the first frame has been processed + self._first_dets_processed = False # Initialize per-class active tracks if self.per_class: @@ -97,7 +100,7 @@ def get_class_dets_n_embs(self, dets, embs, cls_id): return class_dets, class_embs @staticmethod - def on_first_frame_setup(method): + def setup_decorator(method): """ Decorator to perform setup on the first frame only. This ensures that initialization tasks (like setting the association function) only @@ -105,6 +108,17 @@ def on_first_frame_setup(method): """ def wrapper(self, *args, **kwargs): # If setup hasn't been done yet, perform it + # Even if dets is empty (e.g., shape (0, 7)), this check will still pass if it's Nx7 + if not self._first_dets_processed: + dets = args[0] + if dets is not None: + if dets.ndim == 2 and dets.shape[1] == 6: + self.is_obb = False + self._first_dets_processed = True + elif dets.ndim == 2 and dets.shape[1] == 7: + self.is_obb = True + self._first_dets_processed = True + if not self._first_frame_processed: img = args[1] self.h, self.w = img.shape[0:2] @@ -178,9 +192,15 @@ def check_inputs(self, dets, img): assert ( len(dets.shape) == 2 ), "Unsupported 'dets' dimensions, valid number of dimensions is two" - assert ( - dets.shape[1] == 6 - ), "Unsupported 'dets' 2nd dimension lenght, valid lenghts is 6" + if self.is_obb: + assert ( + dets.shape[1] == 7 + ), "Unsupported 'dets' 2nd dimension lenght, valid lenghts is 6 (cx,cy,w,h,angle,conf,cls)" + else : + assert ( + dets.shape[1] == 6 + ), "Unsupported 'dets' 2nd dimension lenght, valid lenghts is 6 (x1,y1,x2,y2,conf,cls)" + def id_to_color(self, id: int, saturation: float = 0.75, value: float = 0.95) -> tuple: """ @@ -233,23 +253,44 @@ def plot_box_on_img(self, img: np.ndarray, box: tuple, conf: float, cls: int, id Returns: - np.ndarray: The image array with the bounding box drawn on it. """ + if self.is_obb: + + angle = box[4] * 180.0 / np.pi # Convert radians to degrees + box_poly = ((box[0], box[1]), (box[2], box[3]), angle) + # print((width, height)) + rotrec = cv.boxPoints(box_poly) + box_poly = np.int_(rotrec) # Convert to integer + + # Draw the rectangle on the image + img = cv.polylines(img, [box_poly], isClosed=True, color=self.id_to_color(id), thickness=thickness) + + img = cv.putText( + img, + f'id: {int(id)}, conf: {conf:.2f}, c: {int(cls)}, a: {box[4]:.2f}', + (int(box[0]), int(box[1]) - 10), + cv.FONT_HERSHEY_SIMPLEX, + fontscale, + self.id_to_color(id), + thickness + ) + else : - img = cv.rectangle( - img, - (int(box[0]), int(box[1])), - (int(box[2]), int(box[3])), - self.id_to_color(id), - thickness - ) - img = cv.putText( - img, - f'id: {int(id)}, conf: {conf:.2f}, c: {int(cls)}', - (int(box[0]), int(box[1]) - 10), - cv.FONT_HERSHEY_SIMPLEX, - fontscale, - self.id_to_color(id), - thickness - ) + img = cv.rectangle( + img, + (int(box[0]), int(box[1])), + (int(box[2]), int(box[3])), + self.id_to_color(id), + thickness + ) + img = cv.putText( + img, + f'id: {int(id)}, conf: {conf:.2f}, c: {int(cls)}', + (int(box[0]), int(box[1]) - 10), + cv.FONT_HERSHEY_SIMPLEX, + fontscale, + self.id_to_color(id), + thickness + ) return img @@ -270,14 +311,24 @@ def plot_trackers_trajectories(self, img: np.ndarray, observations: list, id: in """ for i, box in enumerate(observations): trajectory_thickness = int(np.sqrt(float (i + 1)) * 1.2) - img = cv.circle( - img, - (int((box[0] + box[2]) / 2), - int((box[1] + box[3]) / 2)), - 2, - color=self.id_to_color(int(id)), - thickness=trajectory_thickness - ) + if self.is_obb: + img = cv.circle( + img, + (int(box[0]), int(box[1])), + 2, + color=self.id_to_color(int(id)), + thickness=trajectory_thickness + ) + else: + + img = cv.circle( + img, + (int((box[0] + box[2]) / 2), + int((box[1] + box[3]) / 2)), + 2, + color=self.id_to_color(int(id)), + thickness=trajectory_thickness + ) return img diff --git a/boxmot/trackers/botsort/botsort.py b/boxmot/trackers/botsort/botsort.py index 3601d47a09..476f39b5a6 100644 --- a/boxmot/trackers/botsort/botsort.py +++ b/boxmot/trackers/botsort/botsort.py @@ -4,7 +4,7 @@ import numpy as np from pathlib import Path -from boxmot.motion.kalman_filters.xywh_kf import KalmanFilterXYWH +from boxmot.motion.kalman_filters.aabb.xywh_kf import KalmanFilterXYWH from boxmot.appearance.reid_auto_backend import ReidAutoBackend from boxmot.motion.cmc.sof import SOF from boxmot.trackers.botsort.basetrack import BaseTrack, TrackState @@ -84,7 +84,7 @@ def __init__( self.cmc = get_cmc_method(cmc_method)() self.fuse_first_associate = fuse_first_associate - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> np.ndarray: self.check_inputs(dets, img) diff --git a/boxmot/trackers/botsort/botsort_track.py b/boxmot/trackers/botsort/botsort_track.py index da7dcdfe2a..9f97b9f9bd 100644 --- a/boxmot/trackers/botsort/botsort_track.py +++ b/boxmot/trackers/botsort/botsort_track.py @@ -2,7 +2,7 @@ from collections import deque from boxmot.trackers.botsort.basetrack import BaseTrack, TrackState -from boxmot.motion.kalman_filters.xywh_kf import KalmanFilterXYWH +from boxmot.motion.kalman_filters.aabb.xywh_kf import KalmanFilterXYWH from boxmot.utils.ops import xywh2xyxy, xyxy2xywh diff --git a/boxmot/trackers/bytetrack/bytetrack.py b/boxmot/trackers/bytetrack/bytetrack.py index a3357327ce..e1d5ff8641 100644 --- a/boxmot/trackers/bytetrack/bytetrack.py +++ b/boxmot/trackers/bytetrack/bytetrack.py @@ -3,7 +3,7 @@ import numpy as np from collections import deque -from boxmot.motion.kalman_filters.xyah_kf import KalmanFilterXYAH +from boxmot.motion.kalman_filters.aabb.xyah_kf import KalmanFilterXYAH from boxmot.trackers.bytetrack.basetrack import BaseTrack, TrackState from boxmot.utils.matching import fuse_score, iou_distance, linear_assignment from boxmot.utils.ops import tlwh2xyah, xywh2tlwh, xywh2xyxy, xyxy2xywh @@ -150,7 +150,7 @@ def __init__( self.max_time_lost = self.buffer_size self.kalman_filter = KalmanFilterXYAH() - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray = None, embs: np.ndarray = None) -> np.ndarray: diff --git a/boxmot/trackers/deepocsort/deepocsort.py b/boxmot/trackers/deepocsort/deepocsort.py index 158a233582..95e069dc35 100644 --- a/boxmot/trackers/deepocsort/deepocsort.py +++ b/boxmot/trackers/deepocsort/deepocsort.py @@ -7,8 +7,7 @@ from boxmot.appearance.reid_auto_backend import ReidAutoBackend from boxmot.motion.cmc import get_cmc_method -from boxmot.motion.kalman_filters.xysr_kf import KalmanFilterXYSR -from boxmot.motion.kalman_filters.xywh_kf import KalmanFilterXYWH +from boxmot.motion.kalman_filters.aabb.xysr_kf import KalmanFilterXYSR from boxmot.utils.association import associate, linear_assignment from boxmot.trackers.basetracker import BaseTracker from boxmot.utils.ops import xyxy2xysr @@ -299,7 +298,7 @@ def __init__( self.cmc_off = cmc_off self.aw_off = aw_off - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> np.ndarray: """ diff --git a/boxmot/trackers/hybridsort/hybridsort.py b/boxmot/trackers/hybridsort/hybridsort.py index ed52af1217..d375a93d37 100644 --- a/boxmot/trackers/hybridsort/hybridsort.py +++ b/boxmot/trackers/hybridsort/hybridsort.py @@ -128,7 +128,7 @@ def __init__( """ # define constant velocity model # if not orig and not args.kalman_GPR: - from boxmot.motion.kalman_filters.xysr_kf import KalmanFilterXYSR + from boxmot.motion.kalman_filters.aabb.xysr_kf import KalmanFilterXYSR self.kf = KalmanFilterXYSR(dim_x=9, dim_z=5, max_obs=max_obs) # u, v, s, c, r, ~u, ~v, ~s, ~c @@ -392,7 +392,7 @@ def camera_update(self, trackers, warp_matrix): for tracker in trackers: tracker.camera_update(warp_matrix) - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> np.ndarray: """ diff --git a/boxmot/trackers/imprassoc/imprassoctrack.py b/boxmot/trackers/imprassoc/imprassoctrack.py index 3bff68fda0..7f5a4b20ba 100644 --- a/boxmot/trackers/imprassoc/imprassoctrack.py +++ b/boxmot/trackers/imprassoc/imprassoctrack.py @@ -7,7 +7,7 @@ from boxmot.appearance.reid_auto_backend import ReidAutoBackend from boxmot.motion.cmc.sof import SOF -from boxmot.motion.kalman_filters.xywh_kf import KalmanFilterXYWH +from boxmot.motion.kalman_filters.aabb.xywh_kf import KalmanFilterXYWH from boxmot.trackers.imprassoc.basetrack import BaseTrack, TrackState from boxmot.utils.matching import (embedding_distance, fuse_score, iou_distance, linear_assignment, @@ -264,7 +264,7 @@ def __init__( self.cmc = SOF() - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> np.ndarray: self.check_inputs(dets, img) diff --git a/boxmot/trackers/ocsort/ocsort.py b/boxmot/trackers/ocsort/ocsort.py index 0131463a32..df9f84d251 100644 --- a/boxmot/trackers/ocsort/ocsort.py +++ b/boxmot/trackers/ocsort/ocsort.py @@ -7,15 +7,19 @@ from collections import deque -from boxmot.motion.kalman_filters.xysr_kf import KalmanFilterXYSR +from boxmot.motion.kalman_filters.aabb.xysr_kf import KalmanFilterXYSR from boxmot.utils.association import associate, linear_assignment from boxmot.trackers.basetracker import BaseTracker from boxmot.utils.ops import xyxy2xysr +from boxmot.motion.kalman_filters.obb.xywha_kf import KalmanBoxTrackerOBB -def k_previous_obs(observations, cur_age, k): +def k_previous_obs(observations, cur_age, k, is_obb=False): if len(observations) == 0: - return [-1, -1, -1, -1, -1] + if is_obb: + return [-1, -1, -1, -1, -1, -1] + else : + return [-1, -1, -1, -1, -1] for i in range(k): dt = k - i if cur_age - dt in observations: @@ -208,7 +212,7 @@ def __init__( inertia: float = 0.2, use_byte: bool = False, Q_xy_scaling: float = 0.01, - Q_s_scaling: float = 0.0001 + Q_s_scaling: float = 0.0001, ): super().__init__(max_age=max_age, per_class=per_class, asso_func=asso_func) """ @@ -227,7 +231,7 @@ def __init__( self.Q_s_scaling = Q_s_scaling KalmanBoxTracker.count = 0 - @BaseTracker.on_first_frame_setup + @BaseTracker.setup_decorator @BaseTracker.per_class_decorator def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> np.ndarray: """ @@ -245,7 +249,7 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> h, w = img.shape[0:2] dets = np.hstack([dets, np.arange(len(dets)).reshape(-1, 1)]) - confs = dets[:, 4] + confs = dets[:, 4+self.is_obb] inds_low = confs > 0.1 inds_high = confs < self.det_thresh @@ -257,12 +261,12 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> dets = dets[remain_inds] # get predicted locations from existing trackers. - trks = np.zeros((len(self.active_tracks), 5)) + trks = np.zeros((len(self.active_tracks), 5+self.is_obb)) to_del = [] ret = [] for t, trk in enumerate(trks): pos = self.active_tracks[t].predict()[0] - trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] + trk[:] = [pos[i] for i in range(4+self.is_obb)] + [0] if np.any(np.isnan(pos)): to_del.append(t) trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) @@ -276,9 +280,10 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> ] ) last_boxes = np.array([trk.last_observation for trk in self.active_tracks]) + k_observations = np.array( [ - k_previous_obs(trk.observations, trk.age, self.delta_t) + k_previous_obs(trk.observations, trk.age, self.delta_t, is_obb=self.is_obb) for trk in self.active_tracks ] ) @@ -287,10 +292,10 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> First round of association """ matched, unmatched_dets, unmatched_trks = associate( - dets[:, 0:5], trks, self.asso_func, self.asso_threshold, velocities, k_observations, self.inertia, w, h + dets[:, 0:5+self.is_obb], trks, self.asso_func, self.asso_threshold, velocities, k_observations, self.inertia, w, h ) for m in matched: - self.active_tracks[m[1]].update(dets[m[0], :5], dets[m[0], 5], dets[m[0], 6]) + self.active_tracks[m[1]].update(dets[m[0], :-2], dets[m[0], -2], dets[m[0], -1]) """ Second round of associaton by OCR @@ -315,7 +320,7 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> if iou_left[m[0], m[1]] < self.asso_threshold: continue self.active_tracks[trk_ind].update( - dets_second[det_ind, :5], dets_second[det_ind, 5], dets_second[det_ind, 6] + dets_second[det_ind, :-2], dets_second[det_ind, -2], dets_second[det_ind, -1] ) to_remove_trk_indices.append(trk_ind) unmatched_trks = np.setdiff1d( @@ -340,7 +345,7 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> det_ind, trk_ind = unmatched_dets[m[0]], unmatched_trks[m[1]] if iou_left[m[0], m[1]] < self.asso_threshold: continue - self.active_tracks[trk_ind].update(dets[det_ind, :5], dets[det_ind, 5], dets[det_ind, 6]) + self.active_tracks[trk_ind].update(dets[det_ind, :-2], dets[det_ind, -2], dets[det_ind, -1]) to_remove_det_indices.append(det_ind) to_remove_trk_indices.append(trk_ind) unmatched_dets = np.setdiff1d( @@ -355,7 +360,10 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> # create and initialise new trackers for unmatched detections for i in unmatched_dets: - trk = KalmanBoxTracker(dets[i, :5], dets[i, 5], dets[i, 6], delta_t=self.delta_t, Q_xy_scaling=self.Q_xy_scaling, Q_s_scaling=self.Q_s_scaling, max_obs=self.max_obs) + if self.is_obb: + trk = KalmanBoxTrackerOBB(dets[i, :-2], dets[i, -2], dets[i, -1], delta_t=self.delta_t, Q_xy_scaling=self.Q_xy_scaling, Q_a_scaling=self.Q_s_scaling, max_obs=self.max_obs) + else: + trk = KalmanBoxTracker(dets[i, :5], dets[i, 5], dets[i, 6], delta_t=self.delta_t, Q_xy_scaling=self.Q_xy_scaling, Q_s_scaling=self.Q_s_scaling, max_obs=self.max_obs) self.active_tracks.append(trk) i = len(self.active_tracks) for trk in reversed(self.active_tracks): @@ -366,7 +374,7 @@ def update(self, dets: np.ndarray, img: np.ndarray, embs: np.ndarray = None) -> this is optional to use the recent observation or the kalman filter prediction, we didn't notice significant difference here """ - d = trk.last_observation[:4] + d = trk.last_observation[:4+self.is_obb] if (trk.time_since_update < 1) and ( trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits ): diff --git a/boxmot/trackers/strongsort/sort/track.py b/boxmot/trackers/strongsort/sort/track.py index 50d1e10983..c33367819c 100644 --- a/boxmot/trackers/strongsort/sort/track.py +++ b/boxmot/trackers/strongsort/sort/track.py @@ -3,7 +3,7 @@ import os import numpy as np -from boxmot.motion.kalman_filters.xyah_kf import KalmanFilterXYAH +from boxmot.motion.kalman_filters.aabb.xyah_kf import KalmanFilterXYAH class TrackState: diff --git a/boxmot/utils/iou.py b/boxmot/utils/iou.py index f1b53dfc31..19fdcc9f7c 100644 --- a/boxmot/utils/iou.py +++ b/boxmot/utils/iou.py @@ -1,4 +1,35 @@ import numpy as np +import cv2 as cv + +def iou_obb_pair(i, j, bboxes1, bboxes2): + """ + Compute IoU for the rotated rectangles at index i and j in the batches `bboxes1`, `bboxes2` . + """ + rect1 = bboxes1[int(i)] + rect2 = bboxes2[int(j)] + + (cx1, cy1, w1, h1, angle1) = rect1[0:5] + (cx2, cy2, w2, h2, angle2) = rect2[0:5] + + + r1 = ((cx1, cy1), (w1, h1), angle1) + r2 = ((cx2, cy2), (w2, h2), angle2) + + # Compute intersection + ret, intersect = cv.rotatedRectangleIntersection(r1, r2) + if ret == 0 or intersect is None: + return 0.0 # No intersection + + # Calculate intersection area + intersection_area = cv.contourArea(intersect) + + # Calculate union area + area1 = w1 * h1 + area2 = w2 * h2 + union_area = area1 + area2 - intersection_area + + # Compute IoU + return intersection_area / union_area if union_area > 0 else 0.0 class AssociationFunction: def __init__(self, w, h, asso_mode="iou"): @@ -34,7 +65,17 @@ def iou_batch(bboxes1, bboxes2) -> np.ndarray: wh ) return o + + @staticmethod + def iou_batch_obb(bboxes1, bboxes2) -> np.ndarray: + N, M = len(bboxes1), len(bboxes2) + + def wrapper(i, j): + return iou_obb_pair(i, j, bboxes1, bboxes2) + + iou_matrix = np.fromfunction(np.vectorize(wrapper), shape=(N, M), dtype=int) + return iou_matrix @staticmethod def hmiou_batch(bboxes1, bboxes2): @@ -144,6 +185,19 @@ def centroid_batch(self, bboxes1, bboxes2) -> np.ndarray: return 1 - normalized_distances + def centroid_batch_obb(self, bboxes1, bboxes2) -> np.ndarray: + centroids1 = np.stack((bboxes1[..., 0], bboxes1[..., 1]),axis=-1) + centroids2 = np.stack((bboxes2[..., 0], bboxes2[..., 1]),axis=-1) + + centroids1 = np.expand_dims(centroids1, 1) + centroids2 = np.expand_dims(centroids2, 0) + + distances = np.sqrt(np.sum((centroids1 - centroids2) ** 2, axis=-1)) + norm_factor = np.sqrt(self.w ** 2 + self.h ** 2) + normalized_distances = distances / norm_factor + + return 1 - normalized_distances + @staticmethod def ciou_batch(bboxes1, bboxes2) -> np.ndarray: @@ -279,11 +333,13 @@ def _get_asso_func(self, asso_mode): """ ASSO_FUNCS = { "iou": AssociationFunction.iou_batch, + "iou_obb": AssociationFunction.iou_batch_obb, "hmiou": AssociationFunction.hmiou_batch, "giou": AssociationFunction.giou_batch, "ciou": AssociationFunction.ciou_batch, "diou": AssociationFunction.diou_batch, - "centroid": self.centroid_batch # only not being staticmethod + "centroid": self.centroid_batch, # only not being staticmethod + "centroid_obb": self.centroid_batch_obb } if self.asso_mode not in ASSO_FUNCS: diff --git a/examples/det/obb.ipynb b/examples/det/obb.ipynb new file mode 100644 index 0000000000..749c2a6660 --- /dev/null +++ b/examples/det/obb.ipynb @@ -0,0 +1,118 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2\r" + ] + } + ], + "source": [ + "import cv2\n", + "import numpy as np\n", + "from pathlib import Path\n", + "\n", + "from boxmot import OcSort\n", + "\n", + "\n", + "# Initialize the tracker\n", + "tracker = OcSort(\n", + " asso_func=\"centroid\",\n", + " min_hits=10,\n", + " asso_threshold=0.98,\n", + " det_thresh = 0.7,\n", + " max_age=20,\n", + " use_byte=True,\n", + " Q_xy_scaling = 0.01,\n", + " Q_s_scaling = 0.0001,\n", + ")\n", + "\n", + "def get_parabolic_array(i):\n", + " \"\"\"\n", + " Generates coordinates where x increases linearly,\n", + " y follows a parabolic curve (y = base + coeff * i^2),\n", + " and the angle matches the tangent of the curve.\n", + " \"\"\"\n", + " # -- FIRST ROW --\n", + " # Define x1, y1\n", + " x1 = 144 + i\n", + " y1 = 212 + 0.01 * (i ** 2)\n", + " \n", + " # Slope = 2 * coeff * i => for 0.01 * i^2, slope = 2*0.01*i = 0.02*i\n", + " slope1 = 0.02 * i\n", + " # Angle in radians\n", + " angle1 = np.arctan(slope1)\n", + " \n", + " # -- SECOND ROW --\n", + " # Define x2, y2 with a different coefficient\n", + " x2 = 425 + i\n", + " y2 = 281 + 0.02 * (i ** 2)\n", + " \n", + " # Slope for 0.02 * i^2 is 2*0.02*i = 0.04*i\n", + " slope2 = 0.04 * i\n", + " # Angle in radians\n", + " angle2 = np.arctan(slope2)\n", + " \n", + " # Build the array\n", + " det = np.array([\n", + " [x1, y1, 45, 30, angle1, 0.82, 0], # row 1\n", + " [x2, y2, 45, 30, angle2, 0.72, 65] # row 2\n", + " ])\n", + " \n", + " return det\n", + "\n", + "\n", + "for i in range(0, 100):\n", + "\n", + " #frame = cv2.imread(str(img_dir / (file.stem + '.png')))\n", + " frame = np.zeros((1080,1080,3))\n", + " \n", + " det = get_parabolic_array(i)\n", + "\n", + " # Update the tracker\n", + " res = tracker.update(det, frame) # --> M X (x, y, x, y, id, conf, cls, ind)\n", + " \n", + " # Plot tracking results on the image\n", + " tracker.plot_results(frame, show_trajectories=True, fontscale=2, thickness=4)\n", + "\n", + " # Display the frame\n", + " cv2.imshow('BoXMOT', frame)\n", + "\n", + " print(len(tracker.active_tracks),end='\\r')\n", + "\n", + " key = cv2.waitKey(1) & 0xFF\n", + " if key == ord('q') or key ==27:\n", + " break\n", + "\n", + "cv2.destroyAllWindows()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "boxmot-YDNZdsaB-py3.11", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.5" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}