Skip to content

Commit

Permalink
transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
RekGRpth committed Oct 28, 2024
1 parent 7fabd15 commit d628f7c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 32 deletions.
19 changes: 11 additions & 8 deletions src/diskquota.c
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,7 @@ create_monitor_db_table(void)
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
bool transaction = true;

/*
* Create function diskquota.diskquota_fetch_table_stat in launcher
Expand All @@ -988,7 +989,7 @@ create_monitor_db_table(void)
*/
PG_TRY();
{
SPI_connect_my(&connected, &pushed_active_snap, &ret);
SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction);

/* debug_query_string need to be set for SPI_execute utility functions. */
debug_query_string = sql;
Expand All @@ -1013,7 +1014,7 @@ create_monitor_db_table(void)
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_my(connected, pushed_active_snap, ret);
SPI_finish_my(connected, pushed_active_snap, ret, transaction);

debug_query_string = NULL;
}
Expand All @@ -1029,6 +1030,7 @@ init_database_list(void)
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;
int num = 0;
int ret;
int i;
Expand All @@ -1038,7 +1040,7 @@ init_database_list(void)
* startup worker for diskquota launcher. If error happens, we just let
* launcher exits.
*/
SPI_connect_my(&connected, &pushed_active_snap, &commit);
SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);

ret = SPI_execute("select dbid from diskquota_namespace.database_list;", true, 0);
if (ret != SPI_OK_SELECT)
Expand Down Expand Up @@ -1108,7 +1110,7 @@ init_database_list(void)
update_monitor_db_mpp(dbEntry->dbid, ADD_DB_TO_MONITOR, LAUNCHER_SCHEMA);
}
}
SPI_finish_my(connected, pushed_active_snap, commit);
SPI_finish_my(connected, pushed_active_snap, commit, transaction);
/* TODO: clean invalid database */
if (num_db > diskquota_max_workers) DiskquotaLauncherShmem->isDynamicWorker = true;
}
Expand Down Expand Up @@ -1161,6 +1163,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
bool transaction = true;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -1169,7 +1172,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
*/
PG_TRY();
{
SPI_connect_my(&connected, &pushed_active_snap, &ret);
SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction);

switch (local_extension_ddl_message.cmd)
{
Expand Down Expand Up @@ -1202,7 +1205,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
}
PG_END_TRY();

SPI_finish_my(connected, pushed_active_snap, ret);
SPI_finish_my(connected, pushed_active_snap, ret, transaction);

/* update something in memory after transaction committed */
if (ret)
Expand All @@ -1211,7 +1214,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
{
/* update_monitor_db_mpp runs sql to distribute dbid to segments */
Oid dbid = local_extension_ddl_message.dbid;
SPI_connect_my(&connected, &pushed_active_snap, &ret);
SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction);
switch (local_extension_ddl_message.cmd)
{
case CMD_CREATE_EXTENSION:
Expand Down Expand Up @@ -1244,7 +1247,7 @@ do_process_extension_ddl_message(MessageResult *code, ExtensionDDLMessage local_
}
PG_END_TRY();

SPI_finish_my(connected, pushed_active_snap, ret);
SPI_finish_my(connected, pushed_active_snap, ret, transaction);
}
DisconnectAndDestroyAllGangs(false);
}
Expand Down
4 changes: 2 additions & 2 deletions src/diskquota.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,6 @@ extern HTAB *DiskquotaShmemInitHash(const char *name, long init_size, long max_s
extern void refresh_monitored_dbid_cache(void);
extern HASHACTION check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message,
TimestampTz *last_overflow_report);
void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret);
void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret);
void SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret, bool *transaction);
void SPI_finish_my(bool connected, bool pushed_active_snap, bool ret, bool transaction);
#endif
45 changes: 33 additions & 12 deletions src/diskquota_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1238,9 +1238,10 @@ worker_spi_get_extension_version(int *major, int *minor)
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;
int ret;

SPI_connect_my(&connected, &pushed_active_snap, &commit);
SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);

ret = SPI_execute("select extversion from pg_extension where extname = 'diskquota'", true, 0);

Expand Down Expand Up @@ -1285,7 +1286,7 @@ worker_spi_get_extension_version(int *major, int *minor)
ret = 0;

out:
SPI_finish_my(connected, pushed_active_snap, commit);
SPI_finish_my(connected, pushed_active_snap, commit, transaction);

return ret;
}
Expand All @@ -1304,6 +1305,12 @@ get_rel_oid_list(bool is_init)
{
List *oidlist = NIL;
int ret;
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;

SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);

#define SELECT_FROM_PG_CATALOG_PG_CLASS "select oid from pg_catalog.pg_class where oid >= $1 and relkind in ('r', 'm')"

Expand Down Expand Up @@ -1336,9 +1343,10 @@ get_rel_oid_list(bool is_init)
oid = DatumGetObjectId(SPI_getbinval(tup, tupdesc, 1, &isnull));
if (!isnull)
{
List *indexIds;
oidlist = lappend_oid(oidlist, oid);
indexIds = diskquota_get_index_list(oid);
List *indexIds;
MemoryContext oldcontext = MemoryContextSwitchTo(TopMemoryContext);
oidlist = lappend_oid(oidlist, oid);
indexIds = diskquota_get_index_list(oid);
if (indexIds != NIL)
{
foreach (l, indexIds)
Expand All @@ -1347,8 +1355,10 @@ get_rel_oid_list(bool is_init)
}
}
list_free(indexIds);
MemoryContextSwitchTo(oldcontext);
}
}
SPI_finish_my(connected, pushed_active_snap, commit, transaction);
return oidlist;
}

Expand Down Expand Up @@ -1711,11 +1721,19 @@ check_hash_fullness(HTAB *hashp, int max_size, const char *warning_message, Time
}

void
SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret)
SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret, bool *transaction)
{
int rc;
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
if (IsTransactionState())
{
*transaction = false;
}
else
{
StartTransactionCommand();
*transaction = true;
}
if ((rc = SPI_connect()) != SPI_OK_CONNECT)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_connect failed"),
errdetail("%s", SPI_result_code_string(rc))));
Expand All @@ -1726,15 +1744,18 @@ SPI_connect_my(bool *connected, bool *pushed_active_snap, bool *ret)
}

void
SPI_finish_my(bool connected, bool pushed_active_snap, bool ret)
SPI_finish_my(bool connected, bool pushed_active_snap, bool ret, bool transaction)
{
int rc;
if (pushed_active_snap) PopActiveSnapshot();
if (connected && (rc = SPI_finish()) != SPI_OK_FINISH)
ereport(WARNING, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("[diskquota] SPI_finish failed"),
errdetail("%s", SPI_result_code_string(rc))));
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
if (transaction)
{
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
}
}
9 changes: 8 additions & 1 deletion src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,13 @@ load_table_size(HTAB *local_table_stats_map)
ActiveTableEntryCombined *quota_entry;
SPIPlanPtr plan;
Portal portal;
char *sql = "select tableid, size, segid from diskquota.table_size";
char *sql = "select tableid, size, segid from diskquota.table_size";
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;

SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);

if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql)));
Expand Down Expand Up @@ -1028,6 +1034,7 @@ load_table_size(HTAB *local_table_stats_map)
SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(portal);
SPI_freeplan(plan);
SPI_finish_my(connected, pushed_active_snap, commit, transaction);
}

/*
Expand Down
33 changes: 24 additions & 9 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ check_diskquota_state_is_ready()
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
bool transaction = true;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -685,7 +686,7 @@ check_diskquota_state_is_ready()
*/
PG_TRY();
{
SPI_connect_my(&connected, &pushed_active_snap, &ret);
SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction);
is_ready = do_check_diskquota_state_is_ready();
}
PG_CATCH();
Expand All @@ -699,7 +700,7 @@ check_diskquota_state_is_ready()
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_my(connected, pushed_active_snap, ret);
SPI_finish_my(connected, pushed_active_snap, ret, transaction);
return is_ready;
}

Expand Down Expand Up @@ -786,8 +787,6 @@ refresh_disk_quota_model(bool is_init)
static void
refresh_disk_quota_usage(bool is_init)
{
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
HTAB *local_active_table_stat_map = NULL;

Expand All @@ -798,7 +797,7 @@ refresh_disk_quota_usage(bool is_init)
*/
PG_TRY();
{
SPI_connect_my(&connected, &pushed_active_snap, &ret);
StartTransactionCommand();
/*
* initialization stage all the tables are active. later loop, only the
* tables whose disk size changed will be treated as active
Expand Down Expand Up @@ -838,7 +837,10 @@ refresh_disk_quota_usage(bool is_init)
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_my(connected, pushed_active_snap, ret);
if (ret)
CommitTransactionCommand();
else
AbortCurrentTransaction();
return;
}

Expand Down Expand Up @@ -1125,17 +1127,23 @@ delete_from_table_size_map(char *str)
{
StringInfoData delete_statement;
int ret;
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;

initStringInfo(&delete_statement);
appendStringInfo(&delete_statement,
"WITH deleted_table AS ( VALUES %s ) "
"delete from diskquota.table_size "
"where (tableid, segid) in ( SELECT * FROM deleted_table );",
str);
SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);
ret = SPI_execute(delete_statement.data, false, 0);
if (ret != SPI_OK_DELETE)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] delete_from_table_size_map SPI_execute failed: error code %d", ret)));
SPI_finish_my(connected, pushed_active_snap, commit, transaction);
pfree(delete_statement.data);
}

Expand All @@ -1144,13 +1152,19 @@ insert_into_table_size_map(char *str)
{
StringInfoData insert_statement;
int ret;
bool connected = false;
bool pushed_active_snap = false;
bool commit = true;
bool transaction = true;

initStringInfo(&insert_statement);
appendStringInfo(&insert_statement, "insert into diskquota.table_size values %s;", str);
SPI_connect_my(&connected, &pushed_active_snap, &commit, &transaction);
ret = SPI_execute(insert_statement.data, false, 0);
if (ret != SPI_OK_INSERT)
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("[diskquota] insert_into_table_size_map SPI_execute failed: error code %d", ret)));
SPI_finish_my(connected, pushed_active_snap, commit, transaction);
pfree(insert_statement.data);
}

Expand Down Expand Up @@ -1200,7 +1214,7 @@ flush_to_table_size(void)
}
}
/* update the table size by delete+insert in table table_size */
else if (TableSizeEntryGetFlushFlag(tsentry, i))
else // if (TableSizeEntryGetFlushFlag(tsentry, i))
{
appendStringInfo(&delete_statement, "%s(%u,%d)", (delete_entries_num == 0) ? " " : ", ",
tsentry->key.reloid, i);
Expand Down Expand Up @@ -1387,6 +1401,7 @@ load_quotas(void)
bool connected = false;
bool pushed_active_snap = false;
bool ret = true;
bool transaction = true;

/*
* Cache Errors during SPI functions, for example a segment may be down
Expand All @@ -1395,7 +1410,7 @@ load_quotas(void)
*/
PG_TRY();
{
SPI_connect_my(&connected, &pushed_active_snap, &ret);
SPI_connect_my(&connected, &pushed_active_snap, &ret, &transaction);
do_load_quotas();
}
PG_CATCH();
Expand All @@ -1409,7 +1424,7 @@ load_quotas(void)
RESUME_INTERRUPTS();
}
PG_END_TRY();
SPI_finish_my(connected, pushed_active_snap, ret);
SPI_finish_my(connected, pushed_active_snap, ret, transaction);
return ret;
}

Expand Down

0 comments on commit d628f7c

Please sign in to comment.