How does the Reader read the data of the Writer (underlying logic)
Last Article (upper logic) The process of reading data by the reader is described in, but there is no bottom-level Tansport. This is how to conduct inter process communication. Let's analyze it here.
In ReceiverManager::GetReceiver, call transport:: Transport:: instance() - > createreceiver to create the Receiver:
template <typename M> auto Transport::CreateReceiver( const RoleAttributes& attr, const typename Receiver<M>::MessageListener& msg_listener, const OptionalMode& mode) -> typename std::shared_ptr<Receiver<M>> { if (is_shutdown_.load()) { AINFO << "transport has been shut down."; return nullptr; } std::shared_ptr<Receiver<M>> receiver = nullptr; RoleAttributes modified_attr = attr; if (!modified_attr.has_qos_profile()) { modified_attr.mutable_qos_profile()->CopyFrom( QosProfileConf::QOS_PROFILE_DEFAULT); } //The Receiver is created according to the communication mode. By default, apollo uses the OptionalMode::HYBRID, that is, the default branch switch (mode) { case OptionalMode::INTRA: receiver = std::make_shared<IntraReceiver<M>>(modified_attr, msg_listener); break; case OptionalMode::SHM: receiver = std::make_shared<ShmReceiver<M>>(modified_attr, msg_listener); break; case OptionalMode::RTPS: receiver = std::make_shared<RtpsReceiver<M>>(modified_attr, msg_listener); break; default: receiver = std::make_shared<HybridReceiver<M>>( modified_attr, msg_listener, participant()); break; } RETURN_VAL_IF_NULL(receiver, nullptr); if (mode != OptionalMode::HYBRID) { receiver->Enable(); } return receiver; }
The OptionalMode::HYBRID communication mode actually includes the above three modes. The OptionalMode::HYBRID mode will select the required mode according to the other party's network state.
Therefore, we analyze HybridReceiver here:
template <typename M> HybridReceiver<M>::HybridReceiver( const RoleAttributes& attr, const typename Receiver<M>::MessageListener& msg_listener, const ParticipantPtr& participant) : Receiver<M>(attr, msg_listener), history_(nullptr), participant_(participant) { InitMode(); ObtainConfig(); InitHistory(); InitReceivers(); InitTransmitters(); } template <typename M> void HybridReceiver<M>::InitMode() { mode_ = std::make_shared<proto::CommunicationMode>(); mapping_table_[SAME_PROC] = mode_->same_proc(); mapping_table_[DIFF_PROC] = mode_->diff_proc(); mapping_table_[DIFF_HOST] = mode_->diff_host(); } template <typename M> void HybridReceiver<M>::ObtainConfig() { auto& global_conf = common::GlobalData::Instance()->Config(); if (!global_conf.has_transport_conf()) { return; } if (!global_conf.transport_conf().has_communication_mode()) { return; } mode_->CopyFrom(global_conf.transport_conf().communication_mode()); mapping_table_[SAME_PROC] = mode_->same_proc(); mapping_table_[DIFF_PROC] = mode_->diff_proc(); mapping_table_[DIFF_HOST] = mode_->diff_host(); } //Set the cache history according to the qos in the configuration_ Length. It can be seen that HybridReceiver RTPS also sets the message cache on the read side. It should be noted that this cache is only set for RTPS mode, so it is meaningful only when the message has qos local cache requirements template <typename M> void HybridReceiver<M>::InitHistory() { HistoryAttributes history_attr(this->attr_.qos_profile().history(), this->attr_.qos_profile().depth()); history_ = std::make_shared<History<M>>(history_attr); if (this->attr_.qos_profile().durability() == QosDurabilityPolicy::DURABILITY_TRANSIENT_LOCAL) { history_->Enable(); } } template <typename M> void HybridReceiver<M>::InitReceivers() { std::set<OptionalMode> modes; modes.insert(mode_->same_proc()); modes.insert(mode_->diff_proc()); modes.insert(mode_->diff_host()); auto listener = std::bind(&HybridReceiver<M>::OnNewMessage, this, std::placeholders::_1, std::placeholders::_2); for (auto& mode : modes) { switch (mode) { case OptionalMode::INTRA: receivers_[mode] = std::make_shared<IntraReceiver<M>>(this->attr_, listener); break; case OptionalMode::SHM: receivers_[mode] = std::make_shared<ShmReceiver<M>>(this->attr_, listener); break; default: receivers_[mode] = std::make_shared<RtpsReceiver<M>>(this->attr_, listener); break; } } } template <typename M> void HybridReceiver<M>::InitTransmitters() { std::unordered_map<uint64_t, RoleAttributes> empty; for (auto& item : receivers_) { transmitters_[item.first] = empty; } }
The constructor called by the HybridReceiver constructor is used for initialization, mainly to initialize mapping_table_ , receivers_ And transmitters_. For mapping_table_ Let's talk about it here:
//SAME_PROC: the same process uses INTRA mode communication by default. This happens in the communication in the same module, such as the communication between different components in the same module or the self communication in the same component //DIFF_PROC: SHM is used by default for different processes but on the same host. This situation is basically used when different module s on a single machine communicate with each other. It is also the most used mode //DIFF_HOST: different processes on different hosts use RTPS by default, which is only used when communicating on different hosts //Use the GetRelation function to determine what to use template <typename M> Relation HybridReceiver<M>::GetRelation(const RoleAttributes& opposite_attr) { if (opposite_attr.channel_name() != this->attr_.channel_name()) { return NO_RELATION; } if (opposite_attr.host_ip() != this->attr_.host_ip()) { return DIFF_HOST; } if (opposite_attr.process_id() != this->attr_.process_id()) { return DIFF_PROC; } return SAME_PROC; }
In HybridReceiver::HybridReceiver, three communication modes of receivers are created. Here, we will use the most frequently used ShmReceiver in apollo:
template <typename M> ShmReceiver<M>::ShmReceiver( const RoleAttributes& attr, const typename Receiver<M>::MessageListener& msg_listener) : Receiver<M>(attr, msg_listener) { dispatcher_ = ShmDispatcher::Instance(); }
ShmDispatcher acts as the data distributor of ShmReceiver:
ShmDispatcher::ShmDispatcher() : host_id_(0) { Init(); } bool ShmDispatcher::Init() { host_id_ = common::Hash(GlobalData::Instance()->HostIp()); notifier_ = NotifierFactory::CreateNotifier(); thread_ = std::thread(&ShmDispatcher::ThreadFunc, this); //Thread property configuration scheduler::Instance()->SetInnerThreadAttr("shm_disp", &thread_); return true; }
Similar to the upper layer communication, the lower layer ShmDispatcher also has a notifier_:
auto NotifierFactory::CreateNotifier() -> NotifierPtr { std::string notifier_type(ConditionNotifier::Type()); auto& g_conf = GlobalData::Instance()->Config(); if (g_conf.has_transport_conf() && g_conf.transport_conf().has_shm_conf() && g_conf.transport_conf().shm_conf().has_notifier_type()) { notifier_type = g_conf.transport_conf().shm_conf().notifier_type(); } ADEBUG << "notifier type: " << notifier_type; if (notifier_type == MulticastNotifier::Type()) { return CreateMulticastNotifier(); } else if (notifier_type == ConditionNotifier::Type()) { return CreateConditionNotifier(); } AINFO << "unknown notifier, we use default notifier: " << notifier_type; return CreateConditionNotifier(); } auto NotifierFactory::CreateConditionNotifier() -> NotifierPtr { return ConditionNotifier::Instance(); } auto NotifierFactory::CreateMulticastNotifier() -> NotifierPtr { return MulticastNotifier::Instance(); }
According to the notifier_type to create a ConditionNotifier or MulticastNotifier. For the ConditionNotifier, a shared memory operation is also used in the notifier. When the write end sends data, it will call its Notify function to write data to the shared memory, and then the read end can call the Listent function to read the data. The data describes how the read end obtains the sent data from the shared memory of data communication. For multicast notifier, socket is used for notification, IP: 239.255.0.100, port: 8888. The specific code will not be analyzed. Continue to analyze ShmDispatcher::Init():
thread_ = std::thread(&ShmDispatcher::ThreadFunc, this) scheduler::Instance()->SetInnerThreadAttr("shm_disp", &thread_);
The processing thread of ShmDispatcher is created here. This thread is the key to data reading. Because the corresponding data reading tool object has not been created, it is not analyzed here. ShmDispatcher::Init() ends, and we continue to analyze Transport::CreateReceiver.
if (mode != OptionalMode::HYBRID) { receiver->Enable(); }
When the mode is not OptionalMode::HYBRID, start the receiver, but we are in this mode. Where is HybridReceiver::Enable called?
It's actually In the last article In Reader::JoinTheTopology(), which was not mentioned at last
template <typename MessageT> void Reader<MessageT>::JoinTheTopology() { // add listener change_conn_ = channel_manager_->AddChangeListener(std::bind( &Reader<MessageT>::OnChannelChange, this, std::placeholders::_1)); // get peer writers const std::string& channel_name = this->role_attr_.channel_name(); std::vector<proto::RoleAttributes> writers; channel_manager_->GetWritersOfChannel(channel_name, &writers); for (auto& writer : writers) { receiver_->Enable(writer); } channel_manager_->Join(this->role_attr_, proto::RoleType::ROLE_READER, message::HasSerializer<MessageT>::value); } template <typename MessageT> void Reader<MessageT>::OnChannelChange(const proto::ChangeMsg& change_msg) { if (change_msg.role_type() != proto::RoleType::ROLE_WRITER) { return; } auto& writer_attr = change_msg.role_attr(); if (writer_attr.channel_name() != this->role_attr_.channel_name()) { return; } auto operate_type = change_msg.operate_type(); if (operate_type == proto::OperateType::OPT_JOIN) { receiver_->Enable(writer_attr); } else { receiver_->Disable(writer_attr); } }
The specific functions in Reader::JoinTheTopology() are not analyzed here. Here we will briefly explain them through channel_ manager_-> Addchangelistener function. When the channel corresponding to the reader changes, the Reader::OnChannelChange function will be triggered. If a Writer is added to operate_type == proto::OperateType::OPT_JOIN calls Enable to start the receiver. If you have saved the writers corresponding to the channel, you can directly traverse and start all receivers. Well, this JoinTheTopology is a function related to the service discovery function. An article will be written later to explain it. Here, you can directly analyze the Enable function.
template <typename M> void HybridReceiver<M>::Enable(const RoleAttributes& opposite_attr) { auto relation = GetRelation(opposite_attr); RETURN_IF(relation == NO_RELATION); uint64_t id = opposite_attr.id(); std::lock_guard<std::mutex> lock(mutex_); if (transmitters_[mapping_table_[relation]].count(id) == 0) { transmitters_[mapping_table_[relation]].insert( std::make_pair(id, opposite_attr)); receivers_[mapping_table_[relation]]->Enable(opposite_attr); ReceiveHistoryMsg(opposite_attr); } }
HybridReceiver::Enable starts the corresponding communication mode according to the relationship between nodes. Here is the analysis of ShmReceiver:
template <typename M> void ShmReceiver<M>::Enable(const RoleAttributes& opposite_attr) { dispatcher_->AddListener<M>( this->attr_, opposite_attr, std::bind(&ShmReceiver<M>::OnNewMessage, this, std::placeholders::_1, std::placeholders::_2)); }
Dispatcher - > is called here AddListener to add callback function:
template <typename MessageT> void ShmDispatcher::AddListener(const RoleAttributes& self_attr, const RoleAttributes& opposite_attr, const MessageListener<MessageT>& listener) { // FIXME: make it more clean auto listener_adapter = [listener](const std::shared_ptr<ReadableBlock>& rb, const MessageInfo& msg_info) { auto msg = std::make_shared<MessageT>(); RETURN_IF(!message::ParseFromArray( rb->buf, static_cast<int>(rb->block->msg_size()), msg.get())); listener(msg, msg_info); }; Dispatcher::AddListener<ReadableBlock>(self_attr, opposite_attr, listener_adapter); AddSegment(self_attr); }
A new callback is built here, and then the AddListener: of the parent class is called.
template <typename MessageT> void Dispatcher::AddListener(const RoleAttributes& self_attr, const RoleAttributes& opposite_attr, const MessageListener<MessageT>& listener) { if (is_shutdown_.load()) { return; } uint64_t channel_id = self_attr.channel_id(); std::shared_ptr<ListenerHandler<MessageT>> handler; ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { handler = std::dynamic_pointer_cast<ListenerHandler<MessageT>>(*handler_base); if (handler == nullptr) { AERROR << "please ensure that readers with the same channel[" << self_attr.channel_name() << "] in the same process have the same message type"; return; } } else { ADEBUG << "new reader for channel:" << GlobalData::GetChannelById(channel_id); handler.reset(new ListenerHandler<MessageT>()); msg_listeners_.Set(channel_id, handler); } handler->Connect(self_attr.id(), opposite_attr.id(), listener); }
Dispatcher::AddListener's main job is to create a handler and store it in msg_listeners_, And pass the callback to handler - > connect:
template <typename MessageT> void ListenerHandler<MessageT>::Connect(uint64_t self_id, uint64_t oppo_id, const Listener& listener) { WriteLockGuard<AtomicRWLock> lock(rw_lock_); if (signals_.find(oppo_id) == signals_.end()) { signals_[oppo_id] = std::make_shared<MessageSignal>(); } auto connection = signals_[oppo_id]->Connect(listener); if (!connection.IsConnected()) { AWARN << oppo_id << " " << self_id << " connect failed!"; return; } if (signals_conns_.find(oppo_id) == signals_conns_.end()) { signals_conns_[oppo_id] = ConnectionMap(); } signals_conns_[oppo_id][self_id] = connection; }
Here, according to the passed in self_id and oppo_ ID respectively sets the corresponding signal map. The signal here is actually implemented by imitating the signal and slot mechanism of QT. In short, multiple slots (callback function) can bind one signal, or one slot can bind multiple signals (apollo should implement the former one here). This is implemented in apollo: when signal executes its overloaded bracket operator, it will call all its bound slot functions. The slot binding signal is through the Connect function, so the listener binds signals here_ [oppo_id]. Go back to ShmDispatcher::AddListener to continue the analysis.
void ShmDispatcher::AddSegment(const RoleAttributes& self_attr) { uint64_t channel_id = self_attr.channel_id(); WriteLockGuard<AtomicRWLock> lock(segments_lock_); if (segments_.count(channel_id) > 0) { return; } auto segment = SegmentFactory::CreateSegment(channel_id); segments_[channel_id] = segment; previous_indexes_[channel_id] = UINT32_MAX; } auto SegmentFactory::CreateSegment(uint64_t channel_id) -> SegmentPtr { std::string segment_type(XsiSegment::Type()); auto& shm_conf = GlobalData::Instance()->Config(); if (shm_conf.has_transport_conf() && shm_conf.transport_conf().has_shm_conf() && shm_conf.transport_conf().shm_conf().has_shm_type()) { segment_type = shm_conf.transport_conf().shm_conf().shm_type(); } ADEBUG << "segment type: " << segment_type; if (segment_type == PosixSegment::Type()) { return std::make_shared<PosixSegment>(channel_id); } return std::make_shared<XsiSegment>(channel_id); }
AddSegment is used to add the operation class Segment of shared memory. There are two implementations: PosixSegment and XsiSegment. The difference between the two is not analyzed here. Anyway, it encapsulates the operation of shared memory. Those who are interested can analyze it themselves. Put the object of Segment into segments_ The data type is STD:: unordered_ map< uint64_ t,std::shared_ ptr >.
After the analysis of Enable, you should ask this question: Enable only initializes and creates related classes, and the data reading process is not reflected. Where is it implemented?
Therefore, we need to go back and analyze the thread function of ShmDispatcher processing thread that was not analyzed just now:
void ShmDispatcher::ThreadFunc() { ReadableInfo readable_info; while (!is_shutdown_.load()) { if (!notifier_->Listen(100, &readable_info)) { ADEBUG << "listen failed."; continue; } if (readable_info.host_id() != host_id_) { ADEBUG << "shm readable info from other host."; continue; } uint64_t channel_id = readable_info.channel_id(); uint32_t block_index = readable_info.block_index(); { ReadLockGuard<AtomicRWLock> lock(segments_lock_); if (segments_.count(channel_id) == 0) { continue; } // check block index if (previous_indexes_.count(channel_id) == 0) { previous_indexes_[channel_id] = UINT32_MAX; } uint32_t& previous_index = previous_indexes_[channel_id]; if (block_index != 0 && previous_index != UINT32_MAX) { if (block_index == previous_index) { ADEBUG << "Receive SAME index " << block_index << " of channel " << channel_id; } else if (block_index < previous_index) { ADEBUG << "Receive PREVIOUS message. last: " << previous_index << ", now: " << block_index; } else if (block_index - previous_index > 1) { ADEBUG << "Receive JUMP message. last: " << previous_index << ", now: " << block_index; } } previous_index = block_index; ReadMessage(channel_id, block_index); } } }
Threadfunc is the thread processing function of ShmDispatcher, and the key is notifier - > Listen, as mentioned above, when the write side sends data, it will call its Notify function to write data to the shared memory, and then the read side can call the listen function to read the data. ThreadFunc is calling the Listent function on polling. When Listren reads the data, it will analyze the way to get data from the shared memory of the data communication, then call ReadMessage to read the data.
void ShmDispatcher::ReadMessage(uint64_t channel_id, uint32_t block_index) { ADEBUG << "Reading sharedmem message: " << GlobalData::GetChannelById(channel_id) << " from block: " << block_index; auto rb = std::make_shared<ReadableBlock>(); rb->index = block_index; if (!segments_[channel_id]->AcquireBlockToRead(rb.get())) { AWARN << "fail to acquire block, channel: " << GlobalData::GetChannelById(channel_id) << " index: " << block_index; return; } MessageInfo msg_info; const char* msg_info_addr = reinterpret_cast<char*>(rb->buf) + rb->block->msg_size(); if (msg_info.DeserializeFrom(msg_info_addr, rb->block->msg_info_size())) { OnMessage(channel_id, rb, msg_info); } else { AERROR << "error msg info of channel:" << GlobalData::GetChannelById(channel_id); } segments_[channel_id]->ReleaseReadBlock(*rb); }
From segments_ Take out the STD:: shared corresponding to the channel_ In other words, read the data from the shared memory through message onr, and then read the data from the shared memory:
void ShmDispatcher::OnMessage(uint64_t channel_id, const std::shared_ptr<ReadableBlock>& rb, const MessageInfo& msg_info) { if (is_shutdown_.load()) { return; } ListenerHandlerBasePtr* handler_base = nullptr; if (msg_listeners_.Get(channel_id, &handler_base)) { auto handler = std::dynamic_pointer_cast<ListenerHandler<ReadableBlock>>( *handler_base); handler->Run(rb, msg_info); } else { AERROR << "Cannot find " << GlobalData::GetChannelById(channel_id) << "'s handler."; } }
From msg_listeners_ Take out the listener handler corresponding to the channel and execute its Run function:
template <typename MessageT> void ListenerHandler<MessageT>::Run(const Message& msg, const MessageInfo& msg_info) { signal_(msg, msg_info); uint64_t oppo_id = msg_info.sender_id().HashValue(); ReadLockGuard<AtomicRWLock> lock(rw_lock_); if (signals_.find(oppo_id) == signals_.end()) { return; } (*signals_[oppo_id])(msg, msg_info); }
*(signals_[oppo_id])(msg, msg_info) this call is the signal transmission mentioned above and will execute all its slot functions. What is this slot function? If there is a reader who has been following from the previous chapter, we will find that although the layers passed and multiple packages, the most initial function is that we call lamda in ReceiverManager::GetReceiver: transport:: Transport:: Instance () ->CreateReceiver. The analysis here should clarify the flow direction of data from the bottom to the top, and summarize the following process:
For shm:
Write end:
writer —> Segment::AcquireBlockToWrite —> notifier::Notify
Read end:
ShmDispatcher::ThreadFunc —> notifier::Listen —> Segment::AcquireBlockToRead —> ListenerHandler::Run —> (*signals_[oppo_id])(msg, msg_info) —> DataDispatcher —> DataVisitor::buffer —> DataNotifier::Instance() :: notify - > callback - > scheduler:: notifyprocessor - > execution collaboration - > take data from datavisitor:: buffer - > put it into the blocker of the reader - > execute the callback (if any) passed in by the user
Reference link:
Baidu Apollo system learning - Cyber RT communication - bottom layer