Skip to content

Commit 03a5fdb

Browse files
committed
feat(sidekiq): scheduled jobs improvements - scheduled operation and add links
When a Job is pushed via `perform_in` or friends, it goes through the following life-cycle: 1. First the Job is pushed with `at` attribute in Redis from `Sidekiq::Client` - this will now create `<...> scheduled` span (before it was `<...> publish` 2. Then the Job is handled from `Sidekiq::Scheduled#enqueue` and pushed without `at` attribute in Redis from `Sidekiq::Client` - this will keep creating `<...> publish` span 3. Then the Job is handled by the Worker as usual Additionally when using `:propagation_style = :link` and `:trace_poller_enqueue = true` a handy set of links are added between the 3 actors above.
1 parent b1fe968 commit 03a5fdb

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,25 @@ def call(_worker_class, job, _queue, _redis_pool)
2424
}
2525
attributes[SemanticConventions::Trace::PEER_SERVICE] = instrumentation_config[:peer_service] if instrumentation_config[:peer_service]
2626

27+
scheduled_at = job['at']
28+
op = scheduled_at.nil? ? "publish" : "scheduled"
29+
2730
span_name = case instrumentation_config[:span_naming]
28-
when :job_class then "#{job['wrapped']&.to_s || job['class']} publish"
29-
else "#{job['queue']} publish"
31+
when :job_class then "#{job['wrapped']&.to_s || job['class']} #{op}"
32+
else "#{job['queue']} #{op}"
3033
end
3134

32-
tracer.in_span(span_name, attributes: attributes, kind: :producer) do |span|
35+
# In case this is Scheduled job, there is already context injected, so link to that context
36+
# NOTE: :propagation_style = :child is not supported as it is quite tricky when :trace_poller_enqueue = true
37+
extracted_context = OpenTelemetry.propagation.extract(job, context: OpenTelemetry::Trace::SpanContext::INVALID)
38+
links = []
39+
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
40+
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
41+
42+
tracer.in_span(span_name, attributes: attributes, links: links, kind: :producer) do |span|
3343
OpenTelemetry.propagation.inject(job)
3444
span.add_event('created_at', timestamp: time_from_timestamp(job['created_at']))
45+
span.add_event('scheduled_at', timestamp: time_from_timestamp(scheduled_at)) unless scheduled_at.nil?
3546
yield
3647
end
3748
end

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@ def call(_worker, msg, _queue)
3333
extracted_context = OpenTelemetry.propagation.extract(msg)
3434
created_at = time_from_timestamp(msg['created_at'])
3535
enqueued_at = time_from_timestamp(msg['created_at'])
36+
scheduled_at = time_from_timestamp(msg['at']) unless msg['at'].nil?
3637
OpenTelemetry::Context.with_current(extracted_context) do
3738
if instrumentation_config[:propagation_style] == :child
3839
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
3940
span.add_event('created_at', timestamp: created_at)
4041
span.add_event('enqueued_at', timestamp: enqueued_at)
42+
span.add_event('scheduled_at', timestamp: scheduled_at) unless scheduled_at.nil?
4143
yield
4244
end
4345
else
@@ -48,6 +50,7 @@ def call(_worker, msg, _queue)
4850
OpenTelemetry::Trace.with_span(span) do
4951
span.add_event('created_at', timestamp: created_at)
5052
span.add_event('enqueued_at', timestamp: enqueued_at)
53+
span.add_event('scheduled_at', timestamp: scheduled_at) unless scheduled_at.nil?
5154
yield
5255
rescue Exception => e # rubocop:disable Lint/RescueException
5356
span.record_exception(e)

0 commit comments

Comments
 (0)