diff --git a/external-table/src/libchurl.c b/external-table/src/libchurl.c index e616781487..b209021beb 100644 --- a/external-table/src/libchurl.c +++ b/external-table/src/libchurl.c @@ -23,6 +23,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/jsonapi.h" +#include "utils/resowner.h" /* include libcurl without typecheck. * This allows wrapping curl_easy_setopt to be wrapped @@ -46,9 +47,17 @@ typedef struct } churl_buffer; /* - * internal context of libchurl + * holds http header properties */ typedef struct +{ + struct curl_slist *headers; +} churl_settings; + +/* + * internal context of libchurl + */ +typedef struct churl_context { /* curl easy API handle */ CURL *curl_handle; @@ -58,6 +67,8 @@ typedef struct */ CURLM *multi_handle; + churl_settings churl_headers; + /* * curl API puts internal errors in this buffer used for error reporting */ @@ -70,10 +81,10 @@ typedef struct int curl_still_running; /* internal buffer for download */ - churl_buffer *download_buffer; + churl_buffer download_buffer; /* internal buffer for upload */ - churl_buffer *upload_buffer; + churl_buffer upload_buffer; /* * holds http error code returned from remote server @@ -82,15 +93,11 @@ typedef struct /* true on upload, false on download */ bool upload; -} churl_context; -/* - * holds http header properties - */ -typedef struct -{ - struct curl_slist *headers; -} churl_settings; + ResourceOwner owner; /* owner of this handle */ + struct churl_context *next; + struct churl_context *prev; +} churl_context; /* the null action object used for pure validation */ static JsonSemAction nullSemAction = @@ -99,7 +106,10 @@ static JsonSemAction nullSemAction = NULL, NULL, NULL, NULL, NULL }; -churl_context *churl_new_context(void); +static churl_context *open_curl_handles; +static bool churl_resowner_callback_registered; + +static churl_context *churl_new_context(void); static void create_curl_handle(churl_context *context); static void set_curl_option(churl_context *context, CURLoption option, const void *data); static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userdata); @@ -112,7 +122,6 @@ static void enlarge_internal_buffer(churl_buffer *buffer, size_t required); static void finish_upload(churl_context *context); static void cleanup_curl_handle(churl_context *context); static void multi_remove_handle(churl_context *context); -static void cleanup_internal_buffer(churl_buffer *buffer); static void churl_cleanup_context(churl_context *context); static size_t write_callback(char *buffer, size_t size, size_t nitems, void *userp); static void fill_internal_buffer(churl_context *context, int want); @@ -307,8 +316,6 @@ churl_headers_cleanup(CHURL_HEADERS headers) if (settings->headers) curl_slist_free_all(settings->headers); - - pfree(settings); } /* @@ -352,7 +359,9 @@ log_curl_debug(CURL *handle, curl_infotype type, char *data, size_t size, void * static CHURL_HANDLE churl_init(const char *url, CHURL_HEADERS headers) { + churl_settings *settings = headers; churl_context *context = churl_new_context(); + context->churl_headers.headers = settings->headers; create_curl_handle(context); clear_error_buffer(context); @@ -445,7 +454,7 @@ size_t churl_write(CHURL_HANDLE handle, const char *buf, size_t bufsize) { churl_context *context = (churl_context *) handle; - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; Assert(context->upload); @@ -483,7 +492,7 @@ churl_read(CHURL_HANDLE handle, char *buf, size_t max_size) { int n = 0; churl_context *context = (churl_context *) handle; - churl_buffer *context_buffer = context->download_buffer; + churl_buffer *context_buffer = &context->download_buffer; Assert(!context->upload); @@ -525,18 +534,59 @@ churl_cleanup(CHURL_HANDLE handle, bool after_error) } cleanup_curl_handle(context); - cleanup_internal_buffer(context->download_buffer); - cleanup_internal_buffer(context->upload_buffer); + churl_headers_cleanup(&context->churl_headers); churl_cleanup_context(context); } -churl_context * -churl_new_context() +static void +churl_abort_callback(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) +{ + churl_context *curr; + churl_context *next; + + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + next = open_curl_handles; + while (next) + { + curr = next; + next = curr->next; + + if (curr->owner == CurrentResourceOwner) + { + if (isCommit) + elog(LOG, "pxf reference leak: %p still referenced", curr); + + churl_cleanup(curr, !isCommit); + } + } +} + +static churl_context * +churl_new_context(void) { - churl_context *context = palloc0(sizeof(churl_context)); + churl_context *context = MemoryContextAllocZero(TopMemoryContext, sizeof(churl_context)); + + context->owner = CurrentResourceOwner; + + if (open_curl_handles) + { + context->next = open_curl_handles; + open_curl_handles->prev = context; + } + + open_curl_handles = context; + + if (!churl_resowner_callback_registered) + { + RegisterResourceReleaseCallback(churl_abort_callback, NULL); + churl_resowner_callback_registered = true; + } - context->download_buffer = palloc0(sizeof(churl_buffer)); - context->upload_buffer = palloc0(sizeof(churl_buffer)); return context; } @@ -575,7 +625,7 @@ static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userdata) { churl_context *context = (churl_context *) userdata; - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; int written = Min(size * nmemb, context_buffer->top - context_buffer->bot); @@ -638,7 +688,7 @@ internal_buffer_large_enough(churl_buffer *buffer, size_t required) static void flush_internal_buffer(churl_context *context) { - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; if (context_buffer->top == 0) return; @@ -736,36 +786,18 @@ multi_remove_handle(churl_context *context) curl_error, curl_easy_strerror(curl_error)); } -static void -cleanup_internal_buffer(churl_buffer *buffer) -{ - if ((buffer) && (buffer->ptr)) - { - pfree(buffer->ptr); - buffer->ptr = NULL; - buffer->bot = 0; - buffer->top = 0; - buffer->max = 0; - } -} - static void churl_cleanup_context(churl_context *context) { if (context) { - if (context->download_buffer) - { - if (context->download_buffer->ptr) - pfree(context->download_buffer->ptr); - pfree(context->download_buffer); - } - if (context->upload_buffer) - { - if (context->upload_buffer->ptr) - pfree(context->upload_buffer->ptr); - pfree(context->upload_buffer); - } + /* unlink from linked list first */ + if (context->prev) + context->prev->next = context->next; + else + open_curl_handles = open_curl_handles->next; + if (context->next) + context->next->prev = context->prev; pfree(context); } @@ -780,7 +812,7 @@ static size_t write_callback(char *buffer, size_t size, size_t nitems, void *userp) { churl_context *context = (churl_context *) userp; - churl_buffer *context_buffer = context->download_buffer; + churl_buffer *context_buffer = &context->download_buffer; const int nbytes = size * nitems; if (!internal_buffer_large_enough(context_buffer, nbytes)) @@ -812,7 +844,7 @@ fill_internal_buffer(churl_context *context, int want) /* attempt to fill buffer */ while (context->curl_still_running && - ((context->download_buffer->top - context->download_buffer->bot) < want)) + ((context->download_buffer.top - context->download_buffer.bot) < want)) { FD_ZERO(&fdread); FD_ZERO(&fdwrite); @@ -967,10 +999,10 @@ check_response_code(churl_context *context) initStringInfo(&err); /* prepare response text if any */ - if (context->download_buffer->ptr) + if (context->download_buffer.ptr) { - context->download_buffer->ptr[context->download_buffer->top] = '\0'; - response_text = context->download_buffer->ptr + context->download_buffer->bot; + context->download_buffer.ptr[context->download_buffer.top] = '\0'; + response_text = context->download_buffer.ptr + context->download_buffer.bot; } appendStringInfo(&err, "PXF server error"); diff --git a/external-table/src/pxfbridge.c b/external-table/src/pxfbridge.c index 713c1d306a..fee67840b5 100644 --- a/external-table/src/pxfbridge.c +++ b/external-table/src/pxfbridge.c @@ -41,10 +41,6 @@ gpbridge_cleanup(gphadoop_context *context) return; churl_cleanup(context->churl_handle, false); - context->churl_handle = NULL; - - churl_headers_cleanup(context->churl_headers); - context->churl_headers = NULL; if (context->gphd_uri != NULL) { diff --git a/fdw/libchurl.c b/fdw/libchurl.c index 083ad24bc2..2fcb5658c7 100644 --- a/fdw/libchurl.c +++ b/fdw/libchurl.c @@ -29,6 +29,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/jsonapi.h" +#include "utils/resowner.h" /* include libcurl without typecheck. * This allows wrapping curl_easy_setopt to be wrapped @@ -51,10 +52,18 @@ typedef struct churl_buffer } churl_buffer; +/* + * holds http header properties + */ +typedef struct +{ + struct curl_slist *headers; +} churl_settings; + /* * internal context of libchurl */ -typedef struct churl_handle +typedef struct churl_context { /* curl easy API handle */ CURL *curl_handle; @@ -64,6 +73,8 @@ typedef struct churl_handle */ CURLM *multi_handle; + churl_settings churl_headers; + /* * curl API puts internal errors in this buffer used for error reporting */ @@ -76,10 +87,10 @@ typedef struct churl_handle int curl_still_running; /* internal buffer for download */ - churl_buffer *download_buffer; + churl_buffer download_buffer; /* internal buffer for upload */ - churl_buffer *upload_buffer; + churl_buffer upload_buffer; /* * holds http error code returned from remote server @@ -88,15 +99,11 @@ typedef struct churl_handle /* true on upload, false on download */ bool upload; -} churl_context; -/* - * holds http header properties - */ -typedef struct churl_settings -{ - struct curl_slist *headers; -} churl_settings; + ResourceOwner owner; /* owner of this handle */ + struct churl_context *next; + struct churl_context *prev; +} churl_context; /* the null action object used for pure validation */ static JsonSemAction nullSemAction = @@ -105,7 +112,10 @@ static JsonSemAction nullSemAction = NULL, NULL, NULL, NULL, NULL }; -churl_context *churl_new_context(void); +static churl_context *open_curl_handles; +static bool churl_resowner_callback_registered; + +static churl_context *churl_new_context(void); static void create_curl_handle(churl_context *context); static void set_curl_option(churl_context *context, CURLoption option, const void *data); static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userdata); @@ -118,7 +128,6 @@ static void enlarge_internal_buffer(churl_buffer *buffer, size_t required); static void finish_upload(churl_context *context); static void cleanup_curl_handle(churl_context *context); static void multi_remove_handle(churl_context *context); -static void cleanup_internal_buffer(churl_buffer *buffer); static void churl_cleanup_context(churl_context *context); static size_t write_callback(char *buffer, size_t size, size_t nitems, void *userp); static void fill_internal_buffer(churl_context *context, int want); @@ -313,8 +322,6 @@ churl_headers_cleanup(CHURL_HEADERS headers) if (settings->headers) curl_slist_free_all(settings->headers); - - pfree(settings); } /* @@ -358,7 +365,9 @@ log_curl_debug(CURL *handle, curl_infotype type, char *data, size_t size, void * static CHURL_HANDLE churl_init(const char *url, CHURL_HEADERS headers) { + churl_settings *settings = headers; churl_context *context = churl_new_context(); + context->churl_headers.headers = settings->headers; create_curl_handle(context); clear_error_buffer(context); @@ -452,7 +461,7 @@ size_t churl_write(CHURL_HANDLE handle, const char *buf, size_t bufsize) { churl_context *context = (churl_context *) handle; - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; Assert(context->upload); @@ -490,7 +499,7 @@ churl_read(CHURL_HANDLE handle, char *buf, size_t max_size) { int n = 0; churl_context *context = (churl_context *) handle; - churl_buffer *context_buffer = context->download_buffer; + churl_buffer *context_buffer = &context->download_buffer; Assert(!context->upload); @@ -532,18 +541,59 @@ churl_cleanup(CHURL_HANDLE handle, bool after_error) } cleanup_curl_handle(context); - cleanup_internal_buffer(context->download_buffer); - cleanup_internal_buffer(context->upload_buffer); + churl_headers_cleanup(&context->churl_headers); churl_cleanup_context(context); } -churl_context * -churl_new_context() +static void +churl_abort_callback(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) { - churl_context *context = palloc0(sizeof(churl_context)); + churl_context *curr; + churl_context *next; + + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + next = open_curl_handles; + while (next) + { + curr = next; + next = curr->next; + + if (curr->owner == CurrentResourceOwner) + { + if (isCommit) + elog(LOG, "pxf reference leak: %p still referenced", curr); + + churl_cleanup(curr, !isCommit); + } + } +} + +static churl_context * +churl_new_context(void) +{ + churl_context *context = MemoryContextAllocZero(TopMemoryContext, sizeof(churl_context)); + + context->owner = CurrentResourceOwner; + + if (open_curl_handles) + { + context->next = open_curl_handles; + open_curl_handles->prev = context; + } + + open_curl_handles = context; + + if (!churl_resowner_callback_registered) + { + RegisterResourceReleaseCallback(churl_abort_callback, NULL); + churl_resowner_callback_registered = true; + } - context->download_buffer = palloc0(sizeof(churl_buffer)); - context->upload_buffer = palloc0(sizeof(churl_buffer)); return context; } @@ -582,7 +632,7 @@ static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userdata) { churl_context *context = (churl_context *) userdata; - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; int written = Min(size * nmemb, context_buffer->top - context_buffer->bot); @@ -645,7 +695,7 @@ internal_buffer_large_enough(churl_buffer *buffer, size_t required) static void flush_internal_buffer(churl_context *context) { - churl_buffer *context_buffer = context->upload_buffer; + churl_buffer *context_buffer = &context->upload_buffer; if (context_buffer->top == 0) return; @@ -661,8 +711,6 @@ flush_internal_buffer(churl_context *context) multi_perform(context); } - check_response(context); - if ((context->curl_still_running == 0) && ((context_buffer->top - context_buffer->bot) > 0)) elog(ERROR, "failed sending to remote component %s", get_dest_address(context->curl_handle)); @@ -746,36 +794,18 @@ multi_remove_handle(churl_context *context) curl_error, curl_easy_strerror(curl_error)); } -static void -cleanup_internal_buffer(churl_buffer *buffer) -{ - if ((buffer) && (buffer->ptr)) - { - pfree(buffer->ptr); - buffer->ptr = NULL; - buffer->bot = 0; - buffer->top = 0; - buffer->max = 0; - } -} - static void churl_cleanup_context(churl_context *context) { if (context) { - if (context->download_buffer) - { - if (context->download_buffer->ptr) - pfree(context->download_buffer->ptr); - pfree(context->download_buffer); - } - if (context->upload_buffer) - { - if (context->upload_buffer->ptr) - pfree(context->upload_buffer->ptr); - pfree(context->upload_buffer); - } + /* unlink from linked list first */ + if (context->prev) + context->prev->next = context->next; + else + open_curl_handles = open_curl_handles->next; + if (context->next) + context->next->prev = context->prev; pfree(context); } @@ -790,7 +820,7 @@ static size_t write_callback(char *buffer, size_t size, size_t nitems, void *userp) { churl_context *context = (churl_context *) userp; - churl_buffer *context_buffer = context->download_buffer; + churl_buffer *context_buffer = &context->download_buffer; const int nbytes = size * nitems; if (!internal_buffer_large_enough(context_buffer, nbytes)) @@ -822,7 +852,7 @@ fill_internal_buffer(churl_context *context, int want) /* attempt to fill buffer */ while (context->curl_still_running && - ((context->download_buffer->top - context->download_buffer->bot) < want)) + ((context->download_buffer.top - context->download_buffer.bot) < want)) { FD_ZERO(&fdread); FD_ZERO(&fdwrite); @@ -984,10 +1014,10 @@ check_response_code(churl_context *context) initStringInfo(&err); /* prepare response text if any */ - if (context->download_buffer->ptr) + if (context->download_buffer.ptr) { - context->download_buffer->ptr[context->download_buffer->top] = '\0'; - response_text = context->download_buffer->ptr + context->download_buffer->bot; + context->download_buffer.ptr[context->download_buffer.top] = '\0'; + response_text = context->download_buffer.ptr + context->download_buffer.bot; } appendStringInfo(&err, "PXF server error"); diff --git a/fdw/pxf_bridge.c b/fdw/pxf_bridge.c index b56a2510f3..dc777b25b3 100644 --- a/fdw/pxf_bridge.c +++ b/fdw/pxf_bridge.c @@ -43,10 +43,6 @@ PxfBridgeCleanup(PxfFdwModifyState *pxfmstate) return; churl_cleanup(pxfmstate->churl_handle, false); - pxfmstate->churl_handle = NULL; - - churl_headers_cleanup(pxfmstate->churl_headers); - pxfmstate->churl_headers = NULL; if (pxfmstate->uri.data) { @@ -59,6 +55,28 @@ PxfBridgeCleanup(PxfFdwModifyState *pxfmstate) } } +/* + * Clean up churl related data structures from the PXF FDW scan state. + */ +void +PxfBridgeScanCleanup(PxfFdwScanState *pxfsstate) +{ + if (pxfsstate == NULL) + return; + + churl_cleanup(pxfsstate->churl_handle, false); + + if (pxfsstate->uri.data) + { + pfree(pxfsstate->uri.data); + } + + if (pxfsstate->options) + { + pfree(pxfsstate->options); + } +} + /* * Sets up data before starting import */ diff --git a/fdw/pxf_bridge.h b/fdw/pxf_bridge.h index b8a866a497..6d2a1209e1 100644 --- a/fdw/pxf_bridge.h +++ b/fdw/pxf_bridge.h @@ -76,6 +76,7 @@ typedef struct PxfFdwModifyState /* Clean up churl related data structures from the context */ void PxfBridgeCleanup(PxfFdwModifyState *context); +void PxfBridgeScanCleanup(PxfFdwScanState *context); /* Sets up data before starting import */ void PxfBridgeImportStart(PxfFdwScanState *pxfsstate); diff --git a/fdw/pxf_fdw.c b/fdw/pxf_fdw.c index a4a1cc107a..c9672fba44 100644 --- a/fdw/pxf_fdw.c +++ b/fdw/pxf_fdw.c @@ -568,6 +568,8 @@ pxfEndForeignScan(ForeignScanState *node) if (pxfsstate) EndCopyFrom(pxfsstate->cstate); + PxfBridgeScanCleanup(pxfsstate); + elog(DEBUG5, "pxf_fdw: pxfEndForeignScan ends on segment: %d", PXF_SEGMENT_ID); }