Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using k8s;
using k8s.Models;
using Neuroglia.Serialization.Yaml;

Expand Down Expand Up @@ -108,7 +109,7 @@ public static V1PodTemplateSpec LoadPodTemplate()
var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod);
if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate;
var yaml = File.ReadAllText(templateFilePath);
return YamlSerializer.Default.Deserialize<V1PodTemplateSpec>(yaml)!;
return KubernetesYaml.Deserialize<V1PodTemplateSpec>(yaml);
}

}
13 changes: 6 additions & 7 deletions src/operator/Synapse.Operator/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@
.ConfigureServices((context, services) =>
{
services.Configure<OperatorOptions>(context.Configuration);
services.AddSingleton(provider =>
{
var options = provider.GetRequiredService<IOptionsMonitor<OperatorOptions>>().CurrentValue;
return Options.Create(options.Runner);
});
services.AddLogging(builder =>
{
builder.AddSimpleConsole(options =>
Expand Down Expand Up @@ -67,9 +62,13 @@
services.AddScoped<WorkflowInstanceController>();
services.AddScoped<IResourceController<WorkflowInstance>>(provider => provider.GetRequiredService<WorkflowInstanceController>());

services.AddHostedService<OperatorApplication>();
services.AddSingleton<OperatorApplication>();
services.AddHostedService(provider => provider.GetRequiredService<OperatorApplication>());
services.AddSingleton<RunnerConfigurationMonitor>();
services.AddHostedService(provider => provider.GetRequiredService<RunnerConfigurationMonitor>());
services.AddSingleton<IOptionsMonitor<RunnerConfiguration>>(provider => provider.GetRequiredService<RunnerConfigurationMonitor>());
});

using var app = builder.Build();

await app.RunAsync();
await app.RunAsync();
11 changes: 6 additions & 5 deletions src/operator/Synapse.Operator/Services/OperatorApplication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ internal class OperatorApplication(IServiceProvider serviceProvider)
readonly IServiceScope _scope = serviceProvider.CreateScope();
IServiceProvider ServiceProvider => this._scope.ServiceProvider;

OperatorController _operatorController = null!;
WorkflowController _workflowController = null!;
WorkflowInstanceController _workflowInstanceController = null!;

internal OperatorController OperatorController { get; private set; } = null!;

public async Task StartAsync(CancellationToken cancellationToken)
{
this._operatorController = this.ServiceProvider.GetRequiredService<OperatorController>();
this.OperatorController = this.ServiceProvider.GetRequiredService<OperatorController>();
this._workflowController = this.ServiceProvider.GetRequiredService<WorkflowController>();
this._workflowInstanceController = this.ServiceProvider.GetRequiredService<WorkflowInstanceController>();
await this._operatorController.StartAsync(cancellationToken).ConfigureAwait(false);
await this.OperatorController.StartAsync(cancellationToken).ConfigureAwait(false);
await Task.WhenAll(
[
this._workflowController.StartAsync(cancellationToken),
Expand All @@ -41,12 +42,12 @@ public async Task StopAsync(CancellationToken cancellationToken)
{
await Task.WhenAll(
[
this._operatorController.StopAsync(cancellationToken),
this.OperatorController.StopAsync(cancellationToken),
this._workflowController.StopAsync(cancellationToken),
this._workflowInstanceController.StopAsync(cancellationToken)
]).ConfigureAwait(false);
}

void IDisposable.Dispose() => this._scope.Dispose();

}
}
11 changes: 1 addition & 10 deletions src/operator/Synapse.Operator/Services/OperatorController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ namespace Synapse.Operator.Services;
/// </summary>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="options">The current <see cref="OperatorOptions"/></param>
/// <param name="runnerOptions">The current <see cref="RunnerConfiguration"/></param>
public class OperatorController(IResourceRepository repository, IOptionsMonitor<OperatorOptions> options, IOptionsMonitor<RunnerConfiguration> runnerOptions)
public class OperatorController(IResourceRepository repository, IOptionsMonitor<OperatorOptions> options)
: IOperatorController
{

Expand All @@ -33,11 +32,6 @@ public class OperatorController(IResourceRepository repository, IOptionsMonitor<
/// </summary>
protected OperatorOptions Options => options.CurrentValue;

/// <summary>
/// Gets the current <see cref="RunnerConfiguration"/>
/// </summary>
protected RunnerConfiguration RunnerOptions => runnerOptions.CurrentValue;

/// <inheritdoc/>
public IResourceMonitor<Resources.Operator> Operator { get; protected set; } = null!;

Expand Down Expand Up @@ -90,9 +84,6 @@ protected virtual async Task SetOperatorStatusPhaseAsync(string phase, Cancellat
protected virtual void OnOperatorSpecChanged()
{
this.Options.Runner = this.Operator.Resource.Spec.Runner;
this.RunnerOptions.Api = this.Options.Runner.Api;
this.RunnerOptions.Runtime = this.Options.Runner.Runtime;
this.RunnerOptions.Certificates = this.Options.Runner.Certificates;
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright © 2024-Present The Synapse Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"),
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Synapse.Operator.Services;

/// <summary>
/// Represents the service used to monitor the operator's <see cref="RunnerConfiguration"/>.
/// </summary>
/// <param name="application">The service used to monitor the current operator resource</param>
internal sealed class RunnerConfigurationMonitor(OperatorApplication application)
: IHostedService, IOptionsMonitor<RunnerConfiguration>
{

ConcurrentHashSet<RunnerConfigurationChangeTokenSource> _changeTokenSources = [];
IDisposable? _subscription;

Check warning on line 25 in src/operator/Synapse.Operator/Services/RunnerConfigurationMonitor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Field 'RunnerConfigurationMonitor._subscription' is never assigned to, and will always have its default value null

Check warning on line 25 in src/operator/Synapse.Operator/Services/RunnerConfigurationMonitor.cs

View workflow job for this annotation

GitHub Actions / build (9.0.x)

Field 'RunnerConfigurationMonitor._subscription' is never assigned to, and will always have its default value null
IResourceMonitor<Resources.Operator> _operator = null!;

/// <inheritdoc/>
public RunnerConfiguration CurrentValue => _operator.Resource.Spec.Runner;

/// <inheritdoc/>
public Task StartAsync(CancellationToken cancellationToken)
{
_operator = application.OperatorController.Operator;
_operator.Where(e => e.Type == ResourceWatchEventType.Updated).Select(e => e.Resource.Spec.Runner).Distinct().Subscribe(OnConfigurationChanged);
return Task.CompletedTask;
}

/// <inheritdoc/>
public Task StopAsync(CancellationToken cancellationToken)
{
_subscription?.Dispose();
return Task.CompletedTask;
}

/// <inheritdoc/>
public RunnerConfiguration Get(string? name) => CurrentValue;

void OnConfigurationChanged(RunnerConfiguration configuration)
{
foreach (var changeTokenSource in _changeTokenSources) changeTokenSource.Invoke(configuration, null);
}

IDisposable? IOptionsMonitor<RunnerConfiguration>.OnChange(Action<RunnerConfiguration, string?> listener)
{
var changeTokenSource = new RunnerConfigurationChangeTokenSource(listener);
changeTokenSource.OnDisposed += OnChangeTokenSourceDisposed;
_changeTokenSources.Add(changeTokenSource);
return changeTokenSource;
}

void OnChangeTokenSourceDisposed(object? sender, EventArgs e)
{
if (sender is not RunnerConfigurationChangeTokenSource changeTokenSource) return;
changeTokenSource.OnDisposed -= OnChangeTokenSourceDisposed;
_changeTokenSources.Remove(changeTokenSource);
}

class RunnerConfigurationChangeTokenSource(Action<RunnerConfiguration, string?> listener)
: IDisposable
{

public event EventHandler? OnDisposed;

public void Invoke(RunnerConfiguration configuration, string? name) => listener(configuration, name);

public void Dispose()
{
OnDisposed?.Invoke(this, EventArgs.Empty);
GC.SuppressFinalize(this);
}

}

}
41 changes: 23 additions & 18 deletions src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace Synapse.Runtime.Docker.Services;
/// <param name="serviceProvider">The current <see cref="IServiceProvider"/></param>
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
/// <param name="environment">The current <see cref="IHostEnvironment"/></param>
/// <param name="runner">The service used to access the current <see cref="RunnerConfiguration"/></param>
public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptions<RunnerConfiguration> runner)
/// <param name="runnerConfigurationMonitor">The service used to access the current <see cref="Resources.RunnerConfiguration"/></param>
public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IHostEnvironment environment, IOptionsMonitor<RunnerConfiguration> runnerConfigurationMonitor)
: WorkflowRuntimeBase(loggerFactory)
{

Expand All @@ -41,9 +41,14 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg
protected IHostEnvironment Environment { get; } = environment;

/// <summary>
/// Gets the current <see cref="RunnerConfiguration"/>
/// Gets the service used to access the current <see cref="Resources.RunnerConfiguration"/>
/// </summary>
protected RunnerConfiguration Runner => runner.Value;
protected IOptionsMonitor<RunnerConfiguration> RunnerConfigurationMonitor { get; } = runnerConfigurationMonitor;

/// <summary>
/// Gets the current <see cref="Resources.RunnerConfiguration"/>
/// </summary>
protected RunnerConfiguration RunnerConfiguration => RunnerConfigurationMonitor.CurrentValue;

/// <summary>
/// Gets the service used to interact with the Docker API
Expand All @@ -62,9 +67,9 @@ public class DockerRuntime(IServiceProvider serviceProvider, ILoggerFactory logg
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual Task InitializeAsync(CancellationToken cancellationToken = default)
{
if (this.Runner.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime");
var dockerConfiguration = new DockerClientConfiguration(this.Runner.Runtime.Docker.Api.Endpoint);
this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.Runner.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.Runner.Runtime.Docker.Api.Version!));
if (this.RunnerConfiguration.Runtime.Docker == null) throw new NullReferenceException($"Failed to initialize the Docker Runtime because the operator is not configured to use Docker as a runtime");
var dockerConfiguration = new DockerClientConfiguration(this.RunnerConfiguration.Runtime.Docker.Api.Endpoint);
this.Docker = dockerConfiguration.CreateClient(string.IsNullOrWhiteSpace(this.RunnerConfiguration.Runtime.Docker.Api.Version) ? null : System.Version.Parse(this.RunnerConfiguration.Runtime.Docker.Api.Version!));
return Task.CompletedTask;
}

Expand All @@ -78,7 +83,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
{
this.Logger.LogDebug("Creating a new Docker container for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName());
if (this.Docker == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false);
var container = this.Runner.Runtime.Docker!.ContainerTemplate.Clone()!;
var container = this.RunnerConfiguration.Runtime.Docker!.ContainerTemplate.Clone()!;
try
{
await this.Docker!.Images.InspectImageAsync(container.Image, cancellationToken).ConfigureAwait(false);
Expand All @@ -93,24 +98,24 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
}
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Namespace, workflowInstance.GetNamespace()!);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.Name, $"{workflowInstance.GetName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}");
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.Runner.Api.Uri.OriginalString);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.Runner.ContainerPlatform);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.Runner.PublishLifecycleEvents ?? true).ToString());
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.Runner.Runtime.Docker.Secrets.MountPath);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Api.Uri, this.RunnerConfiguration.Api.Uri.OriginalString);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.ContainerPlatform, this.RunnerConfiguration.ContainerPlatform);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runner.LifecycleEvents, (this.RunnerConfiguration.PublishLifecycleEvents ?? true).ToString());
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Secrets.Directory, this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Name, serviceAccount.GetQualifiedName());
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.ServiceAccount.Key, serviceAccount.Spec.Key);
container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Workflow.Instance, workflowInstance.GetQualifiedName());
container.SetEnvironmentVariable("DOCKER_HOST", "unix:///var/run/docker.sock");
container.User = "root";
if (this.Runner.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true");
var hostConfig = this.Runner.Runtime.Docker.HostConfig?.Clone()! ?? new();
if (!Directory.Exists(this.Runner.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.Runner.Runtime.Docker.Secrets.Directory);
if (this.RunnerConfiguration.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true");
var hostConfig = this.RunnerConfiguration.Runtime.Docker.HostConfig?.Clone()! ?? new();
if (!Directory.Exists(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory)) Directory.CreateDirectory(this.RunnerConfiguration.Runtime.Docker.Secrets.Directory);
hostConfig.Mounts ??= [];
hostConfig.Mounts.Insert(0, new()
{
Type = "bind",
Source = this.Runner.Runtime.Docker.Secrets.Directory,
Target = this.Runner.Runtime.Docker.Secrets.MountPath
Source = this.RunnerConfiguration.Runtime.Docker.Secrets.Directory,
Target = this.RunnerConfiguration.Runtime.Docker.Secrets.MountPath
});
hostConfig.Mounts.Insert(1, new()
{
Expand All @@ -128,7 +133,7 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
HostConfig = hostConfig
};
var result = await this.Docker!.Containers.CreateContainerAsync(parameters, cancellationToken).ConfigureAwait(false);
if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.Runner.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken);
if (this.Environment.RunsInDocker()) await this.Docker.Networks.ConnectNetworkAsync(this.RunnerConfiguration.Runtime.Docker.Network, new NetworkConnectParameters() { Container = result.ID }, cancellationToken);
if (result.Warnings.Count > 0) this.Logger.LogWarning("Warnings have been raised during container creation: {warnings}", string.Join(System.Environment.NewLine, result.Warnings));
var process = ActivatorUtilities.CreateInstance<DockerWorkflowProcess>(this.ServiceProvider, this.Docker!, result.ID);
this.Processes.TryAdd(process.Id, process);
Expand Down
Loading