Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cli/cli/Commands/Content/ContentSyncCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ public class ContentSyncCommand : AtomicCommand<ContentSyncCommandArgs, ContentS
private static ConfigurableOption TARGET_MANIFEST_UID_OPTION =
new("target", "If you pass in a Manifest's UID, we'll sync with that as the target. If filters are provided, will only do this for content that matches the filter");

private static Option<int> DOWNLOAD_MAX_PARALLEL_COUNT_OPTION = new(
"--download-max-parallel-count",
() => 64,
"Maximum number of content files to download in parallel while syncing. Use 0 for unbounded parallelism.");

private ContentService _contentService;

public ContentSyncCommand() : base("sync", "Synchronizes the local content matching the filters to the latest content stored in the realm")
Expand All @@ -38,6 +43,7 @@ public override void Configure()
AddOption(SYNC_CONFLICTS_OPTION, (args, b) => args.SyncConflicts = b);
AddOption(SYNC_DELETED_OPTION, (args, b) => args.SyncDeleted = b);
AddOption(TARGET_MANIFEST_UID_OPTION, (args, b) => args.TargetManifestUid = b);
AddOption(DOWNLOAD_MAX_PARALLEL_COUNT_OPTION, (args, i) => args.DownloadMaxParallelCount = i);
}

public override async Task<ContentSyncResult> GetResult(ContentSyncCommandArgs args)
Expand All @@ -49,7 +55,8 @@ public override async Task<ContentSyncResult> GetResult(ContentSyncCommandArgs a
foreach (var manifestId in args.ManifestIdsToReset)
{
var task = _contentService.SyncLocalContent(args.Lifecycle, manifestId, args.FilterType, args.Filter, args.SyncCreated,
args.SyncModified, args.SyncConflicts, args.SyncDeleted, args.TargetManifestUid, this.SendResults<ProgressStreamResultChannel, ContentProgressUpdateData>);
args.SyncModified, args.SyncConflicts, args.SyncDeleted, args.TargetManifestUid, this.SendResults<ProgressStreamResultChannel, ContentProgressUpdateData>,
args.DownloadMaxParallelCount);

resetPromises.Add(task);
}
Expand All @@ -71,6 +78,7 @@ public class ContentSyncCommandArgs : ContentCommandArgs
public bool SyncConflicts;
public bool SyncDeleted;
public string TargetManifestUid;
public int DownloadMaxParallelCount;
}

[CliContractType]
Expand Down
126 changes: 119 additions & 7 deletions cli/cli/Services/Content/ContentService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ public partial class ContentService
/// </summary>
private const string FAKE_EMPTY_MANIFEST_UID = "EmptyManifest";

private const int CONTENT_DOWNLOAD_MAX_ATTEMPTS = 4;
private const int DEFAULT_CONTENT_DOWNLOAD_MAX_CONCURRENCY = 64;
private const int CONTENT_DOWNLOAD_RETRY_BASE_DELAY_MS = 250;
private const int CONTENT_DOWNLOAD_RETRY_JITTER_MS = 250;

/// <summary>
/// Shared client for CDN content-file downloads during sync.
/// </summary>
/// <remarks>
/// This intentionally bypasses the generic CLI requester to avoid per-file verbose request/response logging while preserving connection reuse.
/// </remarks>
private static readonly HttpClient _contentDownloadClient = new(CreateContentDownloadHandler());

private readonly CliRequester _requester;
private readonly ConfigService _config;
private readonly IContentApi _contentApi;
Expand Down Expand Up @@ -942,10 +955,11 @@ public async Task PublishContent(AutoSnapshotType autoSnapshotType, int maxLocal
public async Task<ContentSyncReport> SyncLocalContent(AppLifecycle lifecycle, string manifestId,
ContentFilterType filterType = ContentFilterType.ExactIds, string[] filters = null,
bool deleteCreated = true, bool syncModified = true, bool forceSyncConflicts = true, bool syncDeleted = true,
string referenceManifestUid = "", Action<ContentProgressUpdateData> onContentSyncProgressUpdate = null)
string referenceManifestUid = "", Action<ContentProgressUpdateData> onContentSyncProgressUpdate = null,
int downloadMaxParallelCount = DEFAULT_CONTENT_DOWNLOAD_MAX_CONCURRENCY)
{
var targetManifest = await GetManifest(manifestId, referenceManifestUid, replaceLatest: string.IsNullOrEmpty(referenceManifestUid));
return await SyncLocalContent(targetManifest, manifestId, filterType, filters, deleteCreated, syncModified, forceSyncConflicts, syncDeleted, onContentSyncProgressUpdate, lifecycle.CancellationToken);
return await SyncLocalContent(targetManifest, manifestId, filterType, filters, deleteCreated, syncModified, forceSyncConflicts, syncDeleted, onContentSyncProgressUpdate, lifecycle.CancellationToken, downloadMaxParallelCount);
}

/// <summary>
Expand All @@ -954,7 +968,8 @@ public async Task<ContentSyncReport> SyncLocalContent(AppLifecycle lifecycle, st
public async Task<ContentSyncReport> SyncLocalContent(ClientManifestJsonResponse targetManifest, string manifestId,
ContentFilterType filterType = ContentFilterType.ExactIds, string[] filters = null,
bool syncCreated = true, bool syncModified = true, bool forceSyncConflicts = true, bool syncDeleted = true,
Action<ContentProgressUpdateData> onContentSyncProgressUpdate = null, CancellationToken cancellationToken = default)
Action<ContentProgressUpdateData> onContentSyncProgressUpdate = null, CancellationToken cancellationToken = default,
int downloadMaxParallelCount = DEFAULT_CONTENT_DOWNLOAD_MAX_CONCURRENCY)
{
// Reset processed count when calling SyncLocalContent method
_syncProcessedCount = 0;
Expand Down Expand Up @@ -995,10 +1010,14 @@ public async Task<ContentSyncReport> SyncLocalContent(ClientManifestJsonResponse

// Download and overwrite the local content for things that have changed based on the hash or don't exist.
int totalItems = contentToDownload.Length + contentToDelete.Length;
using var contentDownloadSemaphore = downloadMaxParallelCount > 0 ? new SemaphoreSlim(downloadMaxParallelCount) : null;
var downloadPromises = contentToDownload.Select(async c =>
{
Log.Verbose("Downloading content with id. ID={Id}", c.Id);

if (contentDownloadSemaphore != null)
{
await contentDownloadSemaphore.WaitAsync(cancellationToken);
}

ContentProgressUpdateData contentProgress = new ContentProgressUpdateData
{
totalItems = totalItems,
Expand All @@ -1008,8 +1027,7 @@ public async Task<ContentSyncReport> SyncLocalContent(ClientManifestJsonResponse
JsonElement customRequest;
try
{
customRequest = await _requester.CustomRequest(Method.GET, c.ReferenceContent.uri,
parser: s => JsonSerializer.Deserialize<JsonElement>(s));
customRequest = await DownloadContentFile(c, cancellationToken);
}
catch (HttpRequesterException exception)
{
Expand All @@ -1018,6 +1036,17 @@ public async Task<ContentSyncReport> SyncLocalContent(ClientManifestJsonResponse
onContentSyncProgressUpdate?.Invoke(contentProgress);
throw;
}
catch (Exception exception) when (!cancellationToken.IsCancellationRequested)
{
contentProgress.EventType = ContentProgressUpdateData.EVT_TYPE_SyncError;
contentProgress.errorMessage = exception.Message;
onContentSyncProgressUpdate?.Invoke(contentProgress);
throw;
}
finally
{
contentDownloadSemaphore?.Release();
}

contentProgress.EventType = ContentProgressUpdateData.EVT_TYPE_SyncComplete;
onContentSyncProgressUpdate?.Invoke(contentProgress);
Expand Down Expand Up @@ -1163,6 +1192,89 @@ public async Task<ContentSyncReport> SyncLocalContent(ClientManifestJsonResponse
}
}

/// <summary>
/// Downloads a single content file from its remote content URI with transient retry handling.
/// </summary>
/// <remarks>
/// Content sync may download hundreds or thousands of small files. This path keeps those downloads off the generic requester so Unity does not receive a verbose log event for every response body.
/// </remarks>
private async Task<JsonElement> DownloadContentFile(ContentFile contentFile, CancellationToken cancellationToken)
{
for (var attempt = 1; attempt <= CONTENT_DOWNLOAD_MAX_ATTEMPTS; attempt++)
{
try
{
using var response = await _contentDownloadClient.GetAsync(contentFile.ReferenceContent.uri, cancellationToken);
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var reader = new StreamReader(stream, Encoding.UTF8);
var rawResponse = await reader.ReadToEndAsync();

if (!response.IsSuccessStatusCode)
{
throw new RequesterException("Cli", Method.GET.ToReadableString(), contentFile.ReferenceContent.uri, (int)response.StatusCode, rawResponse);
}

return JsonSerializer.Deserialize<JsonElement>(rawResponse);
}
catch (Exception exception) when (!cancellationToken.IsCancellationRequested && IsTransientContentDownloadException(exception) && attempt < CONTENT_DOWNLOAD_MAX_ATTEMPTS)
{
var delay = GetContentDownloadRetryDelay(attempt);
Log.Warning($"Transient content download failure. Retrying content-id=[{contentFile.Id}] attempt=[{attempt + 1}/{CONTENT_DOWNLOAD_MAX_ATTEMPTS}] delay-ms=[{delay.TotalMilliseconds}] error=[{exception.GetType().Name}] message=[{exception.Message}]");
await Task.Delay(delay, cancellationToken);
}
}

throw new InvalidOperationException($"Content download retry loop exited unexpectedly. content-id=[{contentFile.Id}]");
}

/// <summary>
/// Creates the HTTP handler used by the shared content download client.
/// </summary>
/// <remarks>
/// The sync command owns concurrency limits separately, so the handler allows a high per-server connection ceiling.
/// </remarks>
private static HttpClientHandler CreateContentDownloadHandler()
{
return new HttpClientHandler
{
MaxConnectionsPerServer = int.MaxValue,
UseCookies = false
};
}

/// <summary>
/// Calculates exponential retry delay with jitter for a failed content download attempt.
/// </summary>
private static TimeSpan GetContentDownloadRetryDelay(int failedAttempt)
{
var exponentialDelayMs = CONTENT_DOWNLOAD_RETRY_BASE_DELAY_MS * (1 << (failedAttempt - 1));
var jitterMs = Random.Shared.Next(0, CONTENT_DOWNLOAD_RETRY_JITTER_MS);
return TimeSpan.FromMilliseconds(exponentialDelayMs + jitterMs);
}

/// <summary>
/// Determines whether a content download failure is likely transient and should be retried.
/// </summary>
private static bool IsTransientContentDownloadException(Exception exception)
{
if (exception is RequesterException requesterException)
{
return requesterException.Status is 408 or 429 || requesterException.Status >= 500 && requesterException.Status < 600;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comments here on why we consider those requester status code errors transient?

}

if (exception is HttpRequestException || exception is TimeoutException)
{
return true;
}

if (exception is TaskCanceledException)
{
return true;
}

return exception.InnerException != null && IsTransientContentDownloadException(exception.InnerException);
}


public string[] GetContentSnapshots(bool local, string pid = "")
{
Expand Down
7 changes: 6 additions & 1 deletion client/Packages/com.beamable/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Added
- Added in-window Content Manager progress for sync and revert operations.
- Added editor content sync download concurrency configuration.

### Fixed
- Improved content sync resilience for transient SSL/socket reset download failures.
- Deserialization issue with `properties` field in Score Items of Events
- Fixed an issue where the Unity Editor would not detect changes to Icon subObject (for Sprites in Multiple Mode) and thus not saving it correctly
<<<<<<< fix/contentNullFields
Expand Down Expand Up @@ -1569,4 +1574,4 @@ This is a broken package. It includes changes from the 1.1.0 release. Please do
- Added OnUserLoggingOut event available from API. The event fires before a user switches account.
- Doc Url to package.json.
- Event phase validation. Events can no longer have zero phases. This may lead to disappearing Event Phases if your Beamable version is mismatched.
- Switched MatchmakingService API to point to our new backend matchmaking service.
- Switched MatchmakingService API to point to our new backend matchmaking service.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public partial class ContentSyncArgs : Beamable.Common.BeamCli.IBeamCommandArgs
public bool syncDeleted;
/// <summary>If you pass in a Manifest's UID, we'll sync with that as the target. If filters are provided, will only do this for content that matches the filter</summary>
public string target;
/// <summary>Maximum number of content files to download in parallel while syncing. Use 0 for unbounded parallelism.</summary>
public int downloadMaxParallelCount;
/// <summary>Serializes the arguments for command line usage.</summary>
public virtual string Serialize()
{
Expand Down Expand Up @@ -75,6 +77,11 @@ public virtual string Serialize()
{
genBeamCommandArgs.Add(("--target=" + this.target));
}
// If the downloadMaxParallelCount value was not default, then add it to the list of args.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this autogenerated?

if ((this.downloadMaxParallelCount != default(int)))
{
genBeamCommandArgs.Add(("--download-max-parallel-count=" + this.downloadMaxParallelCount));
}
string genBeamCommandStr = "";
// Join all the args with spaces
genBeamCommandStr = string.Join(" ", genBeamCommandArgs);
Expand Down
Loading
Loading