Skip to content

aync send #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ function start_rabbitmq
--network "$docker_network_name" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/enabled_plugins:/etc/rabbitmq/enabled_plugins" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/definitions.json:/etc/rabbitmq/definitions.json:ro" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/advanced.config:/etc/rabbitmq/advanced.config:ro" \
--volume "$GITHUB_WORKSPACE/.ci/certs:/etc/rabbitmq/certs:ro" \
--volume "$GITHUB_WORKSPACE/.ci/ubuntu/log:/var/log/rabbitmq" \
Expand Down
Binary file not shown.
93 changes: 93 additions & 0 deletions docs/examples/async_confirmation/async_confirmation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"github.com/Azure/go-amqp"
rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
"sync/atomic"
"time"
)

func main() {

// create 10 million UUIDs

env := rmq.NewEnvironment("amqp://test:[email protected]:5672/", &rmq.AmqpConnOptions{

MaxFrameSize: 2048,
})

// Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0)
amqpConnection, err := env.NewConnection(context.Background())
if err != nil {
rmq.Error("Error opening connection", err)
return
}

amqpConnection.Management().DeleteQueue(context.Background(), "test")
amqpConnection.Management().DeclareQueue(context.Background(), &rmq.StreamQueueSpecification{
Name: "test",
})

publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{Queue: "test"}, nil)
if err != nil {
rmq.Error("Error creating publisher", err)
return
}
// start date

startDate := time.Now()
var confirmed int32
// publish messages to the stream
f := func(message *amqp.Message, state rmq.DeliveryState, err error) {

switch state.(type) {
case *amqp.StateAccepted:
if atomic.AddInt32(&confirmed, 1)%50_000 == 0 {
// confirmations per second
rmq.Info("Confirmations per second", "value", float64(confirmed)/time.Since(startDate).Seconds())
}

if atomic.LoadInt32(&confirmed) == 2_500_000 {
// time since the start
rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds())
}
default:
panic("Message not accepted")

}

}
// 1kb to bytes
//bytes := make([]byte, 1_000)

//for i := 0; i < 500_000; i++ {
// _, err := publisher.Publish(context.Background(), rmq.NewMessage(make([]byte, 1)))
// if err != nil {
// rmq.Error("Error publishing message", err)
// }
// if i%20_000 == 0 {
// // message per second
// rmq.Info("Sync Messages per second", "value", float64(i)/time.Since(startDate).Seconds())
// }
//
// if i == 500_000-1 {
// // time since the start
// rmq.Info("Time to confirm all messages", "value", time.Since(startDate).Seconds())
// }
//}

for i := 0; i < 2_500_000; i++ {
err := publisher.PublishAsyncConfirmation(context.Background(), rmq.NewMessage(make([]byte, 1_000)), f)
if err != nil {
rmq.Error("Error publishing message", err)
}
if i%50_000 == 0 {
// message per second
rmq.Info("Messages per second", "value", float64(i)/time.Since(startDate).Seconds())
}
}

time.Sleep(20 * time.Second)

}
4 changes: 2 additions & 2 deletions docs/examples/reliable/reliable.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ func main() {
}

wg := &sync.WaitGroup{}
for i := 0; i < 1; i++ {
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 500_000; i++ {
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage(make([]byte, 10)))
if err != nil {
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
// then retry to send the message as soon as the connection is reestablished
Expand Down
34 changes: 33 additions & 1 deletion pkg/rabbitmqamqp/amqp_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/Azure/go-amqp"
"github.com/google/uuid"
"sync"
"sync/atomic"
)

Expand All @@ -20,6 +21,9 @@ type Publisher struct {
linkName string
destinationAdd string
id string
maxInFlight int
condition *sync.Cond
pending int32
}

func (m *Publisher) Id() string {
Expand All @@ -32,7 +36,10 @@ func newPublisher(ctx context.Context, connection *AmqpConnection, destinationAd
id = options.id()
}

r := &Publisher{connection: connection, linkName: getLinkName(options), destinationAdd: destinationAdd, id: id}
r := &Publisher{connection: connection, linkName: getLinkName(options),
destinationAdd: destinationAdd, id: id,
maxInFlight: 1_000,
condition: sync.NewCond(&sync.Mutex{})}
connection.entitiesTracker.storeOrReplaceProducer(r)
err := r.createSender(ctx)
if err != nil {
Expand Down Expand Up @@ -111,6 +118,31 @@ func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*Publis
}, err
}

type CallbackConfirmation func(message *amqp.Message, state DeliveryState, err error)

func (m *Publisher) PublishAsyncConfirmation(ctx context.Context, message *amqp.Message, callback CallbackConfirmation) error {
if atomic.AddInt32(&m.pending, 1) >= int32(m.maxInFlight) {
m.condition.L.Lock()
m.condition.Wait()
m.condition.L.Unlock()
}

//go func() {
sendReceipt, err := m.sender.Load().SendWithReceipt(ctx, message, nil)
if err != nil {
return err
}

go func(sr amqp.SendReceipt) {
state, err := sr.Wait(ctx)
atomic.AddInt32(&m.pending, -1)
m.condition.Signal()
callback(message, state, err)
}(sendReceipt)
//}()
return nil
}

// Close closes the publisher.
func (m *Publisher) Close(ctx context.Context) error {
m.connection.entitiesTracker.removeProducer(m)
Expand Down
Loading