Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions app/helpers/miscellaneous.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,14 +1151,26 @@ def keypoints_adjustments(

# --- MANUAL ALIGNMENTS (Sliders) ---
if parameters.get("FaceAdjEnableToggle", False):
kps_5_adj[:, 0] += parameters["KpsXSlider"]
kps_5_adj[:, 1] += parameters["KpsYSlider"]
kps_5_adj[:, 0] -= 255
kps_5_adj[:, 0] *= 1 + parameters["KpsScaleSlider"] / 100.0
kps_5_adj[:, 0] += 255
kps_5_adj[:, 1] -= 255
kps_5_adj[:, 1] *= 1 + parameters["KpsScaleSlider"] / 100.0
kps_5_adj[:, 1] += 255
# 1. Apply spatial translations (X / Y Axis)
# Adjusts the facial keypoints position based on user-defined offsets.
kps_5_adj[:, 0] += parameters.get("KpsXSlider", 0.0)
kps_5_adj[:, 1] += parameters.get("KpsYSlider", 0.0)

# 2. Apply spatial scaling
# Resizes the face representation while maintaining its relative geometry.
scale_val = parameters.get("KpsScaleSlider", 0.0)
if scale_val != 0.0:
scale_factor = 1.0 + (scale_val / 100.0)

# FW-BUG-FIX: Dynamic Centroid Calculation.
# Replaced the hardcoded '255' center with the actual barycenter
# of the face keypoints. This prevents unwanted translation (drift)
# when resizing faces that are not perfectly centered at (255, 255).
centroid = np.mean(kps_5_adj, axis=0) # Returns array([mean_x, mean_y])

# Vectorized scaling: (Point - Centroid) * Scale + Centroid
# Computes both X and Y axes simultaneously for optimal NumPy performance.
kps_5_adj = (kps_5_adj - centroid) * scale_factor + centroid

if (
parameters.get("LandmarksPositionAdjEnableToggle", False)
Expand Down
181 changes: 94 additions & 87 deletions app/processors/face_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,126 +236,144 @@ def _filter_detections_gpu(
skip_nms=False,
):
"""
Performs GPU-accelerated NMS, sorting, and filtering on raw detections from all angles.
Performs GPU-accelerated NMS, automatic heuristic sorting, and filtering on raw detections.
Designed for fully automated pipelines: filters out bad rotation artifacts via geometric
sanity checks and elects the best candidates using a non-linear Confidence/Area heuristic.

Args:
scores_list (list): List of score arrays (np.ndarray) from each detection angle.
bboxes_list (list): List of bounding box arrays (np.ndarray) from each detection angle.
kpss_list (list): List of keypoint arrays (np.ndarray) from each detection angle.
img_height (int): The *original* height of the source image.
img_width (int): The *original* width of the source image.
det_scale (torch.Tensor): The scaling factor used to resize the image (new_height / original_height).
max_num (int): The maximum number of faces to return, sorted by size and centrality.
det_scale (torch.Tensor): The scaling factor used to resize the image.
max_num (int): The maximum number of faces to return.
skip_nms (bool): If True, skips the Non-Maximum Suppression step.

Returns:
tuple: (det, kpss_final, score_values)
- det (np.ndarray): Final bounding boxes, scaled to original image size.
- kpss_final (np.ndarray): Final keypoints, scaled to original image size.
- score_values (np.ndarray): Scores for the final detections.
"""
if not bboxes_list:
return None, None, None

# Convert all raw detection lists to single GPU tensors.
scores_tensor = (
torch.from_numpy(np.vstack(scores_list))
.to(self.models_processor.device)
.squeeze()
# ----------------------------------------------------------------------
# 1. TENSOR CREATION & THREAD SAFETY
# ----------------------------------------------------------------------
# Direct tensor creation with 'device' forces a strict memory copy to VRAM.
# This prevents Race Conditions if numpy arrays are mutated by CPU threads concurrently.
device = self.models_processor.device

scores_tensor = torch.tensor(
np.vstack(scores_list), dtype=torch.float32, device=device
).squeeze()
bboxes_tensor = torch.tensor(
np.vstack(bboxes_list), dtype=torch.float32, device=device
)
bboxes_tensor = torch.from_numpy(np.vstack(bboxes_list)).to(
self.models_processor.device
kpss_tensor = torch.tensor(
np.vstack(kpss_list), dtype=torch.float32, device=device
)
kpss_tensor = torch.from_numpy(np.vstack(kpss_list)).to(
self.models_processor.device
)

bboxes_tensor = torch.as_tensor(bboxes_tensor, dtype=torch.float32)
scores_tensor = torch.as_tensor(scores_tensor, dtype=torch.float32).reshape(-1)

# --- Validation Block to ensure tensors are well-formed before NMS ---
if bboxes_tensor.numel() == 0:
return None, None, None
if bboxes_tensor.dim() == 1 and bboxes_tensor.numel() == 4:
bboxes_tensor = bboxes_tensor.unsqueeze(0)
if scores_tensor.dim() == 0:
scores_tensor = scores_tensor.unsqueeze(0)
if bboxes_tensor.size(0) != scores_tensor.size(0):
# Mismatch in tensor sizes, aborting.
return None, None, None

# Ensure tensors are contiguous (optimizes NMS)
bboxes_tensor = bboxes_tensor.contiguous()
scores_tensor = scores_tensor.contiguous()

# ----------------------------------------------------------------------
# 2. NON-MAXIMUM SUPPRESSION (NMS)
# ----------------------------------------------------------------------
if not skip_nms:
# Perform Non-Maximum Suppression on the GPU to remove overlapping boxes.
nms_thresh = 0.4
# NMS must rely strictly on raw network confidence.
# Multiplying by area here would allow bloated bad rotations to absorb good ones.
keep_indices = nms(bboxes_tensor, scores_tensor, iou_threshold=nms_thresh)

det_boxes, det_kpss, det_scores = (
bboxes_tensor[keep_indices],
kpss_tensor[keep_indices],
scores_tensor[keep_indices],
)
det_boxes = bboxes_tensor[keep_indices]
det_kpss = kpss_tensor[keep_indices]
det_scores = scores_tensor[keep_indices]
else:
det_boxes, det_kpss, det_scores = (
bboxes_tensor,
kpss_tensor,
scores_tensor,
)

# Sort the remaining detections by their confidence score.
sorted_indices = torch.argsort(det_scores, descending=True)
det_boxes, det_kpss, det_scores = (
det_boxes[sorted_indices],
det_kpss[sorted_indices],
det_scores[sorted_indices],
)

# If more faces are detected than max_num, select the best ones.
det_boxes = bboxes_tensor
det_kpss = kpss_tensor
det_scores = scores_tensor

# ----------------------------------------------------------------------
# 3. GEOMETRIC SANITY CHECK (KPS Boundary Validation)
# ----------------------------------------------------------------------
# Anomalies from incorrect rotations often produce KPS coordinates that
# fall far outside their own bounding box. We filter these geometric impossibilities.
if det_boxes.shape[0] > 0:
box_widths = det_boxes[:, 2] - det_boxes[:, 0]
box_heights = det_boxes[:, 3] - det_boxes[:, 1]

# 15% tolerance margin
margin_x = box_widths * 0.15
margin_y = box_heights * 0.15

min_x = det_boxes[:, 0] - margin_x
min_y = det_boxes[:, 1] - margin_y
max_x = det_boxes[:, 2] + margin_x
max_y = det_boxes[:, 3] + margin_y

# Check if all 5 keypoints are within the expanded bounding box
valid_kps_mask = (
(det_kpss[:, :, 0] >= min_x.unsqueeze(1))
& (det_kpss[:, :, 0] <= max_x.unsqueeze(1))
& (det_kpss[:, :, 1] >= min_y.unsqueeze(1))
& (det_kpss[:, :, 1] <= max_y.unsqueeze(1))
).all(dim=1)

if valid_kps_mask.any():
det_boxes = det_boxes[valid_kps_mask]
det_kpss = det_kpss[valid_kps_mask]
det_scores = det_scores[valid_kps_mask]

# ----------------------------------------------------------------------
# 4. AUTOMATIC NON-LINEAR SORTING HEURISTIC
# ----------------------------------------------------------------------
if max_num > 0 and det_boxes.shape[0] > max_num:
if det_boxes.shape[0] > 1:
# Score faces based on a combination of their size and proximity to the image center.
# This filtering happens on *unscaled* coordinates (relative to the padded detection image).
area = (det_boxes[:, 2] - det_boxes[:, 0]) * (
areas = (det_boxes[:, 2] - det_boxes[:, 0]) * (
det_boxes[:, 3] - det_boxes[:, 1]
)
# The old logic (img_height / det_scale) was mathematically incorrect and
# produced extreme values for non-standard aspect ratios (like VR videos).
# The correct logic is to find the center of the *active image area*
# on the padded canvas.
# new_height_on_canvas = img_height * det_scale
# new_width_on_canvas = img_width * det_scale
det_img_center_y = (img_height * det_scale) / 2.0
det_img_center_x = (img_width * det_scale) / 2.0

center_x = (det_boxes[:, 0] + det_boxes[:, 2]) / 2 - det_img_center_x
center_y = (det_boxes[:, 1] + det_boxes[:, 3]) / 2 - det_img_center_y

offset_dist_squared = center_x**2 + center_y**2
# This score favors large faces (area) that are close to the center
# (low offset_dist_squared).
values = area - offset_dist_squared * 2.0
bindex = torch.argsort(values, descending=True)[:max_num]
det_boxes, det_kpss, det_scores = (
det_boxes[bindex],
det_kpss[bindex],
det_scores[bindex],
)
else:
bindex = torch.arange(
det_boxes.shape[0], device=self.models_processor.device
)[:max_num]
areas = areas.clamp(min=1.0)

# Normalize values to prevent scale dominance
norm_scores = det_scores / (det_scores.max() + 1e-6)
norm_areas = areas / (areas.max() + 1e-6)

# Non-linear heuristic. Squaring the confidence drastically punishes
# uncertain faces. Only highly confident faces can leverage their 'Area'
# to win the sorting battle. This removes the need for manual UI strategies.
combined_values = (norm_scores**2) * 0.8 + (norm_areas * 0.2)

bindex = torch.argsort(combined_values, descending=True)[:max_num]

det_boxes = det_boxes[bindex]
det_kpss = det_kpss[bindex]
det_scores = det_scores[bindex]

# Transfer final results back to CPU and scale them to the original image dimensions.
else:
det_boxes = det_boxes[:max_num]
det_kpss = det_kpss[:max_num]
det_scores = det_scores[:max_num]
else:
# Standard confidence sort if below max_num
sorted_indices = torch.argsort(det_scores, descending=True)
det_boxes = det_boxes[sorted_indices]
det_kpss = det_kpss[sorted_indices]
det_scores = det_scores[sorted_indices]

# ----------------------------------------------------------------------
# 5. CPU TRANSFER & FINAL SCALING
# ----------------------------------------------------------------------
det_scale_val = det_scale.cpu().item()

det = det_boxes.cpu().numpy() / det_scale_val
kpss_final = det_kpss.cpu().numpy() / det_scale_val

score_values = det_scores.cpu().numpy()

return det, kpss_final, score_values
Expand Down Expand Up @@ -480,17 +498,6 @@ def run_detect(
Supports tracking via 'previous_detections'.
"""
rotation_angles = rotation_angles or [0]
use_multi_rotation = len(rotation_angles) > 1

# Multi-angle detection must run the full detector path. Tracking shortcuts
# can reuse stale landmarks from a different orientation and destabilize
# swaps on upside-down faces during recording.
# Also force from_points=True so the secondary landmark model uses kpss-aligned
# crops. Without this, the bbox-only crop path (from_points=False) produces an
# upside-down face crop that landmark detectors cannot handle, which corrupts
# kpss_5 and causes wrong embeddings / ghost-face artifacts.
if use_multi_rotation:
from_points = True

control = (
control_override
Expand Down
19 changes: 17 additions & 2 deletions app/processors/face_landmark_detectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def _kps5_is_degenerate(kps5) -> bool:
"""
if kps5 is None:
return True

# Safely convert PyTorch tensors (especially on CUDA) to numpy arrays.
if torch.is_tensor(kps5):
kps5 = kps5.detach().cpu().numpy()

try:
kps5 = np.asarray(kps5, dtype=np.float32)
except (ValueError, TypeError):
Expand Down Expand Up @@ -279,17 +284,27 @@ def _prepare_crop(
Prepares a cropped and warped face image for a landmark detector.
This helper centralizes the repetitive pre-processing logic of aligning a face
based on either a bounding box or existing keypoints.

Returns:
Tuple[torch.Tensor, np.ndarray, np.ndarray]: The cropped image, the forward transform matrix (M),
and the inverse transform matrix (IM).
"""
import math

if not from_points:
# Align the face using the bounding box center and size.
w, h = (bbox[2] - bbox[0]), (bbox[3] - bbox[1])
center = (bbox[2] + bbox[0]) / 2, (bbox[3] + bbox[1]) / 2
_scale = target_size / (max(w, h) * scale)
aimg, M = faceutil.transform(img, center, target_size, _scale, 0)

# Correct math implementation to upright tilted faces in fallback mode.
angle = 0.0
if det_kpss is not None and len(det_kpss) >= 2:
dx = det_kpss[1][0] - det_kpss[0][0]
dy = det_kpss[1][1] - det_kpss[0][1]
if math.hypot(dx, dy) > 1e-3:
angle = math.degrees(math.atan2(-dy, dx))

aimg, M = faceutil.transform(img, center, target_size, _scale, angle)
IM = faceutil.invertAffineTransform(M)
else:
if det_kpss is None or len(det_kpss) == 0:
Expand Down
Loading