SRS streaming media server - source code analysis - simple analysis of basic process

SRS streaming media server - source code analysis - simple analysis of basic process

preface

  1. The Department plans to use SRS to build a live broadcast source station, which is my responsibility. Therefore, I take this opportunity to learn about SRS, which is also an opportunity for me to improve. Recently, I also started to build clusters of various modes on my Alibaba cloud server and read some source codes.
  2. I am a person with poor memory, so I like to summarize and sort out, so it is not so easy to forget, and I am required to do my best.
  3. Although many contents are described in detail in the wiki on the SRS official website, the source code analysis doesn't seem to be seen much.
  4. The goal of SRS is to lower the threshold of bass video. My goal is to lower the threshold of SRS learning, so I want to open a series, which is divided into three parts:
    1. SRS environment construction.
    2. SRS source code analysis.
    3. Problems and solutions encountered in the actual combat of SRS.
  5. I'm still a beginner, so I don't understand enough or say something wrong. Please point out, or I can say any good suggestions. I'll improve and hope to make progress together.
  6. If the flow chart cannot be enlarged, please refer to the language bird document to enlarge the flow chart 1. SRS basic process
  7. The first SRS source code analysis will first sort out a general process, and then analyze the contents of RTMP handshake, alliance building and push-pull stream in detail.

catalogue

  1. main(), domain(), and run_master()
  2. SrsServer::listen()
  3. SrsServer::listen_rtmp()
  4. SrsBufferListener::listen(),SrsTcpListener::listen()
  5. SrsTcpListener::cycle()
  6. SrsBufferListener::on_tcp_client()
  7. SrsServer::accept_client(),SrsServer::fd2conn()
  8. SrsSTCoroutine::start(),SrsConnection::cycle()
  9. SrsRtmpConn::do_cycle()

0. Basic flow chart

1. main(), domain() and run_master()

  1. The file where the main function is located is in main/srs_main_server.cpp.
  2. Some global variables are defined:
    a. _srs_config: global configuration file
    b. _srs_log: Global log file
  3. main() calls domain() to execute the process. The following domain() internal function analysis is not the main process, but it is introduced too much
_srs_config->parse_options(argc, argv)	//Parsing command line parameters

_srs_config->get_work_dir() //Set working directory and current directory
_srs_config->initialize_cwd()    

_srs_log->initialize()	//Initialize log
  1. The point is that the SrsServer object will be created and run
_srs_server = new SrsServer();
run(_srs_server)
  1. HTTP is also initialized when SrsServer is created_ api_ MUX and http_server
http_api_mux = new SrsHttpServeMux(); 	// HTTP request multiplexer, not HTTP streaming
http_server = new SrsHttpServer(this); // http service
  1. run(SrsServer* svr) initializes the server and gets the daemon configuration in_daemon (false by default), if in_ If daemon is false, execute run directly_ master(SrsServer* svr)
svr->initialize(NULL) //initialize server
_srs_config->get_daemon() //Get the daemon and configure it. The default is false. If it is true, srs will fork the child process and let the child process execute run_master
run_master(svr) 
  1. run_ In the master (SrsServer* SVR) function, the server does some initialization work and calls listern to listen to the connection of the client, then calls do_. Cycle function (dead cycle), do some monitoring, update time, cache, etc.
svr->initialize_st() 		//Initialize st collaboration Library
svr->initialize_signal()	//Initialization signal
svr->acquire_pid_file()		//Write pid thread to file
svr->listen()				//Listen for client requests
svr->register_signal()		//Registration signal
svr->http_handle()    		//Register http processing module
svr->ingest()				//Turn on stream collection
svr->cycle()				//Message loop processing
  1. When initializing signal, you need to convert the signal into IO when using state threads, and create a co process to process the signal io.
    a. svr->initialize_ Signal() initializes the signal, and the signal inside_ Manager - > initialize() creates a pipe.
    b. svr->register_ Signal() to register the signal_ Manager - > start() registers the signal and starts the signal listening process.
    c. Srsignalmanager:: cycle() performs the final listening, circularly performs the listening operation, and calls SrsServer::on_signal()
    d. SrsServer::on_signal() will detect each signal. If the signal occurs, set the corresponding bool variable to true
    e. Finally, in srsserver:: do_ Check the signal in cycle () and process it
  2. Focus on SrsServer::listen()

2. SrsServer::listen()

  1. SrsServer::listen() will listen to rtmp/http and other client requests.
listen_rtmp()			//Monitor rtmp
listen_http_api()    	//Listen http
listen_http_stream()	//Listen to HTTP stream
listen_stream_caster()  //Listen to the conversion stream
conn_manager->start()	//Start the connection management process    
  1. First analyze the rtmp push-pull flow process, so focus on SrsServer::listen_rtmp(), which will be introduced later, such as listen_http_stream().

3. SrsServer::listen_rtmp()

  1. SrsServer::listen_ The rtmp() function is as follows
srs_error_t SrsServer::listen_rtmp()
{
    srs_error_t err = srs_success;
    
    // stream service port.  Get the list of ports to listen in the configuration file
    std::vector<std::string> ip_ports = _srs_config->get_listens();
    srs_assert((int)ip_ports.size() > 0);
    //Close the listener of type SrsListenerRtmpStream and delete the listening object from the listeners manager.
    close_listeners(SrsListenerRtmpStream);
    
    for (int i = 0; i < (int)ip_ports.size(); i++) { //Traversal ip_ports list. The pointer listener of the parent class SrsListener points to the object of the newly constructed subclass SrsBufferListener
        SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
        listeners.push_back(listener); //Add to the list of listeners Manager

        int port; string ip; //Split ip address (if any) and port
        srs_parse_endpoint(ip_ports[i], ip, port);
        //Polymorphism: call the member function listen of subclass SrsBufferListener
        if ((err = listener->listen(ip, port)) != srs_success) {
            srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
        }
    }
    
    return err;
}
  1. The types of monitoring are:
// The listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
    // RTMP client,
    SrsListenerRtmpStream = 0,
    // HTTP api,
    SrsListenerHttpApi = 1,
    // HTTP stream, HDS/HLS/DASH
    SrsListenerHttpStream = 2,
    // UDP stream, MPEG-TS over udp.
    SrsListenerMpegTsOverUdp = 3,
    // TCP stream, RTSP stream.
    SrsListenerRtsp = 4,
    // TCP stream, FLV stream over HTTP.
    SrsListenerFlv = 5,
};
  1. close_listeners is to remove elements of type from listeners.
void SrsServer::close_listeners(SrsListenerType type)
{
    std::vector<SrsListener*>::iterator it;
    for (it = listeners.begin(); it != listeners.end();) {
        SrsListener* listener = *it;
        
        if (listener->listen_type() != type) { //Different type s, continue
            ++it;
            continue;
        }
        
        srs_freep(listener);
        it = listeners.erase(it); //Remove from listeners (vector)
    }
}

4. SrsBufferListener::listen(),SrsTcpListener::listen()

  1. listern_rtmp() calls listen() of SrsBufferListener to listen
srs_error_t SrsBufferListener::listen(string i, int p) //i default to "0.0.0.0"
{
    srs_error_t err = srs_success;
    
    ip = i;
    port = p;
    
    srs_freep(listener);
    listener = new SrsTcpListener(this, ip, port); //Create TCP listener
    
    if ((err = listener->listen()) != srs_success) { //Enter monitoring
        return srs_error_wrap(err, "buffered tcp listen");
    }
    
    string v = srs_listener_type2string(type);
    srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
    
    return err;
}
  1. The listener will call SrsTcpListener::listen()
srs_error_t SrsTcpListener::listen()
{
    srs_error_t err = srs_success;

    if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { //Create a listening fd and register the fd with the st library
        return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("tcp", this); //Create a collaboration
    if ((err = trd->start()) != srs_success) {  //Start the collaboration process and enter SrsSTCoroutine::cycle()
        return srs_error_wrap(err, "start coroutine");
    }
    
    return err;
}
  1. The SrsTcpListener class actually listens, creates the monitored fd through socket - > bind - > Listen (completed in the srs_tcp_listen function), and registers the fd to the st library. After that, the events on the fd are monitored and processed by the st library.
  2. Create a tcp coroutine to handle the connection. The coroutine starts and enters the SrsSTCoroutine::cycle() function.
    a. The cycle() function is used to handle client connections.
  3. Some functions are noted as follows:
srs_tcp_listen(ip, port, &lfd)
    fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol) //Create socket
    do_srs_tcp_listen(fd, r, pfd)
        srs_fd_closeexec(fd)				//Set FD_CLOEXEC
        srs_fd_reuseaddr(fd)				//Reuse fd
        bind(fd, r->ai_addr, r->ai_addrlen)	//Binding service ip and port
        ::listen(fd, SERVER_LISTEN_BACKLOG)	//In_ Enable listening on fd
        (*pfd = srs_netfd_open_socket(fd))	//Register fd with the st library, and then all requests of fd will be handled by the library
trd = new SrsSTCoroutine("tcp", this)		//Create a collaboration
trd->start()								//Start the collaboration process and enter SrsSTCoroutine::cycle()

5. SrsTcpListener::cycle()

  1. Srstcoroutine:: cycle() will finally call SrsTcpListener::cycle()
  2. SrsTcpListener::cycle() listens to the process. After accepting the connection request, the process passes the execution logic to the BufferListener for processing.
srs_error_t SrsTcpListener::cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "tcp listener");
        }
        //Receive connection
        srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
        if(fd == NULL){
            return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
        }
        
	    if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
	        return srs_error_wrap(err, "set closeexec");
	    }
        //This listening process only processes connection requests, and the specific execution logic is handed over to the BufferListener for processing
        if ((err = handler->on_tcp_client(fd)) != srs_success) {
            return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
        }
    }
    
    return err;
}

6. SrsBufferListener::on_tcp_client()

  1. SrsBufferListener::on_ tcp_ The client () code is as follows:
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
    srs_error_t err = server->accept_client(type, stfd); //Submit to SrsServer for processing
    if (err != srs_success) {
        srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
        srs_freep(err);
    }
    
    return srs_success;
}
  1. Finally, the accept of SrsServer is called_ Client processing

7. SrsServer::accept_client(),SrsServer::fd2conn()

  1. SrsServer::accept_ The client () code is as follows:
    a. First get the SrsConnection of the connection according to the type
    b. Add SrsConnection to conns, which stores all connections
    c. Open a connection process for each SrsConnection
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
    srs_error_t err = srs_success;
    
    SrsConnection* conn = NULL;
    
    if ((err = fd2conn(type, stfd, &conn)) != srs_success) { //Get the SrsConnection of the connection according to the type
        if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
            srs_close_stfd(stfd); srs_error_reset(err);
            return srs_success;
        }
        return srs_error_wrap(err, "fd2conn");
    }
    srs_assert(conn);
    
    // directly enqueue, the cycle thread will remove the client.
    conns.push_back(conn); // Join conns, which stores all connections
    
    // cycle will start process thread and when finished remove the client.
    // @remark never use the conn, for it maybe destroyed.
    if ((err = conn->start()) != srs_success) { //Each connection opens a connection process
        return srs_error_wrap(err, "start conn coroutine");
    }
    
    return err;
}
  1. Get the main code of the connection:
    a. Because now the type is SrsListenerRtmpStream, and all Conns return srsrrtmp conn.
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
    srs_error_t err = srs_success;
    
    int fd = srs_netfd_fileno(stfd);
    string ip = srs_get_peer_ip(fd);
    
    // for some keep alive application, for example, the keepalived,
    // will send some tcp packet which we cann't got the ip,
    // we just ignore it. 
    if (ip.empty()) { //Ignore if ip cannot be obtained
        return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd);
    }
    
    // check connection limitation.
    int max_connections = _srs_config->get_max_connections(); //Get the maximum number of connections
    if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) {
        return srs_error_wrap(err, "drop client fd=%d, max=%d, cur=%d for err: %s",
            fd, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
    }
    if ((int)conns.size() >= max_connections) { //If the connection limit is exceeded, the connection will be rejected directly
        return srs_error_new(ERROR_EXCEED_CONNECTIONS,
            "drop fd=%d, max=%d, cur=%d for exceed connection limits",
            fd, max_connections, (int)conns.size());
    }
    
    // avoid fd leak when fork.
    // @see https://github.com/ossrs/srs/issues/518
    if (true) {
        int val;
        if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
            return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
        }
        val |= FD_CLOEXEC;
        if (fcntl(fd, F_SETFD, val) < 0) {
            return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
        }
    }
    
    if (type == SrsListenerRtmpStream) {
        *pconn = new SrsRtmpConn(this, stfd, ip); //Create RTMP connection
    } else if (type == SrsListenerHttpApi) {
        *pconn = new SrsHttpApi(this, stfd, http_api_mux, ip);
    } else if (type == SrsListenerHttpStream) {
        *pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
    } else {
        srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
        srs_close_stfd(stfd);
        return err;
    }
    
    return err;
}

8. SrsSTCoroutine::start(),SrsConnection::cycle()

  1. SrsConnection::start() code is as follows:
srs_error_t SrsConnection::start()
{
    srs_error_t err = srs_success;

    if ((err = skt->initialize(stfd)) != srs_success) {
        return srs_error_wrap(err, "init socket");
    }
    //Start the conn process, and finally execute to SrsConnection::cycle()
    if ((err = trd->start()) != srs_success) { 
        return srs_error_wrap(err, "coroutine");
    }
    
    return err;
}
  1. The SrsConnection::cycle() code is as follows:
srs_error_t SrsConnection::cycle()
{
    srs_error_t err = do_cycle(); //SrsRtmpConn::do_cycle
    
    // Notify manager to remove it.
    manager->remove(this);
    
    // success.
    if (err == srs_success) {
        srs_trace("client finished.");
        return err;
    }
    
    // client close peer.
    // TODO: FIXME: Only reset the error when client closed it.
    if (srs_is_client_gracefully_close(err)) {
        srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
    } else if (srs_is_server_gracefully_close(err)) {
        srs_warn("server disconnect. ret=%d", srs_error_code(err));
    } else {
        srs_error("serve error %s", srs_error_desc(err).c_str());
    }
    
    srs_freep(err);
    return srs_success;
}

9. SrsRtmpConn::do_cycle()

  1. If there is a streaming event, it will enter srsrrtmpconn:: do_ Cycle(), which is responsible for the specific execution of RTMP processes, including handshaking, receiving connect requests, sending response connect responses, and receiving audio and video stream data.
srs_error_t SrsRtmpConn::do_cycle()
{
    srs_error_t err = srs_success;
    
    srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
    
    rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); //Set receive timeout
    rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); //Set send timeout

    if ((err = rtmp->handshake()) != srs_success) { //rtmp handshake
        return srs_error_wrap(err, "rtmp handshake");
    }

    uint32_t rip = rtmp->proxy_real_ip();
    if (rip > 0) {
        srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
            uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
    }
    
    SrsRequest* req = info->req;
    if ((err = rtmp->connect_app(req)) != srs_success) { //After the handshake is successful, srs will receive and parse the RTMP message connect sent by the client
        return srs_error_wrap(err, "rtmp connect tcUrl");
    }
    
    // set client ip to request.
    req->ip = ip;
    
    srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
        req->schema.c_str(), req->vhost.c_str(), req->port,
        req->app.c_str(), (req->args? "(obj)":"null"));
    
    // show client identity
    if(req->args) {
        std::string srs_version;
        std::string srs_server_ip;
        int srs_pid = 0;
        int srs_id = 0;
        
        SrsAmf0Any* prop = NULL;
        if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
            srs_version = prop->to_str();
        }
        if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
            srs_server_ip = prop->to_str();
        }
        if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
            srs_pid = (int)prop->to_number();
        }
        if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
            srs_id = (int)prop->to_number();
        }
        
        if (srs_pid > 0) {
            srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
                srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
        }
    }
    //Service cycle
    if ((err = service_cycle()) != srs_success) {
        err = srs_error_wrap(err, "service cycle");
    }
    
    srs_error_t r0 = srs_success;
    if ((r0 = on_disconnect()) != srs_success) {
        err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
        srs_freep(r0);
    }
    
    // If client is redirect to other servers, we already logged the event.
    if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
        srs_error_reset(err);
    }
    
    return err;
}

Keywords: Operation & Maintenance server

Added by mikeatrpi on Fri, 28 Jan 2022 07:29:52 +0200