From d628f7cb6275b1a96062ebe7176a9812db33fb40 Mon Sep 17 00:00:00 2001 From: Georgy Shelkovy Date: Mon, 28 Oct 2024 14:51:33 +0500 Subject: [PATCH] transaction --- src/diskquota.c | 19 +++++++++-------- src/diskquota.h | 4 ++-- src/diskquota_utility.c | 45 ++++++++++++++++++++++++++++++----------- src/gp_activetable.c | 9 ++++++++- src/quotamodel.c | 33 +++++++++++++++++++++--------- 5 files changed, 78 insertions(+), 32 deletions(-) diff --git a/src/diskquota.c b/src/diskquota.c index ff9101d8..3a852d25 100644 --- a/src/diskquota.c +++ b/src/diskquota.c @@ -962,6 +962,7 @@ create_monitor_db_table(void) bool connected = false; bool pushed_active_snap = false; bool ret = true; + bool transaction = true; /* * Create function diskquota.diskquota_fetch_table_stat in launcher @@ -988,7 +989,7 @@ create_monitor_db_table(void) */ PG_TRY(); { - SPI_connect_my(&connected, &pushed_active_snap, &ret); + SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction); /* debug_query_string need to be set for SPI_execute utility functions. */ debug_query_string = sql; @@ -1013,7 +1014,7 @@ create_monitor_db_table(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + SPI_finish_my(connected, pushed_active_snap, ret, transaction); debug_query_string = NULL; } @@ -1029,6 +1030,7 @@ init_database_list(void) bool connected = false; bool pushed_active_snap = false; bool commit = true; + bool transaction = true; int num = 0; int ret; int i; @@ -1038,7 +1040,7 @@ init_database_list(void) * startup worker for diskquota launcher. If error happens, we just let * launcher exits. */ - SPI_connect_my(&connected, &pushed_active_snap, &commit); + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0); if (ret != SPI_OK_SELECT) @@ -1108,7 +1110,7 @@ init_database_list(void) update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA); } } - SPI_finish_my(connected, pushed_active_snap, commit); + SPI_finish_my(connected, pushed_active_snap, commit, transaction); /* TODO: clean invalid database */ if (num_db > diskquota_max_workers) DiskquotaLauncherShmem->isDynamicWorker = true; } @@ -1161,6 +1163,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ bool connected = false; bool pushed_active_snap = false; bool ret = true; + bool transaction = true; /* * Cache Errors during SPI functions, for example a segment may be down @@ -1169,7 +1172,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ */ PG_TRY(); { - SPI_connect_my(&connected, &pushed_active_snap, &ret); + SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction); switch (local_extension_ddl_message.cmd) { @@ -1202,7 +1205,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + SPI_finish_my(connected, pushed_active_snap, ret, transaction); /* update something in memory after transaction committed */ if (ret) @@ -1211,7 +1214,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ { /* update_monitor_db_mpp runs sql to distribute dbid to segments */ Oid dbid = local_extension_ddl_message.dbid; - SPI_connect_my(&connected, &pushed_active_snap, &ret); + SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction); switch (local_extension_ddl_message.cmd) { case CMD_CREATE_EXTENSION: @@ -1244,7 +1247,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_ } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + SPI_finish_my(connected, pushed_active_snap, ret, transaction); } DisconnectAndDestroyAllGangs(false); } diff --git a/src/diskquota.h b/src/diskquota.h index 284ac6ee..7ec1e26c 100644 --- a/src/diskquota.h +++ b/src/diskquota.h @@ -319,6 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s extern void refresh_monitored_dbid_cache(void); extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, TimestampTz *last_overflow_report); -void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret); -void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret); +void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret, bool *transaction); +void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret, bool transaction); #endif diff --git a/src/diskquota_utility.c b/src/diskquota_utility.c index 0862772a..fc06d754 100644 --- a/src/diskquota_utility.c +++ b/src/diskquota_utility.c @@ -1238,9 +1238,10 @@ worker_spi_get_extension_version(int *major, int *minor) bool connected = false; bool pushed_active_snap = false; bool commit = true; + bool transaction = true; int ret; - SPI_connect_my(&connected, &pushed_active_snap, &commit); + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0); @@ -1285,7 +1286,7 @@ worker_spi_get_extension_version(int *major, int *minor) ret = 0; out: - SPI_finish_my(connected, pushed_active_snap, commit); + SPI_finish_my(connected, pushed_active_snap, commit, transaction); return ret; } @@ -1304,6 +1305,12 @@ get_rel_oid_list(bool is_init) { List *oidlist = NIL; int ret; + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + bool transaction = true; + + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); #define SELECT_FROM_PG_CATALOG_PG_CLASS "select oid from pg_catalog.pg_class where oid >= $1 and relkind in ('r', 'm')" @@ -1336,9 +1343,10 @@ get_rel_oid_list(bool is_init) oid = DatumGetObjectId(SPI_getbinval(tup, tupdesc, 1, &isnull)); if (!isnull) { - List *indexIds; - oidlist = lappend_oid(oidlist, oid); - indexIds = diskquota_get_index_list(oid); + List *indexIds; + MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext); + oidlist = lappend_oid(oidlist, oid); + indexIds = diskquota_get_index_list(oid); if (indexIds != NIL) { foreach (l, indexIds) @@ -1347,8 +1355,10 @@ get_rel_oid_list(bool is_init) } } list_free(indexIds); + MemoryContextSwitchTo(oldcontext); } } + SPI_finish_my(connected, pushed_active_snap, commit, transaction); return oidlist; } @@ -1711,11 +1721,19 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time } void -SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret) +SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret, bool *transaction) { int rc; SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); + if (IsTransactionState()) + { + *transaction = false; + } + else + { + StartTransactionCommand(); + *transaction = true; + } if ((rc = SPI_connect()) != SPI_OK_CONNECT) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"), errdetail("%s", SPI_result_code_string(rc)))); @@ -1726,15 +1744,18 @@ SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret) } void -SPI_finish_my(bool connected, bool pushed_active_snap, bool ret) +SPI_finish_my(bool connected, bool pushed_active_snap, bool ret, bool transaction) { int rc; if (pushed_active_snap) PopActiveSnapshot(); if (connected && (rc = SPI_finish()) != SPI_OK_FINISH) ereport(WARNING, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"), errdetail("%s", SPI_result_code_string(rc)))); - if (ret) - CommitTransactionCommand(); - else - AbortCurrentTransaction(); + if (transaction) + { + if (ret) + CommitTransactionCommand(); + else + AbortCurrentTransaction(); + } } diff --git a/src/gp_activetable.c b/src/gp_activetable.c index d328c81a..742d3425 100644 --- a/src/gp_activetable.c +++ b/src/gp_activetable.c @@ -952,7 +952,13 @@ load_table_size(HTAB *local_table_stats_map) ActiveTableEntryCombined *quota_entry; SPIPlanPtr plan; Portal portal; - char *sql = "select tableid, size, segid from diskquota.table_size"; + char *sql = "select tableid, size, segid from diskquota.table_size"; + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + bool transaction = true; + + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); if ((plan = SPI_prepare(sql, 0, NULL)) == NULL) ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql))); @@ -1028,6 +1034,7 @@ load_table_size(HTAB *local_table_stats_map) SPI_freetuptable(SPI_tuptable); SPI_cursor_close(portal); SPI_freeplan(plan); + SPI_finish_my(connected, pushed_active_snap, commit, transaction); } /* diff --git a/src/quotamodel.c b/src/quotamodel.c index 73400f6d..2718fc53 100644 --- a/src/quotamodel.c +++ b/src/quotamodel.c @@ -677,6 +677,7 @@ check_diskquota_state_is_ready() bool connected = false; bool pushed_active_snap = false; bool ret = true; + bool transaction = true; /* * Cache Errors during SPI functions, for example a segment may be down @@ -685,7 +686,7 @@ check_diskquota_state_is_ready() */ PG_TRY(); { - SPI_connect_my(&connected, &pushed_active_snap, &ret); + SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction); is_ready = do_check_diskquota_state_is_ready(); } PG_CATCH(); @@ -699,7 +700,7 @@ check_diskquota_state_is_ready() RESUME_INTERRUPTS(); } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + SPI_finish_my(connected, pushed_active_snap, ret, transaction); return is_ready; } @@ -786,8 +787,6 @@ refresh_disk_quota_model(bool is_init) static void refresh_disk_quota_usage(bool is_init) { - bool connected = false; - bool pushed_active_snap = false; bool ret = true; HTAB *local_active_table_stat_map = NULL; @@ -798,7 +797,7 @@ refresh_disk_quota_usage(bool is_init) */ PG_TRY(); { - SPI_connect_my(&connected, &pushed_active_snap, &ret); + StartTransactionCommand(); /* * initialization stage all the tables are active. later loop, only the * tables whose disk size changed will be treated as active @@ -838,7 +837,10 @@ refresh_disk_quota_usage(bool is_init) RESUME_INTERRUPTS(); } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + if (ret) + CommitTransactionCommand(); + else + AbortCurrentTransaction(); return; } @@ -1125,6 +1127,10 @@ delete_from_table_size_map(char *str) { StringInfoData delete_statement; int ret; + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + bool transaction = true; initStringInfo(&delete_statement); appendStringInfo(&delete_statement, @@ -1132,10 +1138,12 @@ delete_from_table_size_map(char *str) "delete from diskquota.table_size " "where (tableid, segid) in ( SELECT * FROM deleted_table );", str); + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); ret = SPI_execute(delete_statement.data, false, 0); if (ret != SPI_OK_DELETE) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_my(connected, pushed_active_snap, commit, transaction); pfree(delete_statement.data); } @@ -1144,13 +1152,19 @@ insert_into_table_size_map(char *str) { StringInfoData insert_statement; int ret; + bool connected = false; + bool pushed_active_snap = false; + bool commit = true; + bool transaction = true; initStringInfo(&insert_statement); appendStringInfo(&insert_statement, "insert into diskquota.table_size values %s;", str); + SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction); ret = SPI_execute(insert_statement.data, false, 0); if (ret != SPI_OK_INSERT) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] insert_into_table_size_map SPI_execute failed: error code %d", ret))); + SPI_finish_my(connected, pushed_active_snap, commit, transaction); pfree(insert_statement.data); } @@ -1200,7 +1214,7 @@ flush_to_table_size(void) } } /* update the table size by delete+insert in table table_size */ - else if (TableSizeEntryGetFlushFlag(tsentry, i)) + else // if (TableSizeEntryGetFlushFlag(tsentry, i)) { appendStringInfo(&delete_statement, "%s(%u,%d)", (delete_entries_num == 0) ? " " : ", ", tsentry->key.reloid, i); @@ -1387,6 +1401,7 @@ load_quotas(void) bool connected = false; bool pushed_active_snap = false; bool ret = true; + bool transaction = true; /* * Cache Errors during SPI functions, for example a segment may be down @@ -1395,7 +1410,7 @@ load_quotas(void) */ PG_TRY(); { - SPI_connect_my(&connected, &pushed_active_snap, &ret); + SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction); do_load_quotas(); } PG_CATCH(); @@ -1409,7 +1424,7 @@ load_quotas(void) RESUME_INTERRUPTS(); } PG_END_TRY(); - SPI_finish_my(connected, pushed_active_snap, ret); + SPI_finish_my(connected, pushed_active_snap, ret, transaction); return ret; }