diff --git a/docs/examples/README.md b/docs/examples/README.md index d765d94..9bb3334 100644 --- a/docs/examples/README.md +++ b/docs/examples/README.md @@ -8,4 +8,5 @@ - [Publisher per message target](publisher_msg_targets) - An example of how to use a single publisher to send messages in different queues with the address to the message target in the message properties. - [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI) - [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client. -- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client. \ No newline at end of file +- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client. +- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues. \ No newline at end of file diff --git a/docs/examples/broadcast/broadcast.go b/docs/examples/broadcast/broadcast.go new file mode 100644 index 0000000..eed6779 --- /dev/null +++ b/docs/examples/broadcast/broadcast.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "fmt" + rmq "github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp" + "time" +) + +func main() { + + env := rmq.NewEnvironment("amqp://guest:guest@localhost:5672/", nil) + + // Open a connection to the AMQP 1.0 server ( RabbitMQ >= 4.0) + amqpConnection, err := env.NewConnection(context.Background()) + + if err != nil { + rmq.Error("Error opening connection", err) + return + } + const broadcastExchange = "broadcast" + // Create the management interface for the connection + // so we can declare exchanges, queues, and bindings + management := amqpConnection.Management() + _, err = management.DeclareExchange(context.Background(), &rmq.FanOutExchangeSpecification{ + Name: broadcastExchange, + }) + + if err != nil { + rmq.Error("Error declaring exchange", err) + return + } + + for i := 0; i < 5; i++ { + // create temp queues + q, err := management.DeclareQueue(context.Background(), &rmq.AutoGeneratedQueueSpecification{ + IsAutoDelete: true, + IsExclusive: true, + }) + + if err != nil { + rmq.Error("Error DeclareQueue", err) + return + } + + _, err = management.Bind(context.TODO(), &rmq.ExchangeToQueueBindingSpecification{ + SourceExchange: broadcastExchange, + DestinationQueue: q.Name(), + }) + + if err != nil { + rmq.Error("Error binding", err) + return + } + + go func(idx int) { + consumer, err := amqpConnection.NewConsumer(context.Background(), q.Name(), nil) + if err != nil { + rmq.Error("Error creating consumer", err) + return + } + for { + dcx, err1 := consumer.Receive(context.Background()) + if err1 != nil { + rmq.Error("Error receiving message", err) + return + } + rmq.Info("[Consumer]", "index", idx, "msg", fmt.Sprintf("%s", dcx.Message().Data), "[queue]", q.Name()) + err1 = dcx.Accept(context.Background()) + if err1 != nil { + rmq.Error("Error accepting message", err) + return + } + } + }(i) + } + + publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{ + Exchange: broadcastExchange, + }, nil) + + if err != nil { + rmq.Error("Error creating publisher", err) + return + } + + for i := 0; i < 10_000; i++ { + publishResult, err := publisher.Publish(context.Background(), + rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i)))) + if err != nil { + rmq.Error("Error publishing message", err) + return + } + + switch publishResult.Outcome.(type) { + // publish result + case *rmq.StateAccepted: + rmq.Info("[Publisher] Message accepted", "message", publishResult.Message.GetData()) + default: + rmq.Error("[Publisher] Message not accepted", "outcome", publishResult.Outcome) + } + time.Sleep(1 * time.Second) + } + + // press any key to close the connection + + var input string + _, _ = fmt.Scanln(&input) + +}