diff --git a/app/processors/frame_edits.py b/app/processors/frame_edits.py
index dc9b813f..8577743a 100644
--- a/app/processors/frame_edits.py
+++ b/app/processors/frame_edits.py
@@ -80,6 +80,7 @@ def _apply_kornia_warp(
# for the RAM-to-VRAM transfer to complete via the PCIe bus.
M_c2o_tensor = (
torch.from_numpy(M_c2o)
+ .pin_memory()
.float()
.unsqueeze(0)
.to(out.device, non_blocking=True)
@@ -137,7 +138,7 @@ def apply_face_expression_restorer(
else contextlib.nullcontext()
)
- with stream_context:
+ with stream_context, torch.inference_mode():
# --- CONFIGURATION ---
use_mean_eyes = parameters.get("LandmarkMeanEyesToggle", False)
# Sanitized Mode Selection
@@ -294,54 +295,161 @@ def apply_face_expression_restorer(
t_anchor[..., 2].fill_(0)
scale_anchor = x_s_info["scale"]
- # Load Lip Array (Neutral reference for lips)
- lp_lip_array = torch.from_numpy(self.models_processor.lp_lip_array).to(
- dtype=torch.float32, device=self.models_processor.device
- )
+ # Only send to GPU once by checking if it's already a tensor in the central processor.
+ if not hasattr(self, "_cached_lp_lip_tensor"):
+ self._cached_lp_lip_tensor = torch.from_numpy(
+ self.models_processor.lp_lip_array
+ ).to(dtype=torch.float32, device=self.models_processor.device)
+ lp_lip_array = self._cached_lp_lip_tensor
# --- SHARED HELPER FUNCTION ---
def get_component_motion(
- indices,
- driving_exp,
- multiplier,
- extra_delta=0,
- is_relative=False,
- neutral_ref=None,
- use_boost=False,
- ):
+ indices: list[int],
+ driving_exp: torch.Tensor,
+ multiplier: float,
+ extra_delta: torch.Tensor | int | float = 0,
+ is_relative: bool = False,
+ neutral_ref: torch.Tensor | int | None = None,
+ use_boost: bool = False,
+ ) -> torch.Tensor:
"""
Helper to calculate motion with 'Smart Dynamic Boost' and 'Neutral Factor'.
+ Encapsulates Z-Axis Gaze Lock with Automated Perceptual Pitch Compensation
+ and Eyelid Compensation (Anti-Bulging Fix).
"""
delta_local = x_s_info["exp"].clone()
+ force_camera_gaze = parameters.get(
+ "FaceExpressionCameraGazeToggle", False
+ )
if is_relative:
- # Relative Motion Calculation
ref = neutral_ref if neutral_ref is not None else 0
if isinstance(ref, torch.Tensor) and ref.shape[-2] == 21:
ref_part = ref[..., indices, :]
else:
ref_part = ref
- # Calculate the raw difference (motion intent)
raw_diff = driving_exp[:, indices, :] - ref_part
# --- SMART DYNAMIC BOOST ---
boost_val = micro_expression_boost if use_boost else 1.0
-
if use_boost and boost_val > 1.0:
magnitude = torch.abs(raw_diff)
decay = torch.exp(-10.0 * magnitude)
- dynamic_scale = 1.0 + (boost_val - 1.0) * decay
+ noise_gate = torch.clamp(magnitude / 0.005, 0.0, 1.0)
+ dynamic_scale = 1.0 + (boost_val - 1.0) * decay * noise_gate
diff = raw_diff * dynamic_scale
else:
diff = raw_diff * boost_val
- # --- NEUTRAL FACTOR (Anti-Surenchère) ---
+ # --- NEUTRAL FACTOR ---
diff = diff * neutral_factor
delta_local[:, indices, :] = x_s_info["exp"][:, indices, :] + diff
+
+ # --- GAZE STABILIZATION PIPELINE ---
+ if 11 in indices and 15 in indices:
+ idx_11, idx_15 = indices.index(11), indices.index(15)
+
+ if force_camera_gaze:
+ import math
+
+ # 1. 3D Z-Axis Projection (Mona Lisa Effect)
+ cam_world = torch.tensor(
+ [0.0, 0.0, 1.0],
+ dtype=torch.float32,
+ device=delta_local.device,
+ )
+ R_inv = R_anchor.squeeze(0).transpose(0, 1)
+ cam_local = torch.matmul(R_inv, cam_world)
+
+ # 2. Fetch UI Parameters
+ strength = parameters.get(
+ "FaceExpressionCameraGazeStrengthDecimalSlider", 0.50
+ )
+ vertical_offset_ui = parameters.get(
+ "FaceExpressionCameraGazeVerticalOffsetDecimalSlider",
+ 0.0,
+ )
+
+ # 3. DYNAMIC PERCEPTUAL COMPENSATION (Auto-Offset for Video)
+ # Extract the real-time head pitch to evaluate chin elevation.
+ head_pitch_deg = faceutil.headpose_pred_to_degree(
+ x_s_info["pitch"]
+ ).item()
+
+ # We dynamically shift the gaze down when the chin is down, and up when chin is up.
+ # -0.0005 is the empirical constant mapping physical degrees to latent sclera occlusion.
+ auto_perceptual_offset = head_pitch_deg * -0.0005 * strength
+
+ # 4. Raw Latent Shift Calculation
+ raw_gaze_x = cam_local[0].item() * 0.035 * strength
+
+ # Mathematically compute the 3D target, add the AUTO offset, and add the MANUAL fallback
+ raw_gaze_y = (
+ (cam_local[1].item() * 0.020 * strength)
+ + auto_perceptual_offset
+ + (vertical_offset_ui * 0.015)
+ )
+
+ # 5. SELECTIVE SOFT CLAMPING (Safety Shield)
+ safe_gaze_x = (
+ 0.040 * math.tanh(raw_gaze_x / 0.040)
+ if raw_gaze_x != 0
+ else 0.0
+ )
+ safe_gaze_y = (
+ 0.020 * math.tanh(raw_gaze_y / 0.020)
+ if raw_gaze_y != 0
+ else 0.0
+ )
+
+ # 6. Horizontal Overwrite & Vertical Safe Addition (Pupils)
+ delta_local[:, 11, 0] = safe_gaze_x
+ delta_local[:, 15, 0] = safe_gaze_x
+
+ delta_local[:, 11, 1] = (
+ x_s_info["exp"][:, 11, 1]
+ + diff[:, idx_11, 1]
+ + safe_gaze_y
+ )
+ delta_local[:, 15, 1] = (
+ x_s_info["exp"][:, 15, 1]
+ + diff[:, idx_15, 1]
+ + safe_gaze_y
+ )
+
+ # 7. EYELID COMPENSATION (Anti-Bulging Fix)
+ # Eyelids must follow the pupil's vertical shift to prevent sclera over-exposure.
+ if 13 in indices and 16 in indices:
+ idx_13, idx_16 = indices.index(13), indices.index(16)
+ eyelid_comp = (
+ safe_gaze_y * 0.60
+ ) # 60% follow-through ratio
+
+ delta_local[:, 13, 1] = (
+ x_s_info["exp"][:, 13, 1]
+ + diff[:, idx_13, 1]
+ + eyelid_comp
+ )
+ delta_local[:, 16, 1] = (
+ x_s_info["exp"][:, 16, 1]
+ + diff[:, idx_16, 1]
+ + eyelid_comp
+ )
+
+ else:
+ # Standard Dampening
+ gaze_dampening = 0.50
+ delta_local[:, 11, 0] = x_s_info["exp"][:, 11, 0] + (
+ diff[:, idx_11, 0] * gaze_dampening
+ )
+ delta_local[:, 15, 0] = x_s_info["exp"][:, 15, 0] + (
+ diff[:, idx_15, 0] * gaze_dampening
+ )
+
else:
- # Absolute Motion
+ # Absolute Motion Calculation
target_exp = driving_exp[:, indices, :]
current_exp = x_s_info["exp"][:, indices, :]
@@ -349,6 +457,87 @@ def get_component_motion(
current_exp * (1 - neutral_factor) + target_exp * neutral_factor
)
+ # --- GAZE STABILIZATION PIPELINE (Absolute Mode) ---
+ if 11 in indices and 15 in indices:
+ idx_11, idx_15 = indices.index(11), indices.index(15)
+
+ if force_camera_gaze:
+ import math
+
+ cam_world = torch.tensor(
+ [0.0, 0.0, 1.0],
+ dtype=torch.float32,
+ device=delta_local.device,
+ )
+ R_inv = R_anchor.squeeze(0).transpose(0, 1)
+ cam_local = torch.matmul(R_inv, cam_world)
+
+ strength = parameters.get(
+ "FaceExpressionCameraGazeStrengthDecimalSlider", 0.50
+ )
+ vertical_offset_ui = parameters.get(
+ "FaceExpressionCameraGazeVerticalOffsetDecimalSlider",
+ 0.0,
+ )
+
+ # Dynamic Perceptual Compensation (Absolute Mode)
+ head_pitch_deg = faceutil.headpose_pred_to_degree(
+ x_s_info["pitch"]
+ ).item()
+ auto_perceptual_offset = head_pitch_deg * -0.0005 * strength
+
+ raw_gaze_x = cam_local[0].item() * 0.035 * strength
+ raw_gaze_y = (
+ (cam_local[1].item() * 0.020 * strength)
+ + auto_perceptual_offset
+ + (vertical_offset_ui * 0.015)
+ )
+
+ # Selective Soft Clamping
+ safe_gaze_x = (
+ 0.040 * math.tanh(raw_gaze_x / 0.040)
+ if raw_gaze_x != 0
+ else 0.0
+ )
+ safe_gaze_y = (
+ 0.020 * math.tanh(raw_gaze_y / 0.020)
+ if raw_gaze_y != 0
+ else 0.0
+ )
+
+ # Absolute Overwrite & Safe Addition (Pupils)
+ delta_local[:, 11, 0] = safe_gaze_x
+ delta_local[:, 15, 0] = safe_gaze_x
+
+ delta_local[:, 11, 1] = delta_local[:, 11, 1] + safe_gaze_y
+ delta_local[:, 15, 1] = delta_local[:, 15, 1] + safe_gaze_y
+
+ # EYELID COMPENSATION (Anti-Bulging Fix for Absolute Mode)
+ if 13 in indices and 16 in indices:
+ idx_13, idx_16 = indices.index(13), indices.index(16)
+ eyelid_comp = safe_gaze_y * 0.60
+
+ delta_local[:, 13, 1] = (
+ delta_local[:, 13, 1] + eyelid_comp
+ )
+ delta_local[:, 16, 1] = (
+ delta_local[:, 16, 1] + eyelid_comp
+ )
+
+ else:
+ # Standard Blend
+ gaze_x_blend = 0.60
+ delta_local[:, 11, 0] = torch.lerp(
+ current_exp[:, idx_11, 0],
+ target_exp[:, idx_11, 0],
+ gaze_x_blend,
+ )
+ delta_local[:, 15, 0] = torch.lerp(
+ current_exp[:, idx_15, 0],
+ target_exp[:, idx_15, 0],
+ gaze_x_blend,
+ )
+
# Projection & Refinement
x_proj = scale_anchor * (x_c_s @ R_anchor + delta_local) + t_anchor
raw_delta = self.models_processor.lp_stitch(
@@ -360,8 +549,10 @@ def get_component_motion(
return (x_target - x_s) * multiplier
def merge_eye_motion_candidates(
- relative_motion, absolute_motion, normalize_eyes_enabled=False
- ):
+ relative_motion: torch.Tensor,
+ absolute_motion: torch.Tensor,
+ normalize_eyes_enabled: bool = False,
+ ) -> torch.Tensor:
"""
Relative Lids + Retargeted Gaze eye merge:
- keep horizontal gaze direction from the absolute + retargeted eye motion
@@ -371,10 +562,16 @@ def merge_eye_motion_candidates(
"""
merged_motion = relative_motion.clone()
- # Landmark 11/15 X is the clearest eyeball-direction signal.
- # Keep it fully from the absolute + retargeted branch for better gaze stability.
- merged_motion[:, 11, 0] = absolute_motion[:, 11, 0]
- merged_motion[:, 15, 0] = absolute_motion[:, 15, 0]
+ # --- GAZE PRECISION (X-Axis) ---
+ # 50% Absolute provides enough authority to direct the iris precisely,
+ # while leaving 50% Relative to prevent IPD tearing on profile angles.
+ gaze_blend = 0.50
+ merged_motion[:, 11, 0] = torch.lerp(
+ relative_motion[:, 11, 0], absolute_motion[:, 11, 0], gaze_blend
+ )
+ merged_motion[:, 15, 0] = torch.lerp(
+ relative_motion[:, 15, 0], absolute_motion[:, 15, 0], gaze_blend
+ )
# Vertical eye motion carries both lid state and some gaze drift.
# Blend a limited amount of the retargeted branch back in so the
@@ -383,19 +580,31 @@ def merge_eye_motion_candidates(
eyelid_blend = 0.45 if normalize_eyes_enabled else 0.30
eye_center_blend = 0.35 if normalize_eyes_enabled else 0.20
+ # 11, 15: Eye Centers (Iris vertical position & depth)
for idx in (11, 15):
merged_motion[:, idx, 1] = torch.lerp(
relative_motion[:, idx, 1],
absolute_motion[:, idx, 1],
eye_center_blend,
)
+ merged_motion[:, idx, 2] = torch.lerp(
+ relative_motion[:, idx, 2],
+ absolute_motion[:, idx, 2],
+ eye_center_blend,
+ )
+ # 13, 16: Eyelids (Blink & Squint)
for idx in (13, 16):
merged_motion[:, idx, 1] = torch.lerp(
relative_motion[:, idx, 1],
absolute_motion[:, idx, 1],
eyelid_blend,
)
+ merged_motion[:, idx, 2] = torch.lerp(
+ relative_motion[:, idx, 2],
+ absolute_motion[:, idx, 2],
+ eyelid_blend,
+ )
return merged_motion
@@ -504,33 +713,26 @@ def merge_eye_motion_candidates(
eyes_normalize_max = parameters.get(
"FaceExpressionNormalizeEyesMaxBothDecimalSlider", 0.50
)
- combined_eyes_ratio_normalize = None
- if flag_normalize_eyes and source_lmk is not None:
- c_d_eyes_normalize = c_d_eyes_lst
- eyes_ratio = np.array([c_d_eyes_normalize[0][0]], dtype=np.float32)
- eyes_ratio_normalize = max(eyes_ratio, 0.10)
- eyes_ratio_l = min(c_d_eyes_normalize[0][0], eyes_normalize_max)
- eyes_ratio_r = min(c_d_eyes_normalize[0][1], eyes_normalize_max)
- eyes_ratio_max = np.array(
- [[eyes_ratio_l, eyes_ratio_r]], dtype=np.float32
- )
+ # --- EYE NORMALIZATION PRE-PROCESSING ---
+ # Default baseline is the raw driving ratio
+ eyes_target_array = c_d_eyes_lst
- if eyes_ratio_normalize > eyes_normalize_threshold:
- combined_eyes_ratio_normalize = (
- faceutil.calc_combined_eye_ratio_norm(
- eyes_ratio_max,
- source_lmk,
- device=self.models_processor.device,
- )
- )
- else:
- combined_eyes_ratio_normalize = (
- faceutil.calc_combined_eye_ratio(
- eyes_ratio_max,
- source_lmk,
- device=self.models_processor.device,
- )
+ if flag_normalize_eyes and source_lmk is not None:
+ # Check if the overall eye openness exceeds the user's threshold
+ current_max_openness = max(c_d_eyes_lst[0][0], c_d_eyes_lst[0][1])
+
+ if current_max_openness > eyes_normalize_threshold:
+ # Clamp both eyes independently to the max allowed value
+ # This prevents the "surprised" look while preserving winks
+ eyes_target_array = np.array(
+ [
+ [
+ min(c_d_eyes_lst[0][0], eyes_normalize_max),
+ min(c_d_eyes_lst[0][1], eyes_normalize_max),
+ ]
+ ],
+ dtype=np.float32,
)
if flag_activate_eyes:
@@ -541,22 +743,28 @@ def merge_eye_motion_candidates(
1.0,
)
- if (
- flag_normalize_eyes
- and combined_eyes_ratio_normalize is not None
- ):
- target_eye_ratio = combined_eyes_ratio_normalize
- else:
- target_eye_ratio = faceutil.calc_combined_eye_ratio(
- c_d_eyes_lst,
- source_lmk,
- device=self.models_processor.device,
- )
+ # 1. Get Independent Tensors for each eye to feed into the MLPs
+ ratio_left, ratio_right = faceutil.calc_independent_eye_ratios(
+ eyes_target_array,
+ source_lmk,
+ device=self.models_processor.device,
+ )
- eyes_retarget_delta = self.models_processor.lp_retarget_eye(
- x_s, target_eye_ratio * eye_mult, face_editor_type
+ # 2. Double MLP Inference
+ delta_left_sym = self.models_processor.lp_retarget_eye(
+ x_s, ratio_left * eye_mult, face_editor_type
+ )
+ delta_right_sym = self.models_processor.lp_retarget_eye(
+ x_s, ratio_right * eye_mult, face_editor_type
)
+ # 3. Latent Splicing: Stitch Left and Right expressions
+ # Indices: 15 (Right pupil/center), 16 (Right eyelid)
+ eyes_retarget_delta = delta_left_sym.clone()
+ eyes_retarget_delta[:, [15, 16], :] = delta_right_sym[
+ :, [15, 16], :
+ ]
+
if (
flag_stable_gaze_eyes
and flag_relative_eyes
@@ -600,9 +808,11 @@ def merge_eye_motion_candidates(
if flag_activate_lips:
lips_retarget_delta = 0
- if parameters.get(
+ flag_retarget_lips = parameters.get(
"FaceExpressionRetargetingLipsBothEnableToggle", False
- ):
+ )
+
+ if flag_retarget_lips:
lip_mult = parameters.get(
"FaceExpressionRetargetingLipsMultiplierBothDecimalSlider",
1.0,
@@ -614,15 +824,59 @@ def merge_eye_motion_candidates(
x_s, c_d_lip * lip_mult, face_editor_type
)
- accumulated_motion += get_component_motion(
- lip_indices,
- x_d_i_info["exp"],
- driving_multiplier_lips,
- extra_delta=lips_retarget_delta,
- is_relative=flag_relative_lips,
- neutral_ref=lp_lip_array,
- use_boost=True,
- )
+ if flag_relative_lips and flag_retarget_lips:
+ # 1. Pure Relative Branch: Captures shape (smirk, width, pout) on X-axis
+ relative_lip_motion = get_component_motion(
+ lip_indices,
+ x_d_i_info["exp"],
+ 1.0,
+ extra_delta=0, # No retargeting here
+ is_relative=True,
+ neutral_ref=lp_lip_array,
+ use_boost=True,
+ )
+
+ # 2. Pure Absolute Branch: Captures precise jaw drop and mouth opening on Y/Z-axis
+ absolute_retarget_lip_motion = get_component_motion(
+ lip_indices,
+ x_d_i_info["exp"],
+ 1.0,
+ extra_delta=lips_retarget_delta,
+ is_relative=False,
+ neutral_ref=lp_lip_array,
+ use_boost=True,
+ )
+
+ # 3. Structural Decoupling Merge (Softened)
+ # We use Lerp to blend Relative and Absolute on the Y axis.
+ # 0.5 means 50% relative influence, 50% retargeting influence.
+ merged_lip_motion = relative_lip_motion.clone()
+ blend_factor = 0.50
+
+ for idx in lip_indices:
+ merged_lip_motion[:, idx, 1] = torch.lerp(
+ relative_lip_motion[:, idx, 1],
+ absolute_retarget_lip_motion[:, idx, 1],
+ blend_factor,
+ )
+ merged_lip_motion[:, idx, 2] = absolute_retarget_lip_motion[
+ :, idx, 2
+ ] # Depth stays absolute
+
+ accumulated_motion += (
+ merged_lip_motion * driving_multiplier_lips
+ )
+ else:
+ # Standard behavior if only one mode (or neither) is used
+ accumulated_motion += get_component_motion(
+ lip_indices,
+ x_d_i_info["exp"],
+ driving_multiplier_lips,
+ extra_delta=lips_retarget_delta,
+ is_relative=flag_relative_lips,
+ neutral_ref=lp_lip_array,
+ use_boost=True,
+ )
if flag_activate_brows:
accumulated_motion += get_component_motion(
@@ -657,10 +911,6 @@ def merge_eye_motion_candidates(
out = self._apply_kornia_warp(out, M_c2o, dsize)
out = out.mul_(255.0).clamp_(0, 255)
- # Sync the stream safely
- if local_stream:
- local_stream.synchronize()
-
return out.type(torch.float32)
def swap_edit_face_core(
@@ -705,7 +955,7 @@ def swap_edit_face_core(
else contextlib.nullcontext()
)
- with stream_context:
+ with stream_context, torch.inference_mode():
init_source_eye_ratio = 0.0
init_source_lip_ratio = 0.0
@@ -929,10 +1179,6 @@ def swap_edit_face_core(
img = out
img = img.mul_(255.0).clamp_(0, 255).type(torch.float32)
- # Sync the stream safely
- if local_stream:
- local_stream.synchronize()
-
return img
def swap_edit_face_core_makeup(
diff --git a/app/processors/models_processor.py b/app/processors/models_processor.py
index e3b86d01..4d4686bf 100644
--- a/app/processors/models_processor.py
+++ b/app/processors/models_processor.py
@@ -564,6 +564,117 @@ def _check_tensorrt_cache(self, model_name: str, onnx_path: str) -> bool:
print(f"[ERROR] Failed TensorRT cache check: {e}")
return False
+ def _clean_tensorrt_cache(self, onnx_path: str, trt_options: dict) -> None:
+ """
+ Cleans up potentially corrupted TensorRT cache files for a specific model.
+ Safely ignores missing files or locked files to prevent crashes during the cleanup process.
+
+ Args:
+ onnx_path (str): The local path to the ONNX model.
+ trt_options (dict): The TensorRT options containing the dynamic cache path.
+ """
+ import os
+ import re
+
+ cache_dir = trt_options.get("trt_engine_cache_path", "tensorrt-engines")
+ base_onnx_name = os.path.splitext(os.path.basename(onnx_path))[0]
+
+ # 1. Try to read the context file to find the specific engine file before deleting it
+ ctx_file_name = f"{base_onnx_name}_ctx.onnx"
+ ctx_file_path = os.path.join(cache_dir, ctx_file_name)
+
+ engine_file_paths_to_check = []
+ if os.path.exists(ctx_file_path) and os.path.isfile(ctx_file_path):
+ try:
+ with open(ctx_file_path, "rb") as f:
+ content = f.read()
+
+ # Extract the engine name generated by ONNX Runtime
+ match = re.search(b"TensorrtExecutionProvider_.*?\\.engine", content)
+ if match:
+ engine_name = match.group(0).decode("utf-8")
+
+ # Failsafe: ORT pathing behavior varies.
+ engine_subdirectory_name = os.path.basename(cache_dir)
+ engine_file_paths_to_check.extend(
+ [
+ os.path.join(cache_dir, engine_name),
+ os.path.join(
+ cache_dir, engine_subdirectory_name, engine_name
+ ),
+ ]
+ )
+ except Exception as e:
+ print(
+ f"[WARN] Could not read corrupted context file {ctx_file_path} to find engine name: {e}"
+ )
+
+ # 2. Delete the context file
+ if os.path.exists(ctx_file_path) and os.path.isfile(ctx_file_path):
+ try:
+ os.remove(ctx_file_path)
+ print(
+ f"[INFO] Deleted corrupted TensorRT context file: {ctx_file_path}"
+ )
+ except Exception as e:
+ print(
+ f"[WARN] Failed to delete {ctx_file_path} (it might be locked or missing): {e}"
+ )
+
+ # 3. Delete the engine file(s) if we found them
+ for engine_path in engine_file_paths_to_check:
+ if (
+ engine_path
+ and os.path.exists(engine_path)
+ and os.path.isfile(engine_path)
+ ):
+ try:
+ os.remove(engine_path)
+ print(
+ f"[INFO] Deleted corrupted TensorRT engine file: {engine_path}"
+ )
+ except Exception as e:
+ print(f"[WARN] Failed to delete engine file {engine_path}: {e}")
+
+ # 4. Delete any associated timing cache, profile files, or general cache files
+ if os.path.exists(cache_dir) and os.path.isdir(cache_dir):
+ try:
+ for file_name in os.listdir(cache_dir):
+ # Catch model-specific files (e.g., SomeModel.profile)
+ is_model_specific = file_name.startswith(base_onnx_name) and (
+ file_name.endswith(".profile")
+ or file_name.endswith(".cache")
+ or file_name.endswith(".timing")
+ )
+
+ # Catch exact generic names (like DFM's "timing.cache")
+ is_generic_timing = file_name == "timing.cache"
+
+ # Catch ORT's global architecture-based timing caches
+ # Example: TensorrtExecutionProvider_cache_sm120.timing
+ is_ort_global_timing = file_name.startswith(
+ "TensorrtExecutionProvider_"
+ ) and (
+ file_name.endswith(".timing") or file_name.endswith(".profile")
+ )
+
+ if is_model_specific or is_generic_timing or is_ort_global_timing:
+ target_path = os.path.join(cache_dir, file_name)
+ if os.path.isfile(target_path):
+ try:
+ os.remove(target_path)
+ print(
+ f"[INFO] Deleted TensorRT auxiliary/timing file: {target_path}"
+ )
+ except Exception as e:
+ print(
+ f"[WARN] Failed to delete auxiliary file {target_path}: {e}"
+ )
+ except Exception as e:
+ print(
+ f"[WARN] Failed to clean profile/timing/cache files in {cache_dir}: {e}"
+ )
+
def load_model(self, model_name, session_options=None):
"""
Loads an AI model (ONNX) with thread safety.
@@ -680,6 +791,15 @@ def load_model(self, model_name, session_options=None):
)
probe_process.terminate()
probe_process.join()
+
+ # Clean up corrupted caches caused by the timeout before raising
+ print(
+ f"[INFO] Cleaning up corrupted TensorRT cache for {model_name} due to timeout..."
+ )
+ self._clean_tensorrt_cache(
+ onnx_path, model_trt_options
+ )
+
raise RuntimeError(
"TensorRT Engine build timed out."
)
@@ -700,6 +820,15 @@ def load_model(self, model_name, session_options=None):
print(
f"[WARN] Probe attempt {attempt + 1} failed with exit code {exitcode}."
)
+
+ # Wipe corrupted artifacts before attempting the next retry
+ print(
+ f"[INFO] Cleaning up potentially corrupted TensorRT cache for {model_name}..."
+ )
+ self._clean_tensorrt_cache(
+ onnx_path, model_trt_options
+ )
+
if attempt < max_retries - 1:
print("[INFO] Retrying in 2 seconds...")
time.sleep(2.0)
diff --git a/app/processors/utils/faceutil.py b/app/processors/utils/faceutil.py
index 67dadd8e..27920608 100644
--- a/app/processors/utils/faceutil.py
+++ b/app/processors/utils/faceutil.py
@@ -2001,8 +2001,67 @@ def calculate_distance_ratio(
def calc_eye_close_ratio(
lmk: np.ndarray, target_eye_ratio: np.ndarray = None
) -> np.ndarray:
- lefteye_close_ratio = calculate_distance_ratio(lmk, 6, 18, 0, 12)
- righteye_close_ratio = calculate_distance_ratio(lmk, 30, 42, 24, 36)
+ """
+ Calculates the Eye Aspect Ratio (EAR) with strict projection safeguards.
+ Includes Profile Occlusion Detection and Symmetric Blink Harmonization
+ to completely eliminate "fisheyes" and "lazy eyes".
+
+ Args:
+ lmk: Array of shape (N, 203, 2) or (1, 203, 2) containing landmarks.
+ target_eye_ratio: Optional target ratio to concatenate.
+
+ Returns:
+ np.ndarray: The safely clamped and harmonized eye ratios.
+ """
+ # 1. Calculate raw horizontal width of the eyes
+ raw_left_width = np.linalg.norm(lmk[:, 0] - lmk[:, 12], axis=1, keepdims=True)
+ raw_right_width = np.linalg.norm(lmk[:, 24] - lmk[:, 36], axis=1, keepdims=True)
+
+ # SAFEGUARD A: Profile Occlusion Detection (The Fisheye Fix)
+ # If one eye is significantly narrower horizontally than the other (< 55%),
+ # the face is turned. The hidden eye's 2D landmarks are unreliable.
+ left_occluded = raw_left_width < (raw_right_width * 0.55)
+ right_occluded = raw_right_width < (raw_left_width * 0.55)
+
+ # SAFEGUARD B: Clamp minimum width to prevent ZeroDivision on extreme squishing
+ min_eye_width = 4.0
+ left_eye_width = np.maximum(raw_left_width, min_eye_width)
+ right_eye_width = np.maximum(raw_right_width, min_eye_width)
+
+ # 2. Calculate vertical height of the eyes
+ left_eye_height = np.linalg.norm(lmk[:, 6] - lmk[:, 18], axis=1, keepdims=True)
+ right_eye_height = np.linalg.norm(lmk[:, 30] - lmk[:, 42], axis=1, keepdims=True)
+
+ # 3. Calculate Base Ratios
+ lefteye_close_ratio = left_eye_height / left_eye_width
+ righteye_close_ratio = right_eye_height / right_eye_width
+
+ # SAFEGUARD C: Apply Occlusion Lock
+ # Force the hidden eye to perfectly mirror the visible eye's EAR.
+ # This prevents the network from rendering a bulging wide-open eye.
+ lefteye_close_ratio = np.where(
+ left_occluded, righteye_close_ratio, lefteye_close_ratio
+ )
+ righteye_close_ratio = np.where(
+ right_occluded, lefteye_close_ratio, righteye_close_ratio
+ )
+
+ # SAFEGUARD D: Symmetric Blink Harmonization (Anti "Lazy-Eye")
+ blink_threshold = 0.28
+ is_blinking = (lefteye_close_ratio < blink_threshold) & (
+ righteye_close_ratio < blink_threshold
+ )
+
+ avg_ratio = (lefteye_close_ratio + righteye_close_ratio) / 2.0
+
+ lefteye_close_ratio = np.where(is_blinking, avg_ratio, lefteye_close_ratio)
+ righteye_close_ratio = np.where(is_blinking, avg_ratio, righteye_close_ratio)
+
+ # SAFEGUARD E: Hard clamp the final ratio to biologically plausible limits.
+ max_safe_ear = 0.45
+ lefteye_close_ratio = np.clip(lefteye_close_ratio, 0.0, max_safe_ear)
+ righteye_close_ratio = np.clip(righteye_close_ratio, 0.0, max_safe_ear)
+
if target_eye_ratio is not None:
return np.concatenate(
[lefteye_close_ratio, righteye_close_ratio, target_eye_ratio], axis=1
@@ -2012,8 +2071,42 @@ def calc_eye_close_ratio(
# imported from https://github.com/KwaiVGI/LivePortrait/blob/main/src/utils/live_portrait_wrapper.py
+# def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray:
+# return calculate_distance_ratio(lmk, 90, 102, 48, 66)
def calc_lip_close_ratio(lmk: np.ndarray) -> np.ndarray:
- return calculate_distance_ratio(lmk, 90, 102, 48, 66)
+ """
+ Calculates the Mouth Aspect Ratio (MAR) with strict projection safeguards.
+ Prevents division by zero on profile faces or extreme pouting,
+ which causes the lower face to collapse or the mouth to stretch unnaturally.
+
+ Args:
+ lmk: Array of shape (N, 203, 2) or (1, 203, 2) containing landmarks.
+
+ Returns:
+ np.ndarray: The clamped lip ratios to safely feed the retargeting network.
+ """
+ # 1. Calculate horizontal width of the mouth (Denominator)
+ # Indices based on 203-point format: Left mouth corner (48), Right mouth corner (66)
+ mouth_width = np.linalg.norm(lmk[:, 48] - lmk[:, 66], axis=1, keepdims=True)
+
+ # SAFEGUARD A: Clamp minimum width to prevent MAR explosion.
+ # A mouth width below 8.0 pixels implies extreme profile, heavy occlusion, or severe pout.
+ min_mouth_width = 8.0
+ mouth_width = np.maximum(mouth_width, min_mouth_width)
+
+ # 2. Calculate vertical height of the lips (Numerator)
+ # Indices: Upper lip center (90), Lower lip center (102)
+ lip_height = np.linalg.norm(lmk[:, 90] - lmk[:, 102], axis=1, keepdims=True)
+
+ # 3. Calculate Base Ratio
+ mar = lip_height / mouth_width
+
+ # SAFEGUARD B: Hard clamp the final ratio to biologically plausible limits (0.0 to 0.85).
+ # Normal human mouth aspect ratio rarely exceeds 0.75 even when shouting or yawning.
+ max_safe_mar = 0.85
+ mar = np.clip(mar, 0.0, max_safe_mar)
+
+ return mar
# imported from https://github.com/KwaiVGI/LivePortrait/blob/main/src/utils/camera.py
@@ -2310,40 +2403,60 @@ def update_delta_new_mov_y(mov_y, delta_new, **kwargs):
# imported from https://github.com/KwaiVGI/LivePortrait/blob/main/src/utils/live_portrait_wrapper.py
def calc_combined_eye_ratio(c_d_eyes_i, source_lmk, device="cuda"):
+ """
+ FIX: Averages the driving eye ratios to prevent left-eye dominance bias.
+ Ensures symmetric baseline retargeting for the LivePortrait generator.
+ """
c_s_eyes = calc_eye_close_ratio(source_lmk[None])
c_s_eyes_tensor = torch.from_numpy(c_s_eyes).float().to(device)
- # c_d_eyes_i_tensor = torch.Tensor([c_d_eyes_i[0][0]]).reshape(1, 1).to(device)
- c_d_eyes_i_numpy_m = np.array(
- [c_d_eyes_i[0][0]], dtype=np.float32
- ) # Assicurati che sia un array NumPy
- c_d_eyes_i_numpy = np.array(
- [max(c_d_eyes_i_numpy_m, 0.08)], dtype=np.float32
- ) # Mini 0.08 otherwise eyelids overlap
+
+ # Safely extract left and right eye ratios
+ left_eye_ratio = c_d_eyes_i[0][0]
+ right_eye_ratio = c_d_eyes_i[0][1] if len(c_d_eyes_i[0]) > 1 else left_eye_ratio
+
+ # Calculate the mean to harmonize the retargeting delta
+ mean_eye_ratio = (left_eye_ratio + right_eye_ratio) / 2.0
+
+ c_d_eyes_i_numpy_m = np.array([mean_eye_ratio], dtype=np.float32)
+
+ # Minimum 0.08 clamp to prevent eyelid mesh overlapping (Z-fighting)
+ c_d_eyes_i_numpy = np.array([max(c_d_eyes_i_numpy_m[0], 0.08)], dtype=np.float32)
c_d_eyes_i_tensor = torch.from_numpy(c_d_eyes_i_numpy).reshape(1, 1).to(device)
- # [c_s,eyes, c_d,eyes,i]
+
+ # Format: [c_s,eyes, c_d,eyes,i]
combined_eye_ratio_tensor = torch.cat([c_s_eyes_tensor, c_d_eyes_i_tensor], dim=1)
return combined_eye_ratio_tensor
-def calc_combined_eye_ratio_norm(c_d_eyes_i, source_lmk, device="cuda"):
+def calc_independent_eye_ratios(
+ c_d_eyes_i: np.ndarray, source_lmk: np.ndarray, device: str = "cuda"
+) -> tuple[torch.Tensor, torch.Tensor]:
+ """
+ Calculates separate retargeting tensors for Left and Right eyes.
+ Enables the 'Split-Eye' asymmetric blink (winking) trick.
+ """
c_s_eyes = calc_eye_close_ratio(source_lmk[None])
c_s_eyes_tensor = torch.from_numpy(c_s_eyes).float().to(device)
- # c_d_eyes_i_tensor = torch.Tensor([c_d_eyes_i[0][0]]).reshape(1, 1).to(device)
- c_d_eyes_i_numpy_l = np.array(
- [c_d_eyes_i[0][0]], dtype=np.float32
- ) # Assicurati che sia un array NumPy
- c_d_eyes_i_numpy_r = np.array(
- [c_d_eyes_i[0][1]], dtype=np.float32
- ) # Assicurati che sia un array NumPy
- c_d_eyes_i_numpy = np.array(
- [max(min(c_d_eyes_i_numpy_l, c_d_eyes_i_numpy_r), 0.08)], dtype=np.float32
- ) # Mini 0.08 otherwise eyelids overlap
- c_d_eyes_i_tensor = torch.from_numpy(c_d_eyes_i_numpy).reshape(1, 1).to(device)
- # [c_s,eyes, c_d,eyes,i]
- combined_eye_ratio_tensor = torch.cat([c_s_eyes_tensor, c_d_eyes_i_tensor], dim=1)
- return combined_eye_ratio_tensor
+ # Safely extract left and right eye ratios
+ left_eye_ratio = float(c_d_eyes_i[0][0])
+ right_eye_ratio = (
+ float(c_d_eyes_i[0][1]) if len(c_d_eyes_i[0]) > 1 else left_eye_ratio
+ )
+
+ # Clamp to 0.08 minimum to avoid 3D mesh overlap (Z-fighting on eyelids)
+ left_val = np.array([max(left_eye_ratio, 0.08)], dtype=np.float32)
+ right_val = np.array([max(right_eye_ratio, 0.08)], dtype=np.float32)
+
+ left_tensor = torch.from_numpy(left_val).reshape(1, 1).to(device)
+ right_tensor = torch.from_numpy(right_val).reshape(1, 1).to(device)
+
+ # Format: [c_s_left, c_s_right, target_specific_eye]
+ ratio_left_target = torch.cat([c_s_eyes_tensor, left_tensor], dim=1)
+ ratio_right_target = torch.cat([c_s_eyes_tensor, right_tensor], dim=1)
+
+ return ratio_left_target, ratio_right_target
# imported from https://github.com/KwaiVGI/LivePortrait/blob/main/src/utils/live_portrait_wrapper.py
diff --git a/app/processors/video_processor.py b/app/processors/video_processor.py
index 13bf8376..b8fac871 100644
--- a/app/processors/video_processor.py
+++ b/app/processors/video_processor.py
@@ -16,7 +16,6 @@
import numpy
import torch
import pyvirtualcam
-import math
import copy
from PySide6.QtCore import QObject, QTimer, Signal, Slot
@@ -30,6 +29,7 @@
from app.ui.widgets.actions import list_view_actions
from app.ui.widgets.actions import save_load_actions
from app.ui.widgets.settings_layout_data import CAMERA_BACKENDS
+from app.processors.video_utils.video_encoding import FFmpegEncoder, FFmpegPostProcessor
import app.helpers.miscellaneous as misc_helpers
from app.helpers.typing_helper import (
ControlTypes,
@@ -217,9 +217,7 @@ def __init__(self, main_window: "MainWindow", num_threads=2):
# --- Subprocesses ---
self.virtcam: pyvirtualcam.Camera | None = None
- self.recording_sp: subprocess.Popen | None = (
- None # FFmpeg process for both recording styles
- )
+ self.encoder = FFmpegEncoder()
self.ffplay_sound_sp: subprocess.Popen | None = (
None # ffplay process for live audio
)
@@ -1238,24 +1236,19 @@ def display_next_frame(self):
# Write to FFmpeg
if self.is_processing_segments or self.recording:
- if (
- self.recording_sp
- and self.recording_sp.stdin
- and not self.recording_sp.stdin.closed
- ):
- try:
- self.recording_sp.stdin.write(frame.tobytes())
+ if self.encoder.is_running():
+ if self.encoder.write_frame(frame):
# update counters for duration calculation
self.frames_written += 1
self.last_displayed_frame = frame_number_to_display
- except OSError as e:
+ else:
log_prefix = (
f"segment {self.current_segment_index + 1}"
if self.is_processing_segments
else "recording"
)
print(
- f"[WARN] Error writing frame {frame_number_to_display} to FFmpeg stdin during {log_prefix}: {e}"
+ f"[WARN] Error writing frame {frame_number_to_display} to FFmpeg encoder during {log_prefix}."
)
else:
log_prefix = (
@@ -1264,7 +1257,7 @@ def display_next_frame(self):
else "recording"
)
print(
- f"[WARN] FFmpeg stdin not available for {log_prefix} when trying to write frame {frame_number_to_display}."
+ f"[WARN] FFmpeg encoder not available for {log_prefix} when trying to write frame {frame_number_to_display}."
)
# Update UI
@@ -1537,7 +1530,26 @@ def process_video(self):
# DELAYED FFMPEG CREATION
if self.recording:
- if not self.create_ffmpeg_subprocess(output_filename=None):
+ self.temp_file = self._prepare_default_temp_file()
+ if os.path.exists(self.temp_file):
+ try:
+ os.remove(self.temp_file)
+ except OSError:
+ pass
+
+ frame_height, frame_width, _ = self.current_frame.shape
+
+ success = self.encoder.start_process(
+ output_filename=self.temp_file,
+ frame_width=frame_width,
+ frame_height=frame_height,
+ fps=self.fps,
+ control=self.main_window.control,
+ is_segment=False,
+ media_path=self.media_path,
+ )
+
+ if not success:
print("[ERROR] Failed to start FFmpeg for default-style recording.")
self.stop_processing() # Abort the start
return
@@ -1753,11 +1765,20 @@ def start_frame_worker(
return prev
def process_current_frame(
- self, synchronous: bool = False, fit_on_complete: bool = False
- ):
+ self,
+ synchronous: bool = False,
+ fit_on_complete: bool = False,
+ suppress_raw_preview: bool = False,
+ ) -> "FrameWorker | None":
"""
Process the single, currently selected frame (e.g., after seek or for image).
This is a one-shot operation, not part of the metronome.
+
+ Args:
+ synchronous: If True, blocks until processing is done.
+ fit_on_complete: If True, auto-fits the view after generation.
+ suppress_raw_preview: If True, skips displaying the unprocessed raw frame
+ while waiting for the AI worker. Prevents UI flashing.
"""
if self.processing or self.is_processing_segments:
print("[INFO] Stopping active processing to process single frame.")
@@ -1908,7 +1929,20 @@ def process_current_frame(
# --- Process if read was successful ---
if read_successful and frame_to_process is not None:
- if frame_changed:
+ # Check if the UI is currently simulating a navigation step
+ is_stepping = getattr(self.main_window, "_is_stepping_media", False)
+ is_compare_active = getattr(
+ self.main_window, "view_face_compare_enabled", False
+ )
+ is_mask_active = getattr(self.main_window, "view_face_mask_enabled", False)
+
+ # Block the raw image preview IF explicitly requested (e.g., Stop button)
+ # OR IF we are actively stepping through navigation with a special preview mode active
+ force_suppression = suppress_raw_preview or (
+ is_stepping and (is_compare_active or is_mask_active)
+ )
+
+ if frame_changed and not force_suppression:
frame_bgr_preview = numpy.ascontiguousarray(frame_to_process[..., ::-1])
self.display_current_frame(
generation=0,
@@ -2017,24 +2051,10 @@ def stop_processing(self) -> bool:
self.join_and_clear_threads()
print("[INFO] Worker threads joined.")
- # 5. Stop and cleanup FFmpeg subprocess
- if self.recording_sp:
- print("[INFO] Closing and waiting for active FFmpeg subprocess...")
- if self.recording_sp.stdin and not self.recording_sp.stdin.closed:
- try:
- self.recording_sp.stdin.close()
- except OSError as e:
- print(f"[WARN] Error closing ffmpeg stdin during abort: {e}")
- try:
- self.recording_sp.wait(timeout=5)
- print("[INFO] FFmpeg subprocess terminated.")
- except subprocess.TimeoutExpired:
- print("[WARN] FFmpeg subprocess did not terminate gracefully, killing.")
- self.recording_sp.kill()
- self.recording_sp.wait()
- except Exception as e:
- print(f"[ERROR] Error waiting for FFmpeg subprocess: {e}")
- self.recording_sp = None
+ # 5. Stop and cleanup FFmpeg encoder
+ if self.encoder.is_running():
+ print("[INFO] Closing and waiting for active FFmpeg encoder...")
+ self.encoder.close_process()
# 6. Cleanup temp files based on stopped mode.
if was_processing_segments:
@@ -2083,21 +2103,29 @@ def stop_processing(self) -> bool:
elif self.file_type == "webcam":
# For webcam, re-opening essentially prepares it for the next 'Play' click.
try:
- webcam_index = int(self.main_window.control.get("WebcamDeviceSelection", 0))
-
- backend_name = self.main_window.control.get("WebcamBackendSelection", "Default")
+ webcam_index = int(
+ self.main_window.control.get("WebcamDeviceSelection", 0)
+ )
+
+ backend_name = str(
+ self.main_window.control.get("WebcamBackendSelection", "Default")
+ )
backend_id = CAMERA_BACKENDS.get(backend_name, cv2.CAP_ANY)
-
+
self.media_capture = cv2.VideoCapture(webcam_index, backend_id)
-
+
if self.media_capture.isOpened():
try:
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
self.media_capture.set(cv2.CAP_PROP_FOURCC, fourcc)
except Exception:
pass
-
- res_str = self.main_window.control.get("WebcamMaxResSelection", "1280x720")
+
+ res_str = str(
+ self.main_window.control.get(
+ "WebcamMaxResSelection", "1280x720"
+ )
+ )
target_width, target_height = map(int, res_str.split("x"))
self.media_capture.set(cv2.CAP_PROP_FRAME_WIDTH, target_width)
self.media_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, target_height)
@@ -2164,10 +2192,11 @@ def stop_processing(self) -> bool:
was_recording_default_style or was_processing_segments
):
print(
- "[INFO] Stop Processing: Triggering final frame refresh to match UI state."
+ "[INFO] Stop Processing: Triggering final frame refresh to match UI state (raw preview suppressed)."
)
- # We call this asynchronously to let the UI finish its current state cleanup first
- self.process_current_frame(synchronous=False)
+ # We call this asynchronously to let the UI finish its current state cleanup first.
+ # suppress_raw_preview=True ensures the UI doesn't flash the original image while computing.
+ self.process_current_frame(synchronous=False, suppress_raw_preview=True)
self.processing_stopped_signal.emit()
@@ -2433,551 +2462,48 @@ def _log_processing_summary(
except Exception as e:
print(f"[WARN] Could not calculate average FPS: {e}\n")
- # --- FFmpeg and Finalization ---
-
- @staticmethod
- def _parse_ffprobe_fps(rate_text: Any) -> float | None:
- """Parse ffprobe frame-rate strings such as "30000/1001" safely."""
- if rate_text is None:
- return None
- try:
- text = str(rate_text).strip()
- if not text:
- return None
- if "/" in text:
- num_s, den_s = text.split("/", 1)
- num = float(num_s)
- den = float(den_s)
- if den == 0:
- return None
- value = num / den
- else:
- value = float(text)
- return value if value > 0 else None
- except Exception:
- return None
-
- def _probe_source_video_metrics(self, file_path: str) -> Dict[str, Any] | None:
- """Probe source video metrics needed for quality matching.
-
- Returns a dictionary with keys: bit_rate, width, height, fps.
- """
- if not file_path or not os.path.isfile(file_path):
- return None
-
- try:
- import json
-
- args = [
- "ffprobe",
- "-v",
- "quiet",
- "-print_format",
- "json",
- "-select_streams",
- "v:0",
- "-show_entries",
- "stream=codec_type,codec_name,width,height,bit_rate,avg_frame_rate,r_frame_rate:format=bit_rate",
- file_path,
- ]
- result = subprocess.run(args, capture_output=True, text=True, timeout=30)
- if result.returncode != 0:
- return None
-
- probe_data = json.loads(result.stdout)
- video_stream = next(
- (
- s
- for s in probe_data.get("streams", [])
- if s.get("codec_type") == "video"
- ),
- None,
- )
- if not isinstance(video_stream, dict):
- return None
-
- width = int(video_stream.get("width") or 0)
- height = int(video_stream.get("height") or 0)
-
- bit_rate_raw = video_stream.get("bit_rate")
- if not bit_rate_raw:
- bit_rate_raw = probe_data.get("format", {}).get("bit_rate")
- bit_rate = float(bit_rate_raw) if bit_rate_raw else 0.0
-
- fps = self._parse_ffprobe_fps(video_stream.get("avg_frame_rate"))
- if not fps:
- fps = self._parse_ffprobe_fps(video_stream.get("r_frame_rate"))
-
- if width <= 0 or height <= 0 or not fps or bit_rate <= 0:
- return None
-
- return {
- "bit_rate": bit_rate,
- "width": float(width),
- "height": float(height),
- "fps": float(fps),
- "codec_name": str(video_stream.get("codec_name") or "").lower(),
- }
- except Exception:
- return None
-
- @staticmethod
- def _source_codec_to_hevc_factor(codec_name: str) -> float:
- """Map source codec efficiency relative to HEVC for quality matching."""
- codec = (codec_name or "").lower()
- if codec in {"hevc", "h265"}:
- return 1.00
- if codec in {"h264", "avc"}:
- return 0.78
- if codec == "av1":
- return 1.28
- if codec == "vp9":
- return 1.18
- if codec in {"mpeg2video", "mpeg4", "msmpeg4v3"}:
- return 0.68
- # Unknown codecs: use a conservative middle-ground.
- return 0.90
-
- def _get_adaptive_recording_quality(
- self,
- control: Mapping[str, Any],
- quality_value: int,
- output_width: int,
- output_height: int,
- source_metrics: Mapping[str, Any] | None = None,
- output_fps: float | None = None,
- ) -> int:
- """Auto-compute CQ/CRF from source metrics to keep perceived quality close.
-
- When auto-match is enabled, this method computes an absolute target quality
- from source bitrate density instead of applying a small delta to the manual
- slider value. This keeps behavior robust even if manual FFQualitySlider is
- set to an unreasonable value.
- """
- if not (
- bool(control.get("FFMpegOptionsToggle", False))
- and bool(control.get("FFAutoMatchSourceQualityToggle", False))
- ):
- return quality_value
-
- if source_metrics is None:
- source_metrics = self._probe_source_video_metrics(self.media_path or "")
- if not source_metrics:
- print(
- "[INFO] Source-quality auto match enabled, but probe failed. Using manual Quality unchanged."
- )
- return quality_value
-
- src_w = max(1.0, source_metrics["width"])
- src_h = max(1.0, source_metrics["height"])
- src_fps = max(0.001, source_metrics["fps"])
- src_bitrate = max(1.0, source_metrics["bit_rate"])
- src_codec = str(source_metrics.get("codec_name", "") or "").lower()
- out_fps = float(output_fps) if output_fps and output_fps > 0 else src_fps
-
- # Bits-per-pixel-per-frame (bpppf) is a lightweight content/quality proxy.
- src_bpppf = src_bitrate / (src_w * src_h * src_fps)
-
- src_pixels = src_w * src_h
- out_pixels = float(max(1, output_width) * max(1, output_height))
- scale_ratio = out_pixels / src_pixels
-
- # Convert source density to a HEVC-equivalent density baseline.
- codec_factor = self._source_codec_to_hevc_factor(src_codec)
- target_bpppf = src_bpppf * codec_factor
-
- # Temporal adjustment for output fps changes. Keep it intentionally gentle
- # so fps differences do not dominate quality estimation.
- temporal_ratio = max(0.5, min(2.0, out_fps / src_fps))
- target_bpppf *= temporal_ratio**0.35
-
- # Resolution-aware density adjustment:
- # - Upscale: allow more density to preserve restored detail.
- # - Downscale: allow less density to avoid wasting bits.
- if scale_ratio > 1.0:
- up_steps = math.log2(scale_ratio)
- target_bpppf *= min(1.35, 1.0 + 0.15 * up_steps)
- elif scale_ratio < 1.0:
- down_steps = math.log2(1.0 / max(scale_ratio, 1e-6))
- target_bpppf *= max(0.70, 1.0 - 0.20 * down_steps)
-
- # Map target bpppf to an absolute CQ/CRF target (lower is higher quality).
- # Tuned to stay in a practical range for SDR NVENC CQ and HDR x265 CRF.
- if target_bpppf >= 0.25:
- auto_quality = 14
- elif target_bpppf >= 0.16:
- auto_quality = 16
- elif target_bpppf >= 0.11:
- auto_quality = 18
- elif target_bpppf >= 0.08:
- auto_quality = 20
- elif target_bpppf >= 0.055:
- auto_quality = 22
- elif target_bpppf >= 0.038:
- auto_quality = 24
- elif target_bpppf >= 0.028:
- auto_quality = 26
- elif target_bpppf >= 0.020:
- auto_quality = 28
- elif target_bpppf >= 0.014:
- auto_quality = 30
- else:
- auto_quality = 33
-
- adapted_quality = max(12, min(36, int(auto_quality)))
-
- print(
- "[INFO] Source-quality auto match: "
- f"source={src_w:.0f}x{src_h:.0f}@{src_fps:.3f} "
- f"codec={src_codec or 'unknown'} bitrate={src_bitrate / 1_000_000:.3f}Mbps "
- f"src_bpppf={src_bpppf:.5f} target_bpppf={target_bpppf:.5f} "
- f"out_fps={out_fps:.3f} temporal_ratio={temporal_ratio:.3f}, "
- f"manual_quality={quality_value} auto_quality={adapted_quality}"
- )
- return adapted_quality
-
- def create_ffmpeg_subprocess(self, output_filename: str):
+ def _prepare_default_temp_file(self) -> str:
"""
- Creates the FFmpeg subprocess for recording.
- This is a merged function used by both default-style and multi-segment recording.
-
- :param output_filename: The direct output path. If None, it's default-style
- recording and a temp file will be generated.
+ Prepares the temporary directory and generates a temp file path for default recording.
+ Cleans up orphaned temp files from previous crashed sessions.
"""
- control = self.main_window.control.copy()
- is_segment = output_filename is not None
-
- # 1. Guards
- if (
- not isinstance(self.current_frame, numpy.ndarray)
- or self.current_frame.size == 0
- ):
- print("[ERROR] Current frame invalid. Cannot get dimensions.")
- return False
- if not self.media_path or not Path(self.media_path).is_file():
- print("[ERROR] Original media path invalid.")
- return False
- if self.fps <= 0:
- print("[ERROR] Invalid FPS.")
- return False
-
- start_time_sec = 0.0
- end_time_sec = 0.0
-
- if is_segment:
- if self.current_segment_index < 0 or self.current_segment_index >= len(
- self.segments_to_process
- ):
- print(f"[ERROR] Invalid segment index {self.current_segment_index}.")
- return False
- start_frame, end_frame = self.segments_to_process[
- self.current_segment_index
- ]
- start_time_sec = start_frame / self.fps
- end_time_sec = end_frame / self.fps
-
- # 2. Frame Dimensions
- frame_height, frame_width, _ = self.current_frame.shape
- # VP-28: Apply enhancer dimension scaling for BOTH segment and default recording modes.
- if control["FrameEnhancerEnableToggle"]:
- if control["FrameEnhancerTypeSelection"] in (
- "RealEsrgan-x2-Plus",
- "BSRGan-x2",
- ):
- frame_height = frame_height * 2
- frame_width = frame_width * 2
- elif control["FrameEnhancerTypeSelection"] in (
- "RealEsrgan-x4-Plus",
- "BSRGan-x4",
- "UltraSharp-x4",
- "UltraMix-x4",
- "RealEsr-General-x4v3",
- ):
- frame_height = frame_height * 4
- frame_width = frame_width * 4
-
- # Calculate downscale dimensions
- frame_height_down = frame_height
- frame_width_down = frame_width
- if control["FrameEnhancerDownToggle"]:
- if frame_width != 1920 or frame_height != 1080:
- frame_width_down_mult = frame_width / 1920
- # VP-27: Force even dimensions — most video codecs (h264/hevc) require
- # width and height to be multiples of 2.
- frame_height_down = math.ceil(frame_height / frame_width_down_mult) & ~1
- frame_width_down = 1920
- else:
- print("[WARN] Already 1920*1080")
-
- # 3. Output File Path and Logging
- if is_segment:
- segment_num = self.current_segment_index + 1
- print(
- f"[INFO] Creating FFmpeg (Segment {segment_num}): Video Dim={frame_width}x{frame_height}, FPS={self.fps}, Output='{output_filename}'"
- )
- print(
- f"[INFO] Audio Segment: Start={start_time_sec:.3f}s, End={end_time_sec:.3f}s (Frames {start_frame}-{end_frame})"
- )
+ date_and_time = datetime.now().strftime(r"%Y_%m_%d_%H_%M_%S")
+ try:
+ base_temp_dir = os.path.join(os.getcwd(), "temp_files", "default")
+ os.makedirs(base_temp_dir, exist_ok=True)
- if Path(output_filename).is_file():
- try:
- os.remove(output_filename)
- except OSError as e:
- print(
- f"[WARN] Could not remove existing segment file {output_filename}: {e}"
- )
- else:
- # Default-style: create a unique temp file
- date_and_time = datetime.now().strftime(r"%Y_%m_%d_%H_%M_%S")
try:
- base_temp_dir = os.path.join(os.getcwd(), "temp_files", "default")
- os.makedirs(base_temp_dir, exist_ok=True)
-
- # Clean up orphaned temp files from previous crashed sessions.
- # These are left behind when the application exits uncleanly during
- # a recording. Only remove files older than 24 hours to avoid
- # accidentally deleting files from a recording that is still active
- # in another instance.
- try:
- _cutoff = time.time() - 86400 # 24 hours
- for _stale in Path(base_temp_dir).glob("temp_output_*.mp4"):
- try:
- if _stale.stat().st_mtime < _cutoff:
- _stale.unlink()
- print(f"[INFO] Removed stale temp file: {_stale.name}")
- except OSError:
- pass
-
- _stale_audio_dir = Path(base_temp_dir) / "temp_audio"
- if _stale_audio_dir.is_dir():
- for _stale_audio_file in _stale_audio_dir.iterdir():
- try:
- if _stale_audio_file.stat().st_mtime < _cutoff:
- if _stale_audio_file.is_dir():
- shutil.rmtree(
- _stale_audio_file, ignore_errors=True
- )
- else:
- _stale_audio_file.unlink()
- print(
- f"[INFO] Removed stale temp audio artifact: {_stale_audio_file.name}"
- )
- except OSError:
- pass
+ _cutoff = time.time() - 86400 # 24 hours
+ for _stale in Path(base_temp_dir).glob("temp_output_*.mp4"):
+ try:
+ if _stale.stat().st_mtime < _cutoff:
+ _stale.unlink()
+ print(f"[INFO] Removed stale temp file: {_stale.name}")
+ except OSError:
+ pass
+ _stale_audio_dir = Path(base_temp_dir) / "temp_audio"
+ if _stale_audio_dir.is_dir():
+ for _stale_audio_file in _stale_audio_dir.iterdir():
try:
- next(_stale_audio_dir.iterdir())
- except StopIteration:
- try:
- _stale_audio_dir.rmdir()
- print("[INFO] Removed empty stale temp audio directory")
- except OSError:
- pass
- except Exception:
- pass # Non-critical; never block recording startup
-
- self.temp_file = os.path.join(
- base_temp_dir, f"temp_output_{date_and_time}.mp4"
- )
- print(f"[INFO] Default temp file will be created at: {self.temp_file}")
- except Exception as e:
- print(f"[ERROR] Failed to create temporary directory/file path: {e}")
- self.temp_file = f"temp_output_{date_and_time}.mp4"
- print(
- f"[WARN] Falling back to local directory for temp file: {self.temp_file}"
- )
-
- print(
- f"[INFO] Creating FFmpeg : Video Dim={frame_width}x{frame_height}, FPS={self.fps}, Temp Output='{self.temp_file}'"
- )
-
- if Path(self.temp_file).is_file():
- try:
- os.remove(self.temp_file)
- except OSError as e:
- print(
- f"[WARN] Could not remove existing temp file {self.temp_file}: {e}"
- )
-
- # 4. Build FFmpeg Arguments
- hdrpreset = control["FFPresetsHDRSelection"]
- sdrpreset = control["FFPresetsSDRSelection"]
- ffquality = int(control["FFQualitySlider"])
- ffspatial = int(control["FFSpatialAQToggle"])
- fftemporal = int(control["FFTemporalAQToggle"])
-
- output_width_for_quality = (
- frame_width_down if control["FrameEnhancerDownToggle"] else frame_width
- )
- output_height_for_quality = (
- frame_height_down if control["FrameEnhancerDownToggle"] else frame_height
- )
+ if _stale_audio_file.stat().st_mtime < _cutoff:
+ if _stale_audio_file.is_dir():
+ import shutil
- source_metrics: Mapping[str, Any] | None = None
- if bool(control.get("FFAutoMatchSourceQualityToggle", False)):
- media_path = self.media_path or ""
- source_metrics_cache = getattr(self, "_source_metrics_cache", None)
- if source_metrics_cache is None:
- source_metrics_cache = {}
- setattr(self, "_source_metrics_cache", source_metrics_cache)
- source_metrics = source_metrics_cache.get(media_path)
- if source_metrics is None:
- source_metrics = self._probe_source_video_metrics(media_path)
- source_metrics_cache[media_path] = source_metrics
-
- ffquality = self._get_adaptive_recording_quality(
- control,
- ffquality,
- output_width_for_quality,
- output_height_for_quality,
- source_metrics=source_metrics,
- output_fps=self.fps,
- )
-
- # Base args: read raw video from stdin.
- # VP-12: Frames written to stdin are in BGR24 byte order.
- # FrameWorker returns numpy arrays in BGR channel order (OpenCV convention).
- # display_next_frame writes frame.tobytes() directly, so the pixel format
- # passed to FFmpeg MUST remain "bgr24" to match the raw bytes.
- args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-f",
- "rawvideo",
- "-pix_fmt",
- "bgr24", # The processed frame from FrameWorker is BGR
- "-s",
- f"{frame_width}x{frame_height}",
- "-r",
- str(self.fps),
- "-i",
- "pipe:0", # Read from stdin
- ]
-
- if is_segment:
- # For segments, add the audio source and time limits
- args.extend(
- [
- "-ss",
- str(start_time_sec),
- "-to",
- str(end_time_sec),
- "-i",
- self.media_path,
- "-map",
- "0:v:0", # Map video from stdin
- "-map",
- "1:a:0?", # Map audio from media_path (if exists)
- "-c:a",
- "aac",
- "-shortest",
- ]
- )
-
- # Video codec args
- if control["HDREncodeToggle"]:
- # HDR uses X265
- args.extend(
- [
- "-c:v",
- "libx265",
- "-profile:v",
- "main10",
- "-preset",
- str(hdrpreset),
- "-pix_fmt",
- "yuv420p10le",
- "-x265-params",
- f"crf={ffquality}:vbv-bufsize=10000:vbv-maxrate=10000:selective-sao=0:no-sao=1:strong-intra-smoothing=0:rect=0:aq-mode={ffspatial}:t-aq={fftemporal}:hdr-opt=1:repeat-headers=1:colorprim=bt2020:range=limited:transfer=smpte2084:colormatrix=bt2020nc:master-display='G(13250,34500)B(7500,3000)R(34000,16000)WP(15635,16450)L(10000000,1)':max-cll=1000,400",
- ]
- )
- else:
- # NVENC for SDR
- args.extend(
- [
- "-c:v",
- "hevc_nvenc",
- "-preset",
- str(sdrpreset),
- "-profile:v",
- "main10",
- "-cq",
- str(ffquality),
- "-pix_fmt",
- "yuv420p10le",
- "-colorspace",
- "rgb",
- "-color_primaries",
- "bt709",
- "-color_trc",
- "bt709",
- "-spatial-aq",
- str(ffspatial),
- "-temporal-aq",
- str(fftemporal),
- "-tier",
- "high",
- "-tag:v",
- "hvc1",
- ]
- )
-
- target_matrix = "bt2020nc" if control["HDREncodeToggle"] else "bt709"
- scale_params = f"in_range=pc:out_range=tv:out_color_matrix={target_matrix}"
-
- if control["FrameEnhancerDownToggle"]:
- args.extend(
- [
- "-vf",
- f"scale={frame_width_down}x{frame_height_down}:{scale_params}:flags=lanczos+accurate_rnd+full_chroma_int",
- ]
- )
- else:
- args.extend(
- [
- "-vf",
- f"scale={scale_params}",
- ]
- )
-
- # Output file
- if is_segment:
- args.extend([output_filename])
- else:
- args.extend([self.temp_file])
+ shutil.rmtree(_stale_audio_file, ignore_errors=True)
+ else:
+ _stale_audio_file.unlink()
+ except OSError:
+ pass
+ except Exception:
+ pass # Non-critical; never block recording startup
- # 5. Start Subprocess
- try:
- self.recording_sp = subprocess.Popen(
- args, stdin=subprocess.PIPE, bufsize=-1
- )
- # reset write counters each time we start a new FFmpeg session
- self.frames_written = 0
- self.last_displayed_frame = None
- return True
- except FileNotFoundError:
- print(
- "[ERROR] FFmpeg command not found. Ensure FFmpeg is installed and in system PATH."
- )
- self.main_window.display_messagebox_signal.emit(
- "FFmpeg Error", "FFmpeg command not found.", self.main_window
- )
- return False
+ temp_path = os.path.join(base_temp_dir, f"temp_output_{date_and_time}.mp4")
+ print(f"[INFO] Default temp file will be created at: {temp_path}")
+ return temp_path
except Exception as e:
- print(f"[ERROR] Failed to start FFmpeg subprocess : {e}")
- if is_segment:
- self.main_window.display_messagebox_signal.emit(
- "FFmpeg Error",
- f"Failed to start FFmpeg for segment {segment_num}:\n{e}",
- self.main_window,
- )
- else:
- self.main_window.display_messagebox_signal.emit(
- "FFmpeg Error", f"Failed to start FFmpeg:\n{e}", self.main_window
- )
- return False
+ print(f"[ERROR] Failed to create temporary directory/file path: {e}")
+ return f"temp_output_{date_and_time}.mp4"
def _identify_frame_segments(self, actual_end_frame: int) -> List[Tuple[int, int]]:
"""
@@ -3712,211 +3238,6 @@ def build_result(cancelled: bool) -> dict[str, Any]:
)
misc_helpers.release_capture(capture)
- def _extract_audio_segments(
- self, segments: List[Tuple[int, int]], temp_audio_dir: str
- ) -> Tuple[bool, List[str]]:
- """
- Extract audio from the original media for each frame segment.
-
- Returns: (success: bool, audio_files: List[str])
- - success: True if all segments extracted successfully
- - audio_files: List of paths to extracted audio files
- """
- audio_files = []
-
- for idx, (start_frame, end_frame) in enumerate(segments):
- # Convert frame numbers to time (seconds)
- start_time = start_frame / self.fps if self.fps > 0 else 0
- # end_time is exclusive (one frame after the last frame we want)
- end_time = (end_frame + 1) / self.fps if self.fps > 0 else 0
-
- # Skip empty segments (should not happen with our segment identification, but safety check)
- if start_time >= end_time:
- print(
- f"[WARN] Skipping empty audio segment {idx + 1} (start_time={start_time:.3f}s >= end_time={end_time:.3f}s)"
- )
- continue
-
- # Use a containerized AAC output rather than raw ADTS .aac.
- # Raw AAC concatenation is brittle on some skipped-frame rebuilds,
- # especially for MKV-derived inputs with awkward timestamps.
- audio_file = os.path.join(temp_audio_dir, f"audio_segment_{idx:04d}.m4a")
- audio_files.append(audio_file)
-
- # Always normalize skipped-frame rebuild audio to AAC-in-M4A.
- # This keeps the concat/remux path codec-agnostic for any source
- # audio format that FFmpeg can decode from the input media.
- media_path: str = self.media_path # type: ignore[assignment]
- args: list[str] = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "warning",
- "-err_detect",
- "ignore_err",
- "-i",
- media_path,
- "-ss",
- str(start_time),
- "-to",
- str(end_time),
- "-vn",
- "-map",
- "0:a:0?",
- "-af",
- "aresample=async=1:first_pts=0",
- "-c:a",
- "aac",
- "-b:a",
- "192k",
- "-y",
- audio_file,
- ]
-
- try:
- print(
- f"[INFO] Extracting audio segment {idx + 1}/{len(segments)}: {start_time:.3f}s → {end_time:.3f}s"
- )
- subprocess.run(args, check=True, capture_output=True, text=True)
-
- # Validate output; if it's not valid, retry once with the same
- # normalized extraction settings to rule out a transient failure.
- if not self._validate_audio_file(audio_file):
- print(
- f"[WARN] Validation failed for segment {idx + 1}, retrying extraction once"
- )
- re_args: list[str] = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "warning",
- "-err_detect",
- "ignore_err",
- "-i",
- media_path,
- "-ss",
- str(start_time),
- "-to",
- str(end_time),
- "-vn",
- "-map",
- "0:a:0?",
- "-af",
- "aresample=async=1:first_pts=0",
- "-c:a",
- "aac",
- "-b:a",
- "192k",
- "-y",
- audio_file,
- ]
- try:
- subprocess.run(
- re_args, check=True, capture_output=True, text=True
- )
- except subprocess.CalledProcessError as e2:
- print(
- f"[ERROR] Retry extraction failed for segment {idx + 1}: {e2}"
- )
- print(f"[ERROR] FFmpeg stderr: {e2.stderr}")
- for audio in audio_files:
- try:
- os.remove(audio)
- except OSError:
- pass
- return False, []
- if not self._validate_audio_file(audio_file):
- print(
- f"[ERROR] Retried segment {idx + 1} is still invalid after validation"
- )
- for audio in audio_files:
- try:
- os.remove(audio)
- except OSError:
- pass
- return False, []
-
- print(f"[INFO] Segment {idx + 1} extracted successfully")
- except subprocess.CalledProcessError as e:
- print(f"[ERROR] Failed to extract audio segment {idx + 1}: {e}")
- print(f"[ERROR] FFmpeg stderr: {e.stderr}")
- print(f"[ERROR] FFmpeg command: {' '.join(args)}")
- # Cleanup partial files
- for audio in audio_files:
- try:
- os.remove(audio)
- except OSError:
- pass
- return False, []
- except FileNotFoundError:
- print("[ERROR] FFmpeg not found. Cannot extract audio segments.")
- return False, []
-
- print(f"[INFO] All {len(segments)} audio segment(s) extracted successfully")
- return True, audio_files
-
- def _validate_audio_file(self, audio_file_path: str) -> bool:
- """
- Validate that an audio file can be properly decoded by FFmpeg.
- Returns True if audio is valid, False if corrupted.
- """
- if not os.path.exists(audio_file_path):
- print(f"[ERROR] Audio file does not exist: {audio_file_path}")
- return False
-
- try:
- # Try to probe the audio file with ffprobe
- args = [
- "ffprobe",
- "-v",
- "quiet",
- "-print_format",
- "json",
- "-show_format",
- "-show_streams",
- audio_file_path,
- ]
- result = subprocess.run(args, capture_output=True, text=True, timeout=30)
-
- if result.returncode != 0:
- print(f"[WARN] ffprobe failed for {audio_file_path}: {result.stderr}")
- return False
-
- # Check if we got valid JSON output
- import json
-
- probe_data = json.loads(result.stdout)
-
- # Check if there's an audio stream
- audio_streams = [
- s
- for s in probe_data.get("streams", [])
- if s.get("codec_type") == "audio"
- ]
- if not audio_streams:
- print(f"[WARN] No audio stream found in {audio_file_path}")
- return False
-
- # Check duration
- format_info = probe_data.get("format", {})
- duration = format_info.get("duration")
- if duration is None or float(duration) <= 0:
- print(f"[WARN] Invalid or zero duration in {audio_file_path}")
- return False
-
- print(f"[INFO] Audio validation passed: {duration}s duration")
- return True
-
- except subprocess.TimeoutExpired:
- print(f"[WARN] Audio validation timed out for {audio_file_path}")
- return False
- except json.JSONDecodeError:
- print(f"[WARN] Invalid ffprobe output for {audio_file_path}")
- return False
- except Exception as e:
- print(f"[WARN] Audio validation failed for {audio_file_path}: {e}")
- return False
-
def _probe_video_duration(self, file_path: str) -> float | None:
"""
Return the duration (in seconds) of the video file at `file_path` using
@@ -3972,156 +3293,14 @@ def _compute_play_end(self) -> Tuple[float, int, int, float | None]:
return play_end, end_frame, frames_processed, duration
- def _concatenate_audio_segments(
- self, audio_files: List[str], temp_audio_dir: str
- ) -> Optional[str]:
- """
- Concatenate multiple audio files into a single audio file using FFmpeg concat demuxer.
-
- Returns: Path to concatenated audio file, or None if failed
- """
-
- if not audio_files:
- print("[ERROR] No audio segments to concatenate")
- return None
-
- if len(audio_files) == 1:
- # Only one segment, return it directly
- print("[INFO] Only one audio segment, no concatenation needed")
- return audio_files[0]
-
- # Create concat manifest file
- concat_file = os.path.join(temp_audio_dir, "concat_manifest.txt")
- try:
- with open(concat_file, "w") as f:
- for audio_file in audio_files:
- # FFmpeg concat demuxer expects absolute paths
- abs_path = os.path.abspath(audio_file)
- formatted_path = abs_path.replace("\\", "/")
- f.write(f"file '{formatted_path}'\n")
- print(f"[INFO] Created concat manifest with {len(audio_files)} segments")
- except OSError as e:
- print(f"[ERROR] Failed to create concat manifest: {e}")
- return None
-
- output_audio = os.path.join(temp_audio_dir, "audio_concatenated.m4a")
-
- # FFmpeg concat demuxer command
- args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-f",
- "concat",
- "-safe",
- "0", # Allow absolute filenames
- "-i",
- concat_file,
- "-vn",
- # Re-encode once here to flatten the segment timestamps into a
- # single monotonic audio stream before the final mux.
- "-af",
- "aresample=async=1:first_pts=0",
- "-c:a",
- "aac",
- "-b:a",
- "192k",
- "-y",
- output_audio,
- ]
-
- try:
- print(f"[INFO] Concatenating {len(audio_files)} audio segment(s)...")
- subprocess.run(args, check=True)
- print("[INFO] ✓ Successfully concatenated audio segments")
- return output_audio
- except subprocess.CalledProcessError as e:
- print(f"[ERROR] Failed to concatenate audio segments: {e}")
- print(f"[ERROR] FFmpeg command: {' '.join(args)}")
- return None
- except FileNotFoundError:
- print("[ERROR] FFmpeg not found. Cannot concatenate audio.")
- return None
-
- def _write_video_only_output(self, source_video: str, output_video: str) -> bool:
- """Fallback writer: produce a playable video-only output when audio handling fails."""
- if not source_video or not os.path.exists(source_video):
- print(f"[ERROR] Video-only fallback source missing: {source_video}")
- return False
-
- if output_video and os.path.exists(output_video):
- try:
- os.remove(output_video)
- except OSError:
- pass
-
- args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-i",
- source_video,
- "-map",
- "0:v:0",
- "-c:v",
- "copy",
- "-an",
- "-y",
- output_video,
- ]
-
- try:
- subprocess.run(args, check=True)
- print(
- f"[WARN] Audio processing failed; emitted video-only output: {output_video}"
- )
- return True
- except Exception as e:
- print(f"[ERROR] Video-only remux fallback failed: {e}")
- return False
-
- def _concatenate_segments_video_only(
- self, list_file_path: str, final_file_path: str
- ) -> bool:
- """Fallback concatenation for segment mode when audio concat fails."""
- args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-f",
- "concat",
- "-safe",
- "0",
- "-i",
- list_file_path,
- "-map",
- "0:v:0",
- "-c:v",
- "copy",
- "-an",
- "-y",
- final_file_path,
- ]
-
- try:
- subprocess.run(args, check=True)
- print(
- f"[WARN] Segment audio concat failed; emitted video-only output: {final_file_path}"
- )
- return True
- except Exception as e:
- print(f"[ERROR] Segment video-only fallback concat failed: {e}")
- return False
-
def _attempt_segment_video_only_fallback(
self, list_file_path: str, final_file_path: str, failure_message: str
) -> bool:
"""Try segment video-only concat fallback and show UI error if it fails."""
print("[WARN] Attempting segment video-only fallback concatenation...")
- if self._concatenate_segments_video_only(list_file_path, final_file_path):
+ if FFmpegPostProcessor.concatenate_segments_video_only(
+ list_file_path, final_file_path
+ ):
return True
self.main_window.display_messagebox_signal.emit(
@@ -4180,8 +3359,11 @@ def _rebuild_segment_audio_if_needed(self, segment_num: int) -> None:
f"[INFO] Segment {segment_num}: rebuilding audio for skipped frames "
f"(manual dropped={self.manual_dropped_skip_count}, read errors={self.read_error_skip_count})."
)
- audio_ok, audio_files = self._extract_audio_segments(
- keep_segments, temp_audio_dir
+ audio_ok, audio_files = FFmpegPostProcessor.extract_audio_segments(
+ media_path=str(self.media_path),
+ fps=self.fps,
+ segments=keep_segments,
+ temp_audio_dir=temp_audio_dir,
)
if not (audio_ok and audio_files):
print(
@@ -4189,8 +3371,8 @@ def _rebuild_segment_audio_if_needed(self, segment_num: int) -> None:
)
return
- corrected_audio = self._concatenate_audio_segments(
- audio_files, temp_audio_dir
+ corrected_audio = FFmpegPostProcessor.concatenate_audio_segments(
+ audio_files=audio_files, temp_audio_dir=temp_audio_dir
)
if not corrected_audio:
print(
@@ -4281,36 +3463,16 @@ def _finalize_default_style_recording(self):
print("[INFO] Worker threads joined.")
# 6. Finalize FFmpeg (close stdin, wait for file to be written)
- if self.recording_sp:
- if self.recording_sp.stdin and not self.recording_sp.stdin.closed:
- try:
- print("[INFO] Closing FFmpeg stdin...")
- self.recording_sp.stdin.close()
- except OSError as e:
- print(
- f"[WARN] Error closing FFmpeg stdin during finalization: {e}"
- )
+ if self.encoder.is_running():
+ print("[INFO] Closing FFmpeg encoder...")
# VP-29: Mark recording stopped early.
self.recording = False
- print("[INFO] Waiting for FFmpeg subprocess to finish writing...")
- try:
- self.recording_sp.wait(timeout=10)
- print("[INFO] FFmpeg subprocess finished.")
- except subprocess.TimeoutExpired:
- print(
- "[WARN] FFmpeg subprocess timed out during finalization, killing."
- )
- self.recording_sp.kill()
- self.recording_sp.wait()
- except Exception as e:
- print(
- f"[ERROR] Error waiting for FFmpeg subprocess during finalization: {e}"
- )
- self.recording_sp = None
+
+ # Safely close the pipe and wait for the file to finalize
+ self.encoder.close_process()
+
# VP-HEVC-INFO: Notify the user about Windows Explorer thumbnail
- # support for HEVC outputs. Default codec is hevc_nvenc / libx265,
- # both produce H.265 streams that Windows 10 does NOT thumbnail
- # natively without the "HEVC Video Extensions" Store package.
+ # support for HEVC outputs. Default codec is hevc_nvenc / libx265.
self._log_hevc_thumbnail_hint_once()
# 7. Calculate audio segment times
@@ -4435,14 +3597,21 @@ def _finalize_default_style_recording(self):
f"invalid frame boundaries: start={start_frame_for_calc}, end={actual_end_frame}"
)
segments = self._identify_frame_segments(actual_end_frame)
- audio_ok, audio_files = self._extract_audio_segments(
- segments, temp_audio_dir
+ audio_ok, audio_files = (
+ FFmpegPostProcessor.extract_audio_segments(
+ media_path=str(self.media_path),
+ fps=self.fps,
+ segments=segments,
+ temp_audio_dir=temp_audio_dir,
+ )
)
if not audio_ok or not audio_files:
raise RuntimeError("failed to extract segmented audio")
- final_audio_path = self._concatenate_audio_segments(
- audio_files, temp_audio_dir
+ final_audio_path = (
+ FFmpegPostProcessor.concatenate_audio_segments(
+ audio_files=audio_files, temp_audio_dir=temp_audio_dir
+ )
)
if not final_audio_path:
raise RuntimeError("failed to concatenate segmented audio")
@@ -4504,8 +3673,8 @@ def _finalize_default_style_recording(self):
print(
"[WARN] Falling back to video-only output for default-style recording."
)
- if not self._write_video_only_output(
- self.temp_file, final_file_path
+ if not FFmpegPostProcessor.write_video_only_output(
+ source_video=self.temp_file, output_video=final_file_path
):
self.main_window.display_messagebox_signal.emit(
"Recording Error",
@@ -4863,13 +4032,30 @@ def process_next_segment(self):
self.worker_threads.append(worker)
# 6. Setup FFmpeg subprocess for this segment
- # create_ffmpeg_subprocess uses self.current_frame.shape, so it will automatically
- # pick up the resized dimensions we set in step 4.
temp_segment_filename = f"segment_{self.current_segment_index:03d}.mp4"
temp_segment_path = os.path.join(self.segment_temp_dir, temp_segment_filename)
self.temp_segment_files.append(temp_segment_path)
- if not self.create_ffmpeg_subprocess(output_filename=temp_segment_path):
+ frame_height, frame_width, _ = self.current_frame.shape
+ start_frame, end_frame = self.segments_to_process[self.current_segment_index]
+
+ # Calculate time boundaries for audio extraction mapping
+ start_time_sec = start_frame / self.fps if self.fps > 0 else 0.0
+ end_time_sec = end_frame / self.fps if self.fps > 0 else 0.0
+
+ success = self.encoder.start_process(
+ output_filename=temp_segment_path,
+ frame_width=frame_width,
+ frame_height=frame_height,
+ fps=self.fps,
+ control=self.main_window.control,
+ is_segment=True,
+ media_path=self.media_path,
+ start_time_sec=start_time_sec,
+ end_time_sec=end_time_sec,
+ )
+
+ if not success:
print(
f"[ERROR] Failed to create ffmpeg subprocess for segment {segment_num}. Aborting."
)
@@ -4960,35 +4146,14 @@ def stop_current_segment(self):
self.frames_to_display.clear()
# 3. Finalize FFmpeg for this segment
- if self.recording_sp:
- if self.recording_sp.stdin and not self.recording_sp.stdin.closed:
- try:
- print(f"[INFO] Closing FFmpeg stdin for segment {segment_num}...")
- self.recording_sp.stdin.close()
- except OSError as e:
- print(
- f"[WARN] Error closing FFmpeg stdin for segment {segment_num}: {e}"
- )
+ if self.encoder.is_running():
print(
- f"[INFO] Waiting for FFmpeg subprocess (segment {segment_num}) to finish writing..."
+ f"[INFO] Closing and waiting for active FFmpeg encoder (segment {segment_num})..."
)
- try:
- self.recording_sp.wait(timeout=10)
- print(f"[INFO] FFmpeg subprocess (segment {segment_num}) finished.")
- except subprocess.TimeoutExpired:
- print(
- f"[WARN] FFmpeg subprocess (segment {segment_num}) timed out, killing."
- )
- self.recording_sp.kill()
- self.recording_sp.wait()
- except Exception as e:
- print(
- f"[ERROR] Error waiting for FFmpeg subprocess (segment {segment_num}): {e}"
- )
- self.recording_sp = None
+ self.encoder.close_process()
else:
print(
- f"[WARN] No active FFmpeg subprocess found when stopping segment {segment_num}."
+ f"[WARN] No active FFmpeg encoder found when stopping segment {segment_num}."
)
if self.temp_segment_files and not os.path.exists(self.temp_segment_files[-1]):
@@ -5015,32 +4180,12 @@ def finalize_segment_concatenation(self):
)
# Failsafe: If this is called while an ffmpeg process is still running
- if self.recording_sp:
+ if self.encoder.is_running():
segment_num = self.current_segment_index + 1
print(
f"[INFO] Finalizing: Stopping active FFmpeg process for segment {segment_num}..."
)
- if self.recording_sp.stdin and not self.recording_sp.stdin.closed:
- try:
- self.recording_sp.stdin.close()
- except OSError as e:
- print(
- f"[WARN] Error closing FFmpeg stdin during early finalization: {e}"
- )
- try:
- self.recording_sp.wait(timeout=10)
- print(
- f"[INFO] FFmpeg subprocess (segment {segment_num}) finished writing."
- )
- except subprocess.TimeoutExpired:
- print(
- f"[WARN] FFmpeg subprocess (segment {segment_num}) timed out, killing."
- )
- self.recording_sp.kill()
- self.recording_sp.wait()
- except Exception as e:
- print(f"[ERROR] Error waiting for FFmpeg subprocess: {e}")
- self.recording_sp = None
+ self.encoder.close_process()
was_triggered_by_job = self.triggered_by_job_manager
diff --git a/app/processors/video_utils/video_encoding.py b/app/processors/video_utils/video_encoding.py
new file mode 100644
index 00000000..a44d77d1
--- /dev/null
+++ b/app/processors/video_utils/video_encoding.py
@@ -0,0 +1,704 @@
+import os
+import math
+import json
+import subprocess
+from typing import Dict, Any, Optional, Mapping, Tuple, List
+import numpy
+
+
+class FFmpegEncoder:
+ """
+ Handles FFmpeg subprocess lifecycle, argument generation, and raw frame encoding.
+ This class isolates OS-level subprocess management from the main video processing loop
+ to prevent thread blocking and simplify recording logic.
+ """
+
+ def __init__(self) -> None:
+ self.recording_sp: Optional[subprocess.Popen] = None
+ self.frames_written: int = 0
+ self._source_metrics_cache: Dict[str, Dict[str, Any]] = {}
+
+ @staticmethod
+ def _parse_ffprobe_fps(rate_text: Any) -> Optional[float]:
+ """Parse ffprobe frame-rate strings such as '30000/1001' safely."""
+ if rate_text is None:
+ return None
+ try:
+ text = str(rate_text).strip()
+ if not text:
+ return None
+ if "/" in text:
+ num_s, den_s = text.split("/", 1)
+ num = float(num_s)
+ den = float(den_s)
+ if den == 0:
+ return None
+ value = num / den
+ else:
+ value = float(text)
+ return value if value > 0 else None
+ except Exception:
+ return None
+
+ def probe_source_video_metrics(self, file_path: str) -> Optional[Dict[str, Any]]:
+ """
+ Probe source video metrics needed for quality matching.
+ Returns a dictionary with keys: bit_rate, width, height, fps, codec_name.
+ """
+ if not file_path or not os.path.isfile(file_path):
+ return None
+
+ # Return from cache if available to prevent redundant blocking I/O calls
+ if file_path in self._source_metrics_cache:
+ return self._source_metrics_cache[file_path]
+
+ try:
+ args = [
+ "ffprobe",
+ "-v",
+ "quiet",
+ "-print_format",
+ "json",
+ "-select_streams",
+ "v:0",
+ "-show_entries",
+ "stream=codec_type,codec_name,width,height,bit_rate,avg_frame_rate,r_frame_rate:format=bit_rate",
+ file_path,
+ ]
+ result = subprocess.run(args, capture_output=True, text=True, timeout=30)
+ if result.returncode != 0:
+ return None
+
+ probe_data = json.loads(result.stdout)
+ video_stream = next(
+ (
+ s
+ for s in probe_data.get("streams", [])
+ if s.get("codec_type") == "video"
+ ),
+ None,
+ )
+ if not isinstance(video_stream, dict):
+ return None
+
+ width = int(video_stream.get("width") or 0)
+ height = int(video_stream.get("height") or 0)
+
+ bit_rate_raw = video_stream.get("bit_rate") or probe_data.get(
+ "format", {}
+ ).get("bit_rate")
+ bit_rate = float(bit_rate_raw) if bit_rate_raw else 0.0
+
+ fps = self._parse_ffprobe_fps(video_stream.get("avg_frame_rate"))
+ if not fps:
+ fps = self._parse_ffprobe_fps(video_stream.get("r_frame_rate"))
+
+ if width <= 0 or height <= 0 or not fps or bit_rate <= 0:
+ return None
+
+ metrics = {
+ "bit_rate": bit_rate,
+ "width": float(width),
+ "height": float(height),
+ "fps": float(fps),
+ "codec_name": str(video_stream.get("codec_name") or "").lower(),
+ }
+ self._source_metrics_cache[file_path] = metrics
+ return metrics
+ except Exception as e:
+ print(f"[WARN] Failed to probe source metrics for {file_path}: {e}")
+ return None
+
+ @staticmethod
+ def _source_codec_to_hevc_factor(codec_name: str) -> float:
+ """Map source codec efficiency relative to HEVC for quality matching."""
+ codec = (codec_name or "").lower()
+ if codec in {"hevc", "h265"}:
+ return 1.00
+ if codec in {"h264", "avc"}:
+ return 0.78
+ if codec == "av1":
+ return 1.28
+ if codec == "vp9":
+ return 1.18
+ if codec in {"mpeg2video", "mpeg4", "msmpeg4v3"}:
+ return 0.68
+ return 0.90
+
+ def get_adaptive_recording_quality(
+ self,
+ control: Mapping[str, Any],
+ quality_value: int,
+ output_width: int,
+ output_height: int,
+ source_metrics: Optional[Dict[str, Any]] = None,
+ output_fps: Optional[float] = None,
+ ) -> int:
+ """Auto-compute CQ/CRF from source metrics to keep perceived quality close."""
+ if not (
+ control.get("FFMpegOptionsToggle", False)
+ and control.get("FFAutoMatchSourceQualityToggle", False)
+ ):
+ return quality_value
+
+ if not source_metrics:
+ print(
+ "[INFO] Source-quality auto match enabled, but probe failed. Using manual Quality unchanged."
+ )
+ return quality_value
+
+ src_w = max(1.0, source_metrics["width"])
+ src_h = max(1.0, source_metrics["height"])
+ src_fps = max(0.001, source_metrics["fps"])
+ src_bitrate = max(1.0, source_metrics["bit_rate"])
+ src_codec = str(source_metrics.get("codec_name", "") or "").lower()
+ out_fps = float(output_fps) if output_fps and output_fps > 0 else src_fps
+
+ src_bpppf = src_bitrate / (src_w * src_h * src_fps)
+ src_pixels = src_w * src_h
+ out_pixels = float(max(1, output_width) * max(1, output_height))
+ scale_ratio = out_pixels / src_pixels
+
+ codec_factor = self._source_codec_to_hevc_factor(src_codec)
+ target_bpppf = src_bpppf * codec_factor
+ temporal_ratio = max(0.5, min(2.0, out_fps / src_fps))
+ target_bpppf *= temporal_ratio**0.35
+
+ if scale_ratio > 1.0:
+ up_steps = math.log2(scale_ratio)
+ target_bpppf *= min(1.35, 1.0 + 0.15 * up_steps)
+ elif scale_ratio < 1.0:
+ down_steps = math.log2(1.0 / max(scale_ratio, 1e-6))
+ target_bpppf *= max(0.70, 1.0 - 0.20 * down_steps)
+
+ if target_bpppf >= 0.25:
+ auto_quality = 14
+ elif target_bpppf >= 0.16:
+ auto_quality = 16
+ elif target_bpppf >= 0.11:
+ auto_quality = 18
+ elif target_bpppf >= 0.08:
+ auto_quality = 20
+ elif target_bpppf >= 0.055:
+ auto_quality = 22
+ elif target_bpppf >= 0.038:
+ auto_quality = 24
+ elif target_bpppf >= 0.028:
+ auto_quality = 26
+ elif target_bpppf >= 0.020:
+ auto_quality = 28
+ elif target_bpppf >= 0.014:
+ auto_quality = 30
+ else:
+ auto_quality = 33
+
+ adapted_quality = max(12, min(36, int(auto_quality)))
+
+ print(
+ "[INFO] Source-quality auto match: "
+ f"source={src_w:.0f}x{src_h:.0f}@{src_fps:.3f} "
+ f"codec={src_codec} bitrate={src_bitrate / 1_000_000:.3f}Mbps "
+ f"src_bpppf={src_bpppf:.5f} target_bpppf={target_bpppf:.5f} "
+ f"out_fps={out_fps:.3f} temporal_ratio={temporal_ratio:.3f}, "
+ f"manual_quality={quality_value} auto_quality={adapted_quality}"
+ )
+ return adapted_quality
+
+ def start_process(
+ self,
+ output_filename: str,
+ frame_width: int,
+ frame_height: int,
+ fps: float,
+ control: Mapping[str, Any],
+ is_segment: bool = False,
+ media_path: Optional[str] = None,
+ start_time_sec: float = 0.0,
+ end_time_sec: float = 0.0,
+ ) -> bool:
+ """
+ Builds the FFmpeg command and opens the subprocess.
+ """
+ if fps <= 0:
+ print("[ERROR] Invalid FPS provided to encoder.")
+ return False
+
+ # Apply enhancer dimension scaling
+ if control.get("FrameEnhancerEnableToggle"):
+ enhancer_type = control.get("FrameEnhancerTypeSelection", "")
+ if enhancer_type in ("RealEsrgan-x2-Plus", "BSRGan-x2"):
+ frame_height *= 2
+ frame_width *= 2
+ elif enhancer_type in (
+ "RealEsrgan-x4-Plus",
+ "BSRGan-x4",
+ "UltraSharp-x4",
+ "UltraMix-x4",
+ "RealEsr-General-x4v3",
+ ):
+ frame_height *= 4
+ frame_width *= 4
+
+ frame_height_down = frame_height
+ frame_width_down = frame_width
+ if control.get("FrameEnhancerDownToggle"):
+ if frame_width != 1920 or frame_height != 1080:
+ frame_width_down_mult = frame_width / 1920
+ frame_height_down = math.ceil(frame_height / frame_width_down_mult) & ~1
+ frame_width_down = 1920
+
+ # Quality Adaptation
+ source_metrics = (
+ self.probe_source_video_metrics(media_path) if media_path else None
+ )
+ ffquality = self.get_adaptive_recording_quality(
+ control=control,
+ quality_value=int(control.get("FFQualitySlider", 20)),
+ output_width=frame_width_down
+ if control.get("FrameEnhancerDownToggle")
+ else frame_width,
+ output_height=frame_height_down
+ if control.get("FrameEnhancerDownToggle")
+ else frame_height,
+ source_metrics=source_metrics,
+ output_fps=fps,
+ )
+
+ args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-f",
+ "rawvideo",
+ "-pix_fmt",
+ "bgr24",
+ "-s",
+ f"{frame_width}x{frame_height}",
+ "-r",
+ str(fps),
+ "-i",
+ "pipe:0",
+ ]
+
+ if is_segment and media_path:
+ args.extend(
+ [
+ "-ss",
+ str(start_time_sec),
+ "-to",
+ str(end_time_sec),
+ "-i",
+ media_path,
+ "-map",
+ "0:v:0",
+ "-map",
+ "1:a:0?",
+ "-c:a",
+ "aac",
+ "-shortest",
+ ]
+ )
+
+ # Video codec args
+ if control.get("HDREncodeToggle"):
+ args.extend(
+ [
+ "-c:v",
+ "libx265",
+ "-profile:v",
+ "main10",
+ "-preset",
+ str(control.get("FFPresetsHDRSelection", "medium")),
+ "-pix_fmt",
+ "yuv420p10le",
+ "-x265-params",
+ f"crf={ffquality}:vbv-bufsize=10000:vbv-maxrate=10000:selective-sao=0:no-sao=1:strong-intra-smoothing=0:rect=0:aq-mode={int(control.get('FFSpatialAQToggle', 0))}:t-aq={int(control.get('FFTemporalAQToggle', 0))}:hdr-opt=1:repeat-headers=1:colorprim=bt2020:range=limited:transfer=smpte2084:colormatrix=bt2020nc:master-display='G(13250,34500)B(7500,3000)R(34000,16000)WP(15635,16450)L(10000000,1)':max-cll=1000,400",
+ ]
+ )
+ else:
+ args.extend(
+ [
+ "-c:v",
+ "hevc_nvenc",
+ "-preset",
+ str(control.get("FFPresetsSDRSelection", "p4")),
+ "-profile:v",
+ "main10",
+ "-cq",
+ str(ffquality),
+ "-pix_fmt",
+ "yuv420p10le",
+ "-colorspace",
+ "rgb",
+ "-color_primaries",
+ "bt709",
+ "-color_trc",
+ "bt709",
+ "-spatial-aq",
+ str(int(control.get("FFSpatialAQToggle", 0))),
+ "-temporal-aq",
+ str(int(control.get("FFTemporalAQToggle", 0))),
+ "-tier",
+ "high",
+ "-tag:v",
+ "hvc1",
+ ]
+ )
+
+ target_matrix = "bt2020nc" if control.get("HDREncodeToggle") else "bt709"
+ scale_params = f"in_range=pc:out_range=tv:out_color_matrix={target_matrix}"
+
+ if control.get("FrameEnhancerDownToggle"):
+ args.extend(
+ [
+ "-vf",
+ f"scale={frame_width_down}x{frame_height_down}:{scale_params}:flags=lanczos+accurate_rnd+full_chroma_int",
+ ]
+ )
+ else:
+ args.extend(["-vf", f"scale={scale_params}"])
+
+ args.append(output_filename)
+
+ try:
+ self.recording_sp = subprocess.Popen(
+ args, stdin=subprocess.PIPE, bufsize=-1
+ )
+ self.frames_written = 0
+ return True
+ except FileNotFoundError:
+ print(
+ "[ERROR] FFmpeg command not found. Ensure FFmpeg is installed and in system PATH."
+ )
+ return False
+ except Exception as e:
+ print(f"[ERROR] Failed to start FFmpeg subprocess: {e}")
+ return False
+
+ def write_frame(self, frame: numpy.ndarray) -> bool:
+ """Writes a BGR numpy array to the FFmpeg stdin pipe."""
+ if (
+ self.recording_sp
+ and self.recording_sp.stdin
+ and not self.recording_sp.stdin.closed
+ ):
+ try:
+ self.recording_sp.stdin.write(frame.tobytes())
+ self.frames_written += 1
+ return True
+ except OSError as e:
+ print(f"[WARN] Error writing frame to FFmpeg stdin: {e}")
+ return False
+ return False
+
+ def close_process(self, timeout: int = 120) -> None:
+ """Safely closes the stdin pipe and waits for the FFmpeg process to finalize."""
+ if not self.recording_sp:
+ return
+
+ # 1. Graceful Shutdown Request (Send EOF via stdin)
+ if self.recording_sp.stdin and not self.recording_sp.stdin.closed:
+ try:
+ self.recording_sp.stdin.close()
+ except OSError as e:
+ print(f"[WARN] Error closing FFmpeg stdin: {e}")
+
+ # 2. Wait for natural finalization
+ try:
+ # Wait up to 'timeout' seconds for FFmpeg to safely flush buffers and write the MOOV atom.
+ # Crucial for 4K/8K/VR180 where I/O flushing takes time.
+ self.recording_sp.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ print(
+ f"[WARN] FFmpeg subprocess timed out after {timeout}s. Attempting graceful terminate..."
+ )
+
+ # 3. Escalation Step 1: SIGTERM (Polite request to stop)
+ self.recording_sp.terminate()
+ try:
+ # Give FFmpeg 5 seconds to respond to the terminate signal and write headers
+ self.recording_sp.wait(timeout=5)
+ print("[INFO] FFmpeg closed cleanly after terminate signal.")
+ except subprocess.TimeoutExpired:
+ # 4. Escalation Step 2: SIGKILL (Forceful destruction)
+ print(
+ "[ERROR] FFmpeg ignored terminate signal and is hanging. Forcing kill (SIGKILL)."
+ )
+ self.recording_sp.kill()
+ self.recording_sp.wait()
+ except Exception as e:
+ print(f"[ERROR] Error waiting for FFmpeg subprocess: {e}")
+
+ self.recording_sp = None
+
+ def is_running(self) -> bool:
+ """Check if the subprocess is currently active."""
+ return self.recording_sp is not None and self.recording_sp.poll() is None
+
+
+class FFmpegPostProcessor:
+ """
+ Handles stateless post-processing operations via FFmpeg:
+ Audio extraction, audio concatenation, and fallback video-only muxing.
+ """
+
+ @staticmethod
+ def validate_audio_file(audio_file_path: str) -> bool:
+ """Validate that an audio file can be properly decoded by FFmpeg."""
+ if not os.path.exists(audio_file_path):
+ print(f"[ERROR] Audio file does not exist: {audio_file_path}")
+ return False
+
+ try:
+ args = [
+ "ffprobe",
+ "-v",
+ "quiet",
+ "-print_format",
+ "json",
+ "-show_format",
+ "-show_streams",
+ audio_file_path,
+ ]
+ result = subprocess.run(args, capture_output=True, text=True, timeout=30)
+ if result.returncode != 0:
+ print(f"[WARN] ffprobe failed for {audio_file_path}: {result.stderr}")
+ return False
+
+ probe_data = json.loads(result.stdout)
+ audio_streams = [
+ s
+ for s in probe_data.get("streams", [])
+ if s.get("codec_type") == "audio"
+ ]
+
+ if not audio_streams:
+ print(f"[WARN] No audio stream found in {audio_file_path}")
+ return False
+
+ format_info = probe_data.get("format", {})
+ duration = format_info.get("duration")
+ if duration is None or float(duration) <= 0:
+ print(f"[WARN] Invalid or zero duration in {audio_file_path}")
+ return False
+
+ print(f"[INFO] Audio validation passed: {duration}s duration")
+ return True
+
+ except subprocess.TimeoutExpired:
+ print(f"[WARN] Audio validation timed out for {audio_file_path}")
+ return False
+ except json.JSONDecodeError:
+ print(f"[WARN] Invalid ffprobe output for {audio_file_path}")
+ return False
+ except Exception as e:
+ print(f"[WARN] Audio validation failed for {audio_file_path}: {e}")
+ return False
+
+ @staticmethod
+ def extract_audio_segments(
+ media_path: str,
+ fps: float,
+ segments: List[Tuple[int, int]],
+ temp_audio_dir: str,
+ ) -> Tuple[bool, List[str]]:
+ """Extract audio from the original media for each frame segment."""
+ audio_files = []
+ for idx, (start_frame, end_frame) in enumerate(segments):
+ start_time = start_frame / fps if fps > 0 else 0
+ end_time = (end_frame + 1) / fps if fps > 0 else 0
+
+ if start_time >= end_time:
+ continue
+
+ audio_file = os.path.join(temp_audio_dir, f"audio_segment_{idx:04d}.m4a")
+ audio_files.append(audio_file)
+
+ args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "warning",
+ "-err_detect",
+ "ignore_err",
+ "-i",
+ media_path,
+ "-ss",
+ str(start_time),
+ "-to",
+ str(end_time),
+ "-vn",
+ "-map",
+ "0:a:0?",
+ "-af",
+ "aresample=async=1:first_pts=0",
+ "-c:a",
+ "aac",
+ "-b:a",
+ "192k",
+ "-y",
+ audio_file,
+ ]
+
+ try:
+ print(
+ f"[INFO] Extracting audio segment {idx + 1}/{len(segments)}: {start_time:.3f}s → {end_time:.3f}s"
+ )
+ subprocess.run(args, check=True, capture_output=True, text=True)
+
+ if not FFmpegPostProcessor.validate_audio_file(audio_file):
+ print(
+ f"[WARN] Validation failed for segment {idx + 1}, retrying extraction once"
+ )
+ subprocess.run(args, check=True, capture_output=True, text=True)
+ if not FFmpegPostProcessor.validate_audio_file(audio_file):
+ print(
+ f"[ERROR] Retried segment {idx + 1} is still invalid after validation"
+ )
+ for audio in audio_files:
+ try:
+ os.remove(audio)
+ except OSError:
+ pass
+ return False, []
+
+ print(f"[INFO] Segment {idx + 1} extracted successfully")
+ except Exception as e:
+ print(f"[ERROR] Failed to extract audio segment {idx + 1}: {e}")
+ for audio in audio_files:
+ try:
+ os.remove(audio)
+ except OSError:
+ pass
+ return False, []
+
+ print(f"[INFO] All {len(segments)} audio segment(s) extracted successfully")
+ return True, audio_files
+
+ @staticmethod
+ def concatenate_audio_segments(
+ audio_files: List[str], temp_audio_dir: str
+ ) -> Optional[str]:
+ """Concatenate multiple audio files into a single audio file."""
+ if not audio_files:
+ return None
+ if len(audio_files) == 1:
+ return audio_files[0]
+
+ concat_file = os.path.join(temp_audio_dir, "concat_manifest.txt")
+ try:
+ with open(concat_file, "w") as f:
+ for audio_file in audio_files:
+ abs_path = os.path.abspath(audio_file)
+ formatted_path = abs_path.replace("\\", "/")
+ f.write(f"file '{formatted_path}'\n")
+ except OSError as e:
+ print(f"[ERROR] Failed to create concat manifest: {e}")
+ return None
+
+ output_audio = os.path.join(temp_audio_dir, "audio_concatenated.m4a")
+ args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-f",
+ "concat",
+ "-safe",
+ "0",
+ "-i",
+ concat_file,
+ "-vn",
+ "-af",
+ "aresample=async=1:first_pts=0",
+ "-c:a",
+ "aac",
+ "-b:a",
+ "192k",
+ "-y",
+ output_audio,
+ ]
+
+ try:
+ print(f"[INFO] Concatenating {len(audio_files)} audio segment(s)...")
+ subprocess.run(args, check=True)
+ print("[INFO] ✓ Successfully concatenated audio segments")
+ return output_audio
+ except Exception as e:
+ print(f"[ERROR] Failed to concatenate audio segments: {e}")
+ return None
+
+ @staticmethod
+ def write_video_only_output(source_video: str, output_video: str) -> bool:
+ """Fallback writer: produce a playable video-only output when audio handling fails."""
+ if not source_video or not os.path.exists(source_video):
+ print(f"[ERROR] Video-only fallback source missing: {source_video}")
+ return False
+
+ if output_video and os.path.exists(output_video):
+ try:
+ os.remove(output_video)
+ except OSError:
+ pass
+
+ args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-i",
+ source_video,
+ "-map",
+ "0:v:0",
+ "-c:v",
+ "copy",
+ "-an",
+ "-y",
+ output_video,
+ ]
+
+ try:
+ subprocess.run(args, check=True)
+ print(
+ f"[WARN] Audio processing failed; emitted video-only output: {output_video}"
+ )
+ return True
+ except Exception as e:
+ print(f"[ERROR] Video-only remux fallback failed: {e}")
+ return False
+
+ @staticmethod
+ def concatenate_segments_video_only(
+ list_file_path: str, final_file_path: str
+ ) -> bool:
+ """Fallback concatenation for segment mode when audio concat fails."""
+ args = [
+ "ffmpeg",
+ "-hide_banner",
+ "-loglevel",
+ "error",
+ "-f",
+ "concat",
+ "-safe",
+ "0",
+ "-i",
+ list_file_path,
+ "-map",
+ "0:v:0",
+ "-c:v",
+ "copy",
+ "-an",
+ "-y",
+ final_file_path,
+ ]
+
+ try:
+ subprocess.run(args, check=True)
+ print(
+ f"[WARN] Segment audio concat failed; emitted video-only output: {final_file_path}"
+ )
+ return True
+ except Exception as e:
+ print(f"[ERROR] Segment video-only fallback concat failed: {e}")
+ return False
diff --git a/app/ui/main_ui.py b/app/ui/main_ui.py
index bb72d19c..8598cbf7 100644
--- a/app/ui/main_ui.py
+++ b/app/ui/main_ui.py
@@ -1291,6 +1291,8 @@ def _install_view_navigation_actions(self):
face_compare_action = QtGui.QAction("Face Compare", self.menuView)
face_compare_action.setCheckable(True)
+ face_compare_action.setShortcut(QtGui.QKeySequence("X"))
+ face_compare_action.setShortcutContext(QtCore.Qt.ShortcutContext.WindowShortcut)
face_compare_action.triggered.connect(
lambda checked: (
self._set_compare_mode("compare", checked),
diff --git a/app/ui/widgets/actions/list_view_actions.py b/app/ui/widgets/actions/list_view_actions.py
index af1133b2..6e584422 100644
--- a/app/ui/widgets/actions/list_view_actions.py
+++ b/app/ui/widgets/actions/list_view_actions.py
@@ -943,6 +943,7 @@ def show_shortcuts(main_window: "MainWindow"):
"Ctrl+0 : Fit to View
"
"Ctrl+1 : 100% Zoom
"
"Middle Mouse Drag : Pan view
"
+ "X : Face Compare
"
"Right Click : Viewport menu (Fit to View, 100% Zoom, Save Image)
"
"
"
)
diff --git a/app/ui/widgets/actions/video_control_actions.py b/app/ui/widgets/actions/video_control_actions.py
index a8e2182e..6e95f4bc 100644
--- a/app/ui/widgets/actions/video_control_actions.py
+++ b/app/ui/widgets/actions/video_control_actions.py
@@ -1541,17 +1541,31 @@ def advance_video_slider_by_n_frames(main_window: "MainWindow", n=None):
if new_position > video_processor.max_frame_number:
new_position = video_processor.max_frame_number
- # 1. Setting the value triggers 'on_change_video_seek_slider' automatically.
- # Since the slider is not being dragged (isSliderDown() == False),
- # that slot will naturally execute 'run_post_seek_actions' ONCE.
- main_window.videoSeekSlider.setValue(new_position)
+ # --- CONTEXT-AWARE NAVIGATION (STEPPING) ---
+ is_compare_active = getattr(main_window, "view_face_compare_enabled", False)
+ is_mask_active = getattr(main_window, "view_face_mask_enabled", False)
+ suppress_flash = is_compare_active or is_mask_active
+
+ # Raise the flag to prevent raw frame rendering during slider update
+ if suppress_flash:
+ main_window._is_stepping_media = True
+
+ try:
+ # 1. Setting the value triggers 'on_change_video_seek_slider' automatically.
+ # Since the slider is not being dragged (isSliderDown() == False),
+ # that slot will naturally execute 'run_post_seek_actions' ONCE.
+ main_window.videoSeekSlider.setValue(new_position)
+ finally:
+ # Always drop the flag safely
+ if suppress_flash:
+ main_window._is_stepping_media = False
# 2. Check if this is a single frame step (like 'V' key)
is_single_frame_step = n == 1
- # 3. Run AI models. Runs synchronously only for single steps to prevent "flash".
+ # 3. Run AI models. Explicitly suppress raw preview if special mode is active!
main_window.video_processor.process_current_frame(
- synchronous=is_single_frame_step
+ synchronous=is_single_frame_step, suppress_raw_preview=suppress_flash
)
@@ -1572,16 +1586,30 @@ def rewind_video_slider_by_n_frames(main_window: "MainWindow", n=None):
if new_position < 0:
new_position = 0
- # 1. Setting the value triggers 'on_change_video_seek_slider' automatically.
- # Prevents double execution of heavy Face Detection.
- main_window.videoSeekSlider.setValue(new_position)
+ # --- CONTEXT-AWARE NAVIGATION (STEPPING) ---
+ is_compare_active = getattr(main_window, "view_face_compare_enabled", False)
+ is_mask_active = getattr(main_window, "view_face_mask_enabled", False)
+ suppress_flash = is_compare_active or is_mask_active
+
+ # Raise the flag to prevent raw frame rendering during slider update
+ if suppress_flash:
+ main_window._is_stepping_media = True
+
+ try:
+ # 1. Setting the value triggers 'on_change_video_seek_slider' automatically.
+ # Prevents double execution of heavy Face Detection.
+ main_window.videoSeekSlider.setValue(new_position)
+ finally:
+ # Always drop the flag safely
+ if suppress_flash:
+ main_window._is_stepping_media = False
# 2. Check if this is a single frame step (like 'C' key)
is_single_frame_step = n == 1
- # 3. Run AI models. Runs synchronously only for single steps to prevent "flash".
+ # 3. Run AI models. Explicitly suppress raw preview if special mode is active!
main_window.video_processor.process_current_frame(
- synchronous=is_single_frame_step
+ synchronous=is_single_frame_step, suppress_raw_preview=suppress_flash
)
@@ -2250,15 +2278,25 @@ def on_change_video_seek_slider(main_window: "MainWindow", new_position=0):
# Cache the raw frame so process_current_frame() can use it as a
# fallback when the near-EOF re-read fails (OpenCV reliability issue).
video_processor._seek_cached_frame = (new_position, frame)
- # For preview, show the raw frame immediately.
- # The processed frame will be shown when the slider is released.
- pixmap = common_widget_actions.get_pixmap_from_frame(main_window, frame)
- graphics_view_actions.update_graphics_view(
- main_window,
- pixmap,
- new_position,
- size_mode="native_pixmap_size",
- )
+
+ # --- HYBRID NAVIGATION PREVIEW ---
+ is_stepping = getattr(main_window, "_is_stepping_media", False)
+ is_compare_active = getattr(main_window, "view_face_compare_enabled", False)
+ is_mask_active = getattr(main_window, "view_face_mask_enabled", False)
+
+ # Suppress raw frame display ONLY if we are stepping via actions/shortcuts
+ # AND a special preview mode (Compare/Mask) is currently active.
+ suppress_flash = is_stepping and (is_compare_active or is_mask_active)
+
+ if not suppress_flash:
+ # Standard scrubbing: push the raw frame to the UI immediately for fast response
+ pixmap = common_widget_actions.get_pixmap_from_frame(main_window, frame)
+ graphics_view_actions.update_graphics_view(
+ main_window,
+ pixmap,
+ new_position,
+ size_mode="native_pixmap_size",
+ )
else:
# VP-34: Read failed. Trigger a stop/reopen cycle to recover from silent handle failures.
@@ -2268,6 +2306,7 @@ def on_change_video_seek_slider(main_window: "MainWindow", new_position=0):
video_processor._seek_cached_frame = None
main_window.last_seek_read_failed = True
video_processor.stop_processing()
+
# Only update parameters and widgets if the slider is NOT being actively dragged.
# This ensures playback, clicks, and button presses update the UI,
# but fast scrubbing does not cause lag or skip marker updates.
diff --git a/app/ui/widgets/common_layout_data.py b/app/ui/widgets/common_layout_data.py
index 0c3cbd78..3825ab06 100644
--- a/app/ui/widgets/common_layout_data.py
+++ b/app/ui/widgets/common_layout_data.py
@@ -350,6 +350,44 @@
"requiredSelectionValue": "Advanced",
"help": "Activate the eyes face expression restorer",
},
+ "FaceExpressionCameraGazeToggle": {
+ "level": 4,
+ "label": "Camera Gaze Lock",
+ "default": False,
+ "parentToggle": "FaceExpressionEnableBothToggle & FaceExpressionEyesToggle",
+ "requiredToggleValue": True,
+ "parentSelection": "FaceExpressionModeSelection",
+ "requiredSelectionValue": "Advanced",
+ "help": "Forces the eyes to look directly at the camera. Overrides original and driving gaze.",
+ },
+ "FaceExpressionCameraGazeStrengthDecimalSlider": {
+ "level": 5,
+ "label": "Gaze Strength",
+ "min_value": "0.00",
+ "max_value": "1.00",
+ "default": "0.50",
+ "decimals": 2,
+ "step": 0.05,
+ "parentToggle": "FaceExpressionEnableBothToggle & FaceExpressionEyesToggle & FaceExpressionCameraGazeToggle",
+ "requiredToggleValue": True,
+ "parentSelection": "FaceExpressionModeSelection",
+ "requiredSelectionValue": "Advanced",
+ "help": "Controls the strength of the camera gaze lock.",
+ },
+ "FaceExpressionCameraGazeVerticalOffsetDecimalSlider": {
+ "level": 5,
+ "label": "Gaze Vertical Fine-Tune",
+ "min_value": "-1.00",
+ "max_value": "1.00",
+ "default": "0.00",
+ "decimals": 2,
+ "step": 0.05,
+ "parentToggle": "FaceExpressionEnableBothToggle & FaceExpressionEyesToggle & FaceExpressionCameraGazeToggle",
+ "requiredToggleValue": True,
+ "parentSelection": "FaceExpressionModeSelection",
+ "requiredSelectionValue": "Advanced",
+ "help": "Micro-adjust the vertical gaze up or down to fix perceptual eye contact issues caused by eyelid shape.",
+ },
"FaceExpressionStableGazeEyesToggle": {
"level": 4,
"label": "Relative Lids + Retargeted Gaze",
diff --git a/app/ui/widgets/event_filters.py b/app/ui/widgets/event_filters.py
index 7df822a9..187d009f 100644
--- a/app/ui/widgets/event_filters.py
+++ b/app/ui/widgets/event_filters.py
@@ -62,29 +62,31 @@ def __init__(self, main_window: "MainWindow", parent=None):
def eventFilter(self, slider, event):
if event.type() == QtCore.QEvent.Type.KeyPress:
- if event.key() in {QtCore.Qt.Key_Left, QtCore.Qt.Key_Right}:
- # Allow default slider movement
- result = super().eventFilter(slider, event)
-
- # After the slider moves, call the custom processing function
- QtCore.QTimer.singleShot(
- 0, self.main_window.video_processor.process_current_frame
+ if event.key() == QtCore.Qt.Key_Right:
+ # Force strictly 1 frame advance through our controlled pipeline
+ video_control_actions.advance_video_slider_by_n_frames(
+ self.main_window, 1
)
+ return True # Stop QT from applying default values
- return result # Return the result of the default handling
+ elif event.key() == QtCore.Qt.Key_Left:
+ # Force strictly 1 frame rewind through our controlled pipeline
+ video_control_actions.rewind_video_slider_by_n_frames(
+ self.main_window, 1
+ )
+ return True # Stop QT from applying default values
elif event.type() == QtCore.QEvent.Type.Wheel:
- # Intercept mousewheel to force FrameSkipStepSlider
+ # Intercept mousewheel to use FrameSkipStepSlider logic
delta = event.angleDelta().y()
if delta > 0:
# If wheel up (Advance)
video_control_actions.advance_video_slider_by_n_frames(self.main_window)
elif delta < 0:
- # If wheel up (Rewind)
+ # If wheel down (Rewind)
video_control_actions.rewind_video_slider_by_n_frames(self.main_window)
- # Return True to stop QT from applying default values
- return True
+ return True # Stop QT from applying default values
# For other events, use the default behavior
return super().eventFilter(slider, event)
diff --git a/app/ui/widgets/widget_components.py b/app/ui/widgets/widget_components.py
index ab5aa664..2870a7cc 100644
--- a/app/ui/widgets/widget_components.py
+++ b/app/ui/widgets/widget_components.py
@@ -361,13 +361,13 @@ def load_media(self):
].split("x")
media_capture = cv2.VideoCapture(self.webcam_index, self.webcam_backend)
-
+
try:
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
media_capture.set(cv2.CAP_PROP_FOURCC, fourcc)
except Exception:
pass
-
+
media_capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(res_width))
media_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(res_height))
max_frames_number = 999999