Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions lib/datadog/tracing/contrib/karafka/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,12 @@ def each(&block)
end
end

module AppPatch
ONLY_ONCE_PER_APP = Hash.new { |h, key| h[key] = Core::Utils::OnlyOnce.new }

def initialized!
ONLY_ONCE_PER_APP[self].run do
# Activate tracing on components related to Karafka (e.g. WaterDrop)
Contrib::Karafka::Framework.setup
end
super
end
end

# Patcher enables patching of 'karafka' module.
module Patcher
include Contrib::Patcher

ACTIVATE_FRAMEWORK_ONLY_ONCE = Core::Utils::OnlyOnce.new

module_function

def target_version
Expand All @@ -91,10 +81,20 @@ def target_version
def patch
require_relative 'monitor'
require_relative 'framework'
require_relative '../waterdrop'

::Karafka::Instrumentation::Monitor.prepend(Monitor)
::Karafka::Messages::Messages.prepend(MessagesPatch)
::Karafka::App.singleton_class.prepend(AppPatch)

if Contrib::WaterDrop::Integration.compatible?
Copy link
Contributor Author

@Drowze Drowze Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, the WaterDrop integration requires version >= 2.8.8.rc1 (as that version introduced producer.configured events, which we hook onto to add our middleware to just-initialized waterdrop instances)
Meanwhile, the lowest Karafka version we support (2.3.0) requires waterdrop >= 2.6.12.

Because of that version discrepancy, it's perfectly possible to have Karafka compatible and WaterDrop incompatible at the same time (in fact, our karafka_min gemfiles fall into exactly this)

::Karafka.monitor.subscribe('app.initialized') do |event|
ACTIVATE_FRAMEWORK_ONLY_ONCE.run do
Contrib::Karafka::Framework.setup
end

Contrib::WaterDrop::Patcher.add_middleware(::Karafka.producer)
end
end
end
end
end
Expand Down
9 changes: 6 additions & 3 deletions lib/datadog/tracing/contrib/waterdrop/patcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def patch
::WaterDrop::Producer.prepend(Producer)
::WaterDrop.instrumentation.subscribe('producer.configured') do |event|
producer = event[:producer]

included_middlewares = producer.middleware.instance_variable_get(:@steps)
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
add_middleware(producer)

if Datadog.configuration.data_streams.enabled
producer.monitor.subscribe('message.acknowledged') do |ack_event|
Expand All @@ -39,6 +37,11 @@ def patch
end
end
end

def add_middleware(producer)
included_middlewares = producer.middleware.instance_variable_get(:@steps)
producer.middleware.append(Middleware) unless included_middlewares.include?(Middleware)
end
end
end
end
Expand Down
98 changes: 98 additions & 0 deletions spec/datadog/tracing/contrib/karafka/patcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,102 @@
expect(span.resource).to eq 'ABC#consume'
end
end

describe 'framework auto-instrumentation' do
around do |example|
# Reset before and after each example; don't allow global state to linger.
Datadog.registry[:waterdrop].reset_configuration!
example.run
Datadog.registry[:waterdrop].reset_configuration!

# reset Karafka internal state as well
Karafka::App.config.internal.status.reset!
Karafka::App.config.producer = nil
Karafka.refresh!
end

let(:producer_middlewares) { Karafka.producer.middleware.instance_variable_get(:@steps) }

def waterdrop_compatible?
Datadog::Tracing::Contrib::WaterDrop::Integration.compatible?
end

it 'automatically enables WaterDrop instrumentation' do
skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible?

Karafka::App.setup do |c|
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
end

expect(Datadog.configuration.tracing[:karafka][:enabled]).to be true
expect(Datadog.configuration.tracing[:karafka][:distributed_tracing]).to be true

expect(Datadog.configuration.tracing[:waterdrop][:enabled]).to be true
expect(Datadog.configuration.tracing[:waterdrop][:distributed_tracing]).to be true
end

context 'when user does not supply a custom producer' do
it 'sets up Karafka.producer with the datadog waterdrop middleware' do
skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible?

Karafka::App.setup do |c|
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
end

expect(producer_middlewares).to eq([
Datadog::Tracing::Contrib::WaterDrop::Middleware
])
end
end

context 'when the user does supply a custom producer with custom middlewares' do
let(:custom_middleware) { ->(message) { messsage } }

it 'appends the datadog middleware at the end of the Karafka.producer middleware stack' do
skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible?

Karafka::App.setup do |c|
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
c.producer = WaterDrop::Producer.new do |producer_config|
producer_config.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
producer_config.middleware.append(custom_middleware)
end
end

expect(producer_middlewares).to eq([
custom_middleware,
Datadog::Tracing::Contrib::WaterDrop::Middleware
])
end
end

context 'when the waterdrop integration is manually configured' do
it 'appends the datadog middleware to Karafka.producer only once' do
skip 'WaterDrop is not activated unless it is on a supported version' unless waterdrop_compatible?

Datadog.configure do |c|
c.tracing.instrument :waterdrop, configuration_options
end
Karafka::App.setup do |c|
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
end

expect(producer_middlewares).to eq([
Datadog::Tracing::Contrib::WaterDrop::Middleware
])
end
end

context 'when the waterdrop integration is not on a compatbile version' do
it 'does not attempt to activate waterdrop or append any producer middleware' do
skip 'WaterDrop is not activated unless it is on a supported version' if waterdrop_compatible?

Karafka::App.setup do |c|
c.kafka = {"bootstrap.servers": '127.0.0.1:9092'}
end

expect(producer_middlewares).to be_empty
end
end
end
end