I haven't blogged for a long time. I happen to be studying PBFT recently. Let's start with PBFT!
First, I'll give you the PBFT code written by the boss @byron1st: https://github.com/bigpicturelabs/simple_pbft
I will take you through the source code! Because many blogs on the Internet copy each other, it doesn't help you at all
The whole code logic is around this diagram!
Basic theory of PBFT algorithm: https://www.jianshu.com/p/cf1010f39b84
1. Service startup main function
func main() { nodeID := os.Args[1] // Here is the name of the company server := network.NewServer(nodeID) server.Start() }
This is the main function of the whole PBFT startup. You can see that you can get the value entered by the user as nodeID, then call the NewServer function in the network package to define a service (which can be understood as a client window of a node), and then start server.Start().
2.NewServer function
func NewServer(nodeID string) *Server { // A new node is created according to the passed NodeID. The default view of the node is 1000000, and the node starts three processes: dispatchMsg, alarmToDispatcher and resolveMsg node := NewNode(nodeID) // Start service server := &Server{node.NodeTable[nodeID], node} // Set route server.setRoute() return server }
We can see that the NewServer function mainly does three things:
- Create a node according to the passed nodeID
- Create a server service
- Set route
2.1 NewNode(nodeID)
Let's first look at the first thing, which is to create a node, enter the node class and see what the node is doing!
func NewNode(nodeID string) *Node { const viewID = 10000000000 // temporary. node := &Node{ // Hard-coded for test. NodeID: nodeID, NodeTable: map[string]string{ "Apple": "localhost:1111", "MS": "localhost:1112", "Google": "localhost:1113", "IBM": "localhost:1114", }, View: &View{ ID: viewID, Primary: "Apple", // The primary node is Apple }, // Consensus-related struct CurrentState: nil, CommittedMsgs: make([]*consensus.RequestMsg, 0), // Submitted information MsgBuffer: &MsgBuffer{ ReqMsgs: make([]*consensus.RequestMsg, 0), PrePrepareMsgs: make([]*consensus.PrePrepareMsg, 0), PrepareMsgs: make([]*consensus.VoteMsg, 0), CommitMsgs: make([]*consensus.VoteMsg, 0), }, // Channels MsgEntrance: make(chan interface{}), // Unbuffered information receiving channel MsgDelivery: make(chan interface{}), // Unbuffered message transmission channel Alarm: make(chan bool), } // Start message scheduler go node.dispatchMsg() // Start alarm trigger go node.alarmToDispatcher() // Start information voting go node.resolveMsg() return node }
The NewNode here actually creates a new node and defines some attribute information in the node structure. The node structure is as follows:
// node type Node struct { NodeID string NodeTable map[string]string // key=nodeID, value=url View *View CurrentState *consensus.State CommittedMsgs []*consensus.RequestMsg // kinda block. MsgBuffer *MsgBuffer MsgEntrance chan interface{} MsgDelivery chan interface{} Alarm chan bool }
We analyze NewNode step by step
- Initialization view number const viewid = 1000000000
- Defines some attribute information in the node structure
attribute | value | explain |
---|---|---|
NodeID | nodeID | Node ID |
NodeTable | map[string]string | Node index table |
View | &View | Set view number and master node |
CurrentState | nil | The default status of the current node is nil |
CommittedMsgs | make([]*consensus.RequestMsg, 0) | Submitted information |
MsgBuffer | &MsgBuffer | Buffer list of four message types |
MsgEntrance | make(chan interface{}) | Unbuffered information receiving channel |
MsgDelivery | make(chan interface{}) | Unbuffered message transmission channel |
Alarm | make(chan bool) | Warning channel |
- Then we can see that each node has three goroutine s open
- dispatchMsg
- alarmToDispatcher
- resolveMsg
Let's focus on two cooperative processes goroutine
2.1. 1 collaboration 1: dispatchMsg
func (node *Node) dispatchMsg() { for { select { case msg := <-node.MsgEntrance: // If a message is sent from the MsgEntrance channel, get msg err := node.routeMsg(msg) // routeMsg if err != nil { fmt.Println(err) // TODO: send err to ErrorChannel } case <-node.Alarm: err := node.routeMsgWhenAlarmed() if err != nil { fmt.Println(err) // TODO: send err to ErrorChannel } } } }
Here you need to know something about for select multiplexing!
We can see from the code of dispatchMsg that as long as there is a value in the MsgEnrance channel, it will be passed to an intermediate variable msg, and then the message will be routed and forwarded to routeMsg.
func (node *Node) routeMsg(msg interface{}) []error { switch msg.(type) { case *consensus.RequestMsg: if node.CurrentState == nil { // Copy buffered messages first. msgs := make([]*consensus.RequestMsg, len(node.MsgBuffer.ReqMsgs)) copy(msgs, node.MsgBuffer.ReqMsgs) // Append a newly arrived message. msgs = append(msgs, msg.(*consensus.RequestMsg)) // Empty the buffer. node.MsgBuffer.ReqMsgs = make([]*consensus.RequestMsg, 0) // Send messages. node.MsgDelivery <- msgs } else { node.MsgBuffer.ReqMsgs = append(node.MsgBuffer.ReqMsgs, msg.(*consensus.RequestMsg)) } ... Other types of information data return nil }
We first take RequestMsg as an example, because the execution logic of other types of message data is almost the same as RequestMsg! There are two options for nodes:
- When the current state is nil, it will consume Copy the information of requestmsg to the intermediate variable msgs, and then clear and reset! And send the information in msgs to MsgDelivery channel! (we still remember that three goroutines have been opened on the node. The other goroutines solvemsg will immediately receive the information in this channel.)
- When the current state is not nil, add it directly to the MsgBuffer buffer channel, which will involve the problem of array expansion (you can learn about it)
2.1. 2 collaboration 2: resolveMsg
func (node *Node) resolveMsg() { for { // Get cache information from scheduler msgs := <-node.MsgDelivery switch msgs.(type) { case []*consensus.RequestMsg: // Node voting decision information errs := node.resolveRequestMsg(msgs.([]*consensus.RequestMsg)) if len(errs) != 0 { for _, err := range errs { fmt.Println(err) } // TODO: send err to ErrorChannel } ... For other types of information data } } }
Here, I'll just explain it with RequestMsg! We can see node clearly MsgDelivery is waiting for the dispatcher dispatchMsg to deliver messages. Because the two buffer channels are unbuffered, no messages will be blocked here all the time!
Here is how to execute the resolveRequestMsg function:
// Information of node voting request stage func (node *Node) resolveRequestMsg(msgs []*consensus.RequestMsg) []error { errs := make([]error, 0) // Voting information for _, reqMsg := range msgs { err := node.GetReq(reqMsg) if err != nil { errs = append(errs, err) } } if len(errs) != 0 { return errs } return nil }
In the resolveRequestMsg code block, we can see that the main execution logic is to call the GetReq function
// The master node starts the global consensus func (node *Node) GetReq(reqMsg *consensus.RequestMsg) error { LogMsg(reqMsg) // Create a new state for the new consensus. err := node.createStateForNewConsensus() if err != nil { return err } // Start consensus process prePrepareMsg, err := node.CurrentState.StartConsensus(reqMsg) if err != nil { return err } LogStage(fmt.Sprintf("Consensus Process (ViewID:%d)", node.CurrentState.ViewID), false) // Getprepare information found // Here, the master node starts sending pre preparation messages to other nodes if prePrepareMsg != nil { node.Broadcast(prePrepareMsg, "/preprepare") LogStage("Pre-prepare", true) } return nil }
- Create a new consensus state createStateForNewConsensus
- Start consensus on the current state of the node startconnections
- After the consensus is completed, the master node broadcasts the information of the preparemsg phase to other nodes!
Let's look at chapter 3, the consensus process
2.2 setRoute function
func (server *Server) setRoute() { http.HandleFunc("/req", server.getReq) http.HandleFunc("/preprepare", server.getPrePrepare) http.HandleFunc("/prepare", server.getPrepare) http.HandleFunc("/commit", server.getCommit) http.HandleFunc("/reply", server.getReply) }
3.Request consensus process
3.0 consensus flow chart
This flow chart is for your convenience. When you sort out the whole process and look back at the flow chart I drew, your ideas should be clearer!
An Muxi is my B-stop account. You can also pay attention to o(´ ^ ') O
3.1 create status createStateForNewConsensus
func (node *Node) createStateForNewConsensus() error { // Check if there is an ongoing consensus process. if node.CurrentState != nil { return errors.New("another consensus is ongoing") } // Get the last sequence ID var lastSequenceID int64 if len(node.CommittedMsgs) == 0 { lastSequenceID = -1 } else { lastSequenceID = node.CommittedMsgs[len(node.CommittedMsgs) - 1].SequenceID } // Create a new state for this new consensus process in the Primary node.CurrentState = consensus.CreateState(node.View.ID, lastSequenceID) LogStage("Create the replica status", true) return nil }
- It is preferred to judge whether the status of the current node is nil, that is, whether the current node is in other stages (pre preparation stage or preparation stage, etc.)
- Judge whether messages have been sent in the current stage. If consensus is reached for the first time, the lastSequenceID of the previous serial number is set to - 1, otherwise we take out the previous serial number
- Create a statecreatestate, which is mainly returned after initializing the State
func CreateState(viewID int64, lastSequenceID int64) *State { return &State{ ViewID: viewID, MsgLogs: &MsgLogs{ ReqMsg:nil, PrepareMsgs:make(map[string]*VoteMsg), CommitMsgs:make(map[string]*VoteMsg), }, LastSequenceID: lastSequenceID, CurrentStage: Idle, } }
3.2 start consensus
// Master node start consensus func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) { // The sequence number of the message is sequenceID, that is, the current time sequenceID := time.Now().UnixNano() // Find the unique and largest number for the sequence ID if state.LastSequenceID != -1 { for state.LastSequenceID >= sequenceID { sequenceID += 1 } } // Assign a new sequence ID to the request message object. request.SequenceID = sequenceID // Save ReqMsgs to its logs. state.MsgLogs.ReqMsg = request // Get the summary of the client request message requestMsg digest, err := digest(request) if err != nil { fmt.Println(err) return nil, err } // Convert the current phase to the prepared phase state.CurrentStage = PrePrepared // This is actually the format of messages sent by the master node to other nodes: View ID, request serial number, request information and summary of request information return &PrePrepareMsg{ ViewID: state.ViewID, SequenceID: sequenceID, Digest: digest, RequestMsg: request, }, nil }
- First, use the current timestamp as the serial number
- If the previous serial number > = the current serial number, the current serial number + 1. The purpose is very simple. Every time the master node starts a consensus, the serial number + 1
- Update the serial number SequenceID, obtain the summary digest(request) of the request information, and convert the current status to prepared
3.3 Broadcast message
// Node broadcast function func (node *Node) Broadcast(msg interface{}, path string) map[string]error { errorMap := make(map[string]error) for nodeID, url := range node.NodeTable { // Because you don't need to broadcast to yourself, you just skip it if nodeID == node.NodeID { continue } // Encoding msg information into json format jsonMsg, err := json.Marshal(msg) if err != nil { errorMap[nodeID] = err continue } // Pass json format to other nodes send(url + path, jsonMsg) // URL localhost: 1111 path: / prepare, etc // send function: http Post("http://"+url, "application/json", buff) } if len(errorMap) == 0 { return nil } else { return errorMap } }
- Traverse the node list that comes with node initialization NodeTable
- Encode msg information into json format Marshal (msg), and then send it to other nodes (URL + path, jsonmsg)
func send(url string, msg []byte) { buff := bytes.NewBuffer(msg) _, _ = http.Post("http://"+url, "application/json", buff) }