Skip to content

Commit

Permalink
Add support for messageId, correlationId, and type in RabbitMQ bindings
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Bossard <[email protected]>
  • Loading branch information
Andre Bossard committed Feb 5, 2025
1 parent 1132db5 commit d0c75f6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
15 changes: 15 additions & 0 deletions bindings/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,21 @@ func (r *RabbitMQ) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
pub.Priority = priority
}

messageId, ok := metadata.TryGetMessageId(req.Metadata)
if ok {
pub.MessageId = messageId
}

correlationId, ok := metadata.TryGetCorrelationId(req.Metadata)
if ok {
pub.CorrelationId = correlationId
}

aType, ok := metadata.TryGetType(req.Metadata)
if ok {
pub.Type = aType
}

err = ch.PublishWithContext(ctx, "", r.metadata.QueueName, false, false, pub)
if err != nil {
return nil, fmt.Errorf("failed to publish message: %w", err)
Expand Down
23 changes: 23 additions & 0 deletions metadata/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ func IsRawPayload(props map[string]string) (bool, error) {

return false, nil
}
func TryGetMessageId(props map[string]string) (string, bool) {
if val, ok := props["messageId"]; ok && val != "" {
return val, true
}

return "", false
}

func TryGetCorrelationId(props map[string]string) (string, bool) {
if val, ok := props["correlationId"]; ok && val != "" {
return val, true
}

return "", false
}

func TryGetContentType(props map[string]string) (string, bool) {
if val, ok := props[ContentType]; ok && val != "" {
Expand All @@ -122,6 +137,14 @@ func TryGetContentType(props map[string]string) (string, bool) {
return "", false
}

func TryGetType(props map[string]string) (string, bool) {
if val, ok := props["type"]; ok && val != "" {
return val, true
}

return "", false
}

func TryGetQueryIndexName(props map[string]string) (string, bool) {
if val, ok := props[QueryIndexName]; ok && val != "" {
return val, true
Expand Down
20 changes: 20 additions & 0 deletions pubsub/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,26 @@ func (r *rabbitMQ) publishSync(ctx context.Context, req *pubsub.PublishRequest)
p.Priority = priority
}

contentType, ok := metadata.TryGetContentType(req.Metadata)
if ok {
p.ContentType = contentType
}

messageId, ok := metadata.TryGetMessageId(req.Metadata)
if ok {
p.MessageId = messageId
}

correlationId, ok := metadata.TryGetCorrelationId(req.Metadata)
if ok {
p.CorrelationId = correlationId
}

aType, ok := metadata.TryGetType(req.Metadata)
if ok {
p.Type = aType
}

confirm, err := r.channel.PublishWithDeferredConfirmWithContext(ctx, req.Topic, routingKey, false, false, p)
if err != nil {
r.logger.Errorf("%s publishing to %s failed in channel.Publish: %v", logMessagePrefix, req.Topic, err)
Expand Down

0 comments on commit d0c75f6

Please sign in to comment.