Similar to TcpServer passively accepting connections using Acceptor, TcpClient actively initiates connections using Connector.
Several key points used by Connector:
- In non-blocking network programming, the basic way to initiate a connection is to call connect(2), which indicates that the connection has been established when the socket becomes writable and handles various types of errors.
- Connector is only responsible for establishing a socket connection, not for creating a TcpConnection (the TcpClient class in the later section implements the creation of a TcpCOnnection), its ConnectionCallBack callback parameter's socket fd.
- When error handling occurs, a socket may not be writable but the connection is established.When a connection is established in error, the socket descriptor becomes both readable and writable, and we can call getsockopt to get the error to be handled on the socket (SO_ERROR).
- In non-blocking network programming, the sockfd of connect(2) is one-time and cannot be recovered once an error occurs (such as the other party rejecting the connection), it can only be closed and restarted.However, Connector s are used repeatedly, so each attempt to connect uses a new socket file descriptor and a new Channel object, and attention should be paid to Channel's lifetime management.
1. Definition of Connector
Connector is created in the TcpClient to pass the address of the EventLoop to which it belongs and the server.There are only five external use interfaces exposed.Most of the connect operations are performed in loop s.
class Connector : noncopyable, public std::enable_shared_from_this<Connector> { public: typedef std::function<void (int sockfd)> NewConnectionCallback; // Connect to callback function prototype Connector(EventLoop* loop, const InetAddress& serverAddr); // Constructor ~Connector(); void setNewConnectionCallback(const NewConnectionCallback& cb) { newConnectionCallback_ = cb; } void start(); // Secure, actually calling EventLoop::queueInLoop() void restart(); // Insecure, can only be called on lopp thread s void stop(); // Secure, actually calling EventLoop::queueInLoop() const InetAddress& serverAddress() const { return serverAddr_; } private: enum States { kDisconnected, kConnecting, kConnected }; static const int kMaxRetryDelayMs = 30*1000; // Maximum delay time for reconnection static const int kInitRetryDelayMs = 500; // Delay time for initial reconnection void setState(States s) { state_ = s; } void startInLoop(); // queueInLoop actual call to start() void stopInLoop(); // queueInLoop actual call to stop() void connect(); // Create sockefd, call:: connetc(2) System call void connecting(int sockfd); // :: connetc(2) connect, construct Channel, register events void handleWrite(); // Handle connected socketfd readable events void handleError(); // Handle connected socketfd error events void retry(int sockfd); // Try to reconnect int removeAndResetChannel(); // Disconnect, reset Channel void resetChannel(); EventLoop* loop_; // Owning event loop InetAddress serverAddr_; // Service-side address to connect to bool connect_; // atomic // Flag to connect or not States state_; // FIXME: use atomic variable std::unique_ptr<Channel> channel_; // channel created by socketfd for client communication NewConnectionCallback newConnectionCallback_; // Callback function on success int retryDelayMs_; // Delay time ms for reconnection };
2. Implementation of Connector
2.1. Construction and Destruction
Construct an initialization list to assign values to member variables.Destructors do nothing and close connections using specific functions.
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr) : loop_(loop), // Owning EvenetLoop serverAddr_(serverAddr), // Service-side address to connect to connect_(false), // Don't connect now state_(kDisconnected), // Initial connection state retryDelayMs_(kInitRetryDelayMs) // Reconnection Delay Time { LOG_DEBUG << "ctor[" << this << "]"; } Connector::~Connector() { LOG_DEBUG << "dtor[" << this << "]"; assert(!channel_); }
2.2. Preparing to connect start()
Connector's owner TcpClient calls start(), starts the connection, and calls loop_->internallyRunInLoop, normally calls the connect() function in the following steps:
- Create non-blocking sockets::createNonblockingOrDie(), return socketfd
- Calls: connect() system call, which determines whether a connection is successful or not based on the return value, is divided into three cases:
- Return value 0, normal connection succeeded;The return value of EINPROGRESS indicates that the connection is in progress, wait until the subsequent socketfd is readable to confirm further, or try to reconnect.
- Temporary local, server errors, you can try to reconnect
- Others, such as permissions, protocol errors, close the connection directly
- Select connection processing, reconnection, closure based on the return value
Note that only connect_is setWhen true, connection processing is initiated.If stop() is called to disconnect, the code here will not handle the connection.
void Connector::start() { connect_ = true; loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe } void Connector::startInLoop() { loop_->assertInLoopThread(); assert(state_ == kDisconnected); if (connect_){ // Handle only the case where the start connection flag bit true is connect(); } else{ LOG_DEBUG << "do not connect"; } } void Connector::connect() { int sockfd = sockets::createNonblockingOrDie(serverAddr_.family()); int ret = sockets::connect(sockfd, serverAddr_.getSockAddr()); int savedErrno = (ret == 0) ? 0 : errno; switch (savedErrno) { case 0: case EINPROGRESS: // Non-blocking cannot return immediately, connection is in progress and subsequent socketfd readable check SO_ERROR case EINTR: // Connection interrupted by SIGNAL case EISCONN: // Connected State connecting(sockfd); break; case EAGAIN: // Local address in use case EADDRINUSE: // Address already in use case EADDRNOTAVAIL: // No available local port for connection case ECONNREFUSED: // There is no socket listening on the server address case ENETUNREACH: // Network unreachable (firewall?) retry(sockfd); break; case EACCES: case EPERM: // Connect to a wide address but do not set this flag bit;Local Firewall Rules Do Not Allow Connections case EAFNOSUPPORT: // Current sa_is not supportedFamily case EALREADY: // The socket is non-blocking, but the previous connection request has not been processed to end case EBADF: // The sockfd parameter is not valid in the descriptor table case EFAULT: // socket data structure is not in user address space case ENOTSOCK: // fd is not of type sokcet LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); break; default: LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno; sockets::close(sockfd); // connectErrorCallback_(); break; } }
2.3. Processing connection () in connection
Start processing in the connection, set the current connection state kConnecting, construct the current socketfd as Channel, and register writable, error events to Poller.Connection status is further determined by the readable events Poller listens for.
void Connector::connecting(int sockfd) { setState(kConnecting); // Set state assert(!channel_); // Reset channel_,Set Writable Event Callbacks, Error Event Callbacks channel_.reset(new Channel(loop_, sockfd)); channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe // channel_->tie(shared_from_this()); is not working, // as channel_ is not managed by shared_ptr channel_->enableWriting(); // Register with Poller }
2.4, Handle event handleWrite(), handleError()
Listen for readable events on socketfd to confirm that the current connection was successfully established, and do not process if the current state is kDisconnected.
If the current state is in a kConnecting connection, reset channel_,Get SO_from socketfdERROR.If SO_ERROR if ERROR value is not zero or self-connect, retry(sockfd) reconnect is attempted;SO_An ERROR value of 0 indicates successful connection, and normal callback passes connected socketfd to the TcpClient.Reset channel_when processing state is kConnecting in handling error eventsAnd try to reconnect.
Note that the removeAndResetChannel() function is called to return socketfd because the current socketfd is connected to the server. Subsequent TcpClient s need to take over socketfd, create TcpConnection, and register the IO events of socketd with Poller for server-client communication, so the current Connector is no longer required to manage the descriptor during the connection.
void Connector::handleWrite() { LOG_TRACE << "Connector::handleWrite " << state_; if (state_ == kConnecting){ // Currently the state in the kConnecting connection int sockfd = removeAndResetChannel(); // Reset the connection and return to the soketfd used by the previous connection int err = sockets::getSocketError(sockfd); // Get Last Error if (err){ // connection failed LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); } else if (sockets::isSelfConnect(sockfd)){ // Self-connection LOG_WARN << "Connector::handleWrite - Self connect"; retry(sockfd); } else{ // Error value 0, connection successful setState(kConnected); // Set to kConnected Connected State if (connect_){ newConnectionCallback_(sockfd); // Callback TcpClient's connection successfully callback, returning connected socketfd } else{ sockets::close(sockfd); // If no connection is made, close the currently connected socketfd } } } else { // what happened? assert(state_ == kDisconnected); } } void Connector::handleError() { LOG_ERROR << "Connector::handleError state=" << state_; if (state_ == kConnecting){ int sockfd = removeAndResetChannel(); // Reset int err = sockets::getSocketError(sockfd); // Try to reconnect LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err); retry(sockfd); } }
2.5, Remove the reset removeAndResetChannel()
When an error occurs and a reconnection attempt is made, the channel needs to be reset, because the current socketfd can no longer be reused, but it is close d in retry() without closing.
int Connector::removeAndResetChannel() { channel_->disableAll(); channel_->remove(); int sockfd = channel_->fd(); // Can't reset channel_ here, because we are inside Channel::handleEvent loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe return sockfd; } void Connector::resetChannel() { channel_.reset(); }
2.6. Try to reconnect retyr()
Try reconnecting, close the last socketfd used, and set the connection state to kDisconnected.
Reconnection is handled using a timer, and each attempt to reconnect takes twice as long as the last reconnection.
The socket is closed to unify the interface for disconnected calls to stop().
void Connector::retry(int sockfd) { sockets::close(sockfd); // socketfd used when connection failed before closing setState(kDisconnected); // Set the current state to kDisconnected not connected (disconnected) if (connect_){ LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort() << " in " << retryDelayMs_ << " milliseconds. "; loop_->runAfter(retryDelayMs_/1000.0, std::bind(&Connector::startInLoop, shared_from_this())); retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); } else{ LOG_DEBUG << "do not connect"; } }
2.7, Close and restart stop(), restart()
Client disconnection is also performed in the loop and can be restarted after disconnection.
void Connector::stop() { connect_ = false; // Set Disconnect loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe // FIXME: cancel timer } void Connector::stopInLoop() { loop_->assertInLoopThread(); if (state_ == kConnecting) // If in connection { setState(kDisconnected); // Set state int sockfd = removeAndResetChannel(); // Reset retry(sockfd); // Because connect_is setFor faslse, the reconnect operation does not actually process } } void Connector::restart() { loop_->assertInLoopThread(); setState(kDisconnected); retryDelayMs_ = kInitRetryDelayMs; // Reset initial reconnection timeout connect_ = true; startInLoop(); // Ready to start connection }