The article comes from WeChat official account: Go language circle.
Reference link: www.jb51.net/article/231909.htm
Pre knowledge:
- go basic grammar
- The message queue concept consists of three parts: producer, consumer and queue
objective
- I didn't think about the complexity of the implementation. Because time is limited, just mini. To what extent
- Use a two-way linked list data structure as a queue
- There are multiple topic s for producers to generate messages and consumers to consume messages
- Support concurrent write
- Support consumers to read and delete from the queue after ok
- Message not lost (persistent)
- High performance (think about it first)
Design
Overall architecture
agreement
tcp is used at the bottom of the communication protocol. mq is a customized protocol based on tcp. The protocol is as follows
type Msg struct { Id int64 TopicLen int64 Topic string // 1-consumer 2-producer 3-comsumer-ack 4-error MsgType int64 // Message type Len int64 // Message length Payload []byte // news }
Payload uses a byte array because no matter what the data is, it can only be treated as a byte array. Msg carries messages produced by producers, messages consumed by consumers, acks, and error messages. The first two will have loads, and the latter two will have empty loads and lengths.
The codec processing of the protocol is the processing of bytes. Next, there are two functions: from byte to Msg and from Msg to byte
func BytesToMsg(reader io.Reader) Msg { m := Msg{} var buf [128]byte n, err := reader.Read(buf[:]) if err != nil { fmt.Println("read failed, err:", err) } fmt.Println("read bytes:", n) // id buff := bytes.NewBuffer(buf[0:8]) binary.Read(buff, binary.LittleEndian, &m.Id) // topiclen buff = bytes.NewBuffer(buf[8:16]) binary.Read(buff, binary.LittleEndian, &m.TopicLen) // topic msgLastIndex := 16 + m.TopicLen m.Topic = string(buf[16: msgLastIndex]) // msgtype buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8]) binary.Read(buff, binary.LittleEndian, &m.MsgType) buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16]) binary.Read(buff, binary.LittleEndian, &m.Len) if m.Len <= 0 { return m } m.Payload = buf[msgLastIndex + 16:] return m } func MsgToBytes(msg Msg) []byte { msg.TopicLen = int64(len([]byte(msg.Topic))) msg.Len = int64(len([]byte(msg.Payload))) var data []byte buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Id) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.TopicLen) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Topic)...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.MsgType) data = append(data, buf.Bytes()...) buf = bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, msg.Len) data = append(data, buf.Bytes()...) data = append(data, []byte(msg.Payload)...) return data }
queue
Use container/list to realize first in first out. The producer writes at the end of the queue and the consumer reads at the head of the queue
package broker import ( "container/list" "sync" ) type Queue struct { len int data list.List } var lock sync.Mutex func (queue *Queue) offer(msg Msg) { queue.data.PushBack(msg) queue.len = queue.data.Len() } func (queue *Queue) poll() Msg{ if queue.len == 0 { return Msg{} } msg := queue.data.Front() return msg.Value.(Msg) } func (queue *Queue) delete(id int64) { lock.Lock() for msg := queue.data.Front(); msg != nil; msg = msg.Next() { if msg.Value.(Msg).Id == id { queue.data.Remove(msg) queue.len = queue.data.Len() break } } lock.Unlock() }
The method offer inserts data into the Queue, poll reads the data element from the Queue header, and delete deletes the data from the Queue according to the message ID. In fact, it is necessary to use the Queue structure to encapsulate the List. As the underlying data structure, we want to hide more underlying operations and only provide basic operations to customers.
The delete operation removes the message from the queue after the consumer successfully consumes and sends an ACK. Because consumers can consume multiple messages at the same time, lock when entering the critical area (em, whether locking will certainly affect the performance).
broker
As a server role, broker is responsible for receiving connections, receiving and responding to requests.
package broker import ( "bufio" "net" "os" "sync" "time" ) var topics = sync.Map{} func handleErr(conn net.Conn) { defer func() { if err := recover(); err != nil { println(err.(string)) conn.Write(MsgToBytes(Msg{MsgType: 4})) } }() } func Process(conn net.Conn) { handleErr(conn) reader := bufio.NewReader(conn) msg := BytesToMsg(reader) queue, ok := topics.Load(msg.Topic) var res Msg if msg.MsgType == 1 { // comsumer if queue == nil || queue.(*Queue).len == 0{ return } msg = queue.(*Queue).poll() msg.MsgType = 1 res = msg } else if msg.MsgType == 2 { // producer if ! ok { queue = &Queue{} queue.(*Queue).data.Init() topics.Store(msg.Topic, queue) } queue.(*Queue).offer(msg) res = Msg{Id: msg.Id, MsgType: 2} } else if msg.MsgType == 3 { // consumer ack if queue == nil { return } queue.(*Queue).delete(msg.Id) } conn.Write(MsgToBytes(res)) }
When MsgType is equal to 1, the message is consumed directly; When MsgType is equal to 2, it is the producer's production message. If the queue is empty, a new queue needs to be created and placed under the corresponding topic; When MsgType is equal to 3, it means that the consumer has successfully consumed, and can
removal message
We say that the message is not lost, and the implementation here is not complete, so I have realized persistence (persistence has not been fully realized). The idea is to serialize the messages in the queue corresponding to the topic according to the protocol format, and recover from the file when the broker starts.
The persistence needs to consider whether it is incremental or full, and how long it needs to be saved. These will affect the difficulty and performance of implementation (think about the persistence of Kafka and Redis). Here, it means that it is good to implement it simply: the timer saves it regularly
func Save() { ticker := time.NewTicker(60) for { select { case <-ticker.C: topics.Range(func(key, value interface{}) bool { if value == nil { return false } file, _ := os.Open(key.(string)) if file == nil { file, _ = os.Create(key.(string)) } for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() { file.Write(MsgToBytes(msg.Value.(Msg))) } _ := file.Close() return false }) default: time.Sleep(1) } } }
One question is, when the above delete operation is performed, does the file here need to follow the delete to delete the corresponding message? The answer is to delete. If you don't delete it, you can only wait for the next full persistence to overwrite it. There will be dirty data in the middle
The following is the startup logic
package main import ( "awesomeProject/broker" "fmt" "net" ) func main() { listen, err := net.Listen("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("listen failed, err:", err) return } go broker.Save() for { conn, err := listen.Accept() if err != nil { fmt.Print("accept failed, err:", err) continue } go broker.Process(conn) } }
producer
package main import ( "awesomeProject/broker" "fmt" "net" ) func produce() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Id: 1102, Topic: "topic-test", MsgType: 2, Payload: []byte("I")} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Print("write failed, err:", err) } fmt.Print(n) }
consumer
package main import ( "awesomeProject/broker" "bytes" "fmt" "net" ) func comsume() { conn, err := net.Dial("tcp", "127.0.0.1:12345") if err != nil { fmt.Print("connect failed, err:", err) } defer conn.Close() msg := broker.Msg{Topic: "topic-test", MsgType: 1} n, err := conn.Write(broker.MsgToBytes(msg)) if err != nil { fmt.Println("write failed, err:", err) } fmt.Println("n", n) var res [128]byte conn.Read(res[:]) buf := bytes.NewBuffer(res[:]) receMsg := broker.BytesToMsg(buf) fmt.Print(receMsg) // ack conn, _ = net.Dial("tcp", "127.0.0.1:12345") l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3})) if e != nil { fmt.Println("write failed, err:", err) } fmt.Println("l:", l) }
The consumer re creates the connection during ack. If the connection is not created, the server needs to read data from conn until the end. Think about it. For example, RabbitMQ acks have automatic and manual acks. If they are manual acks, they must need a new connection, because they don't know when the client sends ACKs. Of course, the same connection can be used automatically, but let's simply create a new connection here
start-up
Start the broker first, then the producer, and then the comsumer to send messages to the queue