Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-6442: Refactor diskquota local_table_stats_map #32

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 28 additions & 44 deletions src/gp_activetable.c
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ gp_fetch_active_tables(bool is_init)
Assert(Gp_role == GP_ROLE_DISPATCH);

memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(TableEntryKey);
ctl.entrysize = sizeof(DiskQuotaActiveTableEntry);
ctl.keysize = sizeof(Oid);
ctl.entrysize = offsetof(ActiveTableEntryCombined, tablesize) + (SEGCOUNT + 1) * sizeof(Size);
ctl.hcxt = CurrentMemoryContext;

local_table_stats_map = diskquota_hash_create("local active table map with relfilenode info", 1024, &ctl,
Expand Down Expand Up @@ -946,14 +946,13 @@ get_active_tables_oid(void)
static void
load_table_size(HTAB *local_table_stats_map)
{
TupleDesc tupdesc;
int i;
bool found;
TableEntryKey key;
DiskQuotaActiveTableEntry *quota_entry;
SPIPlanPtr plan;
Portal portal;
char *sql = "select tableid, size, segid from diskquota.table_size";
TupleDesc tupdesc;
int i;
bool found;
ActiveTableEntryCombined *quota_entry;
SPIPlanPtr plan;
Portal portal;
char *sql = "select tableid, size, segid from diskquota.table_size";

if ((plan = SPI_prepare(sql, 0, NULL)) == NULL)
ereport(ERROR, (errmsg("[diskquota] SPI_prepare(\"%s\") failed", sql)));
Expand Down Expand Up @@ -1016,14 +1015,11 @@ load_table_size(HTAB *local_table_stats_map)
size = DatumGetInt64(dat);
dat = SPI_getbinval(tup, tupdesc, 3, &isnull);
if (isnull) continue;
segid = DatumGetInt16(dat);
key.reloid = reloid;
key.segid = segid;

quota_entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found);
quota_entry->reloid = reloid;
quota_entry->tablesize = size;
quota_entry->segid = segid;
segid = DatumGetInt16(dat);

quota_entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);
quota_entry->reloid = reloid;
quota_entry->tablesize[segid + 1] = size;
}
SPI_freetuptable(SPI_tuptable);
SPI_cursor_fetch(portal, true, 10000);
Expand Down Expand Up @@ -1164,12 +1160,11 @@ pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_ar
/* sum table size from each segment into local_table_stats_map */
for (i = 0; i < cdb_pgresults.numResults; i++)
{
Size tableSize;
bool found;
Oid reloid;
int segId;
TableEntryKey key;
DiskQuotaActiveTableEntry *entry;
Size tableSize;
bool found;
Oid reloid;
int segId;
ActiveTableEntryCombined *entry;

PGresult *pgresult = cdb_pgresults.pg_results[i];

Expand All @@ -1182,41 +1177,30 @@ pull_active_table_size_from_seg(HTAB *local_table_stats_map, char *active_oid_ar

for (j = 0; j < PQntuples(pgresult); j++)
{
reloid = atooid(PQgetvalue(pgresult, j, 0));
tableSize = (Size)atoll(PQgetvalue(pgresult, j, 1));
key.reloid = reloid;
reloid = atooid(PQgetvalue(pgresult, j, 0));
tableSize = (Size)atoll(PQgetvalue(pgresult, j, 1));
entry = (ActiveTableEntryCombined *)hash_search(local_table_stats_map, &reloid, HASH_ENTER, &found);

/* for diskquota extension version is 1.0, pgresult doesn't contain segid */
if (PQnfields(pgresult) == 3)
{
/* get the segid, tablesize for each table */
segId = atoi(PQgetvalue(pgresult, j, 2));
key.segid = segId;
entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found);

if (!found)
{
/* receive table size info from the first segment */
entry->reloid = reloid;
entry->segid = segId;
}
entry->tablesize = tableSize;
segId = atoi(PQgetvalue(pgresult, j, 2));
entry->tablesize[segId + 1] = tableSize;
}

/* when segid is -1, the tablesize is the sum of tablesize of master and all segments */
key.segid = -1;
entry = (DiskQuotaActiveTableEntry *)hash_search(local_table_stats_map, &key, HASH_ENTER, &found);
segId = -1;

if (!found)
{
/* receive table size info from the first segment */
entry->reloid = reloid;
entry->tablesize = tableSize;
entry->segid = -1;
entry->tablesize[segId + 1] = tableSize;
}
else
{
/* sum table size from all the segments */
entry->tablesize = entry->tablesize + tableSize;
entry->tablesize[segId + 1] += tableSize;
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/gp_activetable.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ typedef struct DiskQuotaActiveTableEntry
Size tablesize;
} DiskQuotaActiveTableEntry;

typedef struct ActiveTableEntryCombined
{
Oid reloid;
Size tablesize[1]; /* variable length array */
} ActiveTableEntryCombined;

extern HTAB *gp_fetch_active_tables(bool force);
extern void init_active_table_hook(void);
extern void init_shm_worker_active_tables(void);
Expand Down
54 changes: 26 additions & 28 deletions src/quotamodel.c
Original file line number Diff line number Diff line change
Expand Up @@ -912,19 +912,18 @@ merge_uncommitted_table_to_oidlist(List *oidlist)
static void
calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)
{
bool table_size_map_found;
bool active_tbl_found;
int64 updated_total_size;
TableSizeEntry *tsentry = NULL;
Oid relOid;
HASH_SEQ_STATUS iter;
DiskQuotaActiveTableEntry *active_table_entry;
TableSizeEntryKey key;
TableEntryKey active_table_key;
List *oidlist;
ListCell *l;
int delete_entries_num = 0;
StringInfoData delete_statement;
bool table_size_map_found;
bool active_tbl_found;
int64 updated_total_size;
TableSizeEntry *tsentry = NULL;
Oid relOid;
HASH_SEQ_STATUS iter;
ActiveTableEntryCombined *active_table_entry;
TableSizeEntryKey key;
List *oidlist;
ListCell *l;
int delete_entries_num = 0;
StringInfoData delete_statement;

initStringInfo(&delete_statement);

Expand Down Expand Up @@ -1042,10 +1041,8 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)

/* mark tsentry is_exist */
if (tsentry) set_table_size_entry_flag(tsentry, TABLE_EXIST);
active_table_key.reloid = relOid;
active_table_key.segid = cur_segid;
active_table_entry = (DiskQuotaActiveTableEntry *)hash_search(
local_active_table_stat_map, &active_table_key, HASH_FIND, &active_tbl_found);
active_table_entry = (ActiveTableEntryCombined *)hash_search(local_active_table_stat_map, &relOid,
HASH_FIND, &active_tbl_found);

/* skip to recalculate the tables which are not in active list */
if (active_tbl_found)
Expand All @@ -1055,15 +1052,16 @@ calculate_table_disk_usage(bool is_init, HTAB *local_active_table_stat_map)
/* pretend process as utility mode, and append the table size on master */
Gp_role = GP_ROLE_UTILITY;

active_table_entry->tablesize += calculate_table_size(relOid);
active_table_entry->tablesize[cur_segid + 1] += calculate_table_size(relOid);

Gp_role = GP_ROLE_DISPATCH;
}
/* firstly calculate the updated total size of a table */
updated_total_size = active_table_entry->tablesize - TableSizeEntryGetSize(tsentry, cur_segid);
updated_total_size =
active_table_entry->tablesize[cur_segid + 1] - TableSizeEntryGetSize(tsentry, cur_segid);

/* update the table_size entry */
TableSizeEntrySetSize(tsentry, cur_segid, active_table_entry->tablesize);
TableSizeEntrySetSize(tsentry, cur_segid, active_table_entry->tablesize[cur_segid + 1]);
TableSizeEntrySetFlushFlag(tsentry, cur_segid);

/* update the disk usage, there may be entries in the map whose keys are InvlidOid as the tsentry does
Expand Down Expand Up @@ -1345,14 +1343,14 @@ flush_local_reject_map(void)
static void
dispatch_rejectmap(HTAB *local_active_table_stat_map)
{
HASH_SEQ_STATUS hash_seq;
GlobalRejectMapEntry *rejectmap_entry;
DiskQuotaActiveTableEntry *active_table_entry;
int num_entries, count = 0;
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData rows;
StringInfoData active_oids;
StringInfoData sql;
HASH_SEQ_STATUS hash_seq;
GlobalRejectMapEntry *rejectmap_entry;
ActiveTableEntryCombined *active_table_entry;
int num_entries, count = 0;
CdbPgResults cdb_pgresults = {NULL, 0};
StringInfoData rows;
StringInfoData active_oids;
StringInfoData sql;

initStringInfo(&rows);
initStringInfo(&active_oids);
Expand Down
Loading