[source code analysis] machine learning parameter server PS Lite (2) -- communication module Van
0x00 summary
This is the second part of the parameter server series, which introduces the communication module Van of PS Lite.
Other articles in this series are:
[source code analysis] machine learning parameter server PS Lite (1) -- postoffice
0x01 function overview
With an address book in the post office, a truck is needed to pull and deliver objects. Van is the communication module of the whole Parameter Server, and its characteristics are as follows.
- When the PostOffice class is instantiated, an instance of the Van class will be created as a member variable. The life cycle of this Van instance is the same as that of its PostOffice instance (each node has only one object);
- Van is responsible for specific inter node communication. Specifically, it is responsible for establishing the interconnection between nodes (such as the connection between Worker and Scheduler), and opening the local receiving thread to listen to the received message s.
VAN currently has two implementations:
- ZMQVan is the implementation of Van based on zeromq, that is, the underlying details of the connection are realized with zmq Library (zmq library is an open source library that encapsulates sockets well, which makes socket programming simpler, simpler and higher performance).
- IBVerbsVan is the implementation of byte runout, which has not been deeply studied.
0x02 definition
2.1 UML diagram
Firstly, the UML diagram is given.
2.2 main description
Next, we only give the description of the key variables and member functions of the Van object.
The main variables are as follows:
-
Node scheduler_ : Scheduler node parameters. Each node will record the information of the scheduler node;
-
Node my_node_ : Parameters of this node. If this node is a scheduler, then my_node_ It will point to the scheduler;
-
bool is_scheduler_ : Whether this node is a scheduler;
-
std::unique_ ptr< std::thread> receiver_ thread_ : Receiving message thread pointer;
-
std::unique_ ptr< std::thread> heartbeat_ thread_ : Send heartbeat thread pointer;
-
std::vector barrier_count_ : The barrier count is used to record the number of registered nodes. Only after all nodes are registered can the system reach the ready state, the scheduler will send the ready message to all nodes, and the system will be officially started.
-
Resender *resender_ = nullptr: resend message pointer;
-
std::atomic timestamp_{0}: message auto increment id, atomic variable;
-
std::unordered_ map<std::string, int> connected_ nodes_ : Record which nodes are currently connected to;
Its main functions are as follows:
-
start: establish communication initialization;
-
Receiving: processing function of receiving message thread;
-
Heartbeat: processing function of sending heartbeat thread;
-
ProcessAddNodeCommandAtScheduler: AddNode message processing function of the scheduler;
-
Processheartbeat: heartbeat packet processing function;
-
ProcessDataMsg: data message (push & pull) processing function;
-
ProcessAddNodeCommand: AddNode message processing function of worker and server;
-
Processbarrier command: Barrier message processing function;
-
2.3 thread management
The three roles defined by PS Lite work with multithreading mechanism. Each thread undertakes specific responsibilities and is created when its Van instance is started.
The specific description is as follows:
- The Van instances of Scheduler, Worker and Server all hold a thread that accepts data.
- The Van instance of Worker and Server also holds a thread that intermittently sends heartbeat to Scheduler.
- If an environment variable PS with a value other than 0 is defined_ Rest, the Scheduler, Worker, and Server will also start a monitoring thread.
2.4 class definition
The detailed code (summary) is as follows:
class Van { public: static Van *Create(const std::string &type); virtual void Start(int customer_id); int Send(const Message &msg); virtual void Stop(); inline int GetTimestamp() { return timestamp_++; } inline bool IsReady() { return ready_; } protected: //Link node virtual void Connect(const Node &node) = 0; //Bind to your own node virtual int Bind(const Node &node, int max_retry) = 0; //Receive messages in blocking mode virtual int RecvMsg(Message *msg) = 0; //send message virtual int SendMsg(const Message &msg) = 0; /** * \brief pack meta into a string */ void PackMeta(const Meta &meta, char **meta_buf, int *buf_size); /** * \brief pack meta into protobuf */ void PackMetaPB(const Meta &meta, PBMeta *pb); /** * \brief unpack meta from a string */ void UnpackMeta(const char *meta_buf, int buf_size, Meta *meta); Node scheduler_; Node my_node_; bool is_scheduler_; std::mutex start_mu_; private: /** thread function for receving */ void Receiving(); /** thread function for heartbeat */ void Heartbeat(); // node's address string (i.e. ip:port) -> node id // this map is updated when ip:port is received for the first time std::unordered_map<std::string, int> connected_nodes_; // maps the id of node which is added later to the id of node // which is with the same ip:port and added first std::unordered_map<int, int> shared_node_mapping_; /** whether it is ready for sending */ std::atomic<bool> ready_{false}; std::atomic<size_t> send_bytes_{0}; size_t recv_bytes_ = 0; int num_servers_ = 0; int num_workers_ = 0; /** the thread for receiving messages */ std::unique_ptr<std::thread> receiver_thread_; /** the thread for sending heartbeat */ std::unique_ptr<std::thread> heartbeat_thread_; std::vector<int> barrier_count_; /** msg resender */ Resender *resender_ = nullptr; int drop_rate_ = 0; std::atomic<int> timestamp_{0}; int init_stage = 0; //Here is how to handle various types of messages void ProcessAddNodeCommandAtScheduler(Message *msg, Meta *nodes, Meta *recovery_nodes); void ProcessTerminateCommand(); void ProcessAddNodeCommand(Message *msg, Meta *nodes, Meta *recovery_nodes); void ProcessBarrierCommand(Message *msg); void ProcessHearbeat(Message *msg); void ProcessDataMsg(Message *msg); //Update local NodeID void UpdateLocalID(Message *msg, std::unordered_set<int> *deadnodes_set, Meta *nodes, Meta *recovery_nodes); const char *heartbeat_timeout_val = Environment::Get()->find("PS_HEARTBEAT_TIMEOUT"); int heartbeat_timeout_ = heartbeat_timeout_val ? atoi(heartbeat_timeout_val) : 0; DISALLOW_COPY_AND_ASSIGN(Van); };
0x03 initialization
The initialization function of the Van object is to make different settings according to different types of local nodes, so as to start the port, establish a connection to the scheduler, start the message receiving thread and heartbeat thread, so that communication can be carried out. The details are as follows:
- First, get the relevant information from the environment variables, such as the "ip, port" of the scheduler (these two are preset), the role of this node (Worker/Server/Scheduler), etc., and then initialize the member variable scheduler_;
- If this node is a scheduler, the scheduler_ Assign to my_node_;
- If this node is not a scheduler:
- Obtain the ip information of the node from the system;
- Use GetAvailablePort to obtain a port;
- Bind a port with bind;
- Call Connect to establish a connection to the scheduler (the scheduler is also connected to its preset fixed port);
- Start the receiving message thread receiver of the local Node_ thread_, Execute Van::Receiving;
- If this node is not a scheduler, send an add to the scheduler_ Node message, which can inform the scheduler of the information of the local node, that is, register with the scheduler;
- Then enter the waiting state and wait for the scheduler to notify ready (the scheduler will wait for all nodes to complete registration and send ready uniformly); note that the scheduler node will also wait here, but it will not affect the recevie thread of the scheduler node to accept and process messages;
- After Ready, start the Heartbeat thread and establish the Heartbeat connection to the Scheduler;
The further explanation of 7 and 8 points is:
- When the worker and server nodes bind ip and port, they send add to the scheduler node_ NODE message.
- When the scheduler receives the add of all worker s and server s_ After node message, add will be answered in turn_ NODE message,
- Each node passes the atomic variable ready in this process_ Wait for the above process to complete.
The specific codes are as follows:
void Van::Start(int customer_id) { // get scheduler info start_mu_.lock(); if (init_stage == 0) { // Initialize scheduler_ This member variable scheduler_.hostname = std::string( CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_URI"))); scheduler_.port = atoi(CHECK_NOTNULL(Environment::Get()->find("DMLC_PS_ROOT_PORT"))); scheduler_.role = Node::SCHEDULER; scheduler_.id = kScheduler; // Confirm that this node is a scheduler node is_scheduler_ = Postoffice::Get()->is_scheduler(); // get my node info if (is_scheduler_) { // This node is initialized. Since it is a scheduler, it is assigned directly my_node_ = scheduler_; } else { auto role = Postoffice::Get()->is_worker() ? Node::WORKER : Node::SERVER; const char* nhost = Environment::Get()->find("DMLC_NODE_HOST"); std::string ip; if (nhost) ip = std::string(nhost); if (ip.empty()) { const char* itf = Environment::Get()->find("DMLC_INTERFACE"); std::string interface; if (itf) interface = std::string(itf); if (interface.size()) { GetIP(interface, &ip); } else { GetAvailableInterfaceAndIP(&interface, &ip); } } int port = GetAvailablePort(); const char* pstr = Environment::Get()->find("PORT"); if (pstr) port = atoi(pstr); my_node_.hostname = ip; my_node_.role = role; my_node_.port = port; // cannot determine my id now, the scheduler will assign it later // set it explicitly to make re-register within a same process possible my_node_.id = Node::kEmpty; my_node_.customer_id = customer_id; } // bind. //Bind the interface and bind the node to the socket of ip:port. Theoretically, this function initializes the receiver_ my_node_.port = Bind(my_node_, is_scheduler_ ? 0 : 40); // connect to the scheduler // Connect the scheduler, Since this node is the scheduler, It's actually initializing senders_, Since there are many nodes to send, here is a map < int, void * > // Here are senders_ [1] = socket_ 1, socket_ Set a little character "ps1 * * *" for the body in 1. Note that the link is not sendMsg Connect(scheduler_); // for debug use if (Environment::Get()->find("PS_DROP_MSG")) { drop_rate_ = atoi(Environment::Get()->find("PS_DROP_MSG")); } // start receiver // Start a thread to receive messages, and here is to process messages receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this)); init_stage++; } start_mu_.unlock(); if (!is_scheduler_) { // let the scheduler know myself // The worker and server nodes will pass add_ The node message tells the scheduler the information of the local node, such as role, ip, port Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); } // wait until ready // Wait for ready_ From false to true, when it is a scheduler, it must wait for the worker and server nodes to come, otherwise it is always blocked here. If it is a worker/server, it is waiting for the scheduler to send the system allready message. while (!ready_.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } start_mu_.lock(); if (init_stage == 1) { // resender if (Environment::Get()->find("PS_RESEND") && atoi(Environment::Get()->find("PS_RESEND")) != 0) { int timeout = 1000; if (Environment::Get()->find("PS_RESEND_TIMEOUT")) { timeout = atoi(Environment::Get()->find("PS_RESEND_TIMEOUT")); } // If timeout retransmission is set, the rester is initialized_ This variable resender_ = new Resender(timeout, 10, this); } if (!is_scheduler_) { // start heartbeat thread // Initialize heartbeat thread heartbeat_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this)); } init_stage++; } start_mu_.unlock(); }
0x04 accept message
We first introduce how the background thread runs, and then analyze how to process various messages.
4.1 background processing message thread
PS Lite starts a background thread receiver_thread_ Accept / process messages.
// start receiver receiver_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Receiving, this));
4.2 processing function
receiver_thread_ Use the Receiving function for message processing.
4.2. 1 control information
In addition to the data message for transmitting parameters, the control information between nodes includes:
- ADD_NODE: the worker and server register nodes with SHC scheduler;
- BARRIER: synchronization blocking message between nodes;
- HEARTBEAT: HEARTBEAT signal between nodes;
- TERMINATE: node exit signal;
- ACK: confirmation message. The ACK type appears only when the Resender class is enabled.
- EMPTY: push or pull;
Therefore, different processing functions will be called in Receiving to process different types of messages:
- ProcessTerminateCommand: process TERMINATE;
- ProcessAddNodeCommand: process ADD_NODE;
- ProcessBarrierCommand: process BARRIER (analyzed above);
- Processheartbeat: process HEARTBEAT;
4.2. 2. In thread global variables
There are two variables in the thread. Because they are outside the while (true) loop, they belong to the global variables in the thread. You should pay attention to this when reading the code.
- Nodes: only the scheduler is processing ADD_NODE will be used to store all nodes currently owned by the scheduler;
- recovery_nodes: only the scheduler is processing ADD_NODE will be used to store all recovery nodes owned by the current scheduler;
4.2. 3 specific implementation
The Receiving logic is as follows:
- Call RecvMsg (the derived class will implement) to obtain the latest message;
- If sampling is set, drop;
- If the retransmission mechanism is set, it will detect whether the message is repeated and use the rester - > Addincomming (MSG) to handle duplicate messages;
- Processing control messages or data messages;
The specific codes are as follows
void Van::Receiving() { Meta nodes; // The following two can be considered global variables Meta recovery_nodes; // store recovery nodes stores the node to be restarted recovery_nodes.control.cmd = Control::ADD_NODE; // Restart the control of the node CMD is set to ADD_NODE while (true) { Message msg; int recv_bytes = RecvMsg(&msg); //Using receiver_ Variable get message // For debug, drop received message if (ready_.load() && drop_rate_ > 0) { unsigned seed = time(NULL) + my_node_.id; if (rand_r(&seed) % 100 < drop_rate_) { LOG(WARNING) << "Drop message " << msg.DebugString(); continue; } } CHECK_NE(recv_bytes, -1); recv_bytes_ += recv_bytes; //The number of bytes received is accumulated if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } // duplicated message if (resender_ && resender_->AddIncomming(msg)) continue; //Retransmission confirmation mechanism if (!msg.meta.control.empty()) { //If it is a control type message // control msg auto& ctrl = msg.meta.control; if (ctrl.cmd == Control::TERMINATE) { ProcessTerminateCommand(); break; } else if (ctrl.cmd == Control::ADD_NODE) { ProcessAddNodeCommand(&msg, &nodes, &recovery_nodes); //When the execution reaches this position, continue to jump } else if (ctrl.cmd == Control::BARRIER) { ProcessBarrierCommand(&msg); } else if (ctrl.cmd == Control::HEARTBEAT) { ProcessHearbeat(&msg); // ACK sent back to Heartbeat } else { LOG(WARNING) << "Drop unknown typed message " << msg.DebugString(); } } else { //Message handling of non control type ProcessDataMsg(&msg); } } }
4.3 handling ADD_NODE message
ADD_NODE is the control message that the worker / server uses to register itself with the scheduler.
4.3. 1 registration logic
First recall the basic idea of registration.
- When the worker and server nodes bind ip and port, they send add to the scheduler node_ NODE message.
- When the scheduler receives the add of all worker s and server s_ After node message, add will be answered in turn_ Node message. Note that the same type of add is answered_ Node message.
- Each node (scheduler, worker, server) waits for the completion of the above process through the atomic variable ready_ during this process.
4.3.2 ProcessAddNodeCommand
The ProcessAddNodeCommand logic is as follows.
- Find out the id of the heartbeat packet timeout and transfer it to dead_set.
- Get the control information in the received message.
- Call UpdateLocalID, where:
- If it is a new node, the Scheduler records the new node.
- If this node is generated by restart, the information of the old node will be updated.
- If it is a scheduler:
- Call ProcessAddNodeCommandAtScheduler and receive add of all workers and servers_ After the node message, assign the node id and respond, that is, set the latest rank of all nodes and send it to all workers and servers.
- If it is not a scheduler, it means that the work & server has received the add replied by the scheduler_ Node message, then:
- If it is an existing node, it is connected_nodes_ If the new node cannot be found in the, the first node will call Connect to establish a connection with the new node.
- If it is a new node, all existing nodes (not of the same type) are connected.
- In connected_nodes_ Update the global Node information, including global rank (the global rank and other information of the local Node are obtained by the receiver_thread_);
- Finally, set ready_= True, that is, the node can also run because the main thread will block it.
The specific codes are as follows:
void Van::ProcessAddNodeCommand(Message* msg, Meta* nodes, Meta* recovery_nodes) { auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//Find out the id of the heartbeat packet timeout std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//Transfer to dead_ In set auto& ctrl = msg->meta.control; //Get the control information in the received message UpdateLocalID(msg, &dead_set, nodes, recovery_nodes); if (is_scheduler_) { // Scheduler node ProcessAddNodeCommandAtScheduler(msg, nodes, recovery_nodes); } else { // Worker & server node for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // The existing node will find the new node in its own connection and find that there is no new node in the existing connection // If it is a new node, the existing node (not of the same type) will be connected Connect(node); // Connect to the new node connected_nodes_[addr_str] = node.id; // Join connected nodes } if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_; } ready_ = true; } }
4.3.3 UpdateLocalID
This function is used to update the node id information inside the node. It can also be divided into two cases. The logic of the function is as follows:
- If MSG - > meta The sender is Meta::kEmpty, that is, if it is not set, the Scheduler must process this message and enter the if branch.
- If the current nodes control If the number of nodes is less than "number of configured server s + number of configured worker s", it means that the node information of the current message is added to the control during the system startup phase Node.
- Otherwise, it means that the system is running. Some nodes should be connected again after they die and restart. Then, from the control of nodes Find a dead node id whose node role is consistent with the current message (of the same type) in the node, assign this node id to the restarted node, and update nodes - > control.node and recovery_nodes.
- The following is the logic of normal node processing:
- That is, search in all node information sent back by the scheduler in order to find nodes consistent with their own ip and port.
- If it is found, update the local node information (because the node_id is not set when the node is started, which needs to be set uniformly by the scheduler. From the comments, the purpose is to make re registration possible), including the global rank information.
The specific codes are as follows:
void Van::UpdateLocalID(Message* msg, std::unordered_set<int>* deadnodes_set, Meta* nodes, Meta* recovery_nodes) { auto& ctrl = msg->meta.control; size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // assign an id if (msg->meta.sender == Meta::kEmpty) { //If the sender is not set, the Scheduler must process this message CHECK(is_scheduler_); CHECK_EQ(ctrl.node.size(), 1); //The node set in the control command in msg is the worker itself, so it is a node if (nodes->control.node.size() < num_nodes) { //Not all nodes->control.node.push_back(ctrl.node[0]); } else { //If all work and server s arrive, enter else // some node dies and restarts CHECK(ready_.load()); for (size_t i = 0; i < nodes->control.node.size() - 1; ++i) { const auto& node = nodes->control.node[i]; if (deadnodes_set->find(node.id) != deadnodes_set->end() && node.role == ctrl.node[0].role) { auto& recovery_node = ctrl.node[0]; // assign previous node id recovery_node.id = node.id; recovery_node.is_recovery = true; nodes->control.node[i] = recovery_node; recovery_nodes->control.node.push_back(recovery_node); break; } } } } // update my id / for ordinary nodes, update their rank, and the scheduler node will not work (because it cannot be found). // In the message sent by schedule to this work node, if it is found that the local ip and port coincide with a certain point in the message, change the ID of the local node (there is no ID during initialization, but it is equal to Empty) to the node id sent by schedule. for (size_t i = 0; i < ctrl.node.size(); ++i) { const auto& node = ctrl.node[i]; if (my_node_.hostname == node.hostname && my_node_.port == node.port) { if (getenv("DMLC_RANK") == nullptr || my_node_.id == Meta::kEmpty) { my_node_ = node; std::string rank = std::to_string(Postoffice::IDtoRank(node.id)); #ifdef _MSC_VER _putenv_s("DMLC_RANK", rank.c_str()); #else setenv("DMLC_RANK", rank.c_str(), true); #endif } } } }
4.3.4 ProcessAddNodeCommandAtScheduler
ProcessAddNodeCommandAtScheduler runs within the Scheduler and processes control type messages.
For the scheduler node, the scheduler receives the add of all workers and servers_ After the node message, assign the node id and respond, that is, set the latest global rank of all nodes and send it to all workers and servers.
- After receiving the registration messages of all workers & servers (nodes - > control. Node. Size() = = Num)_ nodes):
- Sort the nodes by ip + port combination.
- The scheduler establishes a connection with all registered nodes, updates the heartbeat timestamp, and assigns a global rank to all connected nodes of the scheduler.
- Send add to all worker s and server s_ Node message (carrying all node information in the scheduler).
- Will put ready_= true; That is, the scheduler is in a ready state, regardless of whether the worker and server confirm receiving the ADD_NODE message.
- On the receiving end (worker & Server), the global rank and other information of each local Node is obtained by the receiver_thread_ (other functions) on the receiving end, that is, the nodes information returned by the scheduler is obtained.
- If! recovery_ nodes->control. node. Empty(), which indicates the registration behavior of some restart nodes:
- Find out the id of the heartbeat packet timeout and transfer it to dead_set.
- Establish a connection with the restart node (because an ADD_NODE has been received), so only establish a connection with the new restart node (check_eq (recovery_nodes - > control. Node. Size(), 1) in the code to confirm that the restart node is 1).
- Update the heartbeat of the restart node.
- Because a new restart node is added, one transmission achieves two purposes:
- Send add to all recovery worker s and server s_ Node message (carrying all current node information in the scheduler).
- Send the recovery node information to the live node.
- In this way, the node receiving the message will establish a connection with the new node respectively;
The specific codes are as follows:
void Van::ProcessAddNodeCommandAtScheduler(Message* msg, Meta* nodes, Meta* recovery_nodes) { recovery_nodes->control.cmd = Control::ADD_NODE; time_t t = time(NULL); size_t num_nodes = Postoffice::Get()->num_servers() + Postoffice::Get()->num_workers(); // The scheduler receives the add of all worker s and server s_ The node id is allocated and answered after the node message if (nodes->control.node.size() == num_nodes) { // Node collection complete // Sort the nodes according to their IP and port to order the worker s and server s std::sort(nodes->control.node.begin(), nodes->control.node.end(), [](const Node& a, const Node& b) { return (a.hostname.compare(b.hostname) | (a.port < b.port)) > 0; }); // assign node rank for (auto& node : nodes->control.node) { // Establish a connection, update the heartbeat timestamp, and assign a global rank to all connected nodes of the scheduler. std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { //If ip:port does not exist, van_ Chinese words CHECK_EQ(node.id, Node::kEmpty); //Determine whether to initialize the node int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); //If it is sever, an id number, num, is generated_ servers_ Initialize to 0 node.id = id; //Assign the id of this new node to id Connect(node); //Connect the new node, that is, establish a socket, and then send_ [id] = sender; Is to store the socket of the target ID for later use Postoffice::Get()->UpdateHeartbeat(node.id, t);//Update heartbeat package connected_nodes_[node_host_ip] = id; //Now that the worker and server have sent message s, the scheduler should take this node as the linked node } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++;//Update rank if (node.role == Node::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); //Put this node inside nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; // Send add to all worker s and server s_ Node message for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); } } ready_ = true; //The scheduler is ready } else if (!recovery_nodes->control.node.empty()) { // The node is not fully collected auto dead_nodes = Postoffice::Get()->GetDeadNodes(heartbeat_timeout_);//Find out the id of the heartbeat packet timeout std::unordered_set<int> dead_set(dead_nodes.begin(), dead_nodes.end());//Transfer to dead_set // send back the recovery node CHECK_EQ(recovery_nodes->control.node.size(), 1); Connect(recovery_nodes->control.node[0]); Postoffice::Get()->UpdateHeartbeat(recovery_nodes->control.node[0].id, t); Message back; for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { if (r != recovery_nodes->control.node[0].id && dead_set.find(r) != dead_set.end()) { // do not try to send anything to dead node continue; } // only send recovery_node to nodes already exist // but send all nodes to the recovery_node back.meta = (r == recovery_nodes->control.node[0].id) ? *nodes : *recovery_nodes; back.meta.recver = r; back.meta.timestamp = timestamp_++; Send(back); } } }
The process logic of this part is as follows:
+ Scheduler | Worker | + | + | | | | | | v | | Postoffice::Start +----> Van::Start | | + | | | | | | | | v | | Connect--do nothing | | + | v | | | | Postoffice::Start +-----> Van::Start | | + v | | receiver_thread_ +---+ | | + | | v | | | Connect--to scheduler | | | + | | | | | | | | | | | | | | | v | | | receiver_thread_ +----->+ | | | + | | | | | | | | | | | | | | v | | | <---------------------------------------+ Send | | | | ADD_NODE + | | v | | | | | | | | ProcessAddNodeCommand | | | | + | | | | | | | | | | All nodes OK | | | | | | | | v | | | | | set rank | | | wait until ready | | | | + | | | | | +----------------------------------------------------------------> | | | | ADD_NODE response(nodes info) | | | | | | ProcessAddNodeCommand | | | v | | | | | | <--------------+ | wait until ready | | ready_ = true | + | | | | <---------------+ +-------------------+ v | | | | +--------------------+ v | | | v | | | v Postoffice::Barrier | | Postoffice::Barrier +
The mobile phone is as follows: Scheduler on the left and worker on the right:
4.3. 5 a sequence of newly added nodes
The interconnection process can be divided into three steps:
Step 1: when initializing the worker/server node, send a connection information to the schedule node, assuming that it is node 2;
if (!is_scheduler_) { // let the scheduler know myself Message msg; Node customer_specific_node = my_node_; customer_specific_node.customer_id = customer_id; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::ADD_NODE; msg.meta.control.node.push_back(customer_specific_node); msg.meta.timestamp = timestamp_++; Send(msg); //Send it to the scheduler to establish link information. }
Step 2: after the Scheduler node receives the information, it will first establish a connection with node 2 in ProcessAddNodeCommandAtScheduler. This "node joining information" will be broadcast to all worker nodes / server nodes that have established a connection with the schedule, and the connection information requested by node 2 will be put into the meta information.
// assign node rank for (auto& node : nodes->control.node) { std::string node_host_ip = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(node_host_ip) == connected_nodes_.end()) { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); node.id = id; Connect(node); // Connect the new node, that is, establish a socket, and then send_ [id] = sender; Is to store the socket of the target ID for later use Postoffice::Get()->UpdateHeartbeat(node.id, t); connected_nodes_[node_host_ip] = id; } else { int id = node.role == Node::SERVER ? Postoffice::ServerRankToID(num_servers_) : Postoffice::WorkerRankToID(num_workers_); shared_node_mapping_[id] = connected_nodes_[node_host_ip]; node.id = connected_nodes_[node_host_ip]; } if (node.role == Node::SERVER) num_servers_++; if (node.role == Node::WORKER) num_workers_++; } nodes->control.node.push_back(my_node_); nodes->control.cmd = Control::ADD_NODE; Message back; back.meta = *nodes; // Broadcast the "node joining information" to all worker nodes / server nodes that have established a connection with the schedule, and put the connection information requested by node 2 into the meta information. for (int r : Postoffice::Get()->GetNodeIDs(kWorkerGroup + kServerGroup)) { int recver_id = r; if (shared_node_mapping_.find(r) == shared_node_mapping_.end()) { back.meta.recver = recver_id; back.meta.timestamp = timestamp_++; Send(back); } }
Step 3: after receiving this command, the existing worker/server node will form a connection with node 2 in ProcessAddNodeCommand.
for (const auto& node : ctrl.node) { std::string addr_str = node.hostname + ":" + std::to_string(node.port); if (connected_nodes_.find(addr_str) == connected_nodes_.end()) { // This new node does not exist in the existing connection Connect(node); // Connect to the new node connected_nodes_[addr_str] = node.id; } if (!node.is_recovery && node.role == Node::SERVER) ++num_servers_; if (!node.is_recovery && node.role == Node::WORKER) ++num_workers_;
So far, the whole process is described. After each new node is added, the added node will establish a connection with the new node through the schedule node.
4.4 processing HEARTBEAT messages
We next analyze the heartbeat mechanism.
4.4. 1 heartbeat mechanism
In order to record the accessibility of the network, PS Lite designs a heartbeat mechanism. Specifically:
- In the PostOffice singleton of each node, a MAP structure is maintained to store the active information of the node associated with the HEARTBEAT. The key is the node number and the value is the timestamp of the last time it received its HEARTBEAT message.
- The Worker/Server only records the heartbeat of the Scheduler, and the Scheduler records the heartbeat of all nodes. Based on timestamp and heartbeat timeout, all dead nodes can be output.
- Each Worker/Server node will create a new heartbeat thread every PS_ HEARTBEAT_ Send a heartbeat message to the Scheduler in interval seconds;
- The Scheduler node responds with a HEARTBEAT message after receiving the message.
- The scheduler responds and determines whether it is alive by the difference between the current time and the heartbeat packet receiving time.
- The Scheduler will judge the dead node according to the timestamp of the heartbeat node. If the new node id is in dead_ In the node container, it indicates that the node is restored; The new nodes are interconnected with the existing nodes through the transfer of the schedule.
The details are as follows:
4.4. 2 data structure
std::unordered_ map<int, time_ t> heartbeats_ It stores the active information of the node associated with the HEARTBEAT. The key is the node number and the value is the timestamp of the last time it received its HEARTBEAT message.
UpdateHeartbeat updates the heartbeat periodically.
void UpdateHeartbeat(int node_id, time_t t) { std::lock_guard<std::mutex> lk(heartbeat_mu_); heartbeats_[node_id] = t; } std::unordered_map<int, time_t> heartbeats_;
4.4.3 Worker / Server sends heartbeat
In these two nodes, a thread is started for each Worker/Server node every PS_ HEARTBEAT_ Send a heartbeat message to the Scheduler in interval seconds:
if (!is_scheduler_) { // start heartbeat thread heartbeat_thread_ = std::unique_ptr<std::thread>(new std::thread(&Van::Heartbeat, this)); }
The specific heartbeat functions are:
void Van::Heartbeat() { const char* val = Environment::Get()->find("PS_HEARTBEAT_INTERVAL"); const int interval = val ? atoi(val) : kDefaultHeartbeatInterval; while (interval > 0 && ready_.load()) { std::this_thread::sleep_for(std::chrono::seconds(interval)); Message msg; msg.meta.recver = kScheduler; msg.meta.control.cmd = Control::HEARTBEAT; msg.meta.control.node.push_back(my_node_); msg.meta.timestamp = timestamp_++; Send(msg); } }
4.4.4 Scheduler node handles heartbeat
After receiving the HEARTBEAT message, the Scheduler node responds to a HEARTBEAT message. UpdateHeartbeat updates the HEARTBEAT periodically.
void Van::ProcessHearbeat(Message* msg) { auto& ctrl = msg->meta.control; time_t t = time(NULL); for (auto& node : ctrl.node) { Postoffice::Get()->UpdateHeartbeat(node.id, t); if (is_scheduler_) { Message heartbeat_ack; heartbeat_ack.meta.recver = node.id; heartbeat_ack.meta.control.cmd = Control::HEARTBEAT; heartbeat_ack.meta.control.node.push_back(my_node_); heartbeat_ack.meta.timestamp = timestamp_++; // send back heartbeat Send(heartbeat_ack); } } }
4.4. 5 dead node
The Scheduler is processing add_ When sending a node message, it will check whether there is a dead node. Specifically, it will judge whether it is alive by the difference between the current timestamp and the heartbeat packet reception timestamp.
std::vector<int> Postoffice::GetDeadNodes(int t) { std::vector<int> dead_nodes; if (!van_->IsReady() || t == 0) return dead_nodes; time_t curr_time = time(NULL); const auto& nodes = is_scheduler_ ? GetNodeIDs(kWorkerGroup + kServerGroup) : GetNodeIDs(kScheduler); { std::lock_guard<std::mutex> lk(heartbeat_mu_); for (int r : nodes) { auto it = heartbeats_.find(r); if ((it == heartbeats_.end() || it->second + t < curr_time) && start_time_ + t < curr_time) { dead_nodes.push_back(r); } } } return dead_nodes; }
The logic is as follows:
+----------------------------------------------------+ | Scheduler | | | | | | | | heartbeats_ | | | | receiver_thread_+--------> ProcessHearbeat | | ^ + ^ + | | | | | | | | | | | | | | | | | | | +----------------------------------------------------+ | | | | | | | | RESPONSE | | | +-------------------------------------+ | | | | | | +-------------------------------+ | | | | | HEARTBEAT | | RESPONSE HEARTBEAT | | | | | | +-----------------------------------------+ +-----------------------------------------+ | Worker | | | | Server | | | | | | | | | | | | | | | | | | | | | | | | | | | | heartbeats_ | | | | heartbeats_ | | | | + | | | + | | | heartbeat_thread_+----> Heartbeat | | | heartbeat_thread_+--> Heartbeat | | | | | | | | | v | | v | | receiver_thread_ +---> ProcessHearbeat | | receiver_thread_ +--> ProcessHearbeat | | | | | | | | | | | | | +-----------------------------------------+ +-----------------------------------------+
4.5 processing TERMINATE messages
ProcessTerminateCommand will process the end message by setting ready_ Is false.
This indicates that Van is in a wrong state and cannot continue processing.
void Van::ProcessTerminateCommand() { PS_VLOG(1) << my_node().ShortDebugString() << " is stopped"; ready_ = false; } inline bool IsReady() { return ready_; }
4.6 processing ACK messages
4.6.1 Ack mechanism
In distributed systems, communication is also unreliable. Packet loss and delay must be considered. PS Lite designs the Resender class to improve the reliability of communication. It introduces the ACK mechanism. Namely:
- Each node must respond to an ACK message for non ACK/TERMINATE messages received.
- For each non ACK/TERMINATE message sent by each node, it must be cached locally. The stored data structure is a MAP that produces unique keys based on the content of the message.
- For the received ACK message, each node must remove the corresponding message from the local cache according to the feedback key.
- Each node runs a monitoring thread every PS_RESEND_TIMEOUT milliseconds to check the local cache. According to the sending timestamp and current time of each message, find out the timeout message for retransmission, and accumulate its retry times.
4.6. 2. Resender class
The definition is as follows, where send_buff_ The send cache is used to store the list of sent messages. acked_ Is the confirmed message.
class Resender { std::thread* monitor_; std::unordered_set<uint64_t> acked_; std::atomic<bool> exit_{false}; std::mutex mu_; int timeout_; int max_num_retry_; Van* van_; using Time = std::chrono::milliseconds; // the buffer entry struct Entry { Message msg; Time send; int num_retry = 0; }; std::unordered_map<uint64_t, Entry> send_buff_; };
4.6. 3 monitoring thread
The monitoring thread and functions are as follows, that is, when it is awakened, from send_buff_ (local cache) find the sending timestamp and current time of each message, find the timeout message for retransmission, and accumulate its retry times.:
monitor_ = new std::thread(&Resender::Monitoring, this); void Monitoring() { while (!exit_) { std::this_thread::sleep_for(Time(timeout_)); std::vector<Message> resend; Time now = Now(); mu_.lock(); for (auto& it : send_buff_) { if (it.second.send + Time(timeout_) * (1+it.second.num_retry) < now) { resend.push_back(it.second.msg); ++it.second.num_retry; CHECK_LT(it.second.num_retry, max_num_retry_); } } mu_.unlock(); for (const auto& msg : resend) van_->Send(msg); } }
4.6. 4 cache when sending
When Van sends a message, if retransmission is configured, it calls the AddOutgoing function to add the message to the sending cache.
int Van::Send(const Message& msg) { int send_bytes = SendMsg(msg); CHECK_NE(send_bytes, -1); send_bytes_ += send_bytes; if (resender_) resender_->AddOutgoing(msg); if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } return send_bytes; }
The following function is added to the send cache.
/** * \brief add an outgoining message * */ void AddOutgoing(const Message& msg) { if (msg.meta.control.cmd == Control::ACK) return; CHECK_NE(msg.meta.timestamp, Meta::kEmpty) << msg.DebugString(); auto key = GetKey(msg); std::lock_guard<std::mutex> lk(mu_); // already buffered, which often due to call Send by the monitor thread if (send_buff_.find(key) != send_buff_.end()) return; auto& ent = send_buff_[key]; ent.msg = msg; ent.send = Now(); ent.num_retry = 0; }
4.6. 5 clear cache
The following function has two functions:
- Check whether it is a duplicate message, then the received confirmation message;
- If it is a confirmation message, it is cleared from the send cache.
/** * \brief add an incomming message * \brief return true if msg has been added before or a ACK message */ bool AddIncomming(const Message& msg) { // a message can be received by multiple times if (msg.meta.control.cmd == Control::TERMINATE) { return false; } else if (msg.meta.control.cmd == Control::ACK) { mu_.lock(); auto key = msg.meta.control.msg_sig; auto it = send_buff_.find(key); if (it != send_buff_.end()) send_buff_.erase(it); mu_.unlock(); return true; } else { mu_.lock(); auto key = GetKey(msg); auto it = acked_.find(key); bool duplicated = it != acked_.end(); if (!duplicated) acked_.insert(key); mu_.unlock(); // send back ack message (even if it is duplicated) Message ack; ack.meta.recver = msg.meta.sender; ack.meta.sender = msg.meta.recver; ack.meta.control.cmd = Control::ACK; ack.meta.control.msg_sig = key; van_->Send(ack); // warning if (duplicated) LOG(WARNING) << "Duplicated message: " << msg.DebugString(); return duplicated; } }
4.7 processing data messages
ProcessDataMsg is used to deal with the data message sent by worker (that is, worker updates to server gradient). After obtaining the corresponding Customer, the method of calling Customer is processed, and msg is placed directly in the processing queue.
We will introduce it in Customer.
void Van::ProcessDataMsg(Message* msg) { // data msg int app_id = msg->meta.app_id; int customer_id = Postoffice::Get()->is_worker() ? msg->meta.customer_id : app_id; auto* obj = Postoffice::Get()->GetCustomer(app_id, customer_id, 5); obj->Accept(*msg); // Here you add a message to the Customer }
0x05 ZMQVan
ZMQVan is the implementation of Van based on zeromq, that is, it realizes the bottom details of connection with zmq Library (zmq library is an open source library, which encapsulates socket well, which makes socket programming simpler, simpler and higher performance).
5.1 definitions
ZMQVan is defined as follows:
ZMQVan inherits from Van and adds two member variables to this class, namely:
- unordered_ map<int, void*> senders_ : senders_ Is a set, which is the set of sockets sent by the node, that is, the mapping between node id and socket. For example, if node 8 wants to send messages to node 9, just find the combination (9, socket_9), then call socket_9.send(message).
- void *receiver_ = nullptr: it is the socket connection obtained by the Bind function. Because it is the receiving end, there is only one socket.
The details are as follows:
class ZMQVan : public Van { void *context_ = nullptr; /** * \brief node_id to the socket for sending data to this node */ std::unordered_map<int, void*> senders_; std::mutex mu_; void *receiver_ = nullptr; };
5.2 Van function
The following functions of the Van class will be called to or by ZMQVan.
5.2. 1 send message
The Send function is to call the SendMsg function of ZMQVan to Send a message. After sending, if the ACK mechanism is set, it will call responder_ - > AddOutgoing.
int Van::Send(const Message& msg) { int send_bytes = SendMsg(msg); CHECK_NE(send_bytes, -1); send_bytes_ += send_bytes; if (resender_) resender_->AddOutgoing(msg); if (Postoffice::Get()->verbose() >= 2) { PS_VLOG(2) << msg.DebugString(); } return send_bytes; }
5.2.2 Meta class
Meta encapsulates metadata, sender, receiver, timestamp, request or response, etc.
/** * \brief meta info of a message */ struct Meta { /** \brief the empty value */ static const int kEmpty; /** \brief an int head */ int head; /** \brief the unique id of the application of messsage is for*/ int app_id; /** \brief customer id*/ int customer_id; /** \brief the timestamp of this message */ int timestamp; /** \brief the node id of the sender of this message */ int sender; /** \brief the node id of the receiver of this message */ int recver; /** \brief whether or not this is a request message*/ bool request; /** \brief whether or not a push message */ bool push; /** \brief whether or not a pull message */ bool pull; /** \brief whether or not it's for SimpleApp */ bool simple_app; /** \brief an string body */ std::string body; /** \brief data type of message.data[i] */ std::vector<DataType> data_type; /** \brief system control message */ Control control; /** \brief the byte size */ int data_size = 0; /** \brief message priority */ int priority = 0; };
In order to alleviate the communication pressure, PS Lite uses Protobuf to compress Meta data.
5.2. 3 compression Meta
Is to compress data according to protobuf.
void Van::PackMeta(const Meta& meta, char** meta_buf, int* buf_size) { // convert into protobuf PBMeta pb; pb.set_head(meta.head); if (meta.app_id != Meta::kEmpty) pb.set_app_id(meta.app_id); if (meta.timestamp != Meta::kEmpty) pb.set_timestamp(meta.timestamp); if (meta.body.size()) pb.set_body(meta.body); pb.set_push(meta.push); pb.set_pull(meta.pull); pb.set_request(meta.request); pb.set_simple_app(meta.simple_app); pb.set_priority(meta.priority); pb.set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb.add_data_type(d); if (!meta.control.empty()) { auto ctrl = pb.mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } // to string *buf_size = pb.ByteSize(); *meta_buf = new char[*buf_size + 1]; CHECK(pb.SerializeToArray(*meta_buf, *buf_size)) << "failed to serialize protobuf"; }
5.2. 3 unzip UnpackMeta
Decompress according to the PBMeta format pre generated by protobuf.
void Van::UnpackMeta(const char* meta_buf, int buf_size, Meta* meta) { // to protobuf PBMeta pb; CHECK(pb.ParseFromArray(meta_buf, buf_size)) << "failed to parse string into protobuf"; // to meta meta->head = pb.head(); meta->app_id = pb.has_app_id() ? pb.app_id() : Meta::kEmpty; meta->timestamp = pb.has_timestamp() ? pb.timestamp() : Meta::kEmpty; meta->request = pb.request(); meta->push = pb.push(); meta->pull = pb.pull(); meta->simple_app = pb.simple_app(); meta->priority = pb.priority(); meta->body = pb.body(); meta->customer_id = pb.customer_id(); meta->data_type.resize(pb.data_type_size()); for (int i = 0; i < pb.data_type_size(); ++i) { meta->data_type[i] = static_cast<DataType>(pb.data_type(i)); } if (pb.has_control()) { const auto& ctrl = pb.control(); meta->control.cmd = static_cast<Control::Command>(ctrl.cmd()); meta->control.barrier_group = ctrl.barrier_group(); meta->control.msg_sig = ctrl.msg_sig(); for (int i = 0; i < ctrl.node_size(); ++i) { const auto& p = ctrl.node(i); Node n; n.role = static_cast<Node::Role>(p.role()); n.port = p.port(); n.hostname = p.hostname(); n.id = p.has_id() ? p.id() : Node::kEmpty; n.is_recovery = p.is_recovery(); n.customer_id = p.customer_id(); meta->control.node.push_back(n); } } else { meta->control.cmd = Control::EMPTY; } }
5.2.4 PackMetaPB
From the annotation, PackMetaPB is submitted by byte skipping and is mainly used for ibverbs_van.h. So we don't do in-depth research.
void Van::PackMetaPB(const Meta& meta, PBMeta* pb) { pb->set_head(meta.head); if (meta.app_id != Meta::kEmpty) pb->set_app_id(meta.app_id); if (meta.timestamp != Meta::kEmpty) pb->set_timestamp(meta.timestamp); if (meta.body.size()) pb->set_body(meta.body); pb->set_push(meta.push); pb->set_request(meta.request); pb->set_simple_app(meta.simple_app); pb->set_priority(meta.priority); pb->set_customer_id(meta.customer_id); for (auto d : meta.data_type) pb->add_data_type(d); if (!meta.control.empty()) { auto ctrl = pb->mutable_control(); ctrl->set_cmd(meta.control.cmd); if (meta.control.cmd == Control::BARRIER) { ctrl->set_barrier_group(meta.control.barrier_group); } else if (meta.control.cmd == Control::ACK) { ctrl->set_msg_sig(meta.control.msg_sig); } for (const auto& n : meta.control.node) { auto p = ctrl->add_node(); p->set_id(n.id); p->set_role(n.role); p->set_port(n.port); p->set_hostname(n.hostname); p->set_is_recovery(n.is_recovery); p->set_customer_id(n.customer_id); } } pb->set_data_size(meta.data_size); }
5.3 ZMQVan derived functions
ZMQVan has the following important derived functions.
5.3.1 Bind
Bind logic is as follows:
- Using zmq_bind() to bind a socket to a local network node (endpoint), and then start receiving messages sent to this node.
- The node address information is a string that includes a protocol 😕/ Then followed by an address.
- The Bind function determines whether to enable ipc mode or tcp mode according to the configured variable "DMLC_LOCAL", so as to configure the node address information.
- If it is called by the schedule node, you do not need to specify a port, but you need to find a locally available port for work and server.
- When finding ports, the maximum number of retries will be set.
int Bind(const Node& node, int max_retry) override { receiver_ = zmq_socket(context_, ZMQ_ROUTER); int local = GetEnv("DMLC_LOCAL", 0); std::string hostname = node.hostname.empty() ? "*" : node.hostname; int use_kubernetes = GetEnv("DMLC_USE_KUBERNETES", 0); if (use_kubernetes > 0 && node.role == Node::SCHEDULER) { hostname = "0.0.0.0"; } std::string addr = local ? "ipc:///tmp/" : "tcp://" + hostname + ":"; int port = node.port; unsigned seed = static_cast<unsigned>(time(NULL) + port); for (int i = 0; i < max_retry + 1; ++i) { auto address = addr + std::to_string(port); if (zmq_bind(receiver_, address.c_str()) == 0) break; if (i == max_retry) { port = -1; } else { port = 10000 + rand_r(&seed) % 40000; } } return port; }
5.3.2 Connect
It mainly initializes the Sender, The logic is as follows:
- If the corresponding socket is found, close the socket.
- If it is found that the worker sent it to the same class, or the server sent it to the same class, and it is not sent to itself (the Scheduler can send it to itself), return.
- Create a ZMQ socket and assign the newly created socket to sender in the form of an opaque pointer.
- If it is a scheduler, configure the socket and bind its id to the socket.
- Connect the sender socket to the destination address.
- Store the socket of the target id, that is, add the socket to the Sender.
The details are as follows:
void Connect(const Node& node) override { int id = node.id; auto it = senders_.find(id); if (it != senders_.end()) { zmq_close(it->second); // If the corresponding socket is found, close the socket } // worker doesn't need to connect to the other workers. same for server if ((node.role == my_node_.role) && (node.id != my_node_.id)) { return; } void *sender = zmq_socket(context_, ZMQ_DEALER); //Create a socket //If you are a scheduler, you know your id = 1 at the beginning, so this if condition means binding your ID to the socket if (my_node_.id != Node::kEmpty) { std::string my_id = "ps" + std::to_string(my_node_.id); zmq_setsockopt(sender, ZMQ_IDENTITY, my_id.data(), my_id.size()); const char* watermark = Environment::Get()->find("DMLC_PS_WATER_MARK"); if (watermark) { const int hwm = atoi(watermark); zmq_setsockopt(sender, ZMQ_SNDHWM, &hwm, sizeof(hwm)); } } // connect std::string addr = "tcp://" + node.hostname + ":" + std::to_string(node.port); if (GetEnv("DMLC_LOCAL", 0)) { addr = "ipc:///tmp/" + std::to_string(node.port); } if (zmq_connect(sender, addr.c_str()) != 0) { //Connect the sender socket to the destination address LOG(FATAL) << "connect to " + addr + " failed: " + zmq_strerror(errno); } senders_[id] = sender; //Store the socket of the target id for later use }
5.3.3 SendMsg
The logic is as follows:
- From saved sender_ Find the previously reserved socket in the;
- Compressed meta;
- Send meta;
- Send data in cyclic segments;
int SendMsg(const Message& msg) override { std::lock_guard<std::mutex> lk(mu_); // find the socket int id = msg.meta.recver; CHECK_NE(id, Meta::kEmpty); auto it = senders_.find(id); if (it == senders_.end()) { LOG(WARNING) << "there is no socket to node " << id; return -1; } void *socket = it->second; // send meta int meta_size; char* meta_buf; PackMeta(msg.meta, &meta_buf, &meta_size); int tag = ZMQ_SNDMORE; int n = msg.data.size(); if (n == 0) tag = 0; zmq_msg_t meta_msg; zmq_msg_init_data(&meta_msg, meta_buf, meta_size, FreeData, NULL); while (true) { if (zmq_msg_send(&meta_msg, socket, tag) == meta_size) break; if (errno == EINTR) continue; return -1; } // zmq_msg_close(&meta_msg); int send_bytes = meta_size; // send data for (int i = 0; i < n; ++i) { zmq_msg_t data_msg; SArray<char>* data = new SArray<char>(msg.data[i]); int data_size = data->size(); zmq_msg_init_data(&data_msg, data->data(), data->size(), FreeData, data); if (i == n - 1) tag = 0; while (true) { if (zmq_msg_send(&data_msg, socket, tag) == data_size) break; if (errno == EINTR) continue; return -1; } // zmq_msg_close(&data_msg); send_bytes += data_size; } return send_bytes; }
5.3.4 RecvMsg
RecvMsg is to accept messages on the bound port.
When receiving a message, it will judge which message it is, and then do different processing.
int RecvMsg(Message* msg) override { msg->data.clear(); size_t recv_bytes = 0; for (int i = 0; ; ++i) { zmq_msg_t* zmsg = new zmq_msg_t; CHECK(zmq_msg_init(zmsg) == 0) << zmq_strerror(errno); while (true) { if (zmq_msg_recv(zmsg, receiver_, 0) != -1) break; if (errno == EINTR) { std::cout << "interrupted"; continue; } return -1; } char* buf = CHECK_NOTNULL((char *)zmq_msg_data(zmsg)); size_t size = zmq_msg_size(zmsg); recv_bytes += size; if (i == 0) { // identify msg->meta.sender = GetNodeID(buf, size); msg->meta.recver = my_node_.id; CHECK(zmq_msg_more(zmsg)); zmq_msg_close(zmsg); delete zmsg; } else if (i == 1) { // task UnpackMeta(buf, size, &(msg->meta)); zmq_msg_close(zmsg); bool more = zmq_msg_more(zmsg); delete zmsg; if (!more) break; } else { // zero-copy SArray<char> data; data.reset(buf, size, [zmsg, size](char* buf) { zmq_msg_close(zmsg); delete zmsg; }); msg->data.push_back(data); if (!zmq_msg_more(zmsg)) { break; } } } return recv_bytes; }
The GetNodeID function is
/** * return the node id given the received identity * \return -1 if not find */ int GetNodeID(const char* buf, size_t size) { if (size > 2 && buf[0] == 'p' && buf[1] == 's') { int id = 0; size_t i = 2; for (; i < size; ++i) { if (buf[i] >= '0' && buf[i] <= '9') { id = id * 10 + buf[i] - '0'; } else { break; } } if (i == size) return id; } return Meta::kEmpty; }
0x06 summary
Finally, let's make a summary:
With an address book in the post office, a truck is needed to pull and deliver objects. Van is the communication module of the whole Parameter Server, and its characteristics are as follows.
- When the PostOffice class is instantiated, an instance of the Van class will be created as a member variable. The life cycle of this Van instance is the same as that of its PostOffice instance (each node has only one object);
- Van is responsible for specific inter node communication. Specifically, it is responsible for establishing the interconnection between nodes (such as the connection between Worker and Scheduler), and opening the local receiving thread to listen to the received message s.
- The initialization function of the Van object is to make different settings according to different types of local nodes, so as to start the port, establish the connection between the local node and the scheduler, and start the message receiving thread and heartbeat thread, so that communication can be carried out.
- The Parameter Server receives the server in the background thread_ thread_ Accept / process messages. In addition to the data message for transmitting parameters, the control information between nodes includes:
- ADD_NODE: the worker and server register nodes with SHC scheduler;
- BARRIER: synchronization blocking message between nodes;
- HEARTBEAT: HEARTBEAT signal between nodes;
- TERMINATE: node exit signal;
- ACK: confirmation message. The ACK type appears only when the Resender class is enabled.
- EMPTY: push or pull;
0xEE personal information
★★★★★★★ thinking about life and technology ★★★★★★
Wechat public account: Rossi's thinking
If you want to get the news push of personal articles in time, or want to see the technical materials recommended by yourself, please pay attention.
0xFF reference
[distributed] analysis of distributed computing examples based on PS Lite
PS Lite source code analysis - KangRoger