I have developed a stock software with powerful functions. If necessary, click the following link to get it:
https://www.cnblogs.com/bclshuai/p/11380657.html
curl_ Implementation of multi asynchronous high concurrency service
catalogue
1. Introduction one
2 curl_multi asynchronous implementation one
2.1 curl_ multi_ Implement asynchronous curl 2 in poll mode
2.1.1} function call steps two
2.1.2} implementation scheme two
2.1.3 problems encountered three
2.1.4 curl_multi_poll asynchronous service encapsulation class instance three
2.2 multi_socket to achieve asynchronous curl 4
1. Introduction
https://curl.se/libcurl/c/libcurl-multi.html
Libcurl easy mode is to block the execution of requests. When the number of requests is too large or high concurrent requests are required, the synchronous blocking mode will show performance bottlenecks, low execution efficiency, serious delay, high CPU occupancy and program blocking. Therefore, the asynchronous mode can realize the application scenario of high concurrent requests. Asynchronous can execute multiple requests simultaneously in a single thread, wait for the curl file flag or user-defined file flag to change, process the request results, and support requesting data on thousands of parallel connections based on the event processing results.
2 curl_multi asynchronous implementation
There are two ways of asynchronous requests. Synchronous multithreading calls will lead to excessive CPU utilization, resulting in interface jamming.
There are two ways
(1) The old method is select to judge the return result of the request
(2) multi_socket.
2.1 curl_ multi_ Implement asynchronous curl in poll mode
2.1.1} function call steps
(1) curl_multi_init initializes a multi handle
(2) curl_easy_init initializes an easy handle
(3) curl_easy_setopt sets various parameters for easyhandle
(4) curl_ multi_ add_ Add handle to multihandle
(5) curl_multi_perform asynchronously executes requests. Each execution returns the number of running requests in the pair column. When it is 0, it indicates that the execution is over. The end does not mean that all requests have succeeded or failed. So you need to loop through the function. To reduce the CPU usage of loop execution, curl can be used_ multi_ Poll function or curl_multi_fdset cooperates with the select function to determine whether there is a result return, notify to read data, and reduce CPU consumption. curl_multi_timeout can provide an appropriate timeout for select.
(6) curl_multi_info_read reads the message in the returned result message queue and repeats the call until the message queue is empty. There is an easy handle in the return data to identify which request it is.
(7) curl_multi_remove_handle removes the easyhandle at the end of execution from the multihandle, indicating that the multihandle no longer manages this easyhandle. You can destroy and release it, or modify the request connection url and parameters, rejoin and reuse the connection.
(8) curl_ easy_ After the cleanup is completed, clear the easy handle first
(9) curl_multi_cleanup executes this function to clear the multi handle
2.1.2} implementation scheme
(1) Implement a service. The program calls the add task interface of the service. addTask constantly adds tasks.
(2) Create a thread 1 to continuously take out tasks from the task queue and assign easy handle to easy_hand sets parameters such as url. Then add it to the multihandle to execute the request; And save the map between easyhand and the task to represent the ongoing task;
(3) Create thread 2 to continuously select or curl_multi_poll or curl_multi_wait or check the status of multihandle to see if there is data return. If there is data return, read the data. curl_multi_poll and curl_multi_wait is better than select. It can solve the problem that the maximum number of connections is 1024. curl_multi_poll and curl_ multi_ There are two differences between wait and curl_multi_poll can call curl during the waiting time_ multi_ Wakeup activation, curl_multi_poll will speed up the return. And curl_multi_wait cannot be activated until an event is triggered or the timeout returns. Another difference is that if there is no file descriptor to wait, curl_multi_wait will return immediately, while curl_multi_poll must wait until timeout to return.
(4) Reading the data will return to easyhand. Use easyhand to find the corresponding task in the map; Then process the data according to different task attributes, call the callback function and return the data to the program.
2.1.3} problems encountered
(1) The crash may be caused by multithreading calling libcurl interface;
(2) curl_ multi_ add_ Failed to add easyhand to handle, errocode: 8, currlm_ RECURSIVE_ API_ Call, the reason for the error is to call the API function from within the callback. No solution was found, which may be related to multi-threaded calls. Set timeout curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30); When the timeout is set to 0, there will be a crash. Setting to 0 means no timeout; And set no signal, curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); solve the problem.
(3) When too many easyhand s are used, there are 200, and the error code curle will appear_ COULDNT_ Connect, 7 error, aware that cannot connect () to the host or agent. connect refused. There are too many connections, too many socket connections need to be created, and the number of connections created on the server side is limited, resulting in failure.
(4) Add 1000 tasks to multihand at a time, curl_ multi_ After the perform execution return task is reduced from 1000 to 0, not all sub tasks are completed, and about 600 pieces of data are read. Curl needs to be called many times_ multi_ info_ Read to read the data.
2.1.4 curl_multi_poll asynchronous service encapsulation class instance
This code example uses curl_multi_poll implements the waiting of asynchronous messages, which is more efficient than select, and removes the constraint of the upper limit of select of 1024. The asynchronous request calling process is encapsulated into a service form. All asynchronous requests can be sent to the service for execution, and then the results are returned through the callback function. easyhand is made into a connection pool, which can be reused, and can reuse connections to improve the request performance. In the code example, the new features of C++11 are applied, including smart pointer, std::thread, std::move, etc.
(1) Encapsulate class header file
#pragma once #include"curl.h" #include <mutex> #include <condition_variable> #include"BaseDefine.h" class CurlSelectMulti { public: CurlSelectMulti(); ~CurlSelectMulti(); //Global initialization static void GlobalInit(); //Global de initialization void GlobleFint(); //initialization int init(); //De initialization void finit(); //Add task to queue void addTask(shared_ptr<Task>& task); private: //Processing task, loop to get data from the pair of columns and add to muitihand void dealTask(); //Check whether any tasks are completed void handTaskResult(); //Read completed tasks for parsing void readTaskResult(); //from easyhand Get from queue easyhand,If not, create a new one CURL* GetCurl(); //Create a new easyhand CURL* CreateCurl(); //To be used up easyhand Put in queue void PutCurl(CURL* curl); //Add task to mulitihand,Perform tasks void addTaskToMultiRequest(list<shared_ptr<Task>>& listTask); //to easyhand Set parameters int setTaskParameter(CURL* easyhand, shared_ptr<Task>& task); bool m_bDebug=false; CURL* m_pMultiHand=nullptr;//Multiple operation handle list<shared_ptr<Task>> m_listTask;//task list mutex m_taskMutex;//Lock of task list mutex m_easyHandMutex;//easyhand Queue lock list<CURL*>m_listEasyHand;// easyhand queue bool m_bRunning = true;//Thread control function thread m_taskAddThread;//The thread that posted the task thread m_taskHandThread;//Determine the task status and the thread processing the task condition_variable m_conVarTask; map<CURL*, std::shared_ptr<Task>> m_mapRuningTask;//Tasks in progress mutex m_runningTaskMutex; mutex m_curlApiMutex;//Multithreaded call curl There will be a crash when the interface of. Add a lock here int m_curlnum = 0;// int m_successnum=0; int m_failednum = 0; int m_addmultFailed; };
(2) Encapsulate class source files
#include "stdafx.h" #include "CurlSelectMulti.h" static int OnDebug(CURL *, curl_infotype itype, char * pData, size_t size, void *) { if (itype == CURLINFO_TEXT) { //printf("[TEXT]%s\n", pData); } else if (itype == CURLINFO_HEADER_IN) { printf("[HEADER_IN]%s\n", pData); } else if (itype == CURLINFO_HEADER_OUT) { printf("[HEADER_OUT]%s\n", pData); } else if (itype == CURLINFO_DATA_IN) { printf("[DATA_IN]%s\n", pData); } else if (itype == CURLINFO_DATA_OUT) { printf("[DATA_OUT]%s\n", pData); } return 0; } static size_t OnWriteData(void* buffer, size_t size, size_t nmemb, void* lpVoid) { std::string* str = dynamic_cast<std::string*>((std::string *)lpVoid); if (NULL == str || NULL == buffer) { return -1; } char* pData = (char*)buffer; str->append(pData, size * nmemb); return nmemb; } CurlSelectMulti::CurlSelectMulti() { } CurlSelectMulti::~CurlSelectMulti() { finit(); } void CurlSelectMulti::GlobalInit() { curl_global_init(CURL_GLOBAL_ALL); } void CurlSelectMulti::GlobleFint() { curl_global_cleanup(); } int CurlSelectMulti::init() { //Create a multi handle m_pMultiHand = curl_multi_init(); if (m_pMultiHand == nullptr) { return false; } m_bRunning = true; m_taskAddThread=std::move(thread(std::bind(&CurlSelectMulti::dealTask, this))); m_taskHandThread = std::move(thread(std::bind(&CurlSelectMulti::handTaskResult,this))); //m_taskAddThread.join(); //m_taskHandThread.join(); return true; } void CurlSelectMulti::finit() { //Let the thread exit automatically m_bRunning = false; //Notification not waiting m_conVarTask.notify_all(); //Clear all easycurl while (m_listEasyHand.size()>0) { auto it = move(m_listEasyHand.front()); curl_multi_remove_handle(m_pMultiHand,it); curl_easy_cleanup(it); m_listEasyHand.pop_front(); } //eliminate multihand if (m_pMultiHand != nullptr) { curl_multi_cleanup(m_pMultiHand); m_pMultiHand = nullptr; } } void CurlSelectMulti::addTask(shared_ptr<Task>& task) { if (m_listTask.size() > 5000) { //printf("task is full size %d ,abord task %d", m_listTask.size(),task->taskid); return; } unique_lock<mutex> lk(m_taskMutex); m_listTask.push_back(task); //m_conVarTask.notify_one();//Notification of task addition lk.unlock(); } CURL* CurlSelectMulti::GetCurl() { CURL* curl = NULL; m_easyHandMutex.lock(); if (m_listEasyHand.size()>0) { curl = m_listEasyHand.front(); m_listEasyHand.pop_front(); } m_easyHandMutex.unlock(); if (curl == NULL) { curl = CreateCurl(); } return curl; } CURL* CurlSelectMulti::CreateCurl() { if (m_curlnum >100)//Too many connections will fail error { return NULL; } m_curlnum++; printf("curl num %d", m_curlnum); CURL* curl = curl_easy_init(); if (NULL == curl) { return NULL; } if (m_bDebug) { curl_easy_setopt(curl, CURLOPT_VERBOSE, 1); curl_easy_setopt(curl, CURLOPT_DEBUGFUNCTION, OnDebug); } //curl_easy_setopt(curl, CURLOPT_URL, strUrl.c_str()); curl_easy_setopt(curl, CURLOPT_READFUNCTION, NULL); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, OnWriteData); //curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&strResponse); /* enable TCP keep-alive for this transfer */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPALIVE, 1L); /* keep-alive idle time to 120 seconds */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 300L); /* interval time between keep-alive probes: 60 seconds */ curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 200L); curl_easy_setopt(curl, CURLOPT_TIMEOUT, 100); //Support redirection curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1); //Maintaining a session without repeatedly creating a connection is said to improve efficiency curl_easy_setopt(curl, CURLOPT_COOKIESESSION, 1); //Set sharing dns cache Function, which is said to improve performance curl_share_setopt(curl, CURLSHOPT_SHARE, CURL_LOCK_DATA_DNS); //Do not validate host name curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0); curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0); //Do not validate peer certificates //curl_easy_setopt(curl, CURLOPT_CAINFO, c->msg._caPath.c_str()); /** * When multiple threads use timeout processing, there are sleep or wait operations in the main thread. * If this option is not set, libcurl will signal to interrupt the wait, causing the program to exit. */ curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L); curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 100); return curl; } void CurlSelectMulti::PutCurl(CURL* curl) { m_easyHandMutex.lock(); m_listEasyHand.push_back(curl); m_easyHandMutex.unlock(); //m_conVarEasyHand.notify_all(); } void CurlSelectMulti::dealTask() { while (m_bRunning) { unique_lock<mutex> lk(m_taskMutex); //m_conVarTask.wait(lk);//Waiting for tasks to be added if (m_listTask.size() > 0) { list<shared_ptr<Task>> listTask; listTask.swap(m_listTask); lk.unlock(); addTaskToMultiRequest(listTask); } else { lk.unlock(); Sleep(20); } } } void CurlSelectMulti::handTaskResult() { CURLMcode mc = CURLM_OK; int still_running = 0; int ret = 0; while (m_bRunning) { m_curlApiMutex.lock(); //Execute the request and return the number of requests being executed mc = curl_multi_perform(m_pMultiHand, &still_running); //printf("still running num%d,%d\n", still_running, mc); m_curlApiMutex.unlock(); //Wait for the notification of task completion, return immediately when there is a result, and 1000 when there is no result ms Return after waiting,ret Returns the number of tasks completed mc = curl_multi_poll(m_pMultiHand, NULL, 0, 1000, &ret); if (mc == CURLM_OK)//There are tasks { readTaskResult(); } else { printf("curl_multi_poll error %d", mc); } //if (still_running>0)//There are requested tasks in progress //{ // while (still_running>0) // { // // //Execute again curl_multi_perform,to update still_running // mc = curl_multi_perform(m_pMultiHand, &still_running); // //printf("still running num%d\n", still_running); // } // printf("task finish\n"); //} //else//If there is no task, wait for a while to avoid cycling all the time, cpu Occupancy too high //{ // Sleep(100); //} } } void CurlSelectMulti::readTaskResult() { CURLMsg* m = NULL; do { int msgq = 0; m_curlApiMutex.lock(); m = curl_multi_info_read(m_pMultiHand, &msgq); m_curlApiMutex.unlock(); if (m && (m->msg == CURLMSG_DONE)) { CURL *e = m->easy_handle; // data processing auto it = m_mapRuningTask.find(e); if (it != m_mapRuningTask.end()) { if (m->data.result != 0) { m_failednum++; printf("request error %d,failednum%d,taskid%d,%s\n", m->data.result, m_failednum, m_mapRuningTask[e]->taskid, m_mapRuningTask[e]->strUrl.c_str()); } else { m_successnum++; printf("request success successnum%d,id %d, \n ", m_successnum, m_mapRuningTask[e]->taskid);//, , m_mapRuningTask[e]->strResponse.c_str() } //remove easyhand m_curlApiMutex.lock(); curl_multi_remove_handle(m_pMultiHand, e); m_curlApiMutex.unlock(); //from map Remove from m_runningTaskMutex.lock(); if (it->second->headers != nullptr)//Clear data { curl_slist_free_all(it->second->headers); } m_mapRuningTask.erase(it); m_runningTaskMutex.unlock(); //Put back in the pair of columns for reuse PutCurl(e); } else { //remove easyhand m_curlApiMutex.lock(); curl_multi_remove_handle(m_pMultiHand, e); m_curlApiMutex.unlock(); printf( "find map key failed" ); PutCurl(e); } } } while (m); } void CurlSelectMulti::addTaskToMultiRequest(list<shared_ptr<Task>>& listTask) { while (listTask.empty()==false) { auto item = listTask.front(); CURL* easyhand = GetCurl();//obtain easyhand if (easyhand == NULL) { //unique_lock<mutex> lk(m_easyHandMutex); //m_conVarEasyHand.wait(lk);//Waiting for easyhand Put in Sleep(1); continue; } //Set the parameters according to the task url,timeout Wait for parameters to easyhand //Use smart pointers to point to objects created on the heap /*shared_ptr<Task> task(new Task()); *task = item;*/ if (setTaskParameter(easyhand, item) != 0) { Sleep(2); PutCurl(easyhand); continue; } //The parameters will be set easyhand Add to multihand m_curlApiMutex.lock(); CURLMcode code = curl_multi_add_handle(m_pMultiHand, easyhand);//When the number of tasks is too large, the addition failure error code 8 appears, and sometimes crashes. m_curlApiMutex.unlock(); if (code!= CURLM_OK) { m_addmultFailed++; string strerror= curl_multi_strerror(code); printf("curl_multi_add_handle failed%d,%s\n", m_addmultFailed, strerror.c_str()); PutCurl(easyhand);// continue; } //After joining successfully easyhand and task join map,Easy to pass when returning results easyhand To find a task m_runningTaskMutex.lock(); m_mapRuningTask.insert({ easyhand, item }); printf("running task size:%d\n", m_mapRuningTask.size()); m_runningTaskMutex.unlock(); listTask.pop_front(); } } int CurlSelectMulti::setTaskParameter(CURL* easyhand, shared_ptr<Task>& task) { CURLcode code = CURLE_OK; do { if (task->iType == HttpType::HTTP_POST || task->iType == HttpType::HTTPS_POST) { code = curl_easy_setopt(easyhand, CURLOPT_POST, 1); //post method if (code!= CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_POSTFIELDSIZE, task->strPostContent.size()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_POSTFIELDS, task->strPostContent.data()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } } //set up url code = curl_easy_setopt(easyhand, CURLOPT_URL, task->strUrl.c_str()); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } code = curl_easy_setopt(easyhand, CURLOPT_WRITEDATA, (void *)&(task->strResponse)); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } //Set protocol header if (task->headers != nullptr) { code=curl_easy_setopt(easyhand, CURLOPT_HTTPHEADER, task->headers); if (code != CURLE_OK) { printf("curl_easy_setopt error %d", code); break; } } } while (0); if (code!=CURLE_OK)//Clear invalid easyhand { printf("setTaskParameter error"); PutCurl(easyhand); return -1; } return 0; }
(3) Service encapsulation class usage instance
// CurlMultiServer.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <iostream> #include <fstream> #include <thread> #include "CurlSelectMulti.h" #include "CurlSocketMulti.h" using namespace std; int main() { CurlSelectMulti m_multServer; m_multServer.GlobalInit(); m_multServer.init(); int taskid=0; int time = 0; //while (true) { for (int i = 0; i < 10000; i++) { shared_ptr<Task> task = std::make_shared<Task>(); task->iType = HttpType::HTTPS_GET; task->strUrl = ""; taskid++; task->taskid = taskid; m_multServer.addTask(task); } //Sleep(1000); //printf("using time %ds,task num %d",(time++) * 1, time * 100); } getchar(); return 0; }
(4) Performance test
Under the following computer configuration conditions, 10000 http requests are requested, taking 14 seconds, with an average of 1.4 milliseconds. 714 request calls per second.
In addition, there is multi_socket and event libevent are combined, and listen to the next step.