Skip to content

Add ErrMaxReconnectAttemptsReached #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# RabbitMQ AMQP 1.0 Golang Client

This library is meant to be used with RabbitMQ 4.0.
Suitable for testing in pre-production environments.


## Getting Started

- [Getting Started](docs/examples/getting_started)
- [Examples](docs/examples)
Inside the `docs/examples` directory you will find several examples to get you started.</br>
Also advanced examples like how to use streams, how to handle reconnections, and how to use TLS.
- Getting started Video tutorial: </br>
[![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI)

Expand Down
29 changes: 20 additions & 9 deletions docs/examples/reliable/reliable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@ func main() {
var stateAccepted int32
var stateReleased int32
var stateRejected int32
var isRunning bool

var received int32
var failed int32

startTime := time.Now()
isRunning = true
go func() {
for {
for isRunning {
time.Sleep(5 * time.Second)
total := stateAccepted + stateReleased + stateRejected
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)

}
}()

Expand All @@ -41,6 +42,16 @@ func main() {
switch statusChanged.To.(type) {
case *rmq.StateOpen:
signalBlock.Broadcast()
case *rmq.StateReconnecting:
rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server")
case *rmq.StateClosed:
StateClosed := statusChanged.To.(*rmq.StateClosed)
if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) {
rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError())
signalBlock.Broadcast()
isRunning = false
}

}
}
}(stateChanged)
Expand Down Expand Up @@ -87,13 +98,13 @@ func main() {

// Consume messages from the queue
go func(ctx context.Context) {
for {
for isRunning {
deliveryContext, err := consumer.Receive(ctx)
if errors.Is(err, context.Canceled) {
// The consumer was closed correctly
return
}
if err != nil {
if err != nil && isRunning {
// An error occurred receiving the message
// here the consumer could be disconnected from the server due to a network error
signalBlock.L.Lock()
Expand All @@ -107,7 +118,7 @@ func main() {

atomic.AddInt32(&received, 1)
err = deliveryContext.Accept(context.Background())
if err != nil {
if err != nil && isRunning {
// same here the delivery could not be accepted due to a network error
// we wait for 2_500 ms and try again
time.Sleep(2500 * time.Millisecond)
Expand All @@ -124,12 +135,13 @@ func main() {
return
}

wg := &sync.WaitGroup{}
for i := 0; i < 1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500_000; i++ {
if !isRunning {
rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName)
return
}
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
if err != nil {
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
Expand Down Expand Up @@ -160,7 +172,6 @@ func main() {
}
}()
}
wg.Wait()

println("press any key to close the connection")

Expand Down
4 changes: 2 additions & 2 deletions pkg/rabbitmqamqp/amqp_binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
kv[destination] = b.destinationName
kv["arguments"] = make(map[string]any)
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
return bindingPathWithExchangeQueueKey, err
bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
return bindingPathWithExchangeQueueAndKey, err
}

// Unbind removes a binding between an exchange and a queue or exchange
Expand Down
4 changes: 4 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ func (a *AmqpConnection) maybeReconnect() {
Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id())
}

// If we reach here, all attempts failed
Error("All reconnection attempts failed, closing connection", "ID", a.Id())
a.lifeCycle.SetState(&StateClosed{error: ErrMaxReconnectAttemptsReached})

}

// restartEntities attempts to restart all publishers and consumers after a reconnection
Expand Down
4 changes: 4 additions & 0 deletions pkg/rabbitmqamqp/amqp_connection_recovery.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package rabbitmqamqp

import (
"errors"
"sync"
"time"
)

// ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")

type RecoveryConfiguration struct {
/*
ActiveRecovery Define if the recovery is activated.
Expand Down
3 changes: 3 additions & 0 deletions pkg/rabbitmqamqp/life_cycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (s StateChanged) String() string {

switch s.To.(type) {
case *StateClosed:
if s.To.(*StateClosed).error == nil {
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
}
return fmt.Sprintf("From: %s, To: %s, Error: %s", statusToString(s.From), statusToString(s.To), s.To.(*StateClosed).error)

}
Expand Down
Loading