Skip to content

Commit 87e8ac3

Browse files
wip
1 parent b78e04a commit 87e8ac3

File tree

7 files changed

+37
-59
lines changed

7 files changed

+37
-59
lines changed

lib/mongo/collection/view/aggregation.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ class View
2525
#
2626
# @since 2.0.0
2727
class Aggregation
28+
extend Forwardable
2829
include Behavior
2930

3031
# @return [ Array<Hash> ] pipeline The aggregation pipeline.
3132
attr_reader :pipeline
3233

34+
def_delegators :view, :tracer
35+
3336
# Initialize the aggregation for the provided collection view, pipeline
3437
# and options.
3538
#
@@ -80,7 +83,7 @@ def new(options)
8083
Aggregation.new(view, pipeline, options)
8184
end
8285

83-
def initial_query_op(session, read_preference)
86+
def initial_query_op(session, read_preference = nil)
8487
Operation::Aggregate.new(aggregate_spec(session, read_preference))
8588
end
8689

lib/mongo/collection/view/aggregation/behavior.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def server_selector
8888
@view.send(:server_selector)
8989
end
9090

91-
def aggregate_spec(session, read_preference)
91+
def aggregate_spec(session, read_preference = nil)
9292
Builder::Aggregation.new(
9393
pipeline,
9494
view,

lib/mongo/collection/view/iterable.rb

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,15 @@ def select_cursor(session)
8989
view: self
9090
)
9191
op = initial_query_op(session)
92-
tracer.trace_operation('get_more', op, context) do
92+
op_name = case op
93+
when Mongo::Operation::Find
94+
'find'
95+
when Mongo::Operation::Aggregate
96+
'aggregate'
97+
else
98+
op.class.name.split('::').last.downcase
99+
end
100+
tracer.trace_operation(op_name, op, context) do
93101
if respond_to?(:write?, true) && write?
94102
server = server_selector.select_server(cluster, nil, session, write_aggregation: true)
95103
result = send_initial_query(server, context)

lib/mongo/cursor.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,12 @@ def unregister
514514
end
515515

516516
def execute_operation(op, context: nil)
517+
op_name = case op
518+
when Mongo::Operation::GetMore
519+
'get_more'
520+
when Mongo::Operation::Close
521+
'close'
522+
end
517523
op_context = context || possibly_refreshed_context
518524
tracer.trace_operation('find', op, op_context) do
519525
if @connection.nil?

lib/mongo/operation/shared/specifiable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ def documents
233233
#
234234
# @since 2.0.0
235235
def coll_name
236-
spec.fetch(COLL_NAME)
236+
spec[COLL_NAME]
237237
end
238238

239239
# The id of the cursor created on the server.

lib/mongo/tracing/open_telemetry/operation_tracer.rb

Lines changed: 15 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,29 @@ def initialize(otel_tracer, parent_tracer)
2828
end
2929

3030
def trace_operation(name, operation, operation_context)
31-
parent_context = parent_context_for(operation_context, operation.cursor_id)
32-
operation_context.tracer = @parent_tracer
33-
span = @otel_tracer.start_span(
31+
@otel_tracer.in_span(
3432
operation_span_name(name, operation),
3533
attributes: span_attributes(name, operation),
36-
with_parent: parent_context,
3734
kind: :client
38-
)
39-
::OpenTelemetry::Trace.with_span(span) do |_s, c|
35+
) do |span, _context|
36+
operation_context.tracer = @parent_tracer
4037
yield.tap do |result|
41-
process_cursor_context(result, operation.cursor_id, c)
38+
if result.is_a?(Cursor) && result.id.positive?
39+
span.set_attribute('db.mongodb.cursor_id', result.id)
40+
end
4241
end
4342
end
44-
rescue Exception => e
45-
span&.record_exception(e)
46-
span&.status = ::OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
47-
raise e
4843
ensure
49-
span&.finish
5044
operation_context.tracer = nil
5145
end
5246

5347
private
5448

49+
# Returns a hash of attributes for the OpenTelemetry span for the operation.
50+
#
51+
# @param name [String] The name of the operation.
52+
# @param operation [Operation] The operation being traced.
53+
# @return [Hash] A hash of attributes for the span.
5554
def span_attributes(name, operation)
5655
{
5756
'db.system' => 'mongodb',
@@ -63,49 +62,11 @@ def span_attributes(name, operation)
6362
}.compact
6463
end
6564

66-
def parent_context_for(operation_context, cursor_id)
67-
if (key = transaction_map_key(operation_context.session))
68-
transaction_context_map[key]
69-
elsif cursor_id
70-
cursor_context_map[cursor_id]
71-
end
72-
end
73-
74-
# This map is used to store OpenTelemetry context for cursor_id.
75-
# This allows to group all operations related to a cursor under the same context.
76-
#
77-
# # @return [Hash] a map of cursor_id to OpenTelemetry context.
78-
def cursor_context_map
79-
@cursor_context_map ||= {}
80-
end
81-
82-
def process_cursor_context(result, cursor_id, context)
83-
return unless result.is_a?(Cursor)
84-
85-
if result.id.zero?
86-
# If the cursor is closed, remove it from the context map.
87-
cursor_context_map.delete(cursor_id)
88-
elsif result.id && cursor_id.nil?
89-
# New cursor created, store its context.
90-
cursor_context_map[result.id] = context
91-
end
92-
end
93-
94-
# This map is used to store OpenTelemetry context for transaction.
95-
# This allows to group all operations related to a transaction under the same context.
65+
# Returns the name of the span for the operation.
9666
#
97-
# @return [Hash] a map of transaction_id to OpenTelemetry context.
98-
def transaction_context_map
99-
@transaction_context_map ||= {}
100-
end
101-
102-
# @param session [Mongo::Session] the session for which to get the transaction map key.
103-
def transaction_map_key(session)
104-
return if session.nil? || session.implicit? || !session.in_transaction?
105-
106-
"#{session.id}-#{session.txn_num}"
107-
end
108-
67+
# @param name [String] The name of the operation.
68+
# @param operation [Operation] The operation being traced.
69+
# # @return [String] The name of the span.
10970
def operation_span_name(name, operation)
11071
if operation.coll_name
11172
"#{name} #{operation.db_name}.#{operation.coll_name}"

spec/runners/unified/test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def generate_entities(es)
200200
if observe_spans
201201
opts[:tracing] = {
202202
enabled: true,
203-
tracer: tracer,
203+
# tracer: tracer,
204204
}
205205
end
206206

0 commit comments

Comments
 (0)