Skip to content

Commit 19491d1

Browse files
Update parallel workflow using async gem (#11)
* Add Async for Parallel workflow * add specs for parallel workflow and error aggregator * update rubocop * move require line to lib/mars --------- Co-authored-by: Andres Garcia <[email protected]>
1 parent 2dcf9b5 commit 19491d1

File tree

10 files changed

+155
-41
lines changed

10 files changed

+155
-41
lines changed

.rubocop.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ AllCops:
99
Lint/MissingSuper:
1010
Enabled: false
1111

12+
Metrics/MethodLength:
13+
Enabled: true
14+
Exclude:
15+
- "lib/mars/workflows/parallel.rb"
16+
17+
RSpec/ExampleLength:
18+
Enabled: false
19+
1220
Style/Documentation:
1321
Enabled: false
1422

examples/parallel_workflow/diagram.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
flowchart LR
33
in((In))
44
out((Out))
5-
parallel_workflow_aggregator[Parallel workflow Aggregator]
5+
aggregator[Aggregator]
66
llm_1[LLM 1]
77
llm_2[LLM 2]
88
llm_3[LLM 3]
99
in --> llm_1
1010
in --> llm_2
1111
in --> llm_3
12-
llm_1 --> parallel_workflow_aggregator
13-
parallel_workflow_aggregator --> out
14-
llm_2 --> parallel_workflow_aggregator
15-
llm_3 --> parallel_workflow_aggregator
12+
llm_1 --> aggregator
13+
aggregator --> out
14+
llm_2 --> aggregator
15+
llm_3 --> aggregator
1616
```

examples/parallel_workflow/examples.md

Lines changed: 0 additions & 26 deletions
This file was deleted.

examples/parallel_workflow/generator.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010

1111
llm3 = Mars::Agent.new(name: "LLM 3")
1212

13+
aggregator = Mars::Aggregator.new("Aggregator", operation: lambda(&:sum))
14+
1315
# Create the parallel workflow (LLM 1, LLM 2, LLM 3)
1416
parallel_workflow = Mars::Workflows::Parallel.new(
1517
"Parallel workflow",
16-
steps: [llm1, llm2, llm3]
18+
steps: [llm1, llm2, llm3],
19+
aggregator: aggregator
1720
)
1821

1922
# Generate and save the diagram

lib/mars.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "zeitwerk"
4+
require "async"
45

56
loader = Zeitwerk::Loader.for_gem
67
loader.setup

lib/mars/aggregator.rb

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,15 @@
22

33
module Mars
44
class Aggregator < Runnable
5-
attr_reader :name
5+
attr_reader :name, :operation
66

7-
def initialize(name = "Aggregator")
7+
def initialize(name = "Aggregator", operation: nil)
88
@name = name
9+
@operation = operation || ->(inputs) { inputs.join("\n") }
910
end
1011

1112
def run(inputs)
12-
return yield if block_given?
13-
14-
inputs.join("\n")
13+
operation.call(inputs)
1514
end
1615
end
1716
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# frozen_string_literal: true
2+
3+
module Mars
4+
module Workflows
5+
class AggregateError < StandardError
6+
attr_reader :errors
7+
8+
def initialize(errors)
9+
@errors = errors
10+
super(errors.map { |error| "#{error[:step_name]}: #{error[:error].message}" }.join("\n"))
11+
end
12+
end
13+
end
14+
end

lib/mars/workflows/parallel.rb

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,22 @@ def initialize(name, steps:, aggregator: nil)
1212
end
1313

1414
def run(input)
15-
inputs = @steps.map do |step|
16-
step.run(input)
17-
end
15+
errors = []
16+
results = Async do |workflow|
17+
tasks = @steps.map do |step|
18+
workflow.async do
19+
step.run(input)
20+
rescue StandardError => e
21+
errors << { error: e, step_name: step.name }
22+
end
23+
end
1824

19-
aggregator.run(inputs)
25+
tasks.map(&:wait)
26+
end.result
27+
28+
raise AggregateError, errors if errors.any?
29+
30+
aggregator.run(results)
2031
end
2132

2233
private

mars.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ Gem::Specification.new do |spec|
3535
spec.require_paths = ["lib"]
3636

3737
# Uncomment to register a new dependency of your gem
38+
spec.add_dependency "async", "~> 2.34"
3839
spec.add_dependency "ruby_llm", "~> 1.0"
3940
spec.add_dependency "zeitwerk", "~> 2.7"
4041

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# frozen_string_literal: true
2+
3+
RSpec.describe Mars::Workflows::Parallel do
4+
let(:add_step_class) do
5+
Class.new do
6+
def initialize(value)
7+
@value = value
8+
end
9+
10+
def run(input)
11+
sleep 0.1
12+
puts "add step: #{input}"
13+
input + @value
14+
end
15+
end
16+
end
17+
18+
let(:multiply_step_class) do
19+
Class.new do
20+
def initialize(multiplier)
21+
@multiplier = multiplier
22+
end
23+
24+
def run(input)
25+
puts "multiply step: #{input}"
26+
input * @multiplier
27+
end
28+
end
29+
end
30+
31+
let(:error_step_class) do
32+
Class.new do
33+
attr_reader :name
34+
35+
def initialize(message, name)
36+
@message = message
37+
@name = name
38+
end
39+
40+
def run(_input)
41+
puts "error step: #{@name}"
42+
raise StandardError, @message
43+
end
44+
end
45+
end
46+
47+
describe "#run" do
48+
it "executes steps in parallel" do
49+
add_five = add_step_class.new(5)
50+
multiply_three = multiply_step_class.new(3)
51+
add_two = add_step_class.new(2)
52+
53+
workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two])
54+
55+
# 10 + 5 = 15, 10 * 3 = 30, 10 + 2 = 12
56+
expect(workflow.run(10)).to eq("15\n30\n12")
57+
end
58+
59+
it "executes steps in parallel with a custom aggregator" do
60+
add_five = add_step_class.new(5)
61+
multiply_three = multiply_step_class.new(3)
62+
add_two = add_step_class.new(2)
63+
aggregator = Mars::Aggregator.new("Custom Aggregator", operation: lambda(&:sum))
64+
workflow = described_class.new("math_workflow", steps: [add_five, multiply_three, add_two],
65+
aggregator: aggregator)
66+
67+
expect(workflow.run(10)).to eq(57)
68+
end
69+
70+
it "handles single step" do
71+
multiply_step = multiply_step_class.new(7)
72+
workflow = described_class.new("single_step", steps: [multiply_step])
73+
74+
expect(workflow.run(6)).to eq("42")
75+
end
76+
77+
it "returns input unchanged when no steps" do
78+
workflow = described_class.new("empty", steps: [])
79+
80+
expect(workflow.run(42)).to eq("")
81+
end
82+
83+
it "propagates errors from steps" do
84+
add_step = add_step_class.new(5)
85+
error_step = error_step_class.new("Step failed", "error_step_one")
86+
error_step_two = error_step_class.new("Step failed two", "error_step_two")
87+
88+
workflow = described_class.new("error_workflow", steps: [add_step, error_step, error_step_two])
89+
90+
expect { workflow.run(10) }.to raise_error(
91+
Mars::Workflows::AggregateError,
92+
"error_step_one: Step failed\nerror_step_two: Step failed two"
93+
)
94+
end
95+
end
96+
97+
describe "inheritance" do
98+
it "inherits from Mars::Runnable" do
99+
workflow = described_class.new("test", steps: [])
100+
expect(workflow).to be_a(Mars::Runnable)
101+
end
102+
end
103+
end

0 commit comments

Comments
 (0)