|
20 | 20 | using Synapse.Events.Tasks;
|
21 | 21 | using Synapse.Events.Workflows;
|
22 | 22 | using System.Net.Mime;
|
| 23 | +using System.Reactive.Disposables; |
| 24 | +using System.Reactive.Threading.Tasks; |
23 | 25 |
|
24 | 26 | namespace Synapse.Runner.Services;
|
25 | 27 |
|
@@ -304,11 +306,125 @@ public virtual async Task ResumeAsync(CancellationToken cancellationToken = defa
|
304 | 306 | this.Logger.LogInformation("The workflow's execution has been resumed.");
|
305 | 307 | }
|
306 | 308 |
|
| 309 | + /// <inheritdoc/> |
| 310 | + public virtual async Task<IObservable<IStreamedCloudEvent>> StreamAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default) |
| 311 | + { |
| 312 | + ArgumentNullException.ThrowIfNull(task); |
| 313 | + if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task)); |
| 314 | + if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task doesn't use streaming, the {nameof(CorrelateAsync)} method must be used instead"); |
| 315 | + if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return Observable.Empty<IStreamedCloudEvent>(); |
| 316 | + var @namespace = task.Workflow.Instance.GetNamespace()!; |
| 317 | + var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}"; |
| 318 | + Correlation? correlation = null; |
| 319 | + try { correlation = await this.Api.Correlations.GetAsync(name, @namespace, cancellationToken).ConfigureAwait(false); } |
| 320 | + catch { } |
| 321 | + if (correlation == null) |
| 322 | + { |
| 323 | + correlation = await this.Api.Correlations.CreateAsync(new() |
| 324 | + { |
| 325 | + Metadata = new() |
| 326 | + { |
| 327 | + Namespace = @namespace, |
| 328 | + Name = name, |
| 329 | + Labels = new Dictionary<string, string>() |
| 330 | + { |
| 331 | + { SynapseDefaults.Resources.Labels.WorkflowInstance, this.Instance.GetQualifiedName() } |
| 332 | + } |
| 333 | + }, |
| 334 | + Spec = new() |
| 335 | + { |
| 336 | + Source = new ResourceReference<WorkflowInstance>(task.Workflow.Instance.GetName(), task.Workflow.Instance.GetNamespace()), |
| 337 | + Lifetime = CorrelationLifetime.Ephemeral, |
| 338 | + Events = listenTask.Listen.To, |
| 339 | + Stream = true, |
| 340 | + Expressions = task.Workflow.Definition.Evaluate ?? new(), |
| 341 | + Outcome = new() |
| 342 | + { |
| 343 | + Correlate = new() |
| 344 | + { |
| 345 | + Instance = task.Workflow.Instance.GetQualifiedName(), |
| 346 | + Task = task.Instance.Reference.OriginalString |
| 347 | + } |
| 348 | + } |
| 349 | + } |
| 350 | + }, cancellationToken).ConfigureAwait(false); |
| 351 | + } |
| 352 | + var taskCompletionSource = new TaskCompletionSource<CorrelationContext>(); |
| 353 | + var cancellationTokenRegistration = cancellationToken.Register(() => taskCompletionSource.TrySetCanceled()); |
| 354 | + var correlationSubscription = this.Api.WorkflowInstances.MonitorAsync(this.Instance.GetName(), this.Instance.GetNamespace()!, cancellationToken) |
| 355 | + .ToObservable() |
| 356 | + .Where(e => e.Type == ResourceWatchEventType.Updated) |
| 357 | + .Select(e => e.Resource.Status?.Correlation?.Contexts) |
| 358 | + .Scan((Previous: (EquatableDictionary<string, CorrelationContext>?)null, Current: (EquatableDictionary<string, CorrelationContext>?)null), (accumulator, current) => (accumulator.Current ?? [], current)) |
| 359 | + .Where(v => v.Current?.Count > v.Previous?.Count) //ensures we are not handling changes in a circular loop: if length of current is smaller than previous, it means a context has been processed |
| 360 | + .Subscribe(value => |
| 361 | + { |
| 362 | + var patch = JsonPatchUtility.CreateJsonPatchFromDiff(value.Previous, value.Current); |
| 363 | + var patchOperation = patch.Operations.FirstOrDefault(o => o.Op == OperationType.Add && o.Path[0] == task.Instance.Reference.OriginalString); |
| 364 | + if (patchOperation == null) return; |
| 365 | + context = this.JsonSerializer.Deserialize<CorrelationContext>(patchOperation.Value!)!; |
| 366 | + taskCompletionSource.SetResult(context); |
| 367 | + }); |
| 368 | + var endOfStream = false; |
| 369 | + var stopObservable = taskCompletionSource.Task.ToObservable(); |
| 370 | + var stopSubscription = stopObservable.Take(1).Subscribe(_ => endOfStream = true); |
| 371 | + return Observable.Create<StreamedCloudEvent>(observer => |
| 372 | + { |
| 373 | + var subscription = Observable.Using( |
| 374 | + () => new CompositeDisposable |
| 375 | + { |
| 376 | + cancellationTokenRegistration, |
| 377 | + correlationSubscription |
| 378 | + }, |
| 379 | + disposable => this.Api.Correlations.MonitorAsync(correlation.GetName(), correlation.GetNamespace()!, cancellationToken) |
| 380 | + .ToObservable() |
| 381 | + .Where(e => e.Type == ResourceWatchEventType.Updated) |
| 382 | + .Select(e => e.Resource.Status?.Contexts?.FirstOrDefault()) |
| 383 | + .Where(c => c != null) |
| 384 | + .SelectMany(c => |
| 385 | + { |
| 386 | + var acknowledgedOffset = c!.Offset.HasValue ? (int)c.Offset.Value : 0; |
| 387 | + return c.Events.Values |
| 388 | + .Skip(acknowledgedOffset) |
| 389 | + .Select((evt, index) => new |
| 390 | + { |
| 391 | + ContextId = c.Id, |
| 392 | + Event = evt, |
| 393 | + Offset = (uint)(acknowledgedOffset + index + 1) |
| 394 | + }); |
| 395 | + }) |
| 396 | + .Distinct(e => e.Offset) |
| 397 | + .Select(e => new StreamedCloudEvent(e.Event, e.Offset, async (offset, token) => |
| 398 | + { |
| 399 | + var original = await this.Api.Correlations.GetAsync(name, @namespace, token).ConfigureAwait(false); |
| 400 | + var updated = original.Clone()!; |
| 401 | + var context = updated.Status?.Contexts.FirstOrDefault(c => c.Id == e.ContextId); |
| 402 | + if (context == null) |
| 403 | + { |
| 404 | + this.Logger.LogError("Failed to find a context with the specified id '{contextId}' in correlation '{name}.{@namespace}'", e.ContextId, name, @namespace); |
| 405 | + throw new Exception($"Failed to find a context with the specified id '{e.ContextId}' in correlation '{name}.{@namespace}'"); |
| 406 | + } |
| 407 | + context.Offset = offset; |
| 408 | + var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated); |
| 409 | + await this.Api.Correlations.PatchStatusAsync(name, @namespace, new Patch(PatchType.JsonPatch, patch), cancellationToken: token).ConfigureAwait(false); |
| 410 | + }))) |
| 411 | + .Subscribe(e => |
| 412 | + { |
| 413 | + observer.OnNext(e); |
| 414 | + if (endOfStream) observer.OnCompleted(); |
| 415 | + }, |
| 416 | + ex => observer.OnError(ex), |
| 417 | + () => observer.OnCompleted()); |
| 418 | + return new CompositeDisposable(subscription, stopSubscription); |
| 419 | + }); |
| 420 | + } |
| 421 | + |
307 | 422 | /// <inheritdoc/>
|
308 | 423 | public virtual async Task<CorrelationContext> CorrelateAsync(ITaskExecutionContext task, CancellationToken cancellationToken = default)
|
309 | 424 | {
|
310 | 425 | ArgumentNullException.ThrowIfNull(task);
|
311 | 426 | if (task.Definition is not ListenTaskDefinition listenTask) throw new ArgumentException("The specified task's definition must be a 'listen' task", nameof(task));
|
| 427 | + if (listenTask.Foreach == null) throw new ArgumentException($"Since the specified listen task uses streaming, the {nameof(StreamAsync)} method must be used instead"); |
312 | 428 | if (this.Instance.Status?.Correlation?.Contexts?.TryGetValue(task.Instance.Reference.OriginalString, out var context) == true && context != null) return context;
|
313 | 429 | var @namespace = task.Workflow.Instance.GetNamespace()!;
|
314 | 430 | var name = $"{task.Workflow.Instance.GetName()}.{task.Instance.Id}";
|
|
0 commit comments