nsq deployment / delivery / consumption / cluster example

docker pull nsqio/nsq

Service port and relationship

topic & channel

Cluster mode

Preparation

Because the multi node deployment is realized through docker container on a single machine, if nsqd/nsqadmin container wants to communicate with nsqlookup, it needs to access the service port exposed by nsqlookup on the host machine. Therefore, when creating nsqd/nsqadmin container, the address related to nsqlookup communication should be filled in the host machine ip.

# Get the host intranet ip
ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"

For example, mine is 10.10.31.147. Pay attention to replacement later.

nsqlookupd

# 4160 tcp for nsqd registration
# 4161 http for nsqdadmin and consumer to query service name
docker run -d --name nsqlookupd \
-p 4160:4160 -p 4161:4161 \
nsqio/nsq /nsqlookupd

nsqd

Create two nsqd nodes

nsq has two producer service ports: TCP address and HTTP address

# --The host address of the broadcast address node is used for external access
# Set the following as the host IP so that admin can access the statistics instance status
# --TCP address producer port of TCP protocol
# --HTTP address producer port of HTTP protocol
# --Lookupd tcp address the tcp address of lookupd
# --Data path data persistence storage path

# nsq0 tcp://127.0.0.1:4150/ http://127.0.0.1:4151/
docker run -d -v /tmp/nsq0:/tmp/nsq \
-p 4150:4150 -p 4151:4151 \
--name nsqd0 nsqio/nsq /nsqd \
--tcp-address :4150 \
--http-address :4151 \
--broadcast-address=10.10.31.147 \
--lookupd-tcp-address=10.10.31.147:4160 \
--data-path /tmp/nsq

# nsq1 tcp://127.0.0.1:4250/ http://127.0.0.1:4251/
docker run -d -v /tmp/nsq1:/tmp/nsq \
-p 4250:4250 -p 4251:4251 \
--name nsqd1 nsqio/nsq /nsqd \
--tcp-address :4250 \
--http-address :4251 \
--broadcast-address=10.10.31.147 \
--lookupd-tcp-address=10.10.31.147:4160 \
--data-path /tmp/nsq

nsqadmin

# 4171 admin management platform service port
# --Lookupd http address http address of lookupd
docker run -d --name nsqadmin \
-p 4171:4171 nsqio/nsq /nsqadmin \
--lookupd-http-address=10.10.31.147:4161

http://127.0.0.1:4171/nodes

topic

# create themes
curl -X POST http://127.0.0.1:4151/topic/create?topic=test
curl -X POST http://127.0.0.1:4251/topic/create?topic=test

channel

# Channel is equivalent to the consumption group, and the relationship between channel and topic is equivalent to subscription and publication
curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test&channel=chan_4151_1'
curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test&channel=chan_4151_2'

curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test&channel=chan_4251_1'
curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test&channel=chan_4251_2'

consumer

Here we borrow the nsq related command script in the nsqlookupd container.
nsq_ to_ As a consumer, file obtains all nodes containing the specified topic by querying lookupd, and binds the channel. When a producer sends a message to this topic, the subscribed channel will then consume.

# Get all topic s by querying lookupd
# Subscribe to topic > Channel

docker exec -it nsqlookupd nsq_to_file \
--topic=test --channel=chan_4151_1 \
--output-dir=/tmp/chan_4151_1 \
--lookupd-http-address=127.0.0.1:4161

docker exec -it nsqlookupd nsq_to_file  \
--topic=test --channel=chan_4251_1 \
--output-dir=/tmp/chan_4251_1 \
--lookupd-http-address=127.0.0.1:4161

producer

# topic publishes a message. Each channel will receive this message
# And the load rotation training is distributed to one of the consumers under the channel
curl -d 'hello world 4151' 'http://127.0.0.1:4151/pub?topic=test'
curl -d 'hello world 4251' 'http://127.0.0.1:4251/pub?topic=test'

High availability scenario

Copy, idempotent consumption

High availability cluster naturally needs the concept of replica, but nsq cluster does not have node data synchronization mechanism. Unlike other advanced queues, it has the concept of synchronous data maintenance replica, so nsq replica needs to be maintained and implemented at the code level.

Take nsqd0 and nsqd1 as examples. How to achieve high availability of clusters? Create a topic with the same name in nsqd0 and nsqd1_ ha & channel_ And send messages to both nodes at the same time when delivering messages.

When consumers subscribe to consumption through lookupd mode, they can subscribe to all topics that contain this topic_ Channel of HA node_ replic. When sending the same message to nsqd0 and nsqd1, topic_ha maintains a backup copy for idempotent consumption of messages to prevent repeated processing. When one of the nsqd nodes hangs, we can still deliver and consume business messages normally.

curl -X POST http://127.0.0.1:4151/topic/create?topic=test_ha
curl -X POST http://127.0.0.1:4251/topic/create?topic=test_ha

curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test_ha&channel=chan_replic'
curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test_ha&channel=chan_replic'

# Or use the go code at the end of the article to experience the concept of cluster delivery / consumption
docker exec -it nsqlookupd nsq_to_file  \
--topic=test_ha --channel=chan_replic \
--output-dir=/tmp/chan_replic \
--lookupd-http-address=127.0.0.1:4161

You can see that all nodes containing this topic & channel are obtained through nsqlookupd

2022/03/03 09:49:34 INF    1 [test_ha/chan_replic] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test_ha
2022/03/03 09:49:34 INF    1 [test_ha/chan_replic] (10.10.31.147:4150) connecting to nsqd
2022/03/03 09:49:34 INF    1 [test_ha/chan_replic] (10.10.31.147:4250) connecting to nsqd

Basic concepts

  1. nsq's highly available cluster does not have the function of automatically synchronizing replicas, that is, if you have n nodes, you need to create topic s with the same name on N nodes. When delivering messages, you also need to deliver messages to these n nodes respectively.
  2. The best way to consume nsq is for consumers to connect to the lookupd service and query the nodes where the subscribed topics are distributed. Consumers are mainly topic and will subscribe to all message data containing topic nodes.
  3. channel refers to the consumption group, load balancing message queue within the group, and subscription and publication between groups. Just like kafka's low-level consumption group. Join the same consumption group for load balancing consumption. Different consumption groups are subscribers to each other's topic before.
  4. If the same node subscribes to the same topic and the same channel, it will join the consumption group, and the load in the group will be balanced.
  5. The same node subscribes to the same topic and different channel s are subscription publications. At least one consumer in each consumer group can get messages.

Instance (cluster delivery / consumption)

nsqProducer
Here, I also encapsulate the automatic acquisition of all nsqds containing topic through lookupd and the establishment of tcpProducer. In this way, when delivering messages to a topic distributed on multiple nsqd nodes, there is no need to hand deliver them one by one.

package main

import (
    "encoding/json"
    "errors"
    "flag"
    "github.com/nsqio/go-nsq"
    "io/ioutil"
    "log"
    "net/http"
    "os"
    "os/signal"
    "strconv"
    "syscall"
    "time"
)

var TopicProducers map[string][]*nsq.Producer

type LookupTopicRes struct {
    Channels  []string       `json:"channels"`
    Producers []ProducerInfo `json:"producers"`
}

type ProducerInfo struct {
    RemoteAddress    string `json:"remote_address"`
    Hostname         string `json:"hostname"`
    BroadcastAddress string `json:"broadcast_address"`
    TcpPort          int    `json:"tcp_port"`
    HttpPort         int    `json:"http_port"`
    Version          string `json:"version"`
}

func main() {
    var topic string

    flag.StringVar(&topic, "topic", "test", "topic name default test")
    flag.Parse()

    NewTopicProducer(topic)

    go func() {
        timerTicker := time.Tick(2 * time.Second)
        for {
            <-timerTicker
            totalNode, failedNode, err := PublishTopicMsg(topic, []byte("hello nsq "+time.Now().Format("2006-01-02 15:04:05")))
            if err != nil {
                log.Fatalln("PublishTopicMsg err topic", topic, "err", err.Error())
            }
            log.Println("PublishTopicMsg ok topic", topic, "totalNode", totalNode, "failedNode", failedNode)
        }
    }()

    // wait for signal to exit
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    sigMsg := <-sigChan
    log.Println("sigMsg", sigMsg)

    // Gracefully stop the producer.
    for _, producers := range TopicProducers {
        for _, producer := range producers {
            producer.Stop()
        }
    }
}

// NewTopicProducer
// Obtain all nsqd nodes of topic and establish tcp links
func NewTopicProducer(topic string) {
    TopicProducers = make(map[string][]*nsq.Producer)
    config := nsq.NewConfig()
    config.MaxInFlight = 10 // The maximum number of messages a consumer can process at the same time
    topicNodeAddr := getTopicNodeAddrSet(topic)
    var producers []*nsq.Producer
    for _, addr := range topicNodeAddr {
        producer, err := nsq.NewProducer(addr, config)
        if err != nil {
            log.Fatalln("newProducer err topic", topic, "err", err.Error())
        }
        producers = append(producers, producer)
    }
    TopicProducers[topic] = producers
}

// PublishTopicMsg
// Sending a message to topic will automatically send the cluster mode to each node containing this topic
func PublishTopicMsg(topic string, msg []byte) (totalNode int, failedNode int, err error) {
    producers, ok := TopicProducers[topic]
    if !ok {
        return 0, 0, errors.New("PublishTopicMsg err topic not exists")
    }
    totalNode = len(producers)
    for _, producer := range producers {
        errPub := producer.Publish(topic, msg)
        if nil != errPub {
            failedNode++
        }
    }
    return
}

// Get the nsqd node collection of topic
func getTopicNodeAddrSet(topic string) (topicNodeAddrArr []string) {
    resp, _ := http.Get("http://127.0.0.1:4161/lookup?topic=" + topic)
    defer func() {
        _ = resp.Body.Close()
    }()

    bodyRaw, _ := ioutil.ReadAll(resp.Body)
    lookupTopicRes := &LookupTopicRes{}
    _ = json.Unmarshal(bodyRaw, &lookupTopicRes)

    for _, producer := range lookupTopicRes.Producers {
        topicNodeAddrArr = append(topicNodeAddrArr, producer.BroadcastAddress+":"+strconv.Itoa(producer.TcpPort))
    }

    return topicNodeAddrArr
}

nsqConsumer
Use lookupd to automatically obtain all nsqd nodes of topic + channel and subscribe to consumption.

package main

import (
    "flag"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "os/signal"
    "syscall"
)

type nsqMessageHandler struct{}

// HandleMessage implements the Handler interface.
func (h *nsqMessageHandler) HandleMessage(m *nsq.Message) error {
    if len(m.Body) == 0 {
        // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
        // In this case, a message with an empty body is simply ignored/discarded.
        return nil
    }

    // do whatever actual message processing is desired
    log.Println("HandleMessage nsqd:", m.NSQDAddress, "msg:", string(m.Body))

    // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
    return nil
}

func main() {
    var topic string
    var channel string
    var count int
    var consumerGroup []*nsq.Consumer

    flag.StringVar(&topic, "topic", "test", "topic name default test")
    flag.StringVar(&channel, "channel", "test", "channel name default test")
    flag.IntVar(&count, "count", 1, "consumer count default 1")
    flag.Parse()

    // Instantiate a consumer that will subscribe to the provided channel.
    config := nsq.NewConfig()
    config.MaxInFlight = 10 // The maximum number of messages a consumer can process at the same time

    for i := 0; i < count; i++ {
        consumer, err := nsq.NewConsumer(topic, channel, config)
        if err != nil {
            log.Fatalln("NewConsumer err:", err.Error())
        }

        // Set the Handler for messages received by this Consumer. Can be called multiple times.
        // See also AddConcurrentHandlers.
        consumer.AddHandler(&nsqMessageHandler{})

        // Use nsqlookupd to discover nsqd instances.
        // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
        // All nsqd instances containing the current topic will be subscribed
        // When it is used in cluster mode, the producer sends messages to multiple instances containing topic at the same time
        // When some instances hang up, consumers can still get messages through other instances
        // ! Message idempotent processing is required here!
        err = consumer.ConnectToNSQLookupd("localhost:4161")
        if err != nil {
            log.Fatalln("ConnectToNSQLookupd err:", err.Error())
        } else {
            log.Println("ConnectToNSQLookupd success topic:", topic, "channel:", channel)
        }

        consumerGroup = append(consumerGroup, consumer)
    }

    // wait for signal to exit
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    sigMsg := <-sigChan
    log.Println("sigMsg", sigMsg)

    // Gracefully stop the consumer.
    for _, consumer := range consumerGroup {
        consumer.Stop()
    }
}

function

go run nsqConsumer.go -topic test_ha -channel chan_replic -count=2
go run nsqProducer.go -topic test_ha

The two consumers are load balanced with each other to form consumption group A. consumption group a subscribes to chan of nsqd0 and nsqd1 in turn_ Copy, the producer sends test_ha cluster mode delivery. After receiving the message, nsqd0 and nsqd1 will deliver the message to consumer group a respectively. Which consumer consumes in consumer group a depends on load balancing.

Keywords: Docker cluster

Added by mcirl2 on Mon, 07 Mar 2022 12:39:02 +0200