The last article mainly explained the use of queue. Both producer and consumer are directly connected to the queue for message production and consumption. This article mainly describes the use of Exchange.
There are several types of Exchange: direct,topic,headers and fanout
Let's talk about the exchange of fanout type. The function is to forward the message to all the queues connected with him. All producer s don't need to care about how many queues or consumers there are. They just need to send messages to exchange. As for which consumers need these messages, they can get the desired messages by directly connecting the queue to exchange.
exchange is defined as follows
err = ch.ExchangeDeclare( "fanout_exchange", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
Define the queue connected to exchange
q, err := ch.QueueDeclare( "", //queue name false, //durable false, true, //exclusive false, nil, )
Here, the name of the queue is not specified. The name of the generated queue is random
exclusive is true. When the connection is closed, the queue will be deleted.
The process of connecting a queue with an exchange is called binding. The usage is as follows
err = ch.QueueBind( q.Name, // queue name "", // routing key "fanout_exchange", // exchange false, nil, )
The example code is as follows
conf.go
package config const ( RMQADDR = "amqp://guest:guest@172.17.84.205:5672/" EXCHANGENAME = "fanout_exchange" CONSUMERCNT = 3 )
producer.go
package main import ( config "binTest/rabbitmqTest/t1/l2/conf" "fmt" "log" "os" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "fanout_exchange", //exchange name "fanout", //exchange kind true, //durable false, //autodelete false, false, nil, ) failOnError(err, "Failed to declare exchange") msgs := os.Args[1:] msgNum := len(msgs) for cnt := 0; cnt < msgNum; cnt++ { msgBody := msgs[cnt] err = ch.Publish( "fanout_exchange", //exchange "", //routing key false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(msgBody), }) log.Printf(" [x] Sent %s", msgBody) } failOnError(err, "Failed to publish a message") } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }
consumer.go
package main import ( config "binTest/rabbitmqTest/t1/l3/conf" "fmt" "log" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() forever := make(chan bool) ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "fanout_exchange", //exchange name "fanout", //exchange kind true, //durable false, //autodelete false, false, nil, ) failOnError(err, "Failed to declare exchange") for routine := 0; routine < config.CONSUMERCNT; routine++ { go func(routineNum int) { q, err := ch.QueueDeclare( "", false, //durable false, true, false, nil, ) failOnError(err, "Failed to declare a queue") err = ch.QueueBind( q.Name, "", "fanout_exchange", false, nil, ) failOnError(err, "Failed to bind exchange") msgs, err := ch.Consume( q.Name, "", true, //Auto Ack false, false, false, nil, ) if err != nil { log.Fatal(err) } for msg := range msgs { log.Printf("In %d consume a message: %s\n", routineNum, msg.Body) } }(routine) } <-forever } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }
The results are as follows
producer
consumer
Three queues are bound on the exchange. Each message sent by the producer is sent to three queues and consumed by three different consumer s
Code in
[https://github.com/BinWang-sh...]