Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/gen/temporal/api/command/v1/message_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions lib/gen/temporal/api/common/v1/message_pb.rb

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def initialize(config)
# options[:signal_input] is specified.
# @option options [String, Array, nil] :signal_input corresponds to the 'input' argument to signal_workflow
# @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
# @option options [Hash] :priority check Temporal::Priority for available options
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
# @option options [Hash] :search_attributes
Expand Down Expand Up @@ -68,6 +69,7 @@ def start_workflow(workflow, *input, options: {}, **args)
headers: config.header_propagator_chain.inject(execution_options.headers),
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
priority: execution_options.priority,
start_delay: execution_options.start_delay
)
else
Expand All @@ -86,6 +88,7 @@ def start_workflow(workflow, *input, options: {}, **args)
headers: config.header_propagator_chain.inject(execution_options.headers),
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
priority: execution_options.priority,
signal_name: signal_name,
signal_input: signal_input,
start_delay: execution_options.start_delay
Expand All @@ -109,6 +112,7 @@ def start_workflow(workflow, *input, options: {}, **args)
# @option options [String] :namespace
# @option options [String] :task_queue
# @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
# @option options [Hash] :priority check Temporal::Priority for available options
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
# @option options [Hash] :search_attributes
Expand Down Expand Up @@ -137,6 +141,7 @@ def schedule_workflow(workflow, cron_schedule, *input, options: {}, **args)
cron_schedule: cron_schedule,
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
priority: execution_options.priority,
)

response.run_id
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/concerns/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ def headers(*args)
return @headers if args.empty?
@headers = args.first
end

def priority(*args)
return @priority if args.empty?
@priority = args.first
end
end
end
end
9 changes: 7 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require 'temporal/connection/serializer/failure'
require 'temporal/connection/serializer/backfill'
require 'temporal/connection/serializer/schedule'
require 'temporal/connection/serializer/priority'
require 'temporal/connection/serializer/workflow_id_reuse_policy'

module Temporal
Expand Down Expand Up @@ -117,6 +118,7 @@ def start_workflow_execution(
task_timeout:,
input: nil,
workflow_id_reuse_policy: nil,
priority: nil,
headers: nil,
cron_schedule: nil,
memo: nil,
Expand Down Expand Up @@ -149,7 +151,8 @@ def start_workflow_execution(
),
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
indexed_fields: converter.to_payload_map_without_codec(search_attributes || {})
)
),
priority: priority ? Temporal::Connection::Serializer::Priority.new(priority, converter).to_proto : nil
)

client.start_workflow_execution(request)
Expand Down Expand Up @@ -378,6 +381,7 @@ def signal_with_start_workflow_execution(
task_queue:,
execution_timeout:, run_timeout:, task_timeout:, signal_name:, signal_input:, input: nil,
workflow_id_reuse_policy: nil,
priority: nil,
headers: nil,
cron_schedule: nil,
memo: nil,
Expand Down Expand Up @@ -422,7 +426,8 @@ def signal_with_start_workflow_execution(
),
search_attributes: Temporalio::Api::Common::V1::SearchAttributes.new(
indexed_fields: converter.to_payload_map_without_codec(search_attributes || {})
)
),
priority: priority ? Temporal::Connection::Serializer::Priority.new(priority, converter).to_proto : nil
)

client.signal_with_start_workflow_execution(request)
Expand Down
20 changes: 20 additions & 0 deletions lib/temporal/connection/serializer/priority.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
require 'temporal/connection/serializer/base'

module Temporal
module Connection
module Serializer
class Priority < Base
def to_proto
return unless object

# Create a Priority proto object
Temporalio::Api::Common::V1::Priority.new(
priority_key: object.priority_key || 0,
fairness_key: object.fairness_key || '',
fairness_weight: object.fairness_weight || 0.0
)
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/temporal/connection/serializer/start_child_workflow.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'temporal/connection/serializer/base'
require 'temporal/connection/serializer/retry_policy'
require 'temporal/connection/serializer/priority'
require 'temporal/connection/serializer/workflow_id_reuse_policy'

module Temporal
Expand All @@ -26,6 +27,7 @@ def to_proto
workflow_run_timeout: object.timeouts[:run],
workflow_task_timeout: object.timeouts[:task],
retry_policy: Temporal::Connection::Serializer::RetryPolicy.new(object.retry_policy, converter).to_proto,
priority: Temporal::Connection::Serializer::Priority.new(object.priority, converter).to_proto,
parent_close_policy: serialize_parent_close_policy(object.parent_close_policy),
header: serialize_headers(object.headers),
cron_schedule: object.cron_schedule,
Expand Down
12 changes: 11 additions & 1 deletion lib/temporal/execution_options.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
require 'temporal/concerns/executable'
require 'temporal/retry_policy'
require 'temporal/priority'

module Temporal
class ExecutionOptions
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes,
attr_reader :name, :namespace, :task_queue, :retry_policy, :priority, :timeouts, :headers, :memo, :search_attributes,
:start_delay

def initialize(object, options, defaults = nil)
Expand All @@ -12,6 +13,7 @@ def initialize(object, options, defaults = nil)
@namespace = options[:namespace]
@task_queue = options[:task_queue] || options[:task_list]
@retry_policy = options[:retry_policy] || {}
@priority = options[:priority] || {}
@timeouts = options[:timeouts] || {}
@headers = options[:headers] || {}
@memo = options[:memo] || {}
Expand All @@ -23,6 +25,7 @@ def initialize(object, options, defaults = nil)
@namespace ||= object.namespace
@task_queue ||= object.task_queue
@retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy
@priority = object.priority.merge(@priority) if object.priority
@timeouts = object.timeouts.merge(@timeouts) if object.timeouts
@headers = object.headers.merge(@headers) if object.headers
end
Expand All @@ -43,6 +46,13 @@ def initialize(object, options, defaults = nil)
@retry_policy.validate!
end

if @priority.empty?
@priority = nil
else
@priority = Temporal::Priority.new(@priority)
@priority.validate!
end

freeze
end

Expand Down
35 changes: 35 additions & 0 deletions lib/temporal/priority.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
require 'temporal/errors'

module Temporal
# Priority configuration for workflow execution
# Similar to retry_policy, priority is represented as a JSON object with predefined keys
class Priority < Struct.new(:priority_key, :fairness_key, :fairness_weight, keyword_init: true)

class InvalidPriority < ClientError; end

def validate!
# PriorityKey should be a number if provided
if priority_key && !priority_key.is_a?(Numeric)
raise InvalidPriority, 'PriorityKey must be a number'
end

# FairnessKey should be a string if provided
if fairness_key && !fairness_key.is_a?(String)
raise InvalidPriority, 'FairnessKey must be a string'
end

# FairnessWeight should be a number if provided
if fairness_weight && !fairness_weight.is_a?(Numeric)
raise InvalidPriority, 'FairnessWeight must be a number'
end
end

def to_hash
hash = {}
hash['PriorityKey'] = priority_key if priority_key
hash['FairnessKey'] = fairness_key if fairness_key
hash['FairnessWeight'] = fairness_weight if fairness_weight
hash
end
end
end
2 changes: 1 addition & 1 deletion lib/temporal/workflow/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ class Workflow
module Command
# TODO: Move these classes into their own directories under workflow/command/*
ScheduleActivity = Struct.new(:activity_type, :activity_id, :input, :task_queue, :retry_policy, :timeouts, :headers, keyword_init: true)
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :parent_close_policy, :timeouts, :headers, :cron_schedule, :memo, :workflow_id_reuse_policy, :search_attributes, keyword_init: true)
StartChildWorkflow = Struct.new(:workflow_type, :workflow_id, :input, :namespace, :task_queue, :retry_policy, :priority, :parent_close_policy, :timeouts, :headers, :cron_schedule, :memo, :workflow_id_reuse_policy, :search_attributes, keyword_init: true)
ContinueAsNew = Struct.new(:workflow_type, :task_queue, :input, :timeouts, :retry_policy, :headers, :memo, :search_attributes, keyword_init: true)
RequestActivityCancellation = Struct.new(:activity_id, keyword_init: true)
RecordMarker = Struct.new(:name, :details, keyword_init: true)
Expand Down
1 change: 1 addition & 0 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def execute_workflow(workflow_class, *input, **args)
namespace: execution_options.namespace,
task_queue: execution_options.task_queue,
retry_policy: execution_options.retry_policy,
priority: execution_options.priority,
parent_close_policy: parent_close_policy,
timeouts: execution_options.timeouts,
headers: config.header_propagator_chain.inject(execution_options.headers),
Expand Down
16 changes: 16 additions & 0 deletions spec/shared_examples/an_executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,20 @@
expect(described_class.instance_variable_get(:@timeouts)).to eq(:test)
end
end

describe '.priority' do
after { described_class.remove_instance_variable(:@priority) }

it 'gets current priority' do
described_class.instance_variable_set(:@priority, :test)

expect(described_class.priority).to eq(:test)
end

it 'sets new priority' do
described_class.priority(:test)

expect(described_class.instance_variable_get(:@priority)).to eq(:test)
end
end
end
32 changes: 32 additions & 0 deletions spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def inject!(header)
headers: { 'test' => 'asdf' },
memo: {},
search_attributes: {},
priority: nil,
start_delay: 0
)
end
Expand Down Expand Up @@ -95,6 +96,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
priority: nil,
start_delay: 0
)
end
Expand Down Expand Up @@ -234,6 +236,35 @@ def inject!(header)
search_attributes: {},
start_delay: 0
)
end

it 'starts a workflow with priority options' do
subject.start_workflow(
TestStartWorkflow,
42,
options: {
priority: { priority_key: 10, fairness_key: 'production', fairness_weight: 0.8 }
}
)

expect(connection)
.to have_received(:start_workflow_execution)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: [42],
task_timeout: config.timeouts[:task],
run_timeout: config.timeouts[:run],
execution_timeout: config.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: {},
memo: {},
search_attributes: {},
priority: an_instance_of(Temporal::Priority),
start_delay: 0
)
end
end
end
Expand Down Expand Up @@ -263,6 +294,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
search_attributes: {},
signal_name: 'the question',
signal_input: expected_signal_argument,
priority: nil,
start_delay: 0
)
end
Expand Down
Loading