Skip to content

Commit ba59245

Browse files
authored
Merge pull request #479 from serverlessworkflow/feat-event-read-mode
Implement event read mode
2 parents 48b4968 + b232248 commit ba59245

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

src/runner/Synapse.Runner/Services/Executors/ListenTaskExecutor.cs

+13-2
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,13 @@ protected override async Task DoExecuteAsync(CancellationToken cancellationToken
6262
if (this.Task.Definition.Foreach == null)
6363
{
6464
var context = await this.Task.CorrelateAsync(cancellationToken).ConfigureAwait(false);
65-
await this.SetResultAsync(context, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
65+
var events = this.Task.Definition.Listen.Read switch
66+
{
67+
EventReadMode.Data or EventReadMode.Raw => context.Events.Select(e => e.Value.Data),
68+
EventReadMode.Envelope => context.Events.Select(e => e.Value.Data),
69+
_ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported")
70+
};
71+
await this.SetResultAsync(events, this.Task.Definition.Then, cancellationToken).ConfigureAwait(false);
6672
}
6773
else
6874
{
@@ -107,7 +113,12 @@ protected virtual async Task OnStreamingEventAsync(IStreamedCloudEvent e)
107113
]
108114
};
109115
var arguments = this.GetExpressionEvaluationArguments();
110-
var eventData = e.Event as object;
116+
var eventData = this.Task.Definition.Listen.Read switch
117+
{
118+
EventReadMode.Data or EventReadMode.Raw => e.Event.Data,
119+
EventReadMode.Envelope => e.Event,
120+
_ => throw new NotSupportedException($"The specified event read mode '{this.Task.Definition.Listen.Read}' is not supported")
121+
};
111122
if (this.Task.Definition.Foreach.Output?.As is string fromExpression) eventData = await this.Task.Workflow.Expressions.EvaluateAsync<object>(fromExpression, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
112123
else if (this.Task.Definition.Foreach.Output?.As != null) eventData = await this.Task.Workflow.Expressions.EvaluateAsync<object>(this.Task.Definition.Foreach.Output.As!, eventData ?? new(), arguments, this.CancellationTokenSource!.Token).ConfigureAwait(false);
113124
if (this.Task.Definition.Foreach.Export?.As is string toExpression)

0 commit comments

Comments
 (0)