docker pull nsqio/nsq
Service port and relationship
topic & channel
Cluster mode
Preparation
Because the multi node deployment is realized through docker container on a single machine, if nsqd/nsqadmin container wants to communicate with nsqlookup, it needs to access the service port exposed by nsqlookup on the host machine. Therefore, when creating nsqd/nsqadmin container, the address related to nsqlookup communication should be filled in the host machine ip.
# Get the host intranet ip ifconfig -a|grep inet|grep -v 127.0.0.1|grep -v inet6|awk '{print $2}'|tr -d "addr:"
For example, mine is 10.10.31.147. Pay attention to replacement later.
nsqlookupd
# 4160 tcp for nsqd registration # 4161 http for nsqdadmin and consumer to query service name docker run -d --name nsqlookupd \ -p 4160:4160 -p 4161:4161 \ nsqio/nsq /nsqlookupd
nsqd
Create two nsqd nodes
nsq has two producer service ports: TCP address and HTTP address
# --The host address of the broadcast address node is used for external access # Set the following as the host IP so that admin can access the statistics instance status # --TCP address producer port of TCP protocol # --HTTP address producer port of HTTP protocol # --Lookupd tcp address the tcp address of lookupd # --Data path data persistence storage path # nsq0 tcp://127.0.0.1:4150/ http://127.0.0.1:4151/ docker run -d -v /tmp/nsq0:/tmp/nsq \ -p 4150:4150 -p 4151:4151 \ --name nsqd0 nsqio/nsq /nsqd \ --tcp-address :4150 \ --http-address :4151 \ --broadcast-address=10.10.31.147 \ --lookupd-tcp-address=10.10.31.147:4160 \ --data-path /tmp/nsq # nsq1 tcp://127.0.0.1:4250/ http://127.0.0.1:4251/ docker run -d -v /tmp/nsq1:/tmp/nsq \ -p 4250:4250 -p 4251:4251 \ --name nsqd1 nsqio/nsq /nsqd \ --tcp-address :4250 \ --http-address :4251 \ --broadcast-address=10.10.31.147 \ --lookupd-tcp-address=10.10.31.147:4160 \ --data-path /tmp/nsq
nsqadmin
# 4171 admin management platform service port # --Lookupd http address http address of lookupd docker run -d --name nsqadmin \ -p 4171:4171 nsqio/nsq /nsqadmin \ --lookupd-http-address=10.10.31.147:4161
topic
# create themes curl -X POST http://127.0.0.1:4151/topic/create?topic=test curl -X POST http://127.0.0.1:4251/topic/create?topic=test
channel
# Channel is equivalent to the consumption group, and the relationship between channel and topic is equivalent to subscription and publication curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test&channel=chan_4151_1' curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test&channel=chan_4151_2' curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test&channel=chan_4251_1' curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test&channel=chan_4251_2'
consumer
Here we borrow the nsq related command script in the nsqlookupd container.
nsq_ to_ As a consumer, file obtains all nodes containing the specified topic by querying lookupd, and binds the channel. When a producer sends a message to this topic, the subscribed channel will then consume.
# Get all topic s by querying lookupd # Subscribe to topic > Channel docker exec -it nsqlookupd nsq_to_file \ --topic=test --channel=chan_4151_1 \ --output-dir=/tmp/chan_4151_1 \ --lookupd-http-address=127.0.0.1:4161 docker exec -it nsqlookupd nsq_to_file \ --topic=test --channel=chan_4251_1 \ --output-dir=/tmp/chan_4251_1 \ --lookupd-http-address=127.0.0.1:4161
producer
# topic publishes a message. Each channel will receive this message # And the load rotation training is distributed to one of the consumers under the channel curl -d 'hello world 4151' 'http://127.0.0.1:4151/pub?topic=test' curl -d 'hello world 4251' 'http://127.0.0.1:4251/pub?topic=test'
High availability scenario
Copy, idempotent consumption
High availability cluster naturally needs the concept of replica, but nsq cluster does not have node data synchronization mechanism. Unlike other advanced queues, it has the concept of synchronous data maintenance replica, so nsq replica needs to be maintained and implemented at the code level.
Take nsqd0 and nsqd1 as examples. How to achieve high availability of clusters? Create a topic with the same name in nsqd0 and nsqd1_ ha & channel_ And send messages to both nodes at the same time when delivering messages.
When consumers subscribe to consumption through lookupd mode, they can subscribe to all topics that contain this topic_ Channel of HA node_ replic. When sending the same message to nsqd0 and nsqd1, topic_ha maintains a backup copy for idempotent consumption of messages to prevent repeated processing. When one of the nsqd nodes hangs, we can still deliver and consume business messages normally.
curl -X POST http://127.0.0.1:4151/topic/create?topic=test_ha curl -X POST http://127.0.0.1:4251/topic/create?topic=test_ha curl -X POST 'http://127.0.0.1:4151/channel/create?topic=test_ha&channel=chan_replic' curl -X POST 'http://127.0.0.1:4251/channel/create?topic=test_ha&channel=chan_replic' # Or use the go code at the end of the article to experience the concept of cluster delivery / consumption docker exec -it nsqlookupd nsq_to_file \ --topic=test_ha --channel=chan_replic \ --output-dir=/tmp/chan_replic \ --lookupd-http-address=127.0.0.1:4161
You can see that all nodes containing this topic & channel are obtained through nsqlookupd
2022/03/03 09:49:34 INF 1 [test_ha/chan_replic] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=test_ha 2022/03/03 09:49:34 INF 1 [test_ha/chan_replic] (10.10.31.147:4150) connecting to nsqd 2022/03/03 09:49:34 INF 1 [test_ha/chan_replic] (10.10.31.147:4250) connecting to nsqd
Basic concepts
- nsq's highly available cluster does not have the function of automatically synchronizing replicas, that is, if you have n nodes, you need to create topic s with the same name on N nodes. When delivering messages, you also need to deliver messages to these n nodes respectively.
- The best way to consume nsq is for consumers to connect to the lookupd service and query the nodes where the subscribed topics are distributed. Consumers are mainly topic and will subscribe to all message data containing topic nodes.
- channel refers to the consumption group, load balancing message queue within the group, and subscription and publication between groups. Just like kafka's low-level consumption group. Join the same consumption group for load balancing consumption. Different consumption groups are subscribers to each other's topic before.
- If the same node subscribes to the same topic and the same channel, it will join the consumption group, and the load in the group will be balanced.
- The same node subscribes to the same topic and different channel s are subscription publications. At least one consumer in each consumer group can get messages.
Instance (cluster delivery / consumption)
nsqProducer
Here, I also encapsulate the automatic acquisition of all nsqds containing topic through lookupd and the establishment of tcpProducer. In this way, when delivering messages to a topic distributed on multiple nsqd nodes, there is no need to hand deliver them one by one.
package main import ( "encoding/json" "errors" "flag" "github.com/nsqio/go-nsq" "io/ioutil" "log" "net/http" "os" "os/signal" "strconv" "syscall" "time" ) var TopicProducers map[string][]*nsq.Producer type LookupTopicRes struct { Channels []string `json:"channels"` Producers []ProducerInfo `json:"producers"` } type ProducerInfo struct { RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TcpPort int `json:"tcp_port"` HttpPort int `json:"http_port"` Version string `json:"version"` } func main() { var topic string flag.StringVar(&topic, "topic", "test", "topic name default test") flag.Parse() NewTopicProducer(topic) go func() { timerTicker := time.Tick(2 * time.Second) for { <-timerTicker totalNode, failedNode, err := PublishTopicMsg(topic, []byte("hello nsq "+time.Now().Format("2006-01-02 15:04:05"))) if err != nil { log.Fatalln("PublishTopicMsg err topic", topic, "err", err.Error()) } log.Println("PublishTopicMsg ok topic", topic, "totalNode", totalNode, "failedNode", failedNode) } }() // wait for signal to exit sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sigMsg := <-sigChan log.Println("sigMsg", sigMsg) // Gracefully stop the producer. for _, producers := range TopicProducers { for _, producer := range producers { producer.Stop() } } } // NewTopicProducer // Obtain all nsqd nodes of topic and establish tcp links func NewTopicProducer(topic string) { TopicProducers = make(map[string][]*nsq.Producer) config := nsq.NewConfig() config.MaxInFlight = 10 // The maximum number of messages a consumer can process at the same time topicNodeAddr := getTopicNodeAddrSet(topic) var producers []*nsq.Producer for _, addr := range topicNodeAddr { producer, err := nsq.NewProducer(addr, config) if err != nil { log.Fatalln("newProducer err topic", topic, "err", err.Error()) } producers = append(producers, producer) } TopicProducers[topic] = producers } // PublishTopicMsg // Sending a message to topic will automatically send the cluster mode to each node containing this topic func PublishTopicMsg(topic string, msg []byte) (totalNode int, failedNode int, err error) { producers, ok := TopicProducers[topic] if !ok { return 0, 0, errors.New("PublishTopicMsg err topic not exists") } totalNode = len(producers) for _, producer := range producers { errPub := producer.Publish(topic, msg) if nil != errPub { failedNode++ } } return } // Get the nsqd node collection of topic func getTopicNodeAddrSet(topic string) (topicNodeAddrArr []string) { resp, _ := http.Get("http://127.0.0.1:4161/lookup?topic=" + topic) defer func() { _ = resp.Body.Close() }() bodyRaw, _ := ioutil.ReadAll(resp.Body) lookupTopicRes := &LookupTopicRes{} _ = json.Unmarshal(bodyRaw, &lookupTopicRes) for _, producer := range lookupTopicRes.Producers { topicNodeAddrArr = append(topicNodeAddrArr, producer.BroadcastAddress+":"+strconv.Itoa(producer.TcpPort)) } return topicNodeAddrArr }
nsqConsumer
Use lookupd to automatically obtain all nsqd nodes of topic + channel and subscribe to consumption.
package main import ( "flag" "github.com/nsqio/go-nsq" "log" "os" "os/signal" "syscall" ) type nsqMessageHandler struct{} // HandleMessage implements the Handler interface. func (h *nsqMessageHandler) HandleMessage(m *nsq.Message) error { if len(m.Body) == 0 { // Returning nil will automatically send a FIN command to NSQ to mark the message as processed. // In this case, a message with an empty body is simply ignored/discarded. return nil } // do whatever actual message processing is desired log.Println("HandleMessage nsqd:", m.NSQDAddress, "msg:", string(m.Body)) // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message. return nil } func main() { var topic string var channel string var count int var consumerGroup []*nsq.Consumer flag.StringVar(&topic, "topic", "test", "topic name default test") flag.StringVar(&channel, "channel", "test", "channel name default test") flag.IntVar(&count, "count", 1, "consumer count default 1") flag.Parse() // Instantiate a consumer that will subscribe to the provided channel. config := nsq.NewConfig() config.MaxInFlight = 10 // The maximum number of messages a consumer can process at the same time for i := 0; i < count; i++ { consumer, err := nsq.NewConsumer(topic, channel, config) if err != nil { log.Fatalln("NewConsumer err:", err.Error()) } // Set the Handler for messages received by this Consumer. Can be called multiple times. // See also AddConcurrentHandlers. consumer.AddHandler(&nsqMessageHandler{}) // Use nsqlookupd to discover nsqd instances. // See also ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds. // All nsqd instances containing the current topic will be subscribed // When it is used in cluster mode, the producer sends messages to multiple instances containing topic at the same time // When some instances hang up, consumers can still get messages through other instances // ! Message idempotent processing is required here! err = consumer.ConnectToNSQLookupd("localhost:4161") if err != nil { log.Fatalln("ConnectToNSQLookupd err:", err.Error()) } else { log.Println("ConnectToNSQLookupd success topic:", topic, "channel:", channel) } consumerGroup = append(consumerGroup, consumer) } // wait for signal to exit sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) sigMsg := <-sigChan log.Println("sigMsg", sigMsg) // Gracefully stop the consumer. for _, consumer := range consumerGroup { consumer.Stop() } }
function
go run nsqConsumer.go -topic test_ha -channel chan_replic -count=2 go run nsqProducer.go -topic test_ha
The two consumers are load balanced with each other to form consumption group A. consumption group a subscribes to chan of nsqd0 and nsqd1 in turn_ Copy, the producer sends test_ha cluster mode delivery. After receiving the message, nsqd0 and nsqd1 will deliver the message to consumer group a respectively. Which consumer consumes in consumer group a depends on load balancing.