diff --git a/src/worker.c b/src/worker.c index 666159d..3daa5d6 100644 --- a/src/worker.c +++ b/src/worker.c @@ -121,7 +121,7 @@ header_cb(void *contents, size_t size, size_t nmemb, void *userp) return realsize; } -static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_milliseconds, struct curl_slist *request_headers, char *reqBody, int64 id, CurlData *cdata) +static void init(CURLM *cm, char *method, char *url, int timeout_milliseconds, struct curl_slist *request_headers, char *reqBody, int64 id, CurlData *cdata) { CURL *eh = curl_easy_init(); @@ -132,9 +132,12 @@ static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_millisecon (void)pushJsonbValue(&response_headers, WJB_BEGIN_OBJECT, NULL); cdata->response_headers = response_headers; cdata->id = id; - cdata->request_headers = request_headers; - request_headers = curl_slist_append(request_headers, "User-Agent: pg_net/" EXTVERSION); + struct curl_slist *new_headers = curl_slist_append(request_headers, "User-Agent: pg_net/" EXTVERSION); + if(new_headers == NULL) + ereport(ERROR, errmsg("curl_slist_append returned NULL")); + + cdata->request_headers = new_headers; if (strcasecmp(method, "GET") == 0) { if (reqBody) { @@ -174,7 +177,10 @@ static CURLMcode init(CURLM *cm, char *method, char *url, int timeout_millisecon #else CURL_EZ_SETOPT(eh, CURLOPT_PROTOCOLS, CURLPROTO_HTTP | CURLPROTO_HTTPS); #endif - return curl_multi_add_handle(cm, eh); + + CURLMcode code = curl_multi_add_handle(cm, eh); + if(code != CURLM_OK) + ereport(ERROR, errmsg("curl_multi_add_handle returned %s", curl_multi_strerror(code))); } bool is_extension_loaded(){ @@ -190,7 +196,7 @@ bool is_extension_loaded(){ void worker_main(Datum main_arg) { - CURLM *cm=NULL; + CURLM *curl_mhandle=NULL; CURL *eh=NULL; CURLMsg *msg=NULL; int still_running=0, msgs_left=0; @@ -280,10 +286,14 @@ worker_main(Datum main_arg) res = curl_global_init(CURL_GLOBAL_ALL); if(res) { - elog(ERROR, "error: curl_global_init() returned %d\n", res); + ereport(ERROR, errmsg("curl_global_init() returned %s\n", curl_easy_strerror(res))); } - cm = curl_multi_init(); + curl_mhandle = curl_multi_init(); + + if(!curl_mhandle) { + ereport(ERROR, errmsg("curl_multi_init()")); + } for (int j = 0; j < SPI_processed; j++) { @@ -308,7 +318,7 @@ worker_main(Datum main_arg) CurlData *cdata; if (strcasecmp(method, "GET") != 0 && strcasecmp(method, "POST") != 0 && strcasecmp(method, "DELETE") != 0) { - elog(ERROR, "error: Unsupported request method %s\n", method); + ereport(ERROR, errmsg("Unsupported request method %s", method)); } headersBin = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 5, &tupIsNull); @@ -321,11 +331,7 @@ worker_main(Datum main_arg) cdata = palloc(sizeof(CurlData)); - res = init(cm, method, url, timeout_milliseconds, request_headers, body, id, cdata); - - if(res) { - elog(ERROR, "error: init() returned %d\n", res); - } + init(curl_mhandle, method, url, timeout_milliseconds, request_headers, body, id, cdata); } } else @@ -337,22 +343,22 @@ worker_main(Datum main_arg) do { int numfds=0; - res = curl_multi_perform(cm, &still_running); + res = curl_multi_perform(curl_mhandle, &still_running); if(res != CURLM_OK) { - elog(ERROR, "error: curl_multi_perform() returned %d\n", res); + ereport(ERROR, errmsg("error: curl_multi_perform() returned %d", res)); } /*wait at least 1 second(1000 ms) in case all responses are slow*/ /*this avoids busy waiting and higher CPU usage*/ - res = curl_multi_wait(cm, NULL, 0, 1000, &numfds); + res = curl_multi_wait(curl_mhandle, NULL, 0, 1000, &numfds); if(res != CURLM_OK) { - elog(ERROR, "error: curl_multi_wait() returned %d\n", res); + ereport(ERROR, errmsg("error: curl_multi_wait() returned %d", res)); } } while(still_running); - while ((msg = curl_multi_info_read(cm, &msgs_left))) { + while ((msg = curl_multi_info_read(curl_mhandle, &msgs_left))) { if (msg->msg == CURLMSG_DONE) { CURLcode return_code = msg->data.result; eh = msg->easy_handle; @@ -414,14 +420,19 @@ worker_main(Datum main_arg) pfree(cdata); } - curl_multi_remove_handle(cm, eh); + res = curl_multi_remove_handle(curl_mhandle, eh); + if(res != CURLM_OK) + ereport(ERROR, errmsg("curl_multi_remove_handle: %s", curl_multi_strerror(res))); + curl_easy_cleanup(eh); } else { - elog(ERROR, "error: after curl_multi_info_read(), CURLMsg=%d\n", msg->msg); + ereport(ERROR, errmsg("curl_multi_info_read(), CURLMsg=%d\n", msg->msg)); } } - curl_multi_cleanup(cm); + res = curl_multi_cleanup(curl_mhandle); + if(res != CURLM_OK) + ereport(ERROR, errmsg("curl_multi_cleanup: %s", curl_multi_strerror(res))); SPI_finish(); PopActiveSnapshot();