Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 77 additions & 17 deletions PenumbraModForwarder.Common/Services/FileWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class FileWatcher : IFileWatcher, IDisposable
private readonly ConcurrentDictionary<string, bool> _processingFiles;
private readonly ConcurrentDictionary<string, DateTime> _trackedFiles;
private readonly ConcurrentDictionary<string, FileDownloadInfo> _downloadProgress;
private readonly ConcurrentDictionary<string, int> _retryAttempts;
private readonly HashSet<string> _processedFiles;

private readonly TimeSpan _initialDelay = TimeSpan.FromMilliseconds(500);
Expand All @@ -35,6 +36,7 @@ public class FileWatcher : IFileWatcher, IDisposable
private readonly TimeSpan _progressTimeout = TimeSpan.FromMinutes(5);
private readonly int _requiredStabilityChecks = 2;
private readonly long _minSizeProgress = 1024;
private readonly int _maxRetryAttempts = 5;

private readonly string[] _archiveExtensions = { ".zip", ".rar", ".7z" };
private readonly string[] _modExtensions = { ".pmp", ".ttmp2", ".ttmp", ".rpvsp" };
Expand All @@ -57,6 +59,7 @@ public FileWatcher(ILogger<FileWatcher> logger, IConfigurationService configurat
_processingFiles = new ConcurrentDictionary<string, bool>();
_trackedFiles = new ConcurrentDictionary<string, DateTime>();
_downloadProgress = new ConcurrentDictionary<string, FileDownloadInfo>();
_retryAttempts = new ConcurrentDictionary<string, int>();
_processedFiles = new HashSet<string>();

_debounceTimer = CreateDebounceTimer();
Expand Down Expand Up @@ -150,7 +153,6 @@ private void OnFileRenamed(object sender, RenamedEventArgs e)
{
_logger.LogInformation($"File renamed: {e.FullPath}");
if (!_configurationService.GetConfigValue(o => o.AutoLoad)) return;
// Check if this file is already being tracked before enqueueing
if (!_trackedFiles.ContainsKey(e.FullPath) && !_processingFiles.ContainsKey(e.FullPath))
{
EnqueueFileEvent(e.FullPath);
Expand All @@ -165,7 +167,6 @@ private void OnFileCreated(object sender, FileSystemEventArgs e)
{
_logger.LogInformation($"File created: {e.FullPath}");
if (!_configurationService.GetConfigValue(o => o.AutoLoad)) return;
// Check if this file is already being tracked before enqueueing
if (!_trackedFiles.ContainsKey(e.FullPath) && !_processingFiles.ContainsKey(e.FullPath))
{
EnqueueFileEvent(e.FullPath);
Expand All @@ -179,15 +180,13 @@ private void OnFileCreated(object sender, FileSystemEventArgs e)
private async void EnqueueFileEvent(string filePath)
{
var fileExtension = Path.GetExtension(filePath).ToLower();
// Combine both extension arrays for initial check
var validExtensions = _archiveExtensions.Concat(_modExtensions);
if (!validExtensions.Contains(fileExtension))
{
_logger.LogInformation($"Ignored file: {filePath}, unsupported extension.");
return;
}

// Initial delay - slightly longer for archives
if (_archiveExtensions.Contains(fileExtension))
{
_logger.LogDebug($"Archive file detected, using extended initial delay: {filePath}");
Expand All @@ -202,7 +201,6 @@ private async void EnqueueFileEvent(string filePath)
{
var now = DateTime.UtcNow;

// Early return if file is already being tracked or processed
if (_trackedFiles.ContainsKey(filePath) ||
_processingFiles.ContainsKey(filePath) ||
_processedFiles.Contains(filePath))
Expand All @@ -220,7 +218,6 @@ private async void EnqueueFileEvent(string filePath)
}
}

// Initialize download tracking only if not already tracked
if (!_downloadProgress.ContainsKey(filePath))
{
_downloadProgress[filePath] = new FileDownloadInfo
Expand All @@ -242,6 +239,12 @@ private async Task<bool> WaitForFileStability(string filePath, CancellationToken
if (!_downloadProgress.TryGetValue(filePath, out var downloadInfo))
{
_logger.LogWarning($"No download info found for file: {filePath}");
if (!File.Exists(filePath))
{
_logger.LogInformation($"File no longer exists, removing from tracking: {filePath}");
CleanupFile(filePath);
return false;
}
return false;
}

Expand All @@ -265,18 +268,17 @@ private async Task<bool> WaitForFileStability(string filePath, CancellationToken
if (!fileInfo.Exists)
{
_logger.LogWarning($"File no longer exists: {filePath}");
CleanupFile(filePath);
return false;
}

// Refresh FileInfo to get current size
fileInfo.Refresh();
var currentSize = fileInfo.Length;

if (currentSize > 0)
{
if (currentSize > downloadInfo.LastSize + _minSizeProgress)
{
// Progress detected
downloadInfo.LastSize = currentSize;
downloadInfo.LastProgressTime = now;
stableCount = 0;
Expand All @@ -288,12 +290,10 @@ private async Task<bool> WaitForFileStability(string filePath, CancellationToken
return false;
}

// Check if file size is stable
if (currentSize == lastSize)
{
if (isArchive)
{
// For archives, we need additional checks
if (await IsArchiveAccessible(filePath))
{
stableCount++;
Expand All @@ -312,13 +312,11 @@ private async Task<bool> WaitForFileStability(string filePath, CancellationToken

if (stableCount >= _requiredStabilityChecks)
{
// For archives, add an additional delay after stability is confirmed
if (isArchive)
{
_logger.LogDebug($"Adding additional stability delay for archive: {filePath}");
await Task.Delay(_archiveStabilityDelay, cancellationToken);

// One final check after the delay
if (!await IsArchiveAccessible(filePath))
{
_logger.LogWarning($"Archive failed final accessibility check: {filePath}");
Expand All @@ -344,6 +342,11 @@ private async Task<bool> WaitForFileStability(string filePath, CancellationToken
catch (Exception ex)
{
_logger.LogError(ex, $"Error checking file stability: {filePath}");
if (!IncrementRetryAttempt(filePath))
{
CleanupFile(filePath);
return false;
}
return false;
}
finally
Expand All @@ -358,10 +361,8 @@ private async Task<bool> IsArchiveAccessible(string filePath)
{
try
{
// First check if we can open the file
using (var stream = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
// Read the first few bytes to verify file access
var buffer = new byte[4096];
await stream.ReadAsync(buffer, 0, buffer.Length);
}
Expand Down Expand Up @@ -393,7 +394,7 @@ private void ProcessFileChanges()
ProcessFileAsync(file, _cancellationTokenSource.Token);
}
}

private async void ProcessFileAsync(string file, CancellationToken cancellationToken)
{
try
Expand Down Expand Up @@ -430,6 +431,7 @@ private void ProcessRetryQueue()
_logger.LogDebug("Processing retry queue.");
var retryQueueCopy = new List<string>();

// Transfer items from the retry queue to a temporary list
while (_retryQueue.TryDequeue(out var filePath))
{
retryQueueCopy.Add(filePath);
Expand All @@ -441,6 +443,13 @@ private void ProcessRetryQueue()
{
try
{
if (!IncrementRetryAttempt(filePath))
{
// Max retry attempts reached; clean up file and stop retrying
CleanupFile(filePath);
return;
}

if (await WaitForFileStability(filePath, _cancellationTokenSource.Token))
{
lock (_lock)
Expand All @@ -457,19 +466,27 @@ private void ProcessRetryQueue()
}
else
{
_retryQueue.Enqueue(filePath);
// Re-enqueue the file back into the retry queue
lock (_retryQueue)
{
_retryQueue.Enqueue(filePath);
}
_logger.LogInformation($"File {filePath} is not stable, re-queuing");
}
}
catch (Exception ex)
{
lock (_retryQueue)
{
_retryQueue.Enqueue(filePath);
}
_logger.LogError(ex, $"Error processing file {filePath}");
_retryQueue.Enqueue(filePath);
}
});
}
}


private bool IsFileReady(string filePath)
{
try
Expand All @@ -493,6 +510,49 @@ private void RestartDebounceTimer()
_debounceTimer.Stop();
_debounceTimer.Start();
}

private bool IncrementRetryAttempt(string filePath)
{
var attempts = _retryAttempts.AddOrUpdate(
filePath,
1,
(_, count) => count + 1
);

if (attempts >= _maxRetryAttempts)
{
_logger.LogWarning($"Max retry attempts ({_maxRetryAttempts}) reached for file: {filePath}");
return false;
}

return true;
}

private void CleanupFile(string filePath)
{
_downloadProgress.TryRemove(filePath, out _);
_trackedFiles.TryRemove(filePath, out _);
_processingFiles.TryRemove(filePath, out _);
_retryAttempts.TryRemove(filePath, out _);

// Clear out the retry queue while preserving other items
var tempQueue = new ConcurrentQueue<string>();
while (_retryQueue.TryDequeue(out var path))
{
if (path != filePath)
{
tempQueue.Enqueue(path);
}
}

// Re-add items back to the original _retryQueue
while (tempQueue.TryDequeue(out var path))
{
_retryQueue.Enqueue(path);
}

_logger.LogInformation($"Cleaned up tracking for file: {filePath}");
}

private void TryClearProcessedFiles()
{
Expand Down
Loading