Skip to content

Commit

Permalink
Refactor closing connection to pxf external-table and fdw
Browse files Browse the repository at this point in the history
The C-part of the PXF external table releases the context
(cleanup_context) only on the last call in the pxfprotocol_export
and pxfprotocol_import functions.
The C-part of the PXF fdw releases the context (PxfBridgeCleanup)
only in the FinishForeignModify function.
That is, on errors, the context is not released, including
curl-connections to the Java-part are not closed.
Therefore, I added a callback to release resources on errors,
which releases the context, including closing curl-connections.
  • Loading branch information
RekGRpth authored and andr-sokolov committed Oct 6, 2023
1 parent c6eed7e commit b244843
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 122 deletions.
144 changes: 88 additions & 56 deletions external-table/src/libchurl.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/jsonapi.h"
#include "utils/resowner.h"

/* include libcurl without typecheck.
* This allows wrapping curl_easy_setopt to be wrapped
Expand All @@ -46,9 +47,17 @@ typedef struct
} churl_buffer;

/*
* internal context of libchurl
* holds http header properties
*/
typedef struct
{
struct curl_slist *headers;
} churl_settings;

/*
* internal context of libchurl
*/
typedef struct churl_context
{
/* curl easy API handle */
CURL *curl_handle;
Expand All @@ -58,6 +67,8 @@ typedef struct
*/
CURLM *multi_handle;

churl_settings churl_headers;

/*
* curl API puts internal errors in this buffer used for error reporting
*/
Expand All @@ -70,10 +81,10 @@ typedef struct
int curl_still_running;

/* internal buffer for download */
churl_buffer *download_buffer;
churl_buffer download_buffer;

/* internal buffer for upload */
churl_buffer *upload_buffer;
churl_buffer upload_buffer;

/*
* holds http error code returned from remote server
Expand All @@ -82,15 +93,11 @@ typedef struct

/* true on upload, false on download */
bool upload;
} churl_context;

/*
* holds http header properties
*/
typedef struct
{
struct curl_slist *headers;
} churl_settings;
ResourceOwner owner; /* owner of this handle */
struct churl_context *next;
struct churl_context *prev;
} churl_context;

/* the null action object used for pure validation */
static JsonSemAction nullSemAction =
Expand All @@ -99,7 +106,10 @@ static JsonSemAction nullSemAction =
NULL, NULL, NULL, NULL, NULL
};

churl_context *churl_new_context(void);
static churl_context *open_curl_handles;
static bool churl_resowner_callback_registered;

static churl_context *churl_new_context(void);
static void create_curl_handle(churl_context *context);
static void set_curl_option(churl_context *context, CURLoption option, const void *data);
static size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userdata);
Expand All @@ -112,7 +122,6 @@ static void enlarge_internal_buffer(churl_buffer *buffer, size_t required);
static void finish_upload(churl_context *context);
static void cleanup_curl_handle(churl_context *context);
static void multi_remove_handle(churl_context *context);
static void cleanup_internal_buffer(churl_buffer *buffer);
static void churl_cleanup_context(churl_context *context);
static size_t write_callback(char *buffer, size_t size, size_t nitems, void *userp);
static void fill_internal_buffer(churl_context *context, int want);
Expand Down Expand Up @@ -307,8 +316,6 @@ churl_headers_cleanup(CHURL_HEADERS headers)

if (settings->headers)
curl_slist_free_all(settings->headers);

pfree(settings);
}

/*
Expand Down Expand Up @@ -352,7 +359,9 @@ log_curl_debug(CURL *handle, curl_infotype type, char *data, size_t size, void *
static CHURL_HANDLE
churl_init(const char *url, CHURL_HEADERS headers)
{
churl_settings *settings = headers;
churl_context *context = churl_new_context();
context->churl_headers.headers = settings->headers;

create_curl_handle(context);
clear_error_buffer(context);
Expand Down Expand Up @@ -445,7 +454,7 @@ size_t
churl_write(CHURL_HANDLE handle, const char *buf, size_t bufsize)
{
churl_context *context = (churl_context *) handle;
churl_buffer *context_buffer = context->upload_buffer;
churl_buffer *context_buffer = &context->upload_buffer;

Assert(context->upload);

Expand Down Expand Up @@ -483,7 +492,7 @@ churl_read(CHURL_HANDLE handle, char *buf, size_t max_size)
{
int n = 0;
churl_context *context = (churl_context *) handle;
churl_buffer *context_buffer = context->download_buffer;
churl_buffer *context_buffer = &context->download_buffer;

Assert(!context->upload);

Expand Down Expand Up @@ -525,18 +534,59 @@ churl_cleanup(CHURL_HANDLE handle, bool after_error)
}

cleanup_curl_handle(context);
cleanup_internal_buffer(context->download_buffer);
cleanup_internal_buffer(context->upload_buffer);
churl_headers_cleanup(&context->churl_headers);
churl_cleanup_context(context);
}

churl_context *
churl_new_context()
static void
churl_abort_callback(ResourceReleasePhase phase,
bool isCommit,
bool isTopLevel,
void *arg)
{
churl_context *curr;
churl_context *next;

if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
return;

next = open_curl_handles;
while (next)
{
curr = next;
next = curr->next;

if (curr->owner == CurrentResourceOwner)
{
if (isCommit)
elog(LOG, "pxf reference leak: %p still referenced", curr);

churl_cleanup(curr, !isCommit);
}
}
}

static churl_context *
churl_new_context(void)
{
churl_context *context = palloc0(sizeof(churl_context));
churl_context *context = MemoryContextAllocZero(TopMemoryContext, sizeof(churl_context));

context->owner = CurrentResourceOwner;

if (open_curl_handles)
{
context->next = open_curl_handles;
open_curl_handles->prev = context;
}

open_curl_handles = context;

if (!churl_resowner_callback_registered)
{
RegisterResourceReleaseCallback(churl_abort_callback, NULL);
churl_resowner_callback_registered = true;
}

context->download_buffer = palloc0(sizeof(churl_buffer));
context->upload_buffer = palloc0(sizeof(churl_buffer));
return context;
}

Expand Down Expand Up @@ -575,7 +625,7 @@ static size_t
read_callback(void *ptr, size_t size, size_t nmemb, void *userdata)
{
churl_context *context = (churl_context *) userdata;
churl_buffer *context_buffer = context->upload_buffer;
churl_buffer *context_buffer = &context->upload_buffer;

int written = Min(size * nmemb, context_buffer->top - context_buffer->bot);

Expand Down Expand Up @@ -638,7 +688,7 @@ internal_buffer_large_enough(churl_buffer *buffer, size_t required)
static void
flush_internal_buffer(churl_context *context)
{
churl_buffer *context_buffer = context->upload_buffer;
churl_buffer *context_buffer = &context->upload_buffer;

if (context_buffer->top == 0)
return;
Expand Down Expand Up @@ -736,36 +786,18 @@ multi_remove_handle(churl_context *context)
curl_error, curl_easy_strerror(curl_error));
}

static void
cleanup_internal_buffer(churl_buffer *buffer)
{
if ((buffer) && (buffer->ptr))
{
pfree(buffer->ptr);
buffer->ptr = NULL;
buffer->bot = 0;
buffer->top = 0;
buffer->max = 0;
}
}

static void
churl_cleanup_context(churl_context *context)
{
if (context)
{
if (context->download_buffer)
{
if (context->download_buffer->ptr)
pfree(context->download_buffer->ptr);
pfree(context->download_buffer);
}
if (context->upload_buffer)
{
if (context->upload_buffer->ptr)
pfree(context->upload_buffer->ptr);
pfree(context->upload_buffer);
}
/* unlink from linked list first */
if (context->prev)
context->prev->next = context->next;
else
open_curl_handles = open_curl_handles->next;
if (context->next)
context->next->prev = context->prev;

pfree(context);
}
Expand All @@ -780,7 +812,7 @@ static size_t
write_callback(char *buffer, size_t size, size_t nitems, void *userp)
{
churl_context *context = (churl_context *) userp;
churl_buffer *context_buffer = context->download_buffer;
churl_buffer *context_buffer = &context->download_buffer;
const int nbytes = size * nitems;

if (!internal_buffer_large_enough(context_buffer, nbytes))
Expand Down Expand Up @@ -812,7 +844,7 @@ fill_internal_buffer(churl_context *context, int want)

/* attempt to fill buffer */
while (context->curl_still_running &&
((context->download_buffer->top - context->download_buffer->bot) < want))
((context->download_buffer.top - context->download_buffer.bot) < want))
{
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
Expand Down Expand Up @@ -967,10 +999,10 @@ check_response_code(churl_context *context)
initStringInfo(&err);

/* prepare response text if any */
if (context->download_buffer->ptr)
if (context->download_buffer.ptr)
{
context->download_buffer->ptr[context->download_buffer->top] = '\0';
response_text = context->download_buffer->ptr + context->download_buffer->bot;
context->download_buffer.ptr[context->download_buffer.top] = '\0';
response_text = context->download_buffer.ptr + context->download_buffer.bot;
}

appendStringInfo(&err, "PXF server error");
Expand Down
4 changes: 0 additions & 4 deletions external-table/src/pxfbridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ gpbridge_cleanup(gphadoop_context *context)
return;

churl_cleanup(context->churl_handle, false);
context->churl_handle = NULL;

churl_headers_cleanup(context->churl_headers);
context->churl_headers = NULL;

if (context->gphd_uri != NULL)
{
Expand Down
Loading

0 comments on commit b244843

Please sign in to comment.