From 92ca25bc31085a311f6fdef4cc0306212a212c5f Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Fri, 1 Nov 2024 08:37:55 +0500 Subject: [PATCH] Use short-lived SPI contexts (#40) diskquota uses long-lived SPI contexts in most places: at the beginning it calls the SPI_connect function, which creates an SPI context and switches to its use, then it does a lot of work, including SPI_execute, and only at the very end it calls the SPI_finish function, which clears the SPI context. This usage architecture is incorrect and can lead to memory leaks while the SPI context is alive, because the SPI_execute function, for example, allocates memory in it for parsing a SQL query. Correct use of SPI involves opening and closing the SPI as close as possible, so that the SPI context is as short-lived as possible. This patch adds new wrappers for connecting and disconnecting functions from SPI and their usage, and moves the connection to SPI into functions closer to making requests over SPI. Ticket: ADBDEV-6577: --- src/diskquota.c | 93 ++++++++++----------------- src/diskquota.h | 2 + src/diskquota_utility.c | 137 +++++++++++++++++++++------------------- src/gp_activetable.c | 11 +--- src/quotamodel.c | 49 +++++--------- 5 files changed, 125 insertions(+), 167 deletions(-) diff --git a/src/diskquota.c b/src/diskquota.c index cc25b70a..50eb0260 100644 --- a/src/diskquota.c +++ b/src/diskquota.c @@ -176,8 +176,8 @@ static bool is_altering_extension_to_default_version(char *version) { int spi_ret; - bool ret = false; - SPI_connect(); + bool ret = false; + bool connected_in_this_function = SPI_connect_if_not_yet(); spi_ret = SPI_execute("select default_version from pg_available_extensions where name ='diskquota'", true, 0); if (spi_ret != SPI_OK_SELECT) elog(ERROR, "[diskquota] failed to select diskquota default version during diskquota update."); @@ -194,7 +194,7 @@ is_altering_extension_to_default_version(char *version) if (strcmp(version, default_version) == 0) ret = true; } } - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ret; } @@ -959,9 +959,9 @@ static void create_monitor_db_table(void) { const char *sql; - bool connected = false; - bool pushed_active_snap = false; - bool ret = true; + bool connected_in_this_function = false; + bool pushed_active_snap = false; + bool ret = true; /* * Create function diskquota.diskquota_fetch_table_stat in launcher @@ -990,21 +990,14 @@ create_monitor_db_table(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota launcher] unable to connect to execute internal query. return code: %d.", - ret_code))); - } - connected = true; + connected_in_this_function = SPI_connect_if_not_yet(); PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; /* debug_query_string need to be set for SPI_execute utility functions. */ debug_query_string = sql; - ret_code = SPI_execute(sql, false, 0); + int ret_code = SPI_execute(sql, false, 0); if (ret_code != SPI_OK_UTILITY) { int saved_errno = errno; @@ -1024,7 +1017,7 @@ create_monitor_db_table(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); + SPI_finish_if(connected_in_this_function); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1054,14 +1047,8 @@ init_database_list(void) StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); - ret = SPI_connect(); - if (ret != SPI_OK_CONNECT) - { - int saved_errno = errno; - ereport(ERROR, (errmsg("[diskquota launcher] SPI connect error, reason: %s, return code: %d.", - strerror(saved_errno), ret))); - } - ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); if (ret != SPI_OK_SELECT) { int saved_errno = errno; @@ -1120,6 +1107,7 @@ init_database_list(void) } } num_db = num; + SPI_finish_if(connected_in_this_function); /* As update_monitor_db_mpp needs to execute sql, so can not put in the loop above */ for (int i = 0; i < diskquota_max_monitored_databases; i++) { @@ -1129,7 +1117,6 @@ init_database_list(void) update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA); } } - SPI_finish(); PopActiveSnapshot(); CommitTransactionCommand(); /* TODO: clean invalid database */ @@ -1181,7 +1168,6 @@ static void do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_extension_ddl_message) { int old_num_db = num_db; - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -1194,13 +1180,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; @@ -1235,7 +1214,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1251,12 +1229,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; Oid dbid = local_extension_ddl_message.dbid; - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } switch (local_extension_ddl_message.cmd) { case CMD_CREATE_EXTENSION: @@ -1278,7 +1250,6 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ local_extension_ddl_message.cmd))); break; } - SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); CommitTransactionCommand(); } @@ -1372,8 +1343,9 @@ add_dbid_to_database_list(Oid dbid) { int ret; - Oid argt[1] = {OIDOID}; - Datum argv[1] = {ObjectIdGetDatum(dbid)}; + Oid argt[1] = {OIDOID}; + Datum argv[1] = {ObjectIdGetDatum(dbid)}; + bool connected_in_this_function = SPI_connect_if_not_yet(); ret = SPI_execute_with_args("select * from diskquota_namespace.database_list where dbid = $1", 1, argt, argv, NULL, true, 0); @@ -1391,7 +1363,7 @@ add_dbid_to_database_list(Oid dbid) ereport(WARNING, (errmsg("[diskquota launcher] database id %d is already actived, " "skip database_list update", dbid))); - return; + goto ret; } ret = SPI_execute_with_args("insert into diskquota_namespace.database_list values($1)", 1, argt, argv, NULL, false, @@ -1405,7 +1377,8 @@ add_dbid_to_database_list(Oid dbid) ret, strerror(saved_errno)))); } - return; +ret: + SPI_finish_if(connected_in_this_function); } /* @@ -1415,23 +1388,25 @@ add_dbid_to_database_list(Oid dbid) static void del_dbid_from_database_list(Oid dbid) { - int ret; + bool connected_in_this_function = SPI_connect_if_not_yet(); /* errors will be cached in outer function */ - ret = SPI_execute_with_args("delete from diskquota_namespace.database_list where dbid = $1", 1, - (Oid[]){ - OIDOID, - }, - (Datum[]){ - ObjectIdGetDatum(dbid), - }, - NULL, false, 0); + int ret = SPI_execute_with_args("delete from diskquota_namespace.database_list where dbid = $1", 1, + (Oid[]){ + OIDOID, + }, + (Datum[]){ + ObjectIdGetDatum(dbid), + }, + NULL, false, 0); if (ret != SPI_OK_DELETE) { int saved_errno = errno; ereport(ERROR, (errmsg("[diskquota launcher] del_dbid_from_database_list: reason: %s, ret_code: %d.", strerror(saved_errno), ret))); } + + SPI_finish_if(connected_in_this_function); } /* @@ -1625,10 +1600,8 @@ static const char * diskquota_status_schema_version() { static char ret_version[64]; - int ret = SPI_connect(); - Assert(ret = SPI_OK_CONNECT); - - ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); if (ret != SPI_OK_SELECT || SPI_processed != 1) { @@ -1655,11 +1628,11 @@ diskquota_status_schema_version() StrNCpy(ret_version, version, sizeof(ret_version) - 1); - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ret_version; fail: - SPI_finish(); + SPI_finish_if(connected_in_this_function); return ""; } diff --git a/src/diskquota.h b/src/diskquota.h index 7c2bbb15..9c47acb5 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -319,4 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s extern void refresh_monitored_dbid_cache(void); extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, TimestampTz *last_overflow_report); +bool SPI_connect_if_not_yet(void); +void SPI_finish_if(bool connected_in_this_function); #endif diff --git a/src/diskquota_utility.c b/src/diskquota_utility.c index f306cef8..252d32cd 100644 --- a/src/diskquota_utility.c +++ b/src/diskquota_utility.c @@ -46,6 +46,7 @@ #include "utils/faultinjector.h" #include "utils/fmgroids.h" #include "utils/formatting.h" +#include "utils/memutils.h" #include "utils/numeric.h" #include "libpq-fe.h" #include "funcapi.h" @@ -123,8 +124,6 @@ static void check_role(Oid roleoid, char *rolname, int64 quota_limit_mb); Datum init_table_size_table(PG_FUNCTION_ARGS) { - int ret; - RangeVar *rv; Relation rel; /* @@ -157,10 +156,10 @@ init_table_size_table(PG_FUNCTION_ARGS) * They do not work on entry db since we do not support dispatching * from entry-db currently. */ - SPI_connect(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* delete all the table size info in table_size if exist. */ - ret = SPI_execute("truncate table diskquota.table_size", false, 0); + int ret = SPI_execute("truncate table diskquota.table_size", false, 0); if (ret != SPI_OK_UTILITY) elog(ERROR, "cannot truncate table_size table: error code %d", ret); ret = SPI_execute( @@ -199,7 +198,7 @@ init_table_size_table(PG_FUNCTION_ARGS) NULL, false, 0); if (ret != SPI_OK_UPDATE) elog(ERROR, "cannot update state table: error code %d", ret); - SPI_finish(); + SPI_finish_if(connected_in_this_function); PG_RETURN_VOID(); } @@ -433,17 +432,10 @@ diskquota_pause(PG_FUNCTION_ARGS) { dbid = PG_GETARG_OID(0); } - if (IS_QUERY_DISPATCHER()) - { - // pause current worker - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - update_monitor_db_mpp(dbid, PAUSE_DB_TO_MONITOR, EXTENSION_SCHEMA); - SPI_finish(); - } + + // pause current worker + if (IS_QUERY_DISPATCHER()) update_monitor_db_mpp(dbid, PAUSE_DB_TO_MONITOR, EXTENSION_SCHEMA); + PG_RETURN_VOID(); } @@ -466,16 +458,7 @@ diskquota_resume(PG_FUNCTION_ARGS) } // active current worker - if (IS_QUERY_DISPATCHER()) - { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - update_monitor_db_mpp(dbid, RESUME_DB_TO_MONITOR, EXTENSION_SCHEMA); - SPI_finish(); - } + if (IS_QUERY_DISPATCHER()) update_monitor_db_mpp(dbid, RESUME_DB_TO_MONITOR, EXTENSION_SCHEMA); PG_RETURN_VOID(); } @@ -486,7 +469,6 @@ diskquota_resume(PG_FUNCTION_ARGS) static bool is_database_empty(void) { - int ret; TupleDesc tupdesc; bool is_empty = false; @@ -494,9 +476,9 @@ is_database_empty(void) * If error happens in is_database_empty, just return error messages to * the client side. So there is no need to catch the error. */ - SPI_connect(); + bool connected_in_this_function = SPI_connect_if_not_yet(); - ret = SPI_execute( + int ret = SPI_execute( "INSERT INTO diskquota.state SELECT (count(relname) = 0)::int " "FROM " " pg_class AS c, " @@ -531,7 +513,7 @@ is_database_empty(void) /* * And finish our transaction. */ - SPI_finish(); + SPI_finish_if(connected_in_this_function); return is_empty; } @@ -688,9 +670,7 @@ set_role_quota(PG_FUNCTION_ARGS) } check_role(roleoid, rolname, quota_limit_mb); - SPI_connect(); set_quota_config_internal(roleoid, quota_limit_mb, ROLE_QUOTA, INVALID_SEGRATIO, InvalidOid); - SPI_finish(); PG_RETURN_VOID(); } @@ -721,9 +701,7 @@ set_schema_quota(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("disk quota can not be set to 0 MB"))); } - SPI_connect(); set_quota_config_internal(namespaceoid, quota_limit_mb, NAMESPACE_QUOTA, INVALID_SEGRATIO, InvalidOid); - SPI_finish(); PG_RETURN_VOID(); } @@ -766,10 +744,8 @@ set_role_tablespace_quota(PG_FUNCTION_ARGS) } check_role(roleoid, rolname, quota_limit_mb); - SPI_connect(); row_id = set_target_internal(roleoid, spcoid, quota_limit_mb, ROLE_TABLESPACE_QUOTA); set_quota_config_internal(row_id, quota_limit_mb, ROLE_TABLESPACE_QUOTA, INVALID_SEGRATIO, spcoid); - SPI_finish(); PG_RETURN_VOID(); } @@ -810,10 +786,8 @@ set_schema_tablespace_quota(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("disk quota can not be set to 0 MB"))); } - SPI_connect(); row_id = set_target_internal(namespaceoid, spcoid, quota_limit_mb, NAMESPACE_TABLESPACE_QUOTA); set_quota_config_internal(row_id, quota_limit_mb, NAMESPACE_TABLESPACE_QUOTA, INVALID_SEGRATIO, spcoid); - SPI_finish(); PG_RETURN_VOID(); } @@ -833,6 +807,7 @@ set_quota_config_internal(Oid targetoid, int64 quota_limit_mb, QuotaType type, f /* Report error if diskquota is not ready. */ do_check_diskquota_state_is_ready(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * If error happens in set_quota_config_internal, just return error messages to * the client side. So there is no need to catch the error. @@ -934,7 +909,7 @@ set_quota_config_internal(Oid targetoid, int64 quota_limit_mb, QuotaType type, f } } - return; + SPI_finish_if(connected_in_this_function); } static int @@ -944,7 +919,7 @@ set_target_internal(Oid primaryoid, Oid spcoid, int64 quota_limit_mb, QuotaType int row_id = -1; bool is_null = false; Datum v; - + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * If error happens in set_target_internal, just return error messages to * the client side. So there is no need to catch the error. @@ -1025,6 +1000,9 @@ set_target_internal(Oid primaryoid, Oid spcoid, int64 quota_limit_mb, QuotaType Assert(is_null == false); row_id = DatumGetInt32(v); } + + SPI_finish_if(connected_in_this_function); + /* No need to update the target table */ return row_id; @@ -1172,10 +1150,7 @@ set_per_segment_quota(PG_FUNCTION_ARGS) ereportif(ratio == 0, ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("per segment quota ratio can not be set to 0"))); - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("unable to connect to execute internal query"))); - } + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * lock table quota_config table in exlusive mode * @@ -1228,7 +1203,7 @@ set_per_segment_quota(PG_FUNCTION_ARGS) /* * And finish our transaction. */ - SPI_finish(); + SPI_finish_if(connected_in_this_function); PG_RETURN_VOID(); } @@ -1236,11 +1211,10 @@ int worker_spi_get_extension_version(int *major, int *minor) { StartTransactionCommand(); - int ret = SPI_connect(); - Assert(ret = SPI_OK_CONNECT); + bool connected_in_this_function = SPI_connect_if_not_yet(); PushActiveSnapshot(GetTransactionSnapshot()); - ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); + int ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); if (SPI_processed == 0) { @@ -1283,7 +1257,7 @@ worker_spi_get_extension_version(int *major, int *minor) ret = 0; out: - SPI_finish(); + SPI_finish_if(connected_in_this_function); PopActiveSnapshot(); CommitTransactionCommand(); @@ -1302,23 +1276,23 @@ worker_spi_get_extension_version(int *major, int *minor) List * get_rel_oid_list(bool is_init) { - List *oidlist = NIL; - int ret; + List *oidlist = NIL; + bool connected_in_this_function = SPI_connect_if_not_yet(); #define SELECT_FROM_PG_CATALOG_PG_CLASS "select oid from pg_catalog.pg_class where oid >= $1 and relkind in ('r', 'm')" - ret = SPI_execute_with_args(is_init ? SELECT_FROM_PG_CATALOG_PG_CLASS - " union distinct" - " select tableid from diskquota.table_size where segid = -1" - : SELECT_FROM_PG_CATALOG_PG_CLASS, - 1, - (Oid[]){ - OIDOID, - }, - (Datum[]){ - ObjectIdGetDatum(FirstNormalObjectId), - }, - NULL, false, 0); + int ret = SPI_execute_with_args(is_init ? SELECT_FROM_PG_CATALOG_PG_CLASS + " union distinct" + " select tableid from diskquota.table_size where segid = -1" + : SELECT_FROM_PG_CATALOG_PG_CLASS, + 1, + (Oid[]){ + OIDOID, + }, + (Datum[]){ + ObjectIdGetDatum(FirstNormalObjectId), + }, + NULL, false, 0); #undef SELECT_FROM_PG_CATALOG_PG_CLASS @@ -1336,9 +1310,10 @@ get_rel_oid_list(bool is_init) oid = DatumGetObjectId(SPI_getbinval(tup, tupdesc, 1, &isnull)); if (!isnull) { - List *indexIds; - oidlist = lappend_oid(oidlist, oid); - indexIds = diskquota_get_index_list(oid); + List *indexIds; + MemoryContext oldcontext = MemoryContextSwitchTo(CurTransactionContext); + oidlist = lappend_oid(oidlist, oid); + indexIds = diskquota_get_index_list(oid); if (indexIds != NIL) { foreach (l, indexIds) @@ -1347,8 +1322,10 @@ get_rel_oid_list(bool is_init) } } list_free(indexIds); + MemoryContextSwitchTo(oldcontext); } } + SPI_finish_if(connected_in_this_function); return oidlist; } @@ -1587,6 +1564,7 @@ get_per_segment_ratio(Oid spcoid) if (!OidIsValid(spcoid)) return segratio; + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * using row share lock to lock TABLESPACE_QUTAO * row to avoid concurrently updating the segratio @@ -1620,6 +1598,7 @@ get_per_segment_ratio(Oid spcoid) segratio = DatumGetFloat4(dat); } } + SPI_finish_if(connected_in_this_function); return segratio; } @@ -1709,3 +1688,29 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time return HASH_FIND; } + +bool +SPI_connect_if_not_yet(void) +{ + if (SPI_context()) return false; + + int rc = SPI_connect(); + + ereportif(rc != SPI_OK_CONNECT, ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"), + errdetail("%s", SPI_result_code_string(rc)))); + + return true; +} + +void +SPI_finish_if(bool connected_in_calling_function) +{ + if (!connected_in_calling_function || !SPI_context()) return; + + int rc = SPI_finish(); + + ereportif(rc != SPI_OK_FINISH, ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"), + errdetail("%s", SPI_result_code_string(rc)))); +} diff --git a/src/gp_activetable.c b/src/gp_activetable.c index d328c81a..b216a14f 100644 --- a/src/gp_activetable.c +++ b/src/gp_activetable.c @@ -447,13 +447,6 @@ diskquota_fetch_table_stat(PG_FUNCTION_ARGS) { MemoryContext oldcontext; TupleDesc tupdesc; - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("unable to connect to execute internal query. return code: %d.", ret_code))); - } - SPI_finish(); /* create a function context for cross-call persistence */ funcctx = SRF_FIRSTCALL_INIT(); @@ -952,7 +945,8 @@ load_table_size(HTAB *local_table_stats_map) ActiveTableEntryCombined *quota_entry; SPIPlanPtr plan; Portal portal; - char *sql = "select tableid, size, segid from diskquota.table_size"; + char *sql = "select tableid, size, segid from diskquota.table_size"; + bool connected_in_this_function = SPI_connect_if_not_yet(); if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql))); @@ -1028,6 +1022,7 @@ load_table_size(HTAB *local_table_stats_map) SPI_freetuptable(SPI_tuptable); SPI_cursor_close(portal); SPI_freeplan(plan); + SPI_finish_if(connected_in_this_function); } /* diff --git a/src/quotamodel.c b/src/quotamodel.c index d4dc8a90..2f45180c 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -671,10 +671,9 @@ vacuum_disk_quota_model(uint32 id) * Check whether the diskquota state is ready */ bool -check_diskquota_state_is_ready() +check_diskquota_state_is_ready(void) { bool is_ready = false; - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -687,12 +686,6 @@ check_diskquota_state_is_ready() */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; is_ready = do_check_diskquota_state_is_ready(); @@ -708,7 +701,6 @@ check_diskquota_state_is_ready() RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -732,7 +724,8 @@ do_check_diskquota_state_is_ready(void) { int ret; TupleDesc tupdesc; - ret = SPI_execute("select state from diskquota.state", true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + ret = SPI_execute("select state from diskquota.state", true, 0); ereportif(ret != SPI_OK_SELECT, ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] check diskquota state SPI_execute failed: error code %d", ret))); @@ -759,6 +752,8 @@ do_check_diskquota_state_is_ready(void) state = isnull ? DISKQUOTA_UNKNOWN_STATE : DatumGetInt32(dat); bool is_ready = state == DISKQUOTA_READY_STATE; + SPI_finish_if(connected_in_this_function); + if (!is_ready && !diskquota_is_readiness_logged()) { diskquota_set_readiness_logged(); @@ -800,7 +795,6 @@ refresh_disk_quota_model(bool is_init) static void refresh_disk_quota_usage(bool is_init) { - bool connected = false; bool pushed_active_snap = false; bool ret = true; HTAB *local_active_table_stat_map = NULL; @@ -814,12 +808,6 @@ refresh_disk_quota_usage(bool is_init) */ PG_TRY(); { - if (SPI_OK_CONNECT != SPI_connect()) - { - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] unable to connect to execute SPI query"))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; /* @@ -861,7 +849,6 @@ refresh_disk_quota_usage(bool is_init) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1153,7 +1140,6 @@ static void delete_from_table_size_map(char *str) { StringInfoData delete_statement; - int ret; initStringInfo(&delete_statement); appendStringInfo(&delete_statement, @@ -1161,10 +1147,12 @@ delete_from_table_size_map(char *str) "delete from diskquota.table_size " "where (tableid, segid) in ( SELECT * FROM deleted_table );", str); - ret = SPI_execute(delete_statement.data, false, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(delete_statement.data, false, 0); if (ret != SPI_OK_DELETE) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_if(connected_in_this_function); pfree(delete_statement.data); } @@ -1172,14 +1160,15 @@ static void insert_into_table_size_map(char *str) { StringInfoData insert_statement; - int ret; initStringInfo(&insert_statement); appendStringInfo(&insert_statement, "insert into diskquota.table_size values %s;", str); - ret = SPI_execute(insert_statement.data, false, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(insert_statement.data, false, 0); if (ret != SPI_OK_INSERT) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] insert_into_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_if(connected_in_this_function); pfree(insert_statement.data); } @@ -1413,7 +1402,6 @@ truncateStringInfo(StringInfo str, int nchars) static bool load_quotas(void) { - bool connected = false; bool pushed_active_snap = false; bool ret = true; @@ -1426,13 +1414,6 @@ load_quotas(void) */ PG_TRY(); { - int ret_code = SPI_connect(); - if (ret_code != SPI_OK_CONNECT) - { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("[diskquota] unable to connect to execute SPI query, return code: %d", ret_code))); - } - connected = true; PushActiveSnapshot(GetTransactionSnapshot()); pushed_active_snap = true; do_load_quotas(); @@ -1448,7 +1429,6 @@ load_quotas(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - if (connected) SPI_finish(); if (pushed_active_snap) PopActiveSnapshot(); if (ret) CommitTransactionCommand(); @@ -1475,6 +1455,7 @@ do_load_quotas(void) */ clean_all_quota_limit(); + bool connected_in_this_function = SPI_connect_if_not_yet(); /* * read quotas from diskquota.quota_config and target table */ @@ -1556,7 +1537,7 @@ do_load_quotas(void) } } - return; + SPI_finish_if(connected_in_this_function); } /* @@ -2301,7 +2282,9 @@ update_monitor_db_mpp(Oid dbid, FetchTableStatType action, const char *schema) "SELECT %s.diskquota_fetch_table_stat(%d, '{%d}'::oid[]) FROM gp_dist_random('gp_id')", schema, action, dbid); /* Add current database to the monitored db cache on all segments */ - int ret = SPI_execute(sql_command.data, true, 0); + bool connected_in_this_function = SPI_connect_if_not_yet(); + int ret = SPI_execute(sql_command.data, true, 0); + SPI_finish_if(connected_in_this_function); pfree(sql_command.data); ereportif(ret != SPI_OK_SELECT, ERROR,