diff --git a/.ci/ubuntu/gha-setup.sh b/.ci/ubuntu/gha-setup.sh index 1118c97..2b24f8a 100755 --- a/.ci/ubuntu/gha-setup.sh +++ b/.ci/ubuntu/gha-setup.sh @@ -85,7 +85,7 @@ function start_rabbitmq --network "$docker_network_name" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \ - --volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \ + --volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/advanced.config:/etc/rabbitmq/advanced.config:ro" \ --volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \ --volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \ diff --git a/docs/examples/async_confirmation/async_confirmation b/docs/examples/async_confirmation/async_confirmation new file mode 100755 index 0000000..3a4295f Binary files /dev/null and b/docs/examples/async_confirmation/async_confirmation differ diff --git a/docs/examples/async_confirmation/async_confirmation.go b/docs/examples/async_confirmation/async_confirmation.go new file mode 100644 index 0000000..42af465 --- /dev/null +++ b/docs/examples/async_confirmation/async_confirmation.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "github.com/Azure/go-amqp" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "sync/atomic" + "time" +) + +func main() { + + // create 10 million UUIDs + + env := rmq.NewEnvironment("amqp://test:test@192.168.1.124:5672/", &rmq.AmqpConnOptions{ + + MaxFrameSize: 2048, + }) + + // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) + amqpConnection, err := env.NewConnection(context.Background()) + if err != nil { + rmq.Error("Error opening connection", err) + return + } + + amqpConnection.Management().DeleteQueue(context.Background(), "test") + amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{ + Name: "test", + }) + + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{Queue: "test"}, nil) + if err != nil { + rmq.Error("Error creating publisher", err) + return + } + // start date + + startDate := time.Now() + var confirmed int32 + // publish messages to the stream + f := func(message *amqp.Message, state rmq.DeliveryState, err error) { + + switch state.(type) { + case *amqp.StateAccepted: + if atomic.AddInt32(&confirmed, 1)%50_000 == 0 { + // confirmations per second + rmq.Info("Confirmations per second", "value", float64(confirmed)/time.Since(startDate).Seconds()) + } + + if atomic.LoadInt32(&confirmed) == 2_500_000 { + // time since the start + rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds()) + } + default: + panic("Message not accepted") + + } + + } + // 1kb to bytes + //bytes := make([]byte, 1_000) + + //for i := 0; i < 500_000; i++ { + // _, err := publisher.Publish(context.Background(), rmq.NewMessage(make([]byte, 1))) + // if err != nil { + // rmq.Error("Error publishing message", err) + // } + // if i%20_000 == 0 { + // // message per second + // rmq.Info("Sync Messages per second", "value", float64(i)/time.Since(startDate).Seconds()) + // } + // + // if i == 500_000-1 { + // // time since the start + // rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds()) + // } + //} + + for i := 0; i < 2_500_000; i++ { + err := publisher.PublishAsyncConfirmation(context.Background(), rmq.NewMessage(make([]byte, 1_000)), f) + if err != nil { + rmq.Error("Error publishing message", err) + } + if i%50_000 == 0 { + // message per second + rmq.Info("Messages per second", "value", float64(i)/time.Since(startDate).Seconds()) + } + } + + time.Sleep(20 * time.Second) + +} diff --git a/docs/examples/reliable/reliable.go b/docs/examples/reliable/reliable.go index 36f3928..5ba478c 100644 --- a/docs/examples/reliable/reliable.go +++ b/docs/examples/reliable/reliable.go @@ -125,12 +125,12 @@ func main() { } wg := &sync.WaitGroup{} - for i := 0; i < 1; i++ { + for i := 0; i < 100; i++ { wg.Add(1) go func() { defer wg.Done() for i := 0; i < 500_000; i++ { - publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i)))) + publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage(make([]byte, 10))) if err != nil { // here you need to deal with the error. You can store the message in a local in memory/persistent storage // then retry to send the message as soon as the connection is reestablished diff --git a/pkg/rabbitmqamqp/amqp_publisher.go b/pkg/rabbitmqamqp/amqp_publisher.go index 5fd8f7d..4802382 100644 --- a/pkg/rabbitmqamqp/amqp_publisher.go +++ b/pkg/rabbitmqamqp/amqp_publisher.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/Azure/go-amqp" "github.com/google/uuid" + "sync" "sync/atomic" ) @@ -20,6 +21,9 @@ type Publisher struct { linkName string destinationAdd string id string + maxInFlight int + condition *sync.Cond + pending int32 } func (m *Publisher) Id() string { @@ -32,7 +36,10 @@ func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAd id = options.id() } - r := &Publisher{connection: connection, linkName: getLinkName(options), destinationAdd: destinationAdd, id: id} + r := &Publisher{connection: connection, linkName: getLinkName(options), + destinationAdd: destinationAdd, id: id, + maxInFlight: 1_000, + condition: sync.NewCond(&sync.Mutex{})} connection.entitiesTracker.storeOrReplaceProducer(r) err := r.createSender(ctx) if err != nil { @@ -111,6 +118,31 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis }, err } +type CallbackConfirmation func(message *amqp.Message, state DeliveryState, err error) + +func (m *Publisher) PublishAsyncConfirmation(ctx context.Context, message *amqp.Message, callback CallbackConfirmation) error { + if atomic.AddInt32(&m.pending, 1) >= int32(m.maxInFlight) { + m.condition.L.Lock() + m.condition.Wait() + m.condition.L.Unlock() + } + + //go func() { + sendReceipt, err := m.sender.Load().SendWithReceipt(ctx, message, nil) + if err != nil { + return err + } + + go func(sr amqp.SendReceipt) { + state, err := sr.Wait(ctx) + atomic.AddInt32(&m.pending, -1) + m.condition.Signal() + callback(message, state, err) + }(sendReceipt) + //}() + return nil +} + // Close closes the publisher. func (m *Publisher) Close(ctx context.Context) error { m.connection.entitiesTracker.removeProducer(m)