Preface
In this article, we will introduce the development of server, and explore how to develop a high performance and concurrent server program from many aspects. It should be noted that the complexity of large servers lies in their business, not in the basic framework of their code engineering.
Large servers generally consist of multiple services, which may support CDN, or so-called "distributed" services. This article will not cover these things, because no matter how complex the structure of the server is, it is composed of a single server. So the focus of this article is to discuss the structure of a single service program, and the structure here refers to the network communication layer structure of a single server. If you can really understand what I said, it is possible to carry out any business on this basic structure, or to expand this structure into complex multiple server groups, such as "distributed" clothing. Business.
Although the code examples in this paper take C++ as an example, they are also suitable for Java (I am also a Java developer), and the principle is the same, but Java may wrap a layer of interface with a virtual machine on the basis of the basic operating system network communication API (Java may even provide some ready-made APIs based on some commonly used network communication framework ideas, such as NIO). In view of this, this article does not discuss those large and empty, general technical terms, but rather refers to practical coding schemes that can guide the reader in practical work or optimize existing coding methods. In addition, the technology discussed here involves both windows and linux platforms.
The so-called high-performance is that the server can handle the connections of each client smoothly and respond to the requests of the client as low as possible. The so-called high concurrency not only means that the server can support multiple client connections at the same time, but also that these clients will have data with the server continuously during the connection period. There are many kinds of network libraries that claim that a single service can support millions or even tens of millions of concurrency at the same time. Then I actually looked at them and found that they can only support many connections at the same time.
If a server can simply accept n connections (n may be very large), but it is meaningless not to deal with the data flow between these connections in an orderly manner. This server framework is only "toy" and has no significance for actual production and application.
This article will introduce two aspects: one is the basic network communication components in the server; the other is how to integrate these basic communication components into a complete and efficient server framework. Note: The client in this paper is a relative concept, referring to the terminal connected to the service program under discussion. So the client here may be either our traditional client program or other server programs connected to the service.
I. Network Communication Components
Following the ideas described above, we begin with the network communication components of the service program.
Problems to be solved
Since the server program will certainly involve the network communication part, what problems should the network communication module of the server program solve? At present, there are many network communication frameworks on the network, such as libevent, boost asio, ACE, but the common technical means of network communication are much the same, at least to solve the following problems:
-
How to detect new client connections?
-
How to accept client connection?
-
How to detect whether the client has data sent?
-
How to collect data from clients?
-
How to detect connection anomalies? How to deal with connection anomalies after they are found?
-
How to send data to the client?
-
How to close the connection after sending data to the client?
Some people who have a little network foundation can answer some of the above questions, such as the accept function of socket API for receiving client connection, recv function for receiving client data, send function for sending data to client, and so on. They can check whether the client has new connection and whether the client has new data that can be select ed, polled and epoll by IO multiplexing technology. Cket API. Indeed, these basic socket APIs constitute the foundation of server network communication. No matter how clever the network communication framework is designed, they are built on the basis of these basic socket APIs. But how to organize these basic sockets skillfully
API is the key to the problem. We persuade servers to be highly efficient and support high concurrency. In fact, it is only a technical means of implementation. Anyway, from the point of view of software development, it is only a program. Therefore, as long as the program can satisfy the principle of "minimizing waiting or not waiting" to the greatest extent possible, it is efficient. That is to say, efficiency is not "busy death, idle death", but "busy death". Everyone can be idle, but if there is work to do, we try to work together, rather than one part busy doing things in turn 123456789, the other part idle there doing nothing. It may be a bit abstract. Let's give some examples to illustrate it.
For example:
-
By default, if the recv function does not have data, the thread will block there.
-
By default, the send function, if the tcp window is not large enough, the data will be blocked if it cannot be sent out.
-
The connect function will block the other end by default.
-
Or send a data to the other end, waiting for the answer, if the other side has not answered, the current thread is blocked here.
Neither of the above is a way of thinking for the development of efficient servers, because the above examples do not meet the principle of "minimizing waiting". Why do we have to wait? There is no way, these processes do not need to wait, it is better not only do not need to wait, but also after these things are completed to inform me. So I can do something else in these cpu time slots that I used to wait for. Yes, that is IO Multiplexing technology (IO multiplexing technology) which we will discuss below.
Comparison of Several IO Reuse Mechanisms
At present, windows system supports select, WSAA syncSelect, WSAEventSelect, completion port (IOCP), linux system supports select, poll, epoll. Here we do not specifically introduce the use of each specific function, let's discuss a bit of deep-seated things, the API functions listed above can be divided into two levels:
Level 1: select and poll
Level 2: WSAA syncSelect, WSAEventSelect, Completion Port (IOCP), epoll
Why is it so divided? First, we introduce the first level. In essence, select and poll functions actively query socket handles (maybe one or more) for events, such as readable events, writable events or error events, in a certain period of time. That is to say, we still need to actively do these checks every other period of time, if some events are detected during this period of time. We haven't spent this period of time in vain, but what if there were no events during this period? We can only do useless work, to put it bluntly, or waste time, because if a server has multiple connections, in the case of limited cpu time, we spent a certain amount of time to detect some socket connections, but found that they have nothing to do, and during this period we have some things to deal with, then why do we spend time to do it? What about this test? Isn't it good to spend this time doing what we need to do? So for server programs, in order to be efficient, we should try to avoid taking the time to actively query some sockets whether there are events, but wait for these sockets when there are events to tell us to deal with. This is what the functions of Level 2 do, they are actually equivalent to changing the active query whether there is an event or not, when there is an event, the system will tell us, then we go to deal with, that is, "good steel used in the blade". However, the function of Level 2 informs us in different ways. For example, WSAAsyncSelect uses event mechanism of window message queue to inform us of window procedure function, IOCP uses GetQueued Completion Status to return the correct state, and epoll is epoll_wait function to return.
For example, if the connection function connects the other end, if the connection socket is non-blocking, then the connection cannot be completed immediately, but it will return immediately without waiting. After the connection is completed, WSAAsyncSelect will return the FD_CONNECT event to tell us that the connection is successful, epoll will generate the EPOLLOUT event, and we can also know that the connection is completed. Even when the socket has data to read, WSAA syncSelect generates FD_READ events, epoll generates EPOLLIN events, and so on. So with the above discussion, we can get the correct posture of network communication detection for readable, writable or error events. This is the second principle I put forward here: to minimize the time spent doing useless work. This may not show any advantage when the service program resources are sufficient, but if there are a lot of tasks to deal with, it becomes a bottleneck in performance.
Detecting the correct posture of network events
According to the above introduction, firstly, in order to avoid meaningless waiting time, secondly, instead of actively querying the events of each socket, we adopt the strategy of waiting for the operating system to inform us of the status of the events. Our sockets are all set to non-blocking. On this basis, we return to the seven questions mentioned in column (1):
-
How to detect new client connections?
-
How to accept client connection?
The default accept function is blocked there. If epoll detects an EPOLLIN event on the listening socket, or WSAAsyncSelect detects an FD_ACCEPT event, then it indicates that there is a new connection at this time. Calling the accept function at this time will not block. Of course, you should also set the new socket to be non-blocking. So we can send and receive data on the new socket.
-
How to detect whether the client has data sent?
-
How to collect data from clients?
Similarly, we should only collect data when there are readable events on the socket, so we don't have to wait to call recv or read functions. How much data should we collect at one time? We can decide according to our own needs, or even you can recv or read repeatedly in a loop. For non-blocking socket, if there is no data, recv or read will return immediately. Error code EWOULDBLOCK will indicate that there is no data at present. Example:
bool CIUSocket::Recv() { int nRet = 0; while(true) { char buff[512]; nRet = ::recv(m_hSocket, buff, 512, 0); if(nRet == SOCKET_ERROR) //Close Socket as soon as an error occurs { if (::WSAGetLastError() == WSAEWOULDBLOCK) break; else return false; } else if(nRet < 1) return false; m_strRecvBuf.append(buff, nRet); ::Sleep(1); } return true; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
-
How to detect connection anomalies? How to deal with connection anomalies after they are found?
Similarly, when we receive an exception event, such as EPOLLERR or the closing event FD_CLOSE, we know that there is an exception, and our handling of the exception is generally to close the corresponding socket. In addition, if the send/recv or read/write function operates on a socket and returns 0, it means that the socket has been closed on the other end. At this time, there is no need for this connection to exist. We can also close the corresponding socket.
-
How to send data to the client?
This is also a common network communication interview question, Tencent background development position in a year asked such questions. Sending data to clients is slightly more troublesome than receiving data. It also requires some skills. First of all, we can't register detectable data writable events at the beginning just like registering detectable data readable events, because if detectable writable events are detected, our sockets will be writable as long as the data is normally collected from the other end. If we set up listening for writable events, writable events will be triggered frequently, but we don't necessarily need to send data at this time. . So the correct way is: if there is data to send, try to send first, if it can not send or only send out part, the rest of us need to cache it, and then set up to detect writable events on the socket, the next writable event occurs, then continue to send, if it still can not send out completely, then continue to set up listenable writable events, and so on. Anyway, until all the data are sent out. Once all data has been sent out, we need to remove listening for writable events and avoid useless writable event notifications. I don't know if you notice that if only part of the data is sent out at one time, the remaining data should be stored temporarily. At this time, we need a buffer to store this part of the data, which we call "sending buffer". The sending buffer not only stores the incomplete data, but also stores the new data that needs to be sent from the upper layer during the sending process. To ensure order, new data should be appended to the back of the current remaining data and sent from the head of the sending buffer. That is to say, first come, first send, then send.
-
How to close the connection after sending data to the client?
This problem is more difficult to deal with, because the "send-out" here is not necessarily the real send-out. Even if we call the send or write function successfully, we only write data to the protocol stack of the operating system. It is difficult to judge whether or not it can be sent out and when it can be sent out. It is even more difficult to judge whether or not the sending-out party receives the data. So, at present, we can only simply think of send or write as returning the byte size of the data we send, and we think of it as "sending out the data". Then call the close and other socket API s to close the connection. Of course, you can also call the shutdown function to achieve the so-called "semi-closed". On the topic of closing the connection, let's open a small heading separately to discuss it.
Passive Close Connection and Active Close Connection
In practical applications, passive closure of connections is due to the detection of abnormal events such as EPOLLERR, or end-to-end closure of connections, and return of send or recv to 0. At this time, the connection is no longer necessary and we are forced to close the connection.
Actively closing the connection is that we actively call close/closesocket to close the connection. For example, the client sends us illegal data, such as some tentative packets of network attacks. At this point, for security reasons, we close the socket connection.
Send Buffer and Receive Buffer
The sending buffer is described above and its significance is explained. The same is true for receiving buffers. When receiving data, we can decompose it directly, but this is not good. The first reason is that unless some common protocol formats, such as http protocol, are used, most of the server's business protocols are different. That is to say, the interpretation of data formats in a data packet should be the business layer's business and network communication. Layer should be decoupled. In order to make the network layer more general, we can not know what the upper layer protocol looks like, because different protocol formats are different, they are related to specific business. Reason 2: Even if we know the protocol format, we will unpackage the corresponding business in the network layer. If the business processing is time-consuming, such as complex operations, or connection to the database for password verification, then our network threads will need a lot of time to deal with these tasks, so other network events may not be handled in time. In view of the above two points, we really need a receiving buffer to put the received data into the buffer, and special business threads or business logic to extract the data from the receiving buffer and unpacking the business.
With all that said, how much capacity should the sending buffer and the receiving buffer be designed to have? This is an old-fashioned problem, because we often encounter the problem that pre-allocated memory is too small to be used, and too large may cause waste. What shall I do? The answer is, like string and vector, to design a buffer that can grow dynamically and distribute it on demand, which is not enough to be scalable.
It should be noted that there is one sending buffer and one receiving buffer for every socket connection. This is our most common design.
Design of Protocol
Apart from some general protocols, such as http and ftp, most server protocols are based on business. When the protocol is designed, the format of the data package is set according to the protocol. We know that tcp/ip protocol is streaming data, so streaming data is like pipeline, there is no clear boundary between data packets and data packets. For example, the A-side sends three data packets to the B-side in succession, each data packet is 50 bytes, the B-side may receive 10 bytes first, then 140 bytes; or receive 20 bytes first, then receive 20 bytes, then receive 110 bytes; or may receive 150 bytes at a time. These 150 bytes can be received by B in any combination and number of bytes. So the first problem in the design of the protocol is how to define the boundaries of the package, that is, how the receiver knows the size of each package. At present, there are three commonly used methods:
-
Fixed size, this method assumes that the size of each packet is a fixed number of bytes. For example, the size of each packet discussed above is 50 bytes, and the receiving end is treated as a packet for each 50 bytes of intake.
-
Specify a package terminator, such as arn (line break and carriage return), so that if the end-to-end receives such a terminator, it can be considered that a package has been received, and the next data is the content of the next package.
-
Specify the size of the package. This method combines the above two methods. Generally, the header is fixed size, and there is a field in the header to specify the package.
The size of the package or the whole package is obtained by parsing the fields in the header of the package after receiving the data from the other end, and then the boundaries of the data are defined according to the size.
The second issue to be discussed in the protocol is to design the protocol as easily as possible, that is to say, the format field of the protocol should be as clear as possible.
The third issue to be discussed in the protocol is that a single data package assembled according to the protocol should be as small as possible. Note that this refers to a single data package, which has the following advantages: First, for some mobile devices, its data processing capacity and bandwidth capacity are limited, small data can not only speed up processing, but also save a lot of traffic costs; Second, if a single data. If the packet size is small enough, it can greatly reduce the bandwidth pressure on the server side which frequently communicates with each other, and the system in which it works can also use less memory. Imagine: If a stock server, if a stock's data packet is 100 bytes or 1000 bytes, is that the same difference between 10,000 stocks?
The fourth issue discussed in the protocol is that for numerical types, we should explicitly specify the length of the value, such as long, which is 32-bit 4 bytes on 32-bit machines, but on 64-bit machines, it becomes 64-bit 8 bytes. This is also a long type. The sender and the receiver may decode in different lengths because of the different number of machine bits. So it is best to explicitly specify the length of integer fields, such as int32, int64 and so on, in protocols involving cross-platform use. Here's an example of a protocol interface, which java programmers should be familiar with:
class BinaryReadStream
{
private:
const char* const ptr;
const size_t len;
const char* cur;
BinaryReadStream(const BinaryReadStream&);
BinaryReadStream& operator=(const BinaryReadStream&);
public:
BinaryReadStream(const char* ptr, size_t len);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool IsEmpty() const;
bool ReadString(string* str, size_t maxlen, size_t& outlen);
bool ReadCString(char* str, size_t strlen, size_t& len);
bool ReadCCString(const char** str, size_t maxlen, size_t& outlen);
bool ReadInt32(int32_t& i);
bool ReadInt64(int64_t& i);
bool ReadShort(short& i);
bool ReadChar(char& c);
size_t ReadAll(char* szBuffer, size_t iLen) const;
bool IsEnd() const;
const char* GetCurrent() const{ return cur; }
public:
bool ReadLength(size_t & len);
bool ReadLengthWithoutOffset(size_t &headlen, size_t & outlen);
};
class BinaryWriteStream
{
public:
BinaryWriteStream(string* data);
virtual const char* GetData() const;
virtual size_t GetSize() const;
bool WriteCString(const char* str, size_t len);
bool WriteString(const string& str);
bool WriteDouble(double value, bool isNULL = false);
bool WriteInt64(int64_t value, bool isNULL = false);
bool WriteInt32(int32_t i, bool isNULL = false);
bool WriteShort(short i, bool isNULL = false);
bool WriteChar(char c, bool isNULL = false);
size_t GetCurrentPos() const{ return m_data->length(); }
void Flush();
void Clear();
private:
string* m_data;
};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
BinaryWriteStream is the class of coding protocol and BinaryReadStream is the class of decoding protocol. You can code and decode in the following way.
Code:
std::string outbuf;
BinaryWriteStream writeStream(&outbuf);
writeStream.WriteInt32(msg_type_register);
writeStream.WriteInt32(m_seq);
writeStream.WriteString(retData);
writeStream.Flush();
- 1
- 2
- 3
- 4
- 5
- 6
Decode:
BinaryReadStream readStream(strMsg.c_str(), strMsg.length());
int32_t cmd;
if (!readStream.ReadInt32(cmd))
{
return false;
}
//int seq;
if (!readStream.ReadInt32(m_seq))
{
return false;
}
std::string data;
size_t datalength;
if (!readStream.ReadString(&data, 0, datalength))
{
return false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
II. Organization of Server Program Structure
In the six headings above, we have discussed a lot of specific details. Now is the time to discuss how to organize these details. According to my personal experience, the main idea at present is the strategy of one thread one loop+reactor mode (also proactor mode). Popularly speaking, a thread is a loop, that is to say, a thread's function is constantly circulating in turn to do something, such as detecting network events, unpacking data to generate business logic. Let's start with the simplest way to set up some threads to do network communication related things in a loop. The pseudo-code is as follows:
While (exit sign) { // IO Multiplexing Technology for Detecting socket Readable Events and Error Events // (Writable events are also detected if data is to be sent) // If there are readable events, new connections are received for listening socket s. // For ordinary sockets, the data on the socket is collected and stored in the corresponding receiving buffer. If there is an error, the connection is closed. // If there are data to send and writable events, send data // Close the connection if there is an error event }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
In addition, some threads are set up to process the received data and unpacking the business logic. These threads can be considered as business threads. The pseudocode is as follows:
// Data unpacking from receiving buffer is decomposed into different services for processing.
- 1
- 2
The above structure is currently the most common server logic structure, but can we simplify or synthesize it? Let's try, have you ever thought about the following question: if the machine has two CPUs (two cores to be exact), the number of network threads is 2, and the number of business logic threads is 2, so the possible situation is that when the business threads are running, the network threads are not running, they have to wait, if so, Why build more? What about two threads? Except that the program structure may be slightly clearer, there is no substantial improvement in program performance, and cpu time slices are wasted on process context switching. So we can merge network threads and business logic threads, and the merged pseudocode looks like this:
While (exit sign) { // IO Multiplexing Technology for Detecting socket Readable Events and Error Events // (Writable events are also detected if data is to be sent) // If there are readable events, new connections are received for listening socket s. // For ordinary sockets, the data on the socket is collected and stored in the corresponding receiving buffer. If there is an error, the connection is closed. // If there are data to send and writable events, send data // Close the connection if there is an error event // Data unpacking from receiving buffer is decomposed into different services for processing. }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
You're right. It's a simple merge. After merge, it can not only achieve the effect before merge, but also deal with some business logic we want to deal with in time without network IO events, and reduce unnecessary thread context switching time.
Further, we can even add other tasks to this while loop, such as logical task queues, timer events, etc. PN codes are as follows:
While (exit sign) { // Timer event processing // IO Multiplexing Technology for Detecting socket Readable Events and Error Events // (Writable events are also detected if data is to be sent) // If there are readable events, new connections are received for listening socket s. // For ordinary sockets, the data on the socket is collected and stored in the corresponding receiving buffer. If there is an error, the connection is closed. // If there are data to send and writable events, send data // Close the connection if there is an error event // Data unpacking from receiving buffer is decomposed into different services for processing. // Program Customization Task 1 // Program Customization Task 2 }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
Note: The reason why timer event processing is put before network IO event detection is to avoid timer event expiration time too long. If left behind, it may take a little time to process ahead. By the time the timer event is processed, the time interval has passed a lot of time. Although this treatment can not guarantee that the timer event is 100% accurate, it can be guaranteed as far as possible. Of course, linux system provides timer objects such as eventfd, all timer objects can be treated as fd like socket. This is also the idea of libevent, which encapsulates sockets, timers and signals into a unified object for processing.
So many theoretical things have been said, let's introduce a popular open source network library, muduo (author: Chen Shuo). The original library is based on boost. I changed it to C++11 version and modified some bug s. Thank the original author, Chen Shuo.
The while loop of the core thread function described above is located in eventloop.cpp:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
Poler_-> poll uses epoll to separate network events, and then processes separated network events. Each client socket corresponds to a connection, that is, a TcpConnection and Channel channel object. CurrtActive Channel -> handleEvent (pollReturnTime_) calls the corresponding handlers based on readable, writable and error events. These functions are callback functions, which are set in the initialization stage of the program:
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
//When listening on socket s, readCallback_points to Acceptor::handleRead
//When it's a client socket, call TcpConnection::handleRead
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
//If it is a socket in connection status, writeCallback_points to Connector::handleWrite()
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
Of course, this takes advantage of the "polymorphism" of the Channel object. If it's a normal socket, the readable event calls the preset callback function; if it's a listening socket, it calls handleRead() of the Aceptor object.
To receive new connections:
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
//New Connection Callback_actually points to TcpServer:: new Connection (int sockfd, const InetAddress & peerAddr)
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of livev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
Business logic processing in the main loop corresponds to:
doPendingFunctors();
if (frameFunctor_)
{
frameFunctor_();
}
[cpp] view plain copy
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
Adding business logic here is to add function pointers to perform tasks. The added tasks are stored in the member variable pending Functors_which is an array of function pointers (vector objects). When executed, each function can be called. The above code first uses a stack variable to replace the function pointer in pending Functors_and then operates on the stack variable to reduce the granularity of the lock. Because the member variable pendingFunctors_is also used when adding tasks and is designed to operate on multiple threads, locks are added where:
void EventLoop::queueInLoop(const Functor& cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
FrameeFunctor_is simpler by setting a function pointer. Of course, there is a tricky thing here: when adding tasks, in order to be able to execute immediately, using the wake-up mechanism, wake up epoll by writing a few bytes into an fd to make it return immediately, because there are no other socks with events at this time, so that the tasks just added are executed next.
Let's look at the logic of data collection:
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
//messageCallback_pointCTcpSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receiveTime)
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
Put the received data in the receiving buffer, and we will unpack it in the future.
void ClientSession::OnRead(const std::shared_ptr<TcpConnection>& conn, Buffer* pBuffer, Timestamp receivTime)
{
while (true)
{
//Not big enough for a Baotou
if (pBuffer->readableBytes() < (size_t)sizeof(msg))
{
LOG_INFO << "buffer is not enough for a package header, pBuffer->readableBytes()=" << pBuffer->readableBytes() << ", sizeof(msg)=" << sizeof(msg);
return;
}
//Not enough for a whole package size
msg header;
memcpy(&header, pBuffer->peek(), sizeof(msg));
if (pBuffer->readableBytes() < (size_t)header.packagesize + sizeof(msg))
return;
pBuffer->retrieve(sizeof(msg));
std::string inbuf;
inbuf.append(pBuffer->peek(), header.packagesize);
pBuffer->retrieve(header.packagesize);
if (!Process(conn, inbuf.c_str(), inbuf.length()))
{
LOG_WARN << "Process error, close TcpConnection";
conn->forceClose();
}
}// end while-loop
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
First determine whether the data in the receiving buffer is large enough for a packet header, and then if it is large enough for a specified packet header, if it is still large enough, then process the packet in the Process function.
Look again at the logic of sending data:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
If the remaining data remaining is larger than channel -> enableWriting (); start listening for writable events, writable events are handled as follows:
[cpp] view plain copy
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
If channel -> disableWriting () is invoked after sending the data; the listener writable event is removed.
Many readers may have been wondering, this article is not to say that unpacking data and processing logic is business code rather than network communication code, you seem to be mixed up here, in fact, there is no, here the actual business code processing is provided by the framework callback function, how to deal with, defined by the framework user - business layer itself.
To sum up, it's actually a loop in a thread function. I don't believe you can look at the code of a transaction system server project that I used to work on.
void CEventDispatcher::Run()
{
m_bShouldRun = true;
while(m_bShouldRun)
{
DispatchIOs();
SyncTime();
CheckTimer();
DispatchEvents();
}
}
void CEpollReactor::DispatchIOs()
{
DWORD dwSelectTimeOut = SR_DEFAULT_EPOLL_TIMEOUT;
if (HandleOtherTask())
{
dwSelectTimeOut = 0;
}
struct epoll_event ev;
CEventHandlerIdMap::iterator itor = m_mapEventHandlerId.begin();
for(; itor!=m_mapEventHandlerId.end(); itor++)
{
CEventHandler *pEventHandler = (CEventHandler *)(*itor).first;
if(pEventHandler == NULL){
continue;
}
ev.data.ptr = pEventHandler;
ev.events = 0;
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nReadID > 0)
{
ev.events |= EPOLLIN;
}
if (nWriteID > 0)
{
ev.events |= EPOLLOUT;
}
epoll_ctl(m_fdEpoll, EPOLL_CTL_MOD, (*itor).second, &ev);
}
struct epoll_event events[EPOLL_MAX_EVENTS];
int nfds = epoll_wait(m_fdEpoll, events, EPOLL_MAX_EVENTS, dwSelectTimeOut/1000);
for (int i=0; i<nfds; i++)
{
struct epoll_event &evref = events[i];
CEventHandler *pEventHandler = (CEventHandler *)evref.data.ptr;
if ((evref.events|EPOLLIN)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleInput();
}
if ((evref.events|EPOLLOUT)!=0 && m_mapEventHandlerId.find(pEventHandler)!=m_mapEventHandlerId.end())
{
pEventHandler->HandleOutput();
}
}
}
void CEventDispatcher::DispatchEvents()
{
CEvent event;
CSyncEvent *pSyncEvent;
while(m_queueEvent.PeekEvent(event))
{
int nRetval;
if(event.pEventHandler != NULL)
{
nRetval = event.pEventHandler->HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
else
{
nRetval = HandleEvent(event.nEventID, event.dwParam, event.pParam);
}
if(event.pAdd != NULL) //Synchronous message
{
pSyncEvent=(CSyncEvent *)event.pAdd;
pSyncEvent->nRetval = nRetval;
pSyncEvent->sem.UnLock();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
Look at the source code of the open source TeamTalk in Mushroom Street (code download address: https://github.com/baloonwj/TeamTalk):
void CEventDispatch::StartDispatch(uint32_t wait_timeout)
{
fd_set read_set, write_set, excep_set;
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = wait_timeout * 1000; // 10 millisecond
if(running)
return;
running = true;
while (running)
{
_CheckTimer();
_CheckLoop();
if (!m_read_set.fd_count && !m_write_set.fd_count && !m_excep_set.fd_count)
{
Sleep(MIN_TIMER_DURATION);
continue;
}
m_lock.lock();
memcpy(&read_set, &m_read_set, sizeof(fd_set));
memcpy(&write_set, &m_write_set, sizeof(fd_set));
memcpy(&excep_set, &m_excep_set, sizeof(fd_set));
m_lock.unlock();
int nfds = select(0, &read_set, &write_set, &excep_set, &timeout);
if (nfds == SOCKET_ERROR)
{
log("select failed, error code: %d", GetLastError());
Sleep(MIN_TIMER_DURATION);
continue; // select again
}
if (nfds == 0)
{
continue;
}
for (u_int i = 0; i < read_set.fd_count; i++)
{
//log("select return read count=%d\n", read_set.fd_count);
SOCKET fd = read_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnRead();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < write_set.fd_count; i++)
{
//log("select return write count=%d\n", write_set.fd_count);
SOCKET fd = write_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnWrite();
pSocket->ReleaseRef();
}
}
for (u_int i = 0; i < excep_set.fd_count; i++)
{
//log("select return exception count=%d\n", excep_set.fd_count);
SOCKET fd = excep_set.fd_array[i];
CBaseSocket* pSocket = FindBaseSocket((net_handle_t)fd);
if (pSocket)
{
pSocket->OnClose();
pSocket->ReleaseRef();
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
Look at filezilla, the server side of an ftp tool, which uses Windows's WSAA syncSelect model (code download address:
https://github.com/baloonwj/filezilla):
//Processes event notifications sent by the sockets or the layers
static LRESULT CALLBACK WindowProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam)
{
if (message>=WM_SOCKETEX_NOTIFY)
{
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (message < static_cast<UINT>(WM_SOCKETEX_NOTIFY+pWnd->m_nWindowDataSize)) //Index is within socket storage
{
//Lookup socket and verify if it's valid
CAsyncSocketEx *pSocket=pWnd->m_pAsyncSocketExWindowData[message - WM_SOCKETEX_NOTIFY].m_pSocket;
SOCKET hSocket = wParam;
if (!pSocket)
return 0;
if (hSocket == INVALID_SOCKET)
return 0;
if (pSocket->m_SocketData.hSocket != hSocket)
return 0;
int nEvent = lParam & 0xFFFF;
int nErrorCode = lParam >> 16;
//Dispatch notification
if (!pSocket->m_pFirstLayer) {
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
break;
#endif //NOSOCKETSTATES
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ) {
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
{
if (nErrorCode && pSocket->m_SocketData.nextAddr)
{
if (pSocket->TryNextProtocol())
break;
}
pSocket->SetState(connected);
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if ((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected)
pSocket->OnReceive(0);
if ((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected)
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != listening && pSocket->GetState() != attached)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_ACCEPT)
pSocket->OnAccept(nErrorCode);
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() != connected && pSocket->GetState() != attached)
break;
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
pSocket->m_SocketData.onCloseCalled = true;
pSocket->OnReceive(WSAESHUTDOWN);
break;
}
}
pSocket->SetState(nErrorCode ? aborted : closed);
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
break;
}
}
else //Dispatch notification to the lowest layer
{
if (nEvent == FD_READ)
{
// Ignore further FD_READ events after FD_CLOSE has been received
if (pSocket->m_SocketData.onCloseCalled)
return 0;
DWORD nBytes;
if (!pSocket->IOCtl(FIONREAD, &nBytes))
nErrorCode = WSAGetLastError();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (nEvent == FD_CLOSE)
{
// If there are still bytes left to read, call OnReceive instead of
// OnClose and trigger a new OnClose
DWORD nBytes = 0;
if (!nErrorCode && pSocket->IOCtl(FIONREAD, &nBytes))
{
if (nBytes > 0)
{
// Just repeat message.
pSocket->ResendCloseNotify();
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(FD_READ, 0);
return 0;
}
}
pSocket->m_SocketData.onCloseCalled = true;
if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
else if (pSocket->m_pLastLayer)
pSocket->m_pLastLayer->CallEvent(nEvent, nErrorCode);
}
}
return 0;
}
else if (message == WM_USER) //Notification event sent by a layer
{
//Verify parameters, lookup socket and notification message
//Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
{
return 0;
}
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
CAsyncSocketExLayer::t_LayerNotifyMsg *pMsg = (CAsyncSocketExLayer::t_LayerNotifyMsg *)lParam;
if (!pMsg || !pSocket || pSocket->m_SocketData.hSocket != pMsg->hSocket)
{
delete pMsg;
return 0;
}
int nEvent=pMsg->lEvent&0xFFFF;
int nErrorCode=pMsg->lEvent>>16;
//Dispatch to layer
if (pMsg->pLayer)
pMsg->pLayer->CallEvent(nEvent, nErrorCode);
else
{
//Dispatch to CAsyncSocketEx instance
switch (nEvent)
{
case FD_READ:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_READ;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_FORCEREAD: //Forceread does not check if there's data waiting
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_FORCEREAD;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_READ)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnReceive(nErrorCode);
}
break;
case FD_WRITE:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting && !nErrorCode)
{
pSocket->m_nPendingEvents |= FD_WRITE;
break;
}
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
if (pSocket->GetState() != connected)
break;
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_WRITE)
{
#ifndef NOSOCKETSTATES
if (nErrorCode)
pSocket->SetState(aborted);
#endif //NOSOCKETSTATES
pSocket->OnSend(nErrorCode);
}
break;
case FD_CONNECT:
#ifndef NOSOCKETSTATES
if (pSocket->GetState() == connecting)
pSocket->SetState(connected);
else if (pSocket->GetState() == attached && !nErrorCode)
pSocket->SetState(connected);
#endif //NOSOCKETSTATES
if (pSocket->m_lEvent & FD_CONNECT)
pSocket->OnConnect(nErrorCode);
#ifndef NOSOCKETSTATES
if (!nErrorCode)
{
if (((pSocket->m_nPendingEvents&FD_READ) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))
pSocket->OnReceive(0);
if (((pSocket->m_nPendingEvents&FD_FORCEREAD) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_READ))
pSocket->OnReceive(0);
if (((pSocket->m_nPendingEvents&FD_WRITE) && pSocket->GetState() == connected) && (pSocket->m_lEvent & FD_WRITE))
pSocket->OnSend(0);
}
pSocket->m_nPendingEvents = 0;
#endif //NOSOCKETSTATES
break;
case FD_ACCEPT:
#ifndef NOSOCKETSTATES
if ((pSocket->GetState() == listening || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_ACCEPT))
#endif //NOSOCKETSTATES
{
pSocket->OnAccept(nErrorCode);
}
break;
case FD_CLOSE:
#ifndef NOSOCKETSTATES
if ((pSocket->GetState() == connected || pSocket->GetState() == attached) && (pSocket->m_lEvent & FD_CLOSE))
{
pSocket->SetState(nErrorCode?aborted:closed);
#else
{
#endif //NOSOCKETSTATES
pSocket->OnClose(nErrorCode);
}
break;
}
}
delete pMsg;
return 0;
}
else if (message == WM_USER+1)
{
// WSAAsyncGetHostByName reply
// Verify parameters
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd = (CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd);
if (!pWnd)
return 0;
CAsyncSocketEx *pSocket = NULL;
for (int i = 0; i < pWnd->m_nWindowDataSize; ++i) {
pSocket = pWnd->m_pAsyncSocketExWindowData[i].m_pSocket;
if (pSocket && pSocket->m_hAsyncGetHostByNameHandle &&
pSocket->m_hAsyncGetHostByNameHandle == (HANDLE)wParam &&
pSocket->m_pAsyncGetHostByNameBuffer)
break;
}
if (!pSocket || !pSocket->m_pAsyncGetHostByNameBuffer)
return 0;
int nErrorCode = lParam >> 16;
if (nErrorCode) {
pSocket->OnConnect(nErrorCode);
return 0;
}
SOCKADDR_IN sockAddr{};
sockAddr.sin_family = AF_INET;
sockAddr.sin_addr.s_addr = ((LPIN_ADDR)((LPHOSTENT)pSocket->m_pAsyncGetHostByNameBuffer)->h_addr)->s_addr;
sockAddr.sin_port = htons(pSocket->m_nAsyncGetHostByNamePort);
BOOL res = pSocket->Connect((SOCKADDR*)&sockAddr, sizeof(sockAddr));
delete [] pSocket->m_pAsyncGetHostByNameBuffer;
pSocket->m_pAsyncGetHostByNameBuffer = 0;
pSocket->m_hAsyncGetHostByNameHandle = 0;
if (!res)
if (GetLastError() != WSAEWOULDBLOCK)
pSocket->OnConnect(GetLastError());
return 0;
}
else if (message == WM_USER + 2)
{
//Verify parameters, lookup socket and notification message
//Verify parameters
if (!hWnd)
return 0;
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
if (!pWnd)
return 0;
if (wParam >= static_cast<UINT>(pWnd->m_nWindowDataSize)) //Index is within socket storage
return 0;
CAsyncSocketEx *pSocket = pWnd->m_pAsyncSocketExWindowData[wParam].m_pSocket;
if (!pSocket)
return 0;
// Process pending callbacks
std::list<t_callbackMsg> tmp;
tmp.swap(pSocket->m_pendingCallbacks);
pSocket->OnLayerCallback(tmp);
for (auto & cb : tmp) {
delete [] cb.str;
}
}
else if (message == WM_TIMER)
{
if (wParam != 1)
return 0;
ASSERT(hWnd);
CAsyncSocketExHelperWindow *pWnd=(CAsyncSocketExHelperWindow *)GetWindowLongPtr(hWnd, GWLP_USERDATA);
ASSERT(pWnd && pWnd->m_pThreadData);
if (!pWnd || !pWnd->m_pThreadData)
return 0;
if (pWnd->m_pThreadData->layerCloseNotify.empty())
{
KillTimer(hWnd, 1);
return 0;
}
CAsyncSocketEx* socket = pWnd->m_pThreadData->layerCloseNotify.front();
pWnd->m_pThreadData->layerCloseNotify.pop_front();
if (pWnd->m_pThreadData->layerCloseNotify.empty())
KillTimer(hWnd, 1);
if (socket)
PostMessage(hWnd, socket->m_SocketData.nSocketIndex + WM_SOCKETEX_NOTIFY, socket->m_SocketData.hSocket, FD_CLOSE);
return 0;
}
return DefWindowProc(hWnd, message, wParam, lParam);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349
- 350
- 351
- 352
- 353
- 354
- 355
- 356
- 357
- 358
- 359
- 360
- 361
- 362
- 363
- 364
- 365
- 366
- 367
- 368
- 369
- 370
- 371
- 372
- 373
- 374
- 375
- 376
- 377
- 378
- 379
- 380
- 381
- 382
- 383
- 384
- 385
- 386
- 387
- 388
- 389
- 390
- 391
- 392
- 393
- 394
- 395
- 396
- 397
- 398
- 399
- 400
- 401
- 402
- 403
- 404
- 405
- 406
- 407
- 408
- 409
- 410
- 411
- 412
- 413
- 414
- 415
- 416
- 417
- 418
- 419
- 420
- 421
- 422
- 423
- 424
- 425
- 426
- 427
- 428
- 429
- 430
- 431
- 432
- 433
- 434
- 435
- 436
- 437
- 438
- 439
- 440
- 441
- 442
- 443
- 444
- 445
If you're not familiar with these projects, you probably don't have any interest in looking at each line of code logic. But you have to understand the logic of the structure I'm talking about. Basically, the current mainstream network framework is this set of principles. For example, filezilla's network communication layer is also used in the famous easyMule.
I've finished introducing the framework of a single service program. If you can fully understand what I'm trying to say, I believe you can also build a high-performance service program.
In addition, the server framework can also add many interesting details based on the above design ideas, such as traffic control. Give me another example of a project I actually worked on.
In general, in actual projects, when the number of client connections is large, when the server is processing network data, if there are multiple sockets at the same time, there are data to be processed. Because the number of cpu cores is limited, according to the above, detecting iO events before processing IO events may occur, the worker thread will always deal with the first few sockets, until the first few sockets are processed and then processed. Data from several sockets. This is equivalent to, you go to a restaurant to eat, everyone ordered, but some tables have been served, and some tables have not been served. This is certainly not good. Let's see how to avoid this phenomenon.
int CFtdEngine::HandlePackage(CFTDCPackage *pFTDCPackage, CFTDCSession *pSession)
{
//NET_IO_LOG0("CFtdEngine::HandlePackage\n");
FTDC_PACKAGE_DEBUG(pFTDCPackage);
if (pFTDCPackage->GetTID() != FTD_TID_ReqUserLogin)
{
if (!IsSessionLogin(pSession->GetSessionID()))
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Customer not logged in");
return 0;
}
}
CalcFlux(pSession, pFTDCPackage->Length()); //Statistical traffic
REPORT_EVENT(LOG_DEBUG, "Front/Fgateway", "Login request%0x", pFTDCPackage->GetTID());
int nRet = 0;
switch(pFTDCPackage->GetTID())
{
case FTD_TID_ReqUserLogin:
///huwp: 20070608: Check for excessive versions API Will be banned from logging in
if (pFTDCPackage->GetVersion()>FTD_VERSION)
{
SendErrorRsp(pFTDCPackage, pSession, 1, "Too High FTD Version");
return 0;
}
nRet = OnReqUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqCheckUserLogin:
nRet = OnReqCheckUserLogin(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
case FTD_TID_ReqSubscribeTopic:
nRet = OnReqSubscribeTopic(pFTDCPackage, (CFTDCSession *)pSession);
FTDRequestIndex.incValue();
break;
}
return 0;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
When there is data readable on a socket, it receives the data on the socket, unpacks the received data, and then calls CalcFlux (pSession, pFTDCPackage - > Length ()) for traffic statistics:
void CFrontEngine::CalcFlux(CSession *pSession, const int nFlux)
{
TFrontSessionInfo *pSessionInfo = m_mapSessionInfo.Find(pSession->GetSessionID());
if (pSessionInfo != NULL)
{
//Change Flow Control to Counting
pSessionInfo->nCommFlux ++;
/// If the traffic exceeds the requirement, the session's read operation is suspended.
if (pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux)
{
pSession->SuspendRead(true);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
This function first increments the number of packets processed by a connection Session, and then determines whether the maximum number of packets exceeds, then sets the read-hang flag:
void CSession::SuspendRead(bool bSuspend)
{
m_bSuspendRead = bSuspend;
}
- 1
- 2
- 3
- 4
Next time this socket will be excluded from the list of detected sockets:
void CEpollReactor::RegisterIO(CEventHandler *pEventHandler)
{
int nReadID, nWriteID;
pEventHandler->GetIds(&nReadID, &nWriteID);
if (nWriteID != 0 && nReadID ==0)
{
nReadID = nWriteID;
}
if (nReadID != 0)
{
m_mapEventHandlerId[pEventHandler] = nReadID;
struct epoll_event ev;
ev.data.ptr = pEventHandler;
if(epoll_ctl(m_fdEpoll, EPOLL_CTL_ADD, nReadID, &ev) != 0)
{
perror("epoll_ctl EPOLL_CTL_ADD");
}
}
}
void CSession::GetIds(int *pReadId, int *pWriteId)
{
m_pChannelProtocol->GetIds(pReadId,pWriteId);
if (m_bSuspendRead)
{
*pReadId = 0;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
That is to say, the socket is no longer checked for data readability. Then the flag is reset after 1 second in the timer, so that the data on the socket can be detected again:
const int SESSION_CHECK_TIMER_ID = 9;
const int SESSION_CHECK_INTERVAL = 1000;
SetTimer(SESSION_CHECK_TIMER_ID, SESSION_CHECK_INTERVAL);
void CFrontEngine::OnTimer(int nIDEvent)
{
if (nIDEvent == SESSION_CHECK_TIMER_ID)
{
CSessionMap::iterator itor = m_mapSession.Begin();
while (!itor.IsEnd())
{
TFrontSessionInfo *pFind = m_mapSessionInfo.Find((*itor)->GetSessionID());
if (pFind != NULL)
{
CheckSession(*itor, pFind);
}
itor++;
}
}
}
void CFrontEngine::CheckSession(CSession *pSession, TFrontSessionInfo *pSessionInfo)
{
/// Re-start calculating traffic
pSessionInfo->nCommFlux -= pSessionInfo->nMaxCommFlux;
if (pSessionInfo->nCommFlux < 0)
{
pSessionInfo->nCommFlux = 0;
}
/// If the traffic exceeds the requirement, the session's read operation is suspended.
pSession->SuspendRead(pSessionInfo->nCommFlux >= pSessionInfo->nMaxCommFlux);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
It's quite the same as serving some dishes to a certain table of guests in a restaurant first, so that they can eat them first. After some dishes are served, they won't continue serving this table, but serve other empty tables. After everyone has eaten, they will continue serving the original table. In fact, our restaurants do that. The above example is a very good idea for implementing single service traffic control, which guarantees that every client can get services in a balanced way, instead of some clients waiting for a long time to respond. Of course, such technology can not be applied to business with sequential requirements, such as sales systems, which are generally ordered first.
In addition, in order to speed up IO operation, servers now use caching technology extensively. Caching is actually a strategy of exchanging space for time. For information that is used repeatedly but not changed frequently, we can use caching if it takes time to load the information from the original location (e.g. from disk or database). So nowadays, various memory databases such as redis, leveldb, fastdb and so on are popular. If you're working on server development, you need to know at least a few of them.
This is my first article on gitchat. Due to the limited space, many details can not be expanded. At the same time, I will not talk about the design techniques of distributed servers. Later, if conditions permit, it will bring you more technology sharing. At the same time, I thank Gitchat for providing such a platform to communicate with you.
In view of the author's limited ability and experience, there are inevitably some mistakes and omissions in this article. You are welcome to make comments.