The use of golang rabbitmq

The last article mainly explained the use of queue. Both producer and consumer are directly connected to the queue for message production and consumption. This article mainly describes the use of Exchange.

There are several types of Exchange: direct,topic,headers and fanout
Let's talk about the exchange of fanout type. The function is to forward the message to all the queues connected with him. All producer s don't need to care about how many queues or consumers there are. They just need to send messages to exchange. As for which consumers need these messages, they can get the desired messages by directly connecting the queue to exchange.

exchange is defined as follows

err = ch.ExchangeDeclare(
  "fanout_exchange",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

Define the queue connected to exchange

q, err := ch.QueueDeclare(
                "", //queue name
                false, //durable
                false,
                true, //exclusive
                false,
                nil,
            )

Here, the name of the queue is not specified. The name of the generated queue is random
exclusive is true. When the connection is closed, the queue will be deleted.

The process of connecting a queue with an exchange is called binding. The usage is as follows

err = ch.QueueBind(
  q.Name, // queue name
  "",     // routing key
  "fanout_exchange", // exchange
  false,
  nil,
)

The example code is as follows
conf.go

package config

const (
    RMQADDR      = "amqp://guest:guest@172.17.84.205:5672/"
    EXCHANGENAME = "fanout_exchange"
    CONSUMERCNT  = 3
)

producer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l2/conf"
    "fmt"
    "log"
    "os"

    "github.com/streadway/amqp"
)

func main() {

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "fanout_exchange", //exchange name
        "fanout",          //exchange kind
        true,              //durable
        false,             //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    msgs := os.Args[1:]
    msgNum := len(msgs)

    for cnt := 0; cnt < msgNum; cnt++ {
        msgBody := msgs[cnt]
        err = ch.Publish(
            "fanout_exchange", //exchange
            "",                //routing key
            false,
            false,
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(msgBody),
            })

        log.Printf(" [x] Sent %s", msgBody)
    }
    failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

consumer.go

package main

import (
    config "binTest/rabbitmqTest/t1/l3/conf"
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

func main() {

    conn, err := amqp.Dial(config.RMQADDR)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    forever := make(chan bool)

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "fanout_exchange", //exchange name
        "fanout",          //exchange kind
        true,              //durable
        false,             //autodelete
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to declare exchange")

    for routine := 0; routine < config.CONSUMERCNT; routine++ {
        go func(routineNum int) {

            q, err := ch.QueueDeclare(
                "",
                false, //durable
                false,
                true,
                false,
                nil,
            )

            failOnError(err, "Failed to declare a queue")

            err = ch.QueueBind(
                q.Name,
                "",
                "fanout_exchange",
                false,
                nil,
            )
            failOnError(err, "Failed to bind exchange")

            msgs, err := ch.Consume(
                q.Name,
                "",
                true, //Auto Ack
                false,
                false,
                false,
                nil,
            )

            if err != nil {
                log.Fatal(err)
            }

            for msg := range msgs {
                log.Printf("In %d consume a message: %s\n", routineNum, msg.Body)
            }

        }(routine)
    }

    <-forever
}

func failOnError(err error, msg string) {
    if err != nil {
        fmt.Printf("%s: %s\n", msg, err)
    }
}

The results are as follows
producer

consumer

Three queues are bound on the exchange. Each message sent by the producer is sent to three queues and consumed by three different consumer s

Code in
[https://github.com/BinWang-sh...]

Keywords: Go github RabbitMQ

Added by coolen on Mon, 18 Nov 2019 22:20:44 +0200