explain
Because the code is not completely completed, the complete code is not put up. If necessary, you can email me and I will send the code to you. But the most important key parts have been posted. The picture shows receiving RTSP server video, receiving, playing, analyzing and forwarding flv.
After the completion of the framework, one is to call opencv of c + + for direct analysis, and the other is to call python to do it. When it is mature, it will be decided.
rtsp decoding
Use live555 to receive in a thread
class c_rtspthread:public c_thread { int v_headlen = 0; c_rtsp *v_rtsp = nullptr; //32-bit hash value uint32_t v_key = 0;// hash(live/1001); uint32_t _recv_stamp = 0; uint32_t _first_stamp = 0; sp_buffer _spbuffer; c_flvserver *v_flv; std::string v_livename;//live/1001 private: //decode use it AVCodec *v_codec = NULL; AVCodecContext *v_codecctx = NULL; AVFrame *v_frame = NULL; c_analyse *v_analyse = NULL; int do_decode_init(const char *name,const char *codec); int do_decode_unit(); int width() { if (v_codecctx != NULL) return v_codecctx->width; return 0; } int height() { if (v_codecctx != NULL) return v_codecctx->height; return 0; } int v_width = 0; int v_height= 0; int v_fps = 0; int v_towidth = 0; int v_toheight = 0; int decode2YUV(uint8_t* src, int srcLen, uint8_t *destYuv, int destw, int desth); void decode2RGB(uint8_t* src, int & srcLen); struct SwsContext *_img_convert_ctx = NULL; public: void init_start(c_flvserver * flv, const char * url,const char* livename,int towidth,int toheight, uint32_t key); int callback(const char* flag, uint8_t * data,long size, uint32_t ts); //Override stop function void Stop(); //Disconnection reconnection void Run(); };
Analysis using opencv
In order to use opencv used by the public, opencv is called directly here. python calls need to consider combining the avframe of ffmepg with the mat of OpenCV, mainly in the following sentences
AVFrame *dframe = av_frame_alloc(); cv::Mat nmat; nmat.create(cv::Size(w, h), CV_8UC3); //printf("frame %3d\n", v_codecctx->frame_number); av_image_fill_arrays(dframe->data, dframe->linesize, nmat.data, AV_PIX_FMT_BGR24, w, h, 16);
When decoding, directly connect cv::Mat to the memory association of AVFrame, and do not copy around. In fact, the Mat of opencv is generally BGR mode. If you need a gray image, you can directly decode it into YUV and take the Y component.
int c_rtspthread::decode2YUV(uint8_t* src, int srcLen, uint8_t *destYuv, int destw, int desth) { cv::Mat m;// (Width, Height, CV_8UC1); //int gotPicture = 0; AVPacket pkt; av_init_packet(&pkt); pkt.data = src; pkt.size = srcLen; int ret = avcodec_send_packet(v_codecctx, &pkt); av_packet_unref(&pkt); if (ret < 0) { fprintf(stderr, "Error sending a packet for decoding\n"); return -1; } //fixme :qianbo maybe receive more frame; while (ret >= 0) { AVFrame *frame = av_frame_alloc(); ret = avcodec_receive_frame(v_codecctx, frame); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { av_frame_free(&frame); return 0; } else if (ret < 0) { av_frame_free(&frame); //fprintf(stderr, "Error during decoding\n"); return 0; } //printf("frame %3d\n", v_codecctx->frame_number); if (v_analyse != NULL) { v_analyse->pushdata2(frame); } } #if 0 if (_img_convert_ctx == NULL) { if (v_destframe == NULL) v_destframe = av_frame_alloc(); if (destw == 0) destw = Width; if (desth == 0) desth = Height; av_image_fill_arrays(v_destframe->data, v_destframe->linesize, destYuv, AV_PIX_FMT_YUV420P, destw, desth, 1); _img_convert_ctx = sws_getContext(Width, Height, _codecCtx->pix_fmt,//PIX_FMT_YUV420P, destw, desth, AV_PIX_FMT_YUV420P, SWS_POINT, //SWS_BICUBIC, NULL, NULL, NULL); } sws_scale(_img_convert_ctx, _Frame->data, _Frame->linesize, 0, Height, _yuvFrame->data, _yuvFrame->linesize); #endif return -1; } void c_rtspthread::decode2RGB(uint8_t* src, int & srcLen) { AVPacket pkt; av_init_packet(&pkt); pkt.data = src; pkt.size = srcLen; int ret = avcodec_send_packet(v_codecctx, &pkt) == 0; av_packet_unref(&pkt); if (ret < 0) { fprintf(stderr, "Error sending a packet for decoding\n"); return; } while (ret >= 0) { ret = avcodec_receive_frame(v_codecctx, v_frame); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { //av_frame_free(&frame); break; } else if (ret < 0) { //fprintf(stderr, "Error during decoding\n"); //av_frame_free(&frame); break; } int w = v_towidth; //v_frame->width; int h = v_toheight; //v_frame->height; if (_img_convert_ctx == NULL) { _img_convert_ctx = sws_getContext(v_frame->width, v_frame->height, v_codecctx->pix_fmt/*AV_PIX_FMT_YUV420P*/, w, h, AV_PIX_FMT_BGR24, //SWS_POINT, SWS_BICUBIC, NULL, NULL, NULL); } AVFrame *dframe = av_frame_alloc(); cv::Mat nmat; nmat.create(cv::Size(w, h), CV_8UC3); //printf("frame %3d\n", v_codecctx->frame_number); av_image_fill_arrays(dframe->data, dframe->linesize, nmat.data, AV_PIX_FMT_BGR24, w, h, 16); sws_scale(_img_convert_ctx, v_frame->data, v_frame->linesize, 0, v_frame->height, dframe->data, dframe->linesize); if (v_analyse != NULL) { v_analyse->pushdata(nmat); } av_frame_free(&dframe); } //av_packet_unref(&pkt); }
Forward flv
This part can be divided into two ways: one is to send it directly to the existing flvserver, and the other is to directly become flvserver. In terms of efficiency, directly becoming flvserver can become a priority. First, use the cooperation of boost library to build an httpserver, because websocketserver is based on httpserver
class c_http_session:public std::enable_shared_from_this<c_http_session> { public: uint32_t v_key = 0; uint32_t v_start_ts = 0; tcp::socket v_socket; int v_has_send_meta = 0; int v_has_send_video = 0; int v_has_send_audio = 0; int v_has_sent_key_frame = 0; asio::strand<asio::io_context::executor_type> v_strand; void close() { if (v_socket.is_open()) v_socket.close(); /*if (v_key > 0) c_flvhubs::instance()->pop(v_key, shared_from_this());*/ } public: bool func_hand_shake(boost::asio::yield_context &yield) { return false; } void go() { auto self(shared_from_this()); boost::asio::spawn(v_strand, [this, self](boost::asio::yield_context yield) { //try //{ //timer_.expires_from_now(std::chrono::seconds(10)); if (func_hand_shake(yield) == false) { std::cout << "not hand shake" << std::endl; return; } for (;;) { //bool ret = func_recv_message(yield); /*if (!ret) { close(); break; }*/ } //} //catch (std::exception& e) //{ // std::cout << "some is error:" << e.what() << std::endl; // close(); // //timer_.cancel(); //} }); } };
The above class is not actually used, because the httpserver must be taken into account after writing the websocket server. In fact, the data volume of httpsever is smaller than that of websocket, except for the initial header, because the websocket server must return the size of frame data to the opposite end every time. In fact, it solves the problem of tcp packet sticking, but in return, The header of FLV also has this data length, so http flv can send data directly.
**According to the RFC document 6455, * * after understanding the principle and header clearly, you can make a concise websocket server. Note that the data sent by the browser is encrypted. Here, it needs to be decoded once. Because the whole process is interactive with the browser, it is good for debugging. Write a javascript for debugging, as follows:
This html can receive the image returned by the server. As a tool, it can be debugged and used.
<!DOCTYPE HTML> <html> <head> <meta charset="utf-8"> <title></title> </head> <body> <div id="imgDiv"></div> <div id="sse"> <a href="javascript:WebSocketTest()">function WebSocket</a> </div> <script type="text/javascript"> function init() { canvas = document.createElement('canvas'); content = canvas.getContext('2d'); canvas.width = 320; canvas.height = 240; content.scale(1, -1); content.translate(0, -240); document.body.appendChild(canvas); // container.appendChild( canvas ); img = new Image(); img.src = "bg1.jpg"; canvas.style.position = 'absolute'; img.onload = function () { content.drawImage(img, 0, 0, canvas.width, canvas.height); //URL.revokeObjectURL(url); // imgDate = content.getImageData(0, 0, canvas.width, canvas.height); //createPotCloud(); // Point cloud creation }; } init(); function WebSocketTest() { if ("WebSocket" in window) { // alert("your browser supports WebSocket!"); // Open a web socket var ws = new WebSocket("ws://127.0.0.1:9000/live/image"); console.log(ws); ws.onopen = function (evt) { console.log("connected"); /*let obj = JSON.stringify({ test:"qianbo0423" }) ws.send(obj);*/ }; ws.onmessage = function (evt) { if (typeof (evt.data) == "string") { //textHandler(JSON.parse(evt.data)); } else { var reader = new FileReader(); reader.onload = function (evt) { if (evt.target.readyState == FileReader.DONE) { var url = evt.target.result; // console.log(url); img.src = url; //img.src = url;// "bg1.jpg"; //var imga = document.getElementById("imgDiv"); //imga.innerHTML = "<img src = " + url + " />"; } } reader.readAsDataURL(evt.data); } }; ws.onclose = function () { alert("Connection closed..."); }; } else { // The browser does not support WebSocket alert("Your browser does not support WebSocket!"); } } </script> </body> </html>
The following is the code of websocket server, which is relatively simple. Because it is the first version, it has not been processed in all aspects. Readers need to handle errors by themselves. The author is developing to make http server and websocket server serve on one port and handle errors.
class c_ws_session : public std::enable_shared_from_this<c_ws_session> { private: void SetSendBufferSize(int nSize) { boost::asio::socket_base::send_buffer_size size_option(nSize); v_socket.set_option(size_option); } public: //do not need this ,we just need key //std::string v_app_stream; uint32_t v_key = 0; //time stamp record,every one not the same uint32_t v_start_ts = 0; public: explicit c_ws_session(boost::asio::io_context& io_context, tcp::socket socket) : v_socket(std::move(socket)), /*timer_(io_context),*/ v_strand(io_context.get_executor()) { SetSendBufferSize(1 * 1024 * 1024); } /* The handshake from the client looks as follows : GET /chat HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ == Origin: http://example.com Sec-WebSocket-Protocol: chat, superchat Sec-WebSocket-Version: 13 GET /chat HTTP/1.1 Host: 127.0.0.1:9000 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache Upgrade: websocket Origin: file:// Sec-WebSocket-Version: 13 User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.8 Sec-WebSocket-Key: 1M9Y1T8iMgTLepYQGDFoxg== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits */ bool func_hand_shake(boost::asio::yield_context &yield) { DEFINE_EC asio::streambuf content_; size_t length = asio::async_read_until(v_socket, content_, "\r\n\r\n", yield[ec]); ERROR_RETURN_FALSE asio::streambuf::const_buffers_type bufs = content_.data(); std::string lines(asio::buffers_begin(bufs), asio::buffers_begin(bufs) + length); //c_header_map hmap; //fetch_head_info(lines, hmap, v_app_stream); //the url length not over 1024; char buf[1024]; fetch_head_get(lines.c_str(), buf, 1023); //v_app_stream = buf; cout << "get:" << buf<< endl; //like this--> live/1001 rtmp server must like this std::string response, key, encrypted_key; //find the get //std::string request; size_t n = lines.find_first_of('\r'); //find the Sec-WebSocket-Key size_t pos = lines.find("Sec-WebSocket-Key"); if (pos == lines.npos) return false; size_t end = lines.find("\r\n", pos); key = lines.substr(pos + 19, end - pos - 19) + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; //get the base64 encode string with sha1 #if 1 boost::uuids::detail::sha1 sha1; sha1.process_bytes(key.c_str(), key.size()); #endif #if 0 SHA1 sha; unsigned int message_digest[5]; sha.Reset(); sha << server_key.c_str(); sha.Result(message_digest); #endif unsigned int digest[5]; sha1.get_digest(digest); for (int i = 0; i < 5; i++) { digest[i] = htonl(digest[i]); } encrypted_key = base64_encode(reinterpret_cast<const uint8_t*>(&digest[0]), 20); /* The handshake from the server looks as follows : HTTP / 1.1 101 Switching Protocols Upgrade : websocket Connection : Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK + xOo = Sec-WebSocket-Protocol: chat */ //set the response text response.append("HTTP/1.1 101 WebSocket Protocol Handshake\r\n"); response.append("Upgrade: websocket\r\n"); response.append("Connection: Upgrade\r\n"); response.append("Sec-WebSocket-Accept: " + encrypted_key + "\r\n\r\n"); //response.append("Sec-WebSocket-Protocol: chat\r\n"); //response.append("Sec-WebSocket-Version: 13\r\n\r\n"); size_t ret = boost::asio::async_write(v_socket, boost::asio::buffer(response), yield[ec]); ERROR_RETURN_FALSE //calculate the hash key v_key = hash_add(buf, HASH_PRIME_MIDDLE); c_flvhubs::instance()->push_session(v_key, shared_from_this()); return true; } bool func_set_head_send(uint8_t * frame, int len /*payloadlen*/, int framelen, asio::yield_context &yield) { *frame = 0x81;//0x81; 1000 0001 text code ; // 1000 0010 binary code //*frame = 0x82; if (len <= 125) { //The data length is less than 1 byte //mask bit is 0 *(frame + 1) = (uint8_t)len; } else if (len <= 0xFFFF) { //65535 //The data length is less than 2 bytes *(frame + 1) = 126; *(frame + 2) = len & 0x000000FF; *(frame + 3) = (len & 0x0000FF00) >> 8; } else { //The data length is 8 bytes *(frame + 1) = 127; *(frame + 2) = len & 0x000000FF; *(frame + 3) = (len & 0x0000FF00) >> 8; *(frame + 4) = (len & 0x00FF0000) >> 16; *(frame + 5) = (len & 0xFF000000) >> 24; *(frame + 6) = 0; *(frame + 7) = 0; *(frame + 8) = 0; *(frame + 9) = 0; } DEFINE_EC //send the data asio::async_write(v_socket, asio::buffer(frame, framelen), yield[ec]); if (ec) { return false; } return true; } bool func_recv_message(asio::yield_context &yield) { /* RFC 6455 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-------+-+-------------+-------------------------------+ |F|R|R|R| opcode|M| Payload len | Extended payload length | |I|S|S|S| (4) |A| (7) | (16/64) | |N|V|V|V| |S| | (if payload len==126/127) | | |1|2|3| |K| | | +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - + | Extended payload length continued, if payload len == 127 | + - - - - - - - - - - - - - - - +-------------------------------+ | |Masking-key, if MASK set to 1 | +-------------------------------+-------------------------------+ | Masking-key (continued) | Payload Data | +-------------------------------- - - - - - - - - - - - - - - - + : Payload Data continued ... : + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + | Payload Data continued ... | +---------------------------------------------------------------+ */ DEFINE_EC unsigned char code[2]; size_t n = asio::async_read(v_socket, asio::buffer(&code[0], sizeof(code)), yield[ec]); ERROR_RETURN_FALSE unsigned char fin = code[0] >> 7; unsigned char opcode = code[0] & 0x0f; //00001111 //opcode four bit if (fin == char(0x00)) { //1 the message's last fragment //0 not the last fragment return false; } switch (opcode) { case 0x01: //a text frame case 0x02: //a binary frame break; case 0x08: //%x8 denotes a connection close rfc6455 std::cout << "connection close" << std::endl; return false; case 0x09://The WebSocket server sends a ping to the client, and can send and maintain links between services //denotes a ping std::cout << "a ping come" << std::endl; return true; case 0x0A: //denotes a pong std::cout << "a pong come" << std::endl; return true; default: return false; } en_data_type type = (en_data_type)opcode; unsigned char is_mask = code[1] >> 7; //x is 0~126: payload len is x bytes / / the length of the data is x bytes. //x is 126: the conitnue 2 bytes is unsigned uint16 number, this number is payload length / / the value of the unsigned integer is the length of the data. //x is 127: the continue 8 bytes is unsigned Uint64 number, this number is payload length / / the next 8 bytes represent a 64 bit unsigned integer (the highest bit is 0), and the value of the unsigned integer is the length of the data. //qianbo : when send data , we must reserve the room for real data int reserved_len = 1; uint64_t payloadlen = code[1] & 0x7F; if (payloadlen == 0x7E) //0111 1110 { uint16_t len; asio::async_read(v_socket, asio::buffer(&len, sizeof(len)), yield[ec]); ERROR_RETURN_FALSE payloadlen = ntohs(len); reserved_len += 3; } else if (payloadlen == 0x7F) //0111 1111 { uint64_t len; asio::async_read(v_socket, asio::buffer(&len, sizeof(len)), yield[ec]); ERROR_RETURN_FALSE payloadlen = ntohll_1(len); reserved_len += 9; } else { //qianbo <126 bytes //if(payloadlen < 126) reserved_len += 1; } //get mask if exists char mask[4]; if (is_mask) { asio::async_read(v_socket, asio::buffer(mask, 4), yield[ec]); ERROR_RETURN_FALSE } if (payloadlen > 0) { //the datalen + ws head len + reserved protocol len size_t frame_len = (payloadlen + reserved_len/* + v_respro_len*/); uint8_t *frame = new uint8_t[frame_len]; uint8_t *data = frame + reserved_len /*+ v_respro_len*/; asio::async_read(v_socket, asio::buffer(data, payloadlen), yield[ec]); ERROR_RETURN_FALSE if (is_mask) { //get the real data for (uint64_t i = 0; i < payloadlen; i++) { data[i] = data[i] ^ mask[i % 4]; } } //data[payloadlen] = '\0'; //std::cout << data << std::endl; if (v_cb != NULL) v_cb(type, frame, data, frame_len); //echo ,send back to session //func_set_head_send(frame, payloadlen, frame_len, yield); delete[] frame; } return true; } void go() { auto self(shared_from_this()); boost::asio::spawn(v_strand, [this, self](boost::asio::yield_context yield) { //try //{ //timer_.expires_from_now(std::chrono::seconds(10)); if (func_hand_shake(yield) == false) { std::cout << "not hand shake" << std::endl; return; } for (;;) { bool ret = func_recv_message(yield); if (!ret) { close(); break; } } //} //catch (std::exception& e) //{ // std::cout << "some is error:" << e.what() << std::endl; // close(); // //timer_.cancel(); //} }); } void func_setcb(cb_wsdata cb, int len) { v_cb = cb; //v_respro_len = len; } protected: //asio::steady_timer timer_; asio::io_context v_ioctx; asio::strand<boost::asio::io_context::executor_type> v_strand; cb_wsdata v_cb = NULL; std::string v_get;//like /live/1001 public: tcp::socket v_socket; int v_has_send_meta = 0; int v_has_send_video = 0; int v_has_send_audio = 0; int v_has_sent_key_frame = 0; void close() { if (v_socket.is_open()) v_socket.close(); /*if (v_key > 0) c_flvhubs::instance()->pop(v_key, shared_from_this());*/ } //int v_respro_len = 0; };
Secondly, in addition to receiving rtsp, you can also directly receive the canvas data sent by the browser, use ws to receive it, and then use the analysis function to analyze the encoded file or h264 data. The following is the canvas of html sent to websocket server.
<!doctype html> <html> <head> <meta charset="utf-8"> <script src="js/canvas.js"></script> <script> document.addEventListener('DOMContentLoaded', () => { document.querySelector('[data-action="goLive"]').addEventListener('click', (e) => { //console.log(createRes); let mediaRecorder; let mediaStream; var addr = "ws://127.0.0.1:3000/1001" ; const ws = new WebSocket(addr); ws.addEventListener('open', (e) => { console.log('WebSocket Open', e); mediaStream = document.querySelector('canvas').captureStream(20); // 10 FPS mediaRecorder = new MediaRecorder(mediaStream, { mimeType: 'video/webm;codecs=h264', videoBitsPerSecond : 500000 }); mediaRecorder.addEventListener('dataavailable', (e) => { console.log(e.data); ws.send(e.data); }); mediaRecorder.addEventListener('stop', ws.close.bind(ws)); mediaRecorder.start(500); // Start recording, and dump data every second }); ws.addEventListener('close', (e) => { console.log('WebSocket Close', e); mediaRecorder.stop(); }); }); }); </script> </head> <body> <canvas width="640" height="360"></canvas> <nav> <button data-action="transmission">transmission</button> </nav> </body> </html>
The websocket server on the server side can receive png data or h264 data at the same time and continue the same process as rtsp.
Forward webrtc
In this part, rtp is converted to srtp, and then the page end uses webrtc to request and return the corresponding sdp protocol. When returning sdp, be sure to tell the receiver that it is h264, so there is no need to transcode. This part is relatively easy. The most important thing is to control the receipt of direct rtp data when receiving rtsp client, directly receive rtp, convert it into srtp, and transmit it through udp rtp. Because there is no direct rtp data callback for the time being, this part is not completed. Please look forward to it.
There are two points to be done in this part:
1. For the setting of MTU size, the maximum transmission unit needs to be limited, because srtp needs a part of the header. The experimental result: 1400 bytes is more appropriate.
2. The callback of live555 must add a RTP direct callback.