muduo Learning Notes: Implementing TCP Network Programming Library-Connector in the net section

Similar to TcpServer passively accepting connections using Acceptor, TcpClient actively initiates connections using Connector.

Several key points used by Connector:

  • In non-blocking network programming, the basic way to initiate a connection is to call connect(2), which indicates that the connection has been established when the socket becomes writable and handles various types of errors.
  • Connector is only responsible for establishing a socket connection, not for creating a TcpConnection (the TcpClient class in the later section implements the creation of a TcpCOnnection), its ConnectionCallBack callback parameter's socket fd.
  • When error handling occurs, a socket may not be writable but the connection is established.When a connection is established in error, the socket descriptor becomes both readable and writable, and we can call getsockopt to get the error to be handled on the socket (SO_ERROR).
  • In non-blocking network programming, the sockfd of connect(2) is one-time and cannot be recovered once an error occurs (such as the other party rejecting the connection), it can only be closed and restarted.However, Connector s are used repeatedly, so each attempt to connect uses a new socket file descriptor and a new Channel object, and attention should be paid to Channel's lifetime management.

1. Definition of Connector

Connector is created in the TcpClient to pass the address of the EventLoop to which it belongs and the server.There are only five external use interfaces exposed.Most of the connect operations are performed in loop s.

class Connector : noncopyable, 
				  public std::enable_shared_from_this<Connector>
{
 public:
  typedef std::function<void (int sockfd)> NewConnectionCallback; // Connect to callback function prototype

  Connector(EventLoop* loop, const InetAddress& serverAddr);  // Constructor
  ~Connector();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  void start();  // Secure, actually calling EventLoop::queueInLoop()
  void restart();  // Insecure, can only be called on lopp thread s
  void stop();  // Secure, actually calling EventLoop::queueInLoop()  

  const InetAddress& serverAddress() const { return serverAddr_; }

 private:
  enum States { kDisconnected, kConnecting, kConnected };
  static const int kMaxRetryDelayMs = 30*1000;  // Maximum delay time for reconnection
  static const int kInitRetryDelayMs = 500;     // Delay time for initial reconnection

  void setState(States s) { state_ = s; }
  void startInLoop();  			// queueInLoop actual call to start()
  void stopInLoop();			// queueInLoop actual call to stop()
  void connect();				// Create sockefd, call:: connetc(2) System call
  void connecting(int sockfd);	// :: connetc(2) connect, construct Channel, register events
  void handleWrite();			// Handle connected socketfd readable events
  void handleError();			// Handle connected socketfd error events
  void retry(int sockfd);		// Try to reconnect
  int removeAndResetChannel();	// Disconnect, reset Channel
  void resetChannel();

  EventLoop* loop_;				// Owning event loop
  InetAddress serverAddr_;     	// Service-side address to connect to
  bool connect_; // atomic	// Flag to connect or not
  States state_;  // FIXME: use atomic variable
  std::unique_ptr<Channel> channel_;			// channel created by socketfd for client communication
  NewConnectionCallback newConnectionCallback_; // Callback function on success
  int retryDelayMs_;  							// Delay time ms for reconnection
};

2. Implementation of Connector

2.1. Construction and Destruction

Construct an initialization list to assign values to member variables.Destructors do nothing and close connections using specific functions.

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),						// Owning EvenetLoop
    serverAddr_(serverAddr),			// Service-side address to connect to
    connect_(false),					// Don't connect now
    state_(kDisconnected),				// Initial connection state
    retryDelayMs_(kInitRetryDelayMs)	// Reconnection Delay Time
{
  LOG_DEBUG << "ctor[" << this << "]";
}

Connector::~Connector()
{
  LOG_DEBUG << "dtor[" << this << "]";
  assert(!channel_);
}

2.2. Preparing to connect start()

Connector's owner TcpClient calls start(), starts the connection, and calls loop_->internallyRunInLoop, normally calls the connect() function in the following steps:

  • Create non-blocking sockets::createNonblockingOrDie(), return socketfd
  • Calls: connect() system call, which determines whether a connection is successful or not based on the return value, is divided into three cases:
    • Return value 0, normal connection succeeded;The return value of EINPROGRESS indicates that the connection is in progress, wait until the subsequent socketfd is readable to confirm further, or try to reconnect.
    • Temporary local, server errors, you can try to reconnect
    • Others, such as permissions, protocol errors, close the connection directly
  • Select connection processing, reconnection, closure based on the return value

Note that only connect_is setWhen true, connection processing is initiated.If stop() is called to disconnect, the code here will not handle the connection.

void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_){    // Handle only the case where the start connection flag bit true is
    connect();
  }
  else{
    LOG_DEBUG << "do not connect";
  }
}

void Connector::connect()
{
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0:
    case EINPROGRESS:      	// Non-blocking cannot return immediately, connection is in progress and subsequent socketfd readable check SO_ERROR
    case EINTR:				// Connection interrupted by SIGNAL
    case EISCONN:			// Connected State
      connecting(sockfd);
      break;

    case EAGAIN:			// Local address in use
    case EADDRINUSE:		// Address already in use
    case EADDRNOTAVAIL:		// No available local port for connection
    case ECONNREFUSED:	    // There is no socket listening on the server address
    case ENETUNREACH:		// Network unreachable (firewall?)
      retry(sockfd);
      break;

    case EACCES:			
    case EPERM:				// Connect to a wide address but do not set this flag bit;Local Firewall Rules Do Not Allow Connections
    case EAFNOSUPPORT:		// Current sa_is not supportedFamily
    case EALREADY:			// The socket is non-blocking, but the previous connection request has not been processed to end
    case EBADF:  			// The sockfd parameter is not valid in the descriptor table
    case EFAULT:			// socket data structure is not in user address space
    case ENOTSOCK:			// fd is not of type sokcet
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}

2.3. Processing connection () in connection

Start processing in the connection, set the current connection state kConnecting, construct the current socketfd as Channel, and register writable, error events to Poller.Connection status is further determined by the readable events Poller listens for.

void Connector::connecting(int sockfd)
{
  setState(kConnecting);  // Set state
  assert(!channel_);
  // Reset channel_,Set Writable Event Callbacks, Error Event Callbacks
  channel_.reset(new Channel(loop_, sockfd));
  channel_->setWriteCallback(std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  channel_->setErrorCallback(std::bind(&Connector::handleError, this)); // FIXME: unsafe

  // channel_->tie(shared_from_this()); is not working,
  // as channel_ is not managed by shared_ptr
  channel_->enableWriting();   // Register with Poller
}

2.4, Handle event handleWrite(), handleError()

Listen for readable events on socketfd to confirm that the current connection was successfully established, and do not process if the current state is kDisconnected.

If the current state is in a kConnecting connection, reset channel_,Get SO_from socketfdERROR.If SO_ERROR if ERROR value is not zero or self-connect, retry(sockfd) reconnect is attempted;SO_An ERROR value of 0 indicates successful connection, and normal callback passes connected socketfd to the TcpClient.Reset channel_when processing state is kConnecting in handling error eventsAnd try to reconnect.

Note that the removeAndResetChannel() function is called to return socketfd because the current socketfd is connected to the server. Subsequent TcpClient s need to take over socketfd, create TcpConnection, and register the IO events of socketd with Poller for server-client communication, so the current Connector is no longer required to manage the descriptor during the connection.

void Connector::handleWrite()
{
  LOG_TRACE << "Connector::handleWrite " << state_;

  if (state_ == kConnecting){   // Currently the state in the kConnecting connection
    int sockfd = removeAndResetChannel();  // Reset the connection and return to the soketfd used by the previous connection
    int err = sockets::getSocketError(sockfd);  // Get Last Error
    if (err){  // connection failed
      LOG_WARN << "Connector::handleWrite - SO_ERROR = " << err << " " << strerror_tl(err);
      retry(sockfd);
    }
    else if (sockets::isSelfConnect(sockfd)){   // Self-connection
      LOG_WARN << "Connector::handleWrite - Self connect";
      retry(sockfd);
    }
    else{  //  Error value 0, connection successful
      setState(kConnected);    // Set to kConnected Connected State
      if (connect_){	
        newConnectionCallback_(sockfd); // Callback TcpClient's connection successfully callback, returning connected socketfd
      }
      else{
        sockets::close(sockfd); // If no connection is made, close the currently connected socketfd
      }
    }
  }
  else {
    // what happened?
    assert(state_ == kDisconnected);
  }
}

void Connector::handleError()
{
  LOG_ERROR << "Connector::handleError state=" << state_;
  if (state_ == kConnecting){  
    int sockfd = removeAndResetChannel();  // Reset
    int err = sockets::getSocketError(sockfd); // Try to reconnect
    LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
    retry(sockfd);
  }
}

2.5, Remove the reset removeAndResetChannel()

When an error occurs and a reconnection attempt is made, the channel needs to be reset, because the current socketfd can no longer be reused, but it is close d in retry() without closing.

int Connector::removeAndResetChannel()
{
  channel_->disableAll();
  channel_->remove();
  int sockfd = channel_->fd();
  // Can't reset channel_ here, because we are inside Channel::handleEvent
  loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
  return sockfd;
}

void Connector::resetChannel()
{
  channel_.reset();
}

2.6. Try to reconnect retyr()

Try reconnecting, close the last socketfd used, and set the connection state to kDisconnected.
Reconnection is handled using a timer, and each attempt to reconnect takes twice as long as the last reconnection.

The socket is closed to unify the interface for disconnected calls to stop().

void Connector::retry(int sockfd)
{
  sockets::close(sockfd);  // socketfd used when connection failed before closing
  setState(kDisconnected); // Set the current state to kDisconnected not connected (disconnected)
  if (connect_){
    LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
             << " in " << retryDelayMs_ << " milliseconds. ";
    loop_->runAfter(retryDelayMs_/1000.0, std::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
  else{
    LOG_DEBUG << "do not connect";
  }
}

2.7, Close and restart stop(), restart()

Client disconnection is also performed in the loop and can be restarted after disconnection.

void Connector::stop()
{
  connect_ = false;   // Set Disconnect
  loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
  // FIXME: cancel timer
}

void Connector::stopInLoop()
{
  loop_->assertInLoopThread();
  if (state_ == kConnecting)  // If in connection
  {
    setState(kDisconnected); // Set state
    int sockfd = removeAndResetChannel();  // Reset
    retry(sockfd);  // Because connect_is setFor faslse, the reconnect operation does not actually process
  }
}

void Connector::restart()
{
  loop_->assertInLoopThread();
  setState(kDisconnected);
  retryDelayMs_ = kInitRetryDelayMs;  // Reset initial reconnection timeout
  connect_ = true;
  startInLoop();	// Ready to start connection
}

Keywords: tcp net muduo

Added by JasonMWaldo on Fri, 03 Sep 2021 19:29:20 +0300