-
Notifications
You must be signed in to change notification settings - Fork 154
Add Opentelemetry Support #304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This PR extracts opentelemetry utility functions from my private project and adds them to this project without calling them. It resolves rabbitmq#43 I'd like a broader discussion about whether these should be automatically called by the library where possible, or if they should simply be provided to clients to use if they so wish. I did my best to follow OpenTelemetry semantic conventions as described here https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/, but they are at times ambiguous for rabbitmq-- e.g. is the destination for a message the Queue or the Consumer Tag the message was delivered to. Given the channel based approaches of this library, it is impossible for the library to know the full execution of a consumer. Unless autoack=false, we cannot actually know when to end the span associated with a delivery, so at least in the consumer case, it's probably best to allow the client to manage spans for themselves. We *can* manage spans on the producer side, and at the very least extract span identifiers to include on published headers automatically, and provide utilities for pulling them back out again. My intention with putting this PR up is to move the conversation forward. Because the PR *only* provides private methods (if I left members public please call them out), it can be safely merged while these questions are worked out.
…an/amqp091-go into feat/opentelemetry
…an/amqp091-go into feat/opentelemetry
delivery.go
Outdated
| } | ||
| } | ||
|
|
||
| func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A review comment on the old PR suggested swapping this for {Ack,Nack,Reject}Ctx methods instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you.. Have updated the code
| ) | ||
|
|
||
| // tracer is the tracer used by the package | ||
| var tracer = otel.Tracer("amqp091") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's actually an issue with instantiating this like this-- namely this tracer instances will be created before configuration has been applied to it, which will make it no-op I think.
Better to make this a function that retrieves a trace with the right name.
| var tracer = otel.Tracer("amqp091") | |
| func tracer() trace.Tracer { | |
| return otel.Tracer("amqp091") | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working my way back through the otel code this will work as one would hope. Basically it gets stored until a tracer delegate is loaded when otel is setup, at which point it updates the tracer with the delegate.
A quick check of GitHub suggests that this is a common pattern to define the tracer once as a global.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, i'll let this be as-is then
Co-authored-by: AndrewWinterman <[email protected]>
|
Thank you for this contribution! I've been busy with other responsibilities. I'm hoping to get to this PR today or tomorrow 👨💻 Thank you too to @AndrewWinterman and @adcharre for reviewing this PR 👀 |
| func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { | ||
| return ch.Publish(exchange, key, mandatory, immediate, msg) | ||
| func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error { | ||
| _, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg) | ||
| return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a note to myself to revisit this change. I think it would be preferable to duplicate some code, instead of executing all the code for the deferred confirm and ignore the return value.
| module github.com/rabbitmq/amqp091-go | ||
|
|
||
| go 1.20 | ||
| go 1.22.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What features are we using of Go 1.22 for this bump?
We are quite conservative with go directive bumps, specially since recent versions of Go have set this as a hard requirement for minimum version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to have come-in as part of the otel-go package https://github.com/open-telemetry/opentelemetry-go/releases/tag/v1.34.0, which requires a minimum go version of 1.22.0.
We will revisit the approach for this PR, aiming to adopt a subpackage-middleware strategy without making major changes to the main package. (discussions around this in the other PR #272 (comment))
Does this make sense @Zerpet ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Zerpet / @AndrewWinterman - Just to flesh this out a bit more.
The OpenTelemetry packages have a requirement for the latest version of Go, for example the most recent release bumps the requirement to 1.23.
There are 2 possible solutions to including OpenTelemetry into this package:
- Simply accept the Go version requirement from the Otel package and add the Otel packages to this packages dependencies.
- Introduce a
Middlewareconcept to break the dependancy with Otel and maintain the existing Go version requirement. OpenTelemetry instrumentation would then be implemented as a new middleware module, within this repo but with it's own dependencies.
Option 1 is what's implemented currently in this PR.
Option 2, would add a new interface and methods to set the middleware of a Connection and an otel/ directory with it's own go.md.
I envision that the Middleware would have pre and post functions for all the methods that we had instrumented so far.
type Middleware interface {
PrePublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing)
PostPublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing, err Error) // err is result of the call to PublishWithContext.
... // Other methods Pre/Post
}and then in the PublishWithContext function
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
if ch.middleware != nil {
ch.middleware.PrePublishWithContext(ctx, exchange, key, mandatory, immediate, msg)
}
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
if ch.middleware != nil {
ch.middleware.PostPublishWithContext(ctx, exchange, key, mandatory, immediate, msg, err)
}
return err
}If we think Option 2 is the way to go, We can add a new PR with the proposed Middleware implementation and agree on that before working on the OpenTelemetry middleware.
Comments?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still drinking my coffee, so apologies in advance if I miss something obvious 😴
I'm not sure how option 2 solves the problem. I understand that we can move OTEL bits to its own module e.g. otel/go.mod, however, this module will still require OTEL version X, which forces a go directive of 1.22 or 1.23 to the middleware module. From the code snippet, I understand that the client module amqp091-go will import the middleware module in otel/go.mod, which will in turn force the client module to bump its Go directive. This is somewhat "recent" since the Go toolchain has become very picky about importing modules of higher go directive. In short, I don't think this middleware approach would solve the problem.
Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I wasn't clear enough....
The middleware would be an interface and there would be no explicit dependency between the original ampq091-go module and the new otel module. The new Otel module would have a new class that implemented the methods defined in the Middleware interface.
Any code using the ampq091-go module which didn't want to use otel would not need changing or see an change to the go version required.
Code that wanted otel instrumentation would import the otel module, instantiate the Otel middleware and apply it to the client/channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea to make option 1 acceptable would be to stay behind on OTEL releases. I personally think that the biggest version bump is to 1.21, because it introduces the toolchain directive, which is not interpreted by earlier versions.
I'm ok to bump to Go 1.22 as part of this PR, and bump to Go 1.23 in a few months time, when there's a valid justification (like supporting OTEL). I'm just on the lookout for Go version bumps for the sake of bumping it 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually like the middleware approach, but I thought we could likely drop down to the protocol level. Likely still need some way to get the span off of a delivery or a return, but that can live in a different package
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I wasn't clear enough....
The middleware would be an interface and there would be no explicit dependency between the original ampq091-go module and the new otel module. The new Otel module would have a new class that implemented the methods defined in the Middleware interface.
Any code using the ampq091-go module which didn't want to use otel would not need changing or see an change to the go version required.
Code that wanted otel instrumentation would import the otel module, instantiate the Otel middleware and apply it to the client/channel.
Thank for the explanation, I think I get the idea now. The "middleware" interface (I think the name should be more specific) will be part of amqp091-go module, and the module with OTEL will implement this interface. Then any user who wants to use automatic instrumentation will initialise the middleware and inject/set it in the Connection or anywhere TBD that makes sense.
If my understanding is correct, I like this idea 👍 and I agree is the right thing to do. FWIW, other data services like Redis also separate the OTEL module from the main library.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really have bandwidth to make a middleware module, but I really think it's a good way to go for this sort of thing. Also would let libraries consistently set e.g. app id, user id, and timestamp headers
| require ( | ||
| github.com/go-logr/logr v1.4.2 // indirect | ||
| github.com/go-logr/stdr v1.2.2 // indirect | ||
| go.opentelemetry.io/auto/sdk v1.1.0 // indirect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what's causing this indirect import 🤔 I was under the impression that libraries should only import APIs and not the SDK, according to OpenTelemetry docs.
| func (c amqpHeaderCarrier) Get(key string) string { | ||
| v, ok := c[key] | ||
| if !ok { | ||
| return "" | ||
| } | ||
| s, ok := v.(string) | ||
| if ok { | ||
| return s | ||
| } | ||
| return "" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced by this implementation. This implementation is basically ignoring any value type other than strings. I'm not sure this is the right thing to do, because we are basically silently dropping information that could be relevant. For example, re-delivery information, which IIRC it's an integer. I'm ok with a text-map implementation, but it should set conversion rules between non-string types to string types, and document them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH this was me being a bit lazy in my original implementation, justified by my understanding that this was basically only used for the tracestate, which basically a big string: https://opentelemetry.io/docs/specs/otel/trace/tracestate-handling/
| // semconv.NetPeerIP("localhost") | ||
| // semconv.ServerAddress("localhost") | ||
| ), | ||
| trace.WithNewRoot(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this 🤔 Couldn't we be inside a nested span here?
For example, and application that has its own app tracing, and calls our library to send a message. I would expect to see our "library spans" as nested spans to the application traces. What's the reasoning to always treat this span as root?
| // spanForDelivery creates a span for the delivered messages | ||
| // returns a new context with the span headers and the span. | ||
| func spanForDelivery(ctx context.Context, delivery *Delivery, options ...trace.SpanStartOption) (context.Context, trace.Span) { | ||
| spanName := fmt.Sprintf("consume %s %s", delivery.Exchange, delivery.RoutingKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been thinking about this for a bit. It's quite unfortunate that the delivery frame does not contain the queue name. I find awkward to set use the exchange name as part of the "consume" span. After reading the messaging guidelines from OTEL regarding subscriptions:
Subscriptions represent entities within messaging systems that allow multiple consumers to receive messages from the topic following subscription-specific consumption behavior that includes load balancing, durability, filtering, or other system-specific capabilities.
Named subscriptions and consumers groups are semantically different mechanisms messaging systems use for similar scenarios such as load-balancing or broadcasting.
I think the correct behaviour would be to use the "named subscription" name, which would be either the queue name, or the routing key, or perhaps both! Given that the queue name is not available, I think it's sensible to use the routing key.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pretty much agree-- having the queue name would be best but we don't have it, so I'm doing the best with what we do have.
| exchange, routinKey string, | ||
| immediate bool, | ||
| ) (context.Context, Publishing, func(err error)) { | ||
| spanName := fmt.Sprintf("%s publish", routinKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The operation name i.e. publish should be the first word of the span name, according to OTEL guidelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, assuming this was propagated from my initial attempt, this is my dyslexia.
| spanName := fmt.Sprintf("%s publish", routinKey) | |
| spanName := fmt.Sprintf("publish %s", routinKey) |
| semconv.MessagingMessageID(publishing.MessageId), | ||
| semconv.MessagingMessageConversationID(publishing.CorrelationId), | ||
| semconv.MessagingSystemRabbitmq, | ||
| semconv.MessagingClientIDKey.String(publishing.AppId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as earlier regarding User ID vs App ID.
| return ctx, publishing, func(err error) { | ||
| if err != nil { | ||
| span.RecordError(err) | ||
| amqpErr := &Error{} | ||
| if errors.As(err, &amqpErr) { | ||
| span.SetAttributes( | ||
| semconv.ErrorTypeKey.String(amqpErr.Reason), | ||
| ) | ||
| } | ||
| span.SetStatus(codes.Error, err.Error()) | ||
| } | ||
| span.End() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the idea now, and I think this is going to be tricky. Have you tested what happens when you publish an un-routable message with mandatory flag set?
I foresee that this Span will be marked as successful, and eventually the server (rabbit) will send a return frame, stating that the message is undeliverable. This will only happen with publisher confirmations enabled, of course. In this situation, it's up for debate whether the "publish" has succeeded. From the application point of view, one could argue that publish did not succeed, because my message was not accepted by the server. Another could argue that the basic.publish frame was sent successfully to the wire, therefore the publish succeeded.
I think it's important to agree on the definition of "success" for publish, and write it down in the public function code doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's just mirroring the protocol-- it reports the publish as succeeded even though it will eventually asynchronously report a failure. Without doing some sort of higher level wrapping I think this is the best we can do.
Co-authored-by: Aitor Pérez Cedres <[email protected]>
|
Since the 28th of July 2025, all external contributors to RabbitMQ will have to sign a CLA again. Unfortunately this time around, we were asked to use a good old Word document because reasons [1][2]. Note companies that contribute to RabbitMQ only need to sign the CLA once, there is no need to make every I apologize for how "simplistic" this CLA method is but our team does not have any control over this matter. https://github.com/rabbitmq/cla |
About
This is based on @AndrewWinterman's PR #272.
These changes have been tested and are currently running in our Production environment.
The changes from this PR provided out-of-the-box instrumentation for our publish operations. It also enabled seamless cross-language context propagation between our Go service and a NodeJS service.
On the consumer side, since AMQP uses a channel-based approach, we had to manually instrument the message processing logic. By adding spans around the relevant operations, we were able to achieve the desired tracing behavior.
Changes
messaging.rabbitmq.routing_key. This attribute is used by NodeJS instrumentation (https://www.npmjs.com/package/@opentelemetry/instrumentation-amqplib). (They are following an older semantic convention)Example for Consumption
Below is sample code showing how we instrumented message consumption from a RabbitMQ queue: