Use Go language to implement a super mini message queue. I do this

The article comes from WeChat official account: Go language circle.
Reference link: www.jb51.net/article/231909.htm

Pre knowledge:

  • go basic grammar
  • The message queue concept consists of three parts: producer, consumer and queue

objective

  • I didn't think about the complexity of the implementation. Because time is limited, just mini. To what extent
  • Use a two-way linked list data structure as a queue
  • There are multiple topic s for producers to generate messages and consumers to consume messages
  • Support concurrent write
  • Support consumers to read and delete from the queue after ok
  • Message not lost (persistent)
  • High performance (think about it first)


Design
Overall architecture


agreement
tcp is used at the bottom of the communication protocol. mq is a customized protocol based on tcp. The protocol is as follows

type Msg struct {
   Id int64
   TopicLen int64
   Topic string
   // 1-consumer 2-producer 3-comsumer-ack 4-error
   MsgType int64 // Message type
   Len int64 // Message length
   Payload []byte // news
}

Payload uses a byte array because no matter what the data is, it can only be treated as a byte array. Msg carries messages produced by producers, messages consumed by consumers, acks, and error messages. The first two will have loads, and the latter two will have empty loads and lengths.

The codec processing of the protocol is the processing of bytes. Next, there are two functions: from byte to Msg and from Msg to byte

func BytesToMsg(reader io.Reader) Msg {

   m := Msg{}
   var buf [128]byte
   n, err := reader.Read(buf[:])
   if err != nil {
      fmt.Println("read failed, err:", err)
   }
   fmt.Println("read bytes:", n)
   // id
   buff := bytes.NewBuffer(buf[0:8])
   binary.Read(buff, binary.LittleEndian, &m.Id)
   // topiclen
   buff = bytes.NewBuffer(buf[8:16])
   binary.Read(buff, binary.LittleEndian, &m.TopicLen)
   // topic
   msgLastIndex := 16 + m.TopicLen
   m.Topic = string(buf[16: msgLastIndex])
   // msgtype
   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 8])
   binary.Read(buff, binary.LittleEndian, &m.MsgType)

   buff = bytes.NewBuffer(buf[msgLastIndex : msgLastIndex + 16])
   binary.Read(buff, binary.LittleEndian, &m.Len)

   if m.Len <= 0 {
      return m
   }

   m.Payload = buf[msgLastIndex + 16:]
   return m
}

func MsgToBytes(msg Msg) []byte {
   msg.TopicLen = int64(len([]byte(msg.Topic)))
   msg.Len = int64(len([]byte(msg.Payload)))

   var data []byte
   buf := bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Id)
   data = append(data, buf.Bytes()...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.TopicLen)
   data = append(data, buf.Bytes()...)

   data = append(data, []byte(msg.Topic)...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.MsgType)
   data = append(data, buf.Bytes()...)

   buf = bytes.NewBuffer([]byte{})
   binary.Write(buf, binary.LittleEndian, msg.Len)
   data = append(data, buf.Bytes()...)
   data = append(data, []byte(msg.Payload)...)

   return data
}


queue
Use container/list to realize first in first out. The producer writes at the end of the queue and the consumer reads at the head of the queue

package broker

import (
   "container/list"
   "sync"
)

type Queue struct {
   len int
   data list.List
}

var lock sync.Mutex

func (queue *Queue) offer(msg Msg) {
   queue.data.PushBack(msg)
   queue.len = queue.data.Len()
}

func (queue *Queue) poll() Msg{
   if queue.len == 0 {
      return Msg{}
   }
   msg := queue.data.Front()
   return msg.Value.(Msg)
}

func (queue *Queue) delete(id int64) {
   lock.Lock()
   for msg := queue.data.Front(); msg != nil; msg = msg.Next() {
      if msg.Value.(Msg).Id == id {
         queue.data.Remove(msg)
         queue.len = queue.data.Len()
         break
      }
   }
   lock.Unlock()
}

The method offer inserts data into the Queue, poll reads the data element from the Queue header, and delete deletes the data from the Queue according to the message ID. In fact, it is necessary to use the Queue structure to encapsulate the List. As the underlying data structure, we want to hide more underlying operations and only provide basic operations to customers.

The delete operation removes the message from the queue after the consumer successfully consumes and sends an ACK. Because consumers can consume multiple messages at the same time, lock when entering the critical area (em, whether locking will certainly affect the performance).


broker
As a server role, broker is responsible for receiving connections, receiving and responding to requests.

package broker

import (
   "bufio"
   "net"
   "os"
   "sync"
   "time"
)

var topics = sync.Map{}

func handleErr(conn net.Conn)  {
   defer func() {
      if err := recover(); err != nil {
         println(err.(string))
         conn.Write(MsgToBytes(Msg{MsgType: 4}))
      }
   }()
}

func Process(conn net.Conn) {
   handleErr(conn)
   reader := bufio.NewReader(conn)
   msg := BytesToMsg(reader)
   queue, ok := topics.Load(msg.Topic)
   var res Msg
   if msg.MsgType == 1 {
      // comsumer
      if queue == nil || queue.(*Queue).len == 0{
         return
      }
      msg = queue.(*Queue).poll()
      msg.MsgType = 1
      res = msg
   } else if msg.MsgType == 2 {
      // producer
      if ! ok {
         queue = &Queue{}
         queue.(*Queue).data.Init()
         topics.Store(msg.Topic, queue)
      }
      queue.(*Queue).offer(msg)
      res = Msg{Id: msg.Id, MsgType: 2}
   } else if msg.MsgType == 3 {
      // consumer ack
      if queue == nil {
         return
      }
      queue.(*Queue).delete(msg.Id)

   }
   conn.Write(MsgToBytes(res))

}

When MsgType is equal to 1, the message is consumed directly; When MsgType is equal to 2, it is the producer's production message. If the queue is empty, a new queue needs to be created and placed under the corresponding topic; When MsgType is equal to 3, it means that the consumer has successfully consumed, and can


removal message
We say that the message is not lost, and the implementation here is not complete, so I have realized persistence (persistence has not been fully realized). The idea is to serialize the messages in the queue corresponding to the topic according to the protocol format, and recover from the file when the broker starts.

The persistence needs to consider whether it is incremental or full, and how long it needs to be saved. These will affect the difficulty and performance of implementation (think about the persistence of Kafka and Redis). Here, it means that it is good to implement it simply: the timer saves it regularly

func Save()  {
   ticker := time.NewTicker(60)
   for {
      select {
      case <-ticker.C:
         topics.Range(func(key, value interface{}) bool {
            if value == nil {
               return false
            }
            file, _ := os.Open(key.(string))
            if file == nil {
               file, _ = os.Create(key.(string))
            }
            for msg := value.(*Queue).data.Front(); msg != nil; msg = msg.Next() {
               file.Write(MsgToBytes(msg.Value.(Msg)))
            }
            _ := file.Close()
            return false
         })
      default:
         time.Sleep(1)
      }
   }
}

One question is, when the above delete operation is performed, does the file here need to follow the delete to delete the corresponding message? The answer is to delete. If you don't delete it, you can only wait for the next full persistence to overwrite it. There will be dirty data in the middle


The following is the startup logic

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func main()  {
   listen, err := net.Listen("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("listen failed, err:", err)
      return
   }
   go broker.Save()
   for {
      conn, err := listen.Accept()
      if err != nil {
         fmt.Print("accept failed, err:", err)
         continue
      }
      go broker.Process(conn)

   }
}


producer

package main

import (
   "awesomeProject/broker"
   "fmt"
   "net"
)

func produce() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Id: 1102, Topic: "topic-test",  MsgType: 2,  Payload: []byte("I")}
   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Print("write failed, err:", err)
   }

   fmt.Print(n)
}


consumer

package main

import (
   "awesomeProject/broker"
   "bytes"
   "fmt"
   "net"
)

func comsume() {
   conn, err := net.Dial("tcp", "127.0.0.1:12345")
   if err != nil {
      fmt.Print("connect failed, err:", err)
   }
   defer conn.Close()

   msg := broker.Msg{Topic: "topic-test",  MsgType: 1}

   n, err := conn.Write(broker.MsgToBytes(msg))
   if err != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("n", n)

   var res [128]byte
   conn.Read(res[:])
   buf := bytes.NewBuffer(res[:])
   receMsg := broker.BytesToMsg(buf)
   fmt.Print(receMsg)

   // ack
   conn, _ = net.Dial("tcp", "127.0.0.1:12345")
   l, e := conn.Write(broker.MsgToBytes(broker.Msg{Id: receMsg.Id, Topic: receMsg.Topic, MsgType: 3}))
   if e != nil {
      fmt.Println("write failed, err:", err)
   }
   fmt.Println("l:", l)
}

The consumer re creates the connection during ack. If the connection is not created, the server needs to read data from conn until the end. Think about it. For example, RabbitMQ acks have automatic and manual acks. If they are manual acks, they must need a new connection, because they don't know when the client sends ACKs. Of course, the same connection can be used automatically, but let's simply create a new connection here


start-up
Start the broker first, then the producer, and then the comsumer to send messages to the queue

Keywords: Go message queue

Added by prismstone on Thu, 20 Jan 2022 11:41:14 +0200