SRS streaming media server - source code analysis - simple analysis of basic process
preface
- The Department plans to use SRS to build a live broadcast source station, which is my responsibility. Therefore, I take this opportunity to learn about SRS, which is also an opportunity for me to improve. Recently, I also started to build clusters of various modes on my Alibaba cloud server and read some source codes.
- I am a person with poor memory, so I like to summarize and sort out, so it is not so easy to forget, and I am required to do my best.
- Although many contents are described in detail in the wiki on the SRS official website, the source code analysis doesn't seem to be seen much.
- The goal of SRS is to lower the threshold of bass video. My goal is to lower the threshold of SRS learning, so I want to open a series, which is divided into three parts:
- SRS environment construction.
- SRS source code analysis.
- Problems and solutions encountered in the actual combat of SRS.
- I'm still a beginner, so I don't understand enough or say something wrong. Please point out, or I can say any good suggestions. I'll improve and hope to make progress together.
- If the flow chart cannot be enlarged, please refer to the language bird document to enlarge the flow chart 1. SRS basic process
- The first SRS source code analysis will first sort out a general process, and then analyze the contents of RTMP handshake, alliance building and push-pull stream in detail.
catalogue
- main(), domain(), and run_master()
- SrsServer::listen()
- SrsServer::listen_rtmp()
- SrsBufferListener::listen(),SrsTcpListener::listen()
- SrsTcpListener::cycle()
- SrsBufferListener::on_tcp_client()
- SrsServer::accept_client(),SrsServer::fd2conn()
- SrsSTCoroutine::start(),SrsConnection::cycle()
- SrsRtmpConn::do_cycle()
0. Basic flow chart
1. main(), domain() and run_master()
- The file where the main function is located is in main/srs_main_server.cpp.
- Some global variables are defined:
a. _srs_config: global configuration file
b. _srs_log: Global log file - main() calls domain() to execute the process. The following domain() internal function analysis is not the main process, but it is introduced too much
_srs_config->parse_options(argc, argv) //Parsing command line parameters
_srs_config->get_work_dir() //Set working directory and current directory
_srs_config->initialize_cwd()
_srs_log->initialize() //Initialize log
- The point is that the SrsServer object will be created and run
_srs_server = new SrsServer();
run(_srs_server)
- HTTP is also initialized when SrsServer is created_ api_ MUX and http_server
http_api_mux = new SrsHttpServeMux(); // HTTP request multiplexer, not HTTP streaming
http_server = new SrsHttpServer(this); // http service
- run(SrsServer* svr) initializes the server and gets the daemon configuration in_daemon (false by default), if in_ If daemon is false, execute run directly_ master(SrsServer* svr)
svr->initialize(NULL) //initialize server
_srs_config->get_daemon() //Get the daemon and configure it. The default is false. If it is true, srs will fork the child process and let the child process execute run_master
run_master(svr)
- run_ In the master (SrsServer* SVR) function, the server does some initialization work and calls listern to listen to the connection of the client, then calls do_. Cycle function (dead cycle), do some monitoring, update time, cache, etc.
svr->initialize_st() //Initialize st collaboration Library
svr->initialize_signal() //Initialization signal
svr->acquire_pid_file() //Write pid thread to file
svr->listen() //Listen for client requests
svr->register_signal() //Registration signal
svr->http_handle() //Register http processing module
svr->ingest() //Turn on stream collection
svr->cycle() //Message loop processing
- When initializing signal, you need to convert the signal into IO when using state threads, and create a co process to process the signal io.
a. svr->initialize_ Signal() initializes the signal, and the signal inside_ Manager - > initialize() creates a pipe.
b. svr->register_ Signal() to register the signal_ Manager - > start() registers the signal and starts the signal listening process.
c. Srsignalmanager:: cycle() performs the final listening, circularly performs the listening operation, and calls SrsServer::on_signal()
d. SrsServer::on_signal() will detect each signal. If the signal occurs, set the corresponding bool variable to true
e. Finally, in srsserver:: do_ Check the signal in cycle () and process it - Focus on SrsServer::listen()
2. SrsServer::listen()
- SrsServer::listen() will listen to rtmp/http and other client requests.
listen_rtmp() //Monitor rtmp
listen_http_api() //Listen http
listen_http_stream() //Listen to HTTP stream
listen_stream_caster() //Listen to the conversion stream
conn_manager->start() //Start the connection management process
- First analyze the rtmp push-pull flow process, so focus on SrsServer::listen_rtmp(), which will be introduced later, such as listen_http_stream().
3. SrsServer::listen_rtmp()
- SrsServer::listen_ The rtmp() function is as follows
srs_error_t SrsServer::listen_rtmp()
{
srs_error_t err = srs_success;
// stream service port. Get the list of ports to listen in the configuration file
std::vector<std::string> ip_ports = _srs_config->get_listens();
srs_assert((int)ip_ports.size() > 0);
//Close the listener of type SrsListenerRtmpStream and delete the listening object from the listeners manager.
close_listeners(SrsListenerRtmpStream);
for (int i = 0; i < (int)ip_ports.size(); i++) { //Traversal ip_ports list. The pointer listener of the parent class SrsListener points to the object of the newly constructed subclass SrsBufferListener
SrsListener* listener = new SrsBufferListener(this, SrsListenerRtmpStream);
listeners.push_back(listener); //Add to the list of listeners Manager
int port; string ip; //Split ip address (if any) and port
srs_parse_endpoint(ip_ports[i], ip, port);
//Polymorphism: call the member function listen of subclass SrsBufferListener
if ((err = listener->listen(ip, port)) != srs_success) {
srs_error_wrap(err, "rtmp listen %s:%d", ip.c_str(), port);
}
}
return err;
}
- The types of monitoring are:
// The listener type for server to identify the connection,
// that is, use different type to process the connection.
enum SrsListenerType
{
// RTMP client,
SrsListenerRtmpStream = 0,
// HTTP api,
SrsListenerHttpApi = 1,
// HTTP stream, HDS/HLS/DASH
SrsListenerHttpStream = 2,
// UDP stream, MPEG-TS over udp.
SrsListenerMpegTsOverUdp = 3,
// TCP stream, RTSP stream.
SrsListenerRtsp = 4,
// TCP stream, FLV stream over HTTP.
SrsListenerFlv = 5,
};
- close_listeners is to remove elements of type from listeners.
void SrsServer::close_listeners(SrsListenerType type)
{
std::vector<SrsListener*>::iterator it;
for (it = listeners.begin(); it != listeners.end();) {
SrsListener* listener = *it;
if (listener->listen_type() != type) { //Different type s, continue
++it;
continue;
}
srs_freep(listener);
it = listeners.erase(it); //Remove from listeners (vector)
}
}
4. SrsBufferListener::listen(),SrsTcpListener::listen()
- listern_rtmp() calls listen() of SrsBufferListener to listen
srs_error_t SrsBufferListener::listen(string i, int p) //i default to "0.0.0.0"
{
srs_error_t err = srs_success;
ip = i;
port = p;
srs_freep(listener);
listener = new SrsTcpListener(this, ip, port); //Create TCP listener
if ((err = listener->listen()) != srs_success) { //Enter monitoring
return srs_error_wrap(err, "buffered tcp listen");
}
string v = srs_listener_type2string(type);
srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());
return err;
}
- The listener will call SrsTcpListener::listen()
srs_error_t SrsTcpListener::listen()
{
srs_error_t err = srs_success;
if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) { //Create a listening fd and register the fd with the st library
return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
}
srs_freep(trd);
trd = new SrsSTCoroutine("tcp", this); //Create a collaboration
if ((err = trd->start()) != srs_success) { //Start the collaboration process and enter SrsSTCoroutine::cycle()
return srs_error_wrap(err, "start coroutine");
}
return err;
}
- The SrsTcpListener class actually listens, creates the monitored fd through socket - > bind - > Listen (completed in the srs_tcp_listen function), and registers the fd to the st library. After that, the events on the fd are monitored and processed by the st library.
- Create a tcp coroutine to handle the connection. The coroutine starts and enters the SrsSTCoroutine::cycle() function.
a. The cycle() function is used to handle client connections. - Some functions are noted as follows:
srs_tcp_listen(ip, port, &lfd)
fd = socket(r->ai_family, r->ai_socktype, r->ai_protocol) //Create socket
do_srs_tcp_listen(fd, r, pfd)
srs_fd_closeexec(fd) //Set FD_CLOEXEC
srs_fd_reuseaddr(fd) //Reuse fd
bind(fd, r->ai_addr, r->ai_addrlen) //Binding service ip and port
::listen(fd, SERVER_LISTEN_BACKLOG) //In_ Enable listening on fd
(*pfd = srs_netfd_open_socket(fd)) //Register fd with the st library, and then all requests of fd will be handled by the library
trd = new SrsSTCoroutine("tcp", this) //Create a collaboration
trd->start() //Start the collaboration process and enter SrsSTCoroutine::cycle()
5. SrsTcpListener::cycle()
- Srstcoroutine:: cycle() will finally call SrsTcpListener::cycle()
- SrsTcpListener::cycle() listens to the process. After accepting the connection request, the process passes the execution logic to the BufferListener for processing.
srs_error_t SrsTcpListener::cycle()
{
srs_error_t err = srs_success;
while (true) {
if ((err = trd->pull()) != srs_success) {
return srs_error_wrap(err, "tcp listener");
}
//Receive connection
srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);
if(fd == NULL){
return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));
}
if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {
return srs_error_wrap(err, "set closeexec");
}
//This listening process only processes connection requests, and the specific execution logic is handed over to the BufferListener for processing
if ((err = handler->on_tcp_client(fd)) != srs_success) {
return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
}
}
return err;
}
6. SrsBufferListener::on_tcp_client()
- SrsBufferListener::on_ tcp_ The client () code is as follows:
srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd)
{
srs_error_t err = server->accept_client(type, stfd); //Submit to SrsServer for processing
if (err != srs_success) {
srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());
srs_freep(err);
}
return srs_success;
}
- Finally, the accept of SrsServer is called_ Client processing
7. SrsServer::accept_client(),SrsServer::fd2conn()
- SrsServer::accept_ The client () code is as follows:
a. First get the SrsConnection of the connection according to the type
b. Add SrsConnection to conns, which stores all connections
c. Open a connection process for each SrsConnection
srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd)
{
srs_error_t err = srs_success;
SrsConnection* conn = NULL;
if ((err = fd2conn(type, stfd, &conn)) != srs_success) { //Get the SrsConnection of the connection according to the type
if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {
srs_close_stfd(stfd); srs_error_reset(err);
return srs_success;
}
return srs_error_wrap(err, "fd2conn");
}
srs_assert(conn);
// directly enqueue, the cycle thread will remove the client.
conns.push_back(conn); // Join conns, which stores all connections
// cycle will start process thread and when finished remove the client.
// @remark never use the conn, for it maybe destroyed.
if ((err = conn->start()) != srs_success) { //Each connection opens a connection process
return srs_error_wrap(err, "start conn coroutine");
}
return err;
}
- Get the main code of the connection:
a. Because now the type is SrsListenerRtmpStream, and all Conns return srsrrtmp conn.
srs_error_t SrsServer::fd2conn(SrsListenerType type, srs_netfd_t stfd, SrsConnection** pconn)
{
srs_error_t err = srs_success;
int fd = srs_netfd_fileno(stfd);
string ip = srs_get_peer_ip(fd);
// for some keep alive application, for example, the keepalived,
// will send some tcp packet which we cann't got the ip,
// we just ignore it.
if (ip.empty()) { //Ignore if ip cannot be obtained
return srs_error_new(ERROR_SOCKET_GET_PEER_IP, "ignore empty ip, fd=%d", fd);
}
// check connection limitation.
int max_connections = _srs_config->get_max_connections(); //Get the maximum number of connections
if (handler && (err = handler->on_accept_client(max_connections, (int)conns.size())) != srs_success) {
return srs_error_wrap(err, "drop client fd=%d, max=%d, cur=%d for err: %s",
fd, max_connections, (int)conns.size(), srs_error_desc(err).c_str());
}
if ((int)conns.size() >= max_connections) { //If the connection limit is exceeded, the connection will be rejected directly
return srs_error_new(ERROR_EXCEED_CONNECTIONS,
"drop fd=%d, max=%d, cur=%d for exceed connection limits",
fd, max_connections, (int)conns.size());
}
// avoid fd leak when fork.
// @see https://github.com/ossrs/srs/issues/518
if (true) {
int val;
if ((val = fcntl(fd, F_GETFD, 0)) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_GET_FILE_INFO, "fnctl F_GETFD error! fd=%d", fd);
}
val |= FD_CLOEXEC;
if (fcntl(fd, F_SETFD, val) < 0) {
return srs_error_new(ERROR_SYSTEM_PID_SET_FILE_INFO, "fcntl F_SETFD error! fd=%d", fd);
}
}
if (type == SrsListenerRtmpStream) {
*pconn = new SrsRtmpConn(this, stfd, ip); //Create RTMP connection
} else if (type == SrsListenerHttpApi) {
*pconn = new SrsHttpApi(this, stfd, http_api_mux, ip);
} else if (type == SrsListenerHttpStream) {
*pconn = new SrsResponseOnlyHttpConn(this, stfd, http_server, ip);
} else {
srs_warn("close for no service handler. fd=%d, ip=%s", fd, ip.c_str());
srs_close_stfd(stfd);
return err;
}
return err;
}
8. SrsSTCoroutine::start(),SrsConnection::cycle()
- SrsConnection::start() code is as follows:
srs_error_t SrsConnection::start()
{
srs_error_t err = srs_success;
if ((err = skt->initialize(stfd)) != srs_success) {
return srs_error_wrap(err, "init socket");
}
//Start the conn process, and finally execute to SrsConnection::cycle()
if ((err = trd->start()) != srs_success) {
return srs_error_wrap(err, "coroutine");
}
return err;
}
- The SrsConnection::cycle() code is as follows:
srs_error_t SrsConnection::cycle()
{
srs_error_t err = do_cycle(); //SrsRtmpConn::do_cycle
// Notify manager to remove it.
manager->remove(this);
// success.
if (err == srs_success) {
srs_trace("client finished.");
return err;
}
// client close peer.
// TODO: FIXME: Only reset the error when client closed it.
if (srs_is_client_gracefully_close(err)) {
srs_warn("client disconnect peer. ret=%d", srs_error_code(err));
} else if (srs_is_server_gracefully_close(err)) {
srs_warn("server disconnect. ret=%d", srs_error_code(err));
} else {
srs_error("serve error %s", srs_error_desc(err).c_str());
}
srs_freep(err);
return srs_success;
}
9. SrsRtmpConn::do_cycle()
- If there is a streaming event, it will enter srsrrtmpconn:: do_ Cycle(), which is responsible for the specific execution of RTMP processes, including handshaking, receiving connect requests, sending response connect responses, and receiving audio and video stream data.
srs_error_t SrsRtmpConn::do_cycle()
{
srs_error_t err = srs_success;
srs_trace("RTMP client ip=%s, fd=%d", ip.c_str(), srs_netfd_fileno(stfd));
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT); //Set receive timeout
rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT); //Set send timeout
if ((err = rtmp->handshake()) != srs_success) { //rtmp handshake
return srs_error_wrap(err, "rtmp handshake");
}
uint32_t rip = rtmp->proxy_real_ip();
if (rip > 0) {
srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",
uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));
}
SrsRequest* req = info->req;
if ((err = rtmp->connect_app(req)) != srs_success) { //After the handshake is successful, srs will receive and parse the RTMP message connect sent by the client
return srs_error_wrap(err, "rtmp connect tcUrl");
}
// set client ip to request.
req->ip = ip;
srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",
req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),
req->schema.c_str(), req->vhost.c_str(), req->port,
req->app.c_str(), (req->args? "(obj)":"null"));
// show client identity
if(req->args) {
std::string srs_version;
std::string srs_server_ip;
int srs_pid = 0;
int srs_id = 0;
SrsAmf0Any* prop = NULL;
if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {
srs_version = prop->to_str();
}
if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {
srs_server_ip = prop->to_str();
}
if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {
srs_pid = (int)prop->to_number();
}
if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {
srs_id = (int)prop->to_number();
}
if (srs_pid > 0) {
srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",
srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);
}
}
//Service cycle
if ((err = service_cycle()) != srs_success) {
err = srs_error_wrap(err, "service cycle");
}
srs_error_t r0 = srs_success;
if ((r0 = on_disconnect()) != srs_success) {
err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());
srs_freep(r0);
}
// If client is redirect to other servers, we already logged the event.
if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {
srs_error_reset(err);
}
return err;
}