Skip to content

[Feature] refactor pdml code. #449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
b43913f
Merge pull request #196 from ApsaraDB/POLARDB_11_DEV
fengzunbao Feb 28, 2022
57ba212
Merge pull request #198 from ApsaraDB/POLARDB_11_DEV
fengzunbao Mar 15, 2022
674ee0e
Merge pull request #201 from ApsaraDB/POLARDB_11_DEV
fengzunbao Mar 24, 2022
c4a5ada
Merge pull request #204 from ApsaraDB/POLARDB_11_DEV
fengzunbao Mar 28, 2022
72fe095
Merge pull request #210 from ApsaraDB/POLARDB_11_DEV
fengzunbao Apr 1, 2022
14ee112
Merge pull request #214 from ApsaraDB/POLARDB_11_DEV
fengzunbao Apr 4, 2022
a8418a8
Merge pull request #218 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Apr 6, 2022
398902a
Merge pull request #221 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Apr 14, 2022
73c86a1
Merge pull request #225 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck May 12, 2022
2474f42
Merge pull request #227 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck May 12, 2022
bba25c1
[Docs] updateCopyright date in NOTICE file
fengzunbao May 13, 2022
8fd8603
Merge pull request #229 from ApsaraDB/fengzunbao-patch-1
fengzunbao May 13, 2022
2a0a926
Merge pull request #236 from ApsaraDB/POLARDB_11_DEV
fengzunbao May 19, 2022
5ca943c
Merge pull request #242 from ApsaraDB/POLARDB_11_DEV
fengzunbao May 23, 2022
1cd5c76
Merge pull request #251 from ApsaraDB/POLARDB_11_DEV
fengzunbao Jun 10, 2022
89af102
Merge pull request #260 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jun 15, 2022
887beb4
Merge pull request #263 from ApsaraDB/POLARDB_11_DEV
fengzunbao Jun 22, 2022
49110b0
Merge pull request #266 from ApsaraDB/POLARDB_11_DEV
fengzunbao Jun 29, 2022
a165263
Merge pull request #269 from ApsaraDB/POLARDB_11_DEV
fengzunbao Jun 29, 2022
87cc978
Merge pull request #273 from ApsaraDB:POLARDB_11_DEV
polardb-bot[bot] Jul 8, 2022
39360e5
Merge pull request #289 from ApsaraDB:POLARDB_11_DEV
polardb-bot[bot] Aug 10, 2022
c93ed95
Merge pull request #296 from ApsaraDB:POLARDB_11_DEV
polardb-bot[bot] Aug 27, 2022
a7748dc
Merge pull request #309 from ApsaraDB:POLARDB_11_DEV
polardb-bot[bot] Oct 14, 2022
d058bf8
Merge pull request #314 from ApsaraDB:POLARDB_11_DEV
polardb-bot[bot] Nov 10, 2022
8b51477
Merge pull request #318 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Dec 19, 2022
dc23dae
Merge pull request #320 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Dec 22, 2022
a2b30a2
Merge pull request #322 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Dec 23, 2022
24ee43c
Merge pull request #327 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jan 4, 2023
928a156
Merge pull request #331 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jan 16, 2023
3f150f4
Merge pull request #337 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jan 31, 2023
87bf56e
Merge pull request #352 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Feb 12, 2023
3f1d4c8
Merge pull request #365 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Mar 7, 2023
c2495e6
Merge pull request #369 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Mar 25, 2023
adca767
Merge pull request #374 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Apr 12, 2023
512dc5b
Merge pull request #377 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Apr 21, 2023
5c5e5ae
Merge pull request #398 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jul 13, 2023
c39c0dd
Merge pull request #404 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jul 19, 2023
6237a5e
Merge pull request #410 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Jul 31, 2023
9844176
Merge pull request #417 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Aug 15, 2023
3beb381
Merge pull request #428 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Sep 8, 2023
bf8b5b3
Merge pull request #432 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Sep 20, 2023
ec10e26
Merge pull request #438 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Oct 16, 2023
45933cb
Merge pull request #441 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Oct 22, 2023
2a9496d
Merge pull request #446 from ApsaraDB/POLARDB_11_DEV
mrdrivingduck Nov 8, 2023
6bf3bb7
[Bug] refactor pdml
HBKO Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
PolarDB for PostgreSQL

Copyright (c) 2020, Alibaba Group Holding Limited
Copyright (c) 2021-2022, Alibaba Group Holding Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion src/backend/executor/execUtils_px.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ FillSliceGangInfo(ExecSlice *slice, int numsegments, DirectDispatchInfo *dd)
slice->segments = NIL;

for(k = 0;k < numsegments; k++)
slice->segments = lappend_int(slice->segments,RW_SEGMENT);
slice->segments = lappend_int(slice->segments, k);
break;
case GANGTYPE_PRIMARY_READER:
slice->planNumSegments = numsegments;
Expand Down
22 changes: 9 additions & 13 deletions src/backend/px/dispatcher/px_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ pxconn_createWorkerDescriptor(struct PxNodeInfo *pxinfo, int identifier,
pxWorkerDesc->identifier = identifier;
pxWorkerDesc->logicalWorkerInfo.total_count = logicalTotalWorkers;
/* POLAR px */
if (MASTER_CONTENT_ID == logicalWorkerIdx && RW_COUNTER_START <= identifier)
pxWorkerDesc->logicalWorkerInfo.idx = (identifier % RW_COUNTER_START) % logicalTotalWorkers;
else
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
/* POLAR end */

MemoryContextSwitchTo(oldContext);
Expand All @@ -118,10 +115,7 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
px_nodes = pxWorkerDesc->pxNodeInfo->px_nodes;

/* put px identifier to free list for reuse */
if (RW_COUNTER_START > pxWorkerDesc->identifier)
{
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);
}
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);

pxconn_disconnect(pxWorkerDesc);

Expand All @@ -139,7 +133,8 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
void
pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *pxid,
const char *options)
const char *options,
SegmentType segmentType)
{
#define MAX_KEYWORDS 10
#define MAX_INT_STRING_LEN 20
Expand All @@ -148,6 +143,7 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *values[MAX_KEYWORDS];
char portstr[MAX_INT_STRING_LEN];
int nkeywords = 0;
bool should_be_localhost = false;

keywords[nkeywords] = "pxid";
values[nkeywords] = pxid;
Expand All @@ -170,10 +166,10 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
*
* For other PX connections, we set "hostaddr". "host" is not used.
*/
if ((px_role == PX_ROLE_QC &&
(pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID ||
pxWorkerDesc->identifier >= RW_COUNTER_START)) ||
FAULT_COND(SIMPLE_FAULT_INJECTOR("hostaddr_info") == FaultInjectorTypeEnable))
/* When EntryDB or pdml, hostaddr should be localhost(127.0.0.1) */
should_be_localhost = (pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID)
|| (segmentType == SEGMENTTYPE_EXPLICT_WRITER);
if (px_role == PX_ROLE_QC && should_be_localhost)
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = "127.0.0.1";
Expand Down
4 changes: 2 additions & 2 deletions src/backend/px/dispatcher/px_gang.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ makeOptions(void)

Assert(px_role == PX_ROLE_QC);

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);
appendStringInfo(&string, " -c polar_px_qc_hostname=%s", qdinfo->config->hostip);
appendStringInfo(&string, " -c polar_px_qc_port=%d", qdinfo->config->port);

Expand Down Expand Up @@ -639,7 +639,7 @@ getPxProcessesForQC(int isPrimary)
elog(FATAL, "getPxProcessesForQC: unsupported request for master mirror process");
}

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);

Assert(qdinfo->config->node_idx == -1);
Assert(qdinfo->config->hostip != NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/px/dispatcher/px_gang_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pxgang_createGang_async(List *segments, SegmentType segmentType)
options = makeOptions();

/* start connection in asynchronous way */
pxconn_doConnectStart(pxWorkerDesc, pxid, options);
pxconn_doConnectStart(pxWorkerDesc, pxid, options, segmentType);

if (pxconn_isBadConnection(pxWorkerDesc))
ereport(ERROR, (errcode(ERRCODE_PX_INTERCONNECTION_ERROR),
Expand Down
54 changes: 17 additions & 37 deletions src/backend/px/px_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int px_node_configs_generation = POLAR_CLUSTER_INFO_INVALID_GENERATION
static MemoryContext px_worker_context = NULL;
static PxNodeConfigEntry *px_node_configs = NULL;

static int nextPXIdentifer(PxNodes *px_nodes, bool isRW);
static int nextPXIdentifer(PxNodes *px_nodes);
static void GeneratePxNodeConfigs(void);

/* NB: all extern function should switch to this context */
Expand Down Expand Up @@ -249,7 +249,7 @@ GeneratePxNodeConfigs(void)
qc_config->node_idx = -1;
qc_config->dop = 1;
qc_config->port = PostPortNumber;
qc_config->hostip = "127.0.0.1";
qc_config->hostip = "";

px_node_configs = configs;
px_node_configs_size = idx;
Expand Down Expand Up @@ -305,7 +305,6 @@ pxnode_getPxNodes()
px_nodes->numActivePXs = 0;
px_nodes->numIdlePXs = 0;
px_nodes->pxCounter = 0;
px_nodes->rwCounter = RW_COUNTER_START;
px_nodes->freeCounterList = NIL;

px_nodes->pxInfo =
Expand Down Expand Up @@ -368,10 +367,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType

if (logicalWorkerIdx == -1)
{
pxinfo = pxnode_getPxNodeInfo(-1);
pxinfo = pxnode_getPxNodeInfo(-1, segmentType);
logicalTotalWorkers = getPxWorkerCount();
} else
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx);
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx, segmentType);

if (pxinfo == NULL)
{
Expand Down Expand Up @@ -414,21 +413,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType
/* POLAR px */
if (!pxWorkerDesc)
{
if (RW_SEGMENT == logicalWorkerIdx)
{
/* RW */
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, true),
MASTER_CONTENT_ID,
logicalTotalWorkers);
}
else
{
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, false),
logicalWorkerIdx,
logicalTotalWorkers);
}
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes),
logicalWorkerIdx,
logicalTotalWorkers);
}
/* POLAR end */

Expand Down Expand Up @@ -504,33 +492,25 @@ pxnode_recycleIdlePX(PxWorkerDescriptor *pxWorkerDesc, bool forceDestroy)
}

static int
nextPXIdentifer(PxNodes *px_nodes, bool isRW)
nextPXIdentifer(PxNodes *px_nodes)
{
int result;
if (isRW)
{
return px_nodes->rwCounter++;
}
else
{
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter;
px_nodes->pxCounter = (px_nodes->pxCounter + 1) % RW_COUNTER_START;
return result;
}

result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter++;
return result;
}
result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
return result;
}

/*
* Find PxNodeInfo in the array by segment index.
*/
PxNodeInfo *
pxnode_getPxNodeInfo(int contentId)
pxnode_getPxNodeInfo(int contentId, SegmentType segmentType)
{
PxNodeInfo *pxInfo = NULL;
PxNodes *px_nodes;
Expand All @@ -546,7 +526,7 @@ pxnode_getPxNodeInfo(int contentId)
* Because the IP and Port of the RW and QC nodes are exactly the same
* , qcInfo can be used directly.
*/
if (RW_SEGMENT == contentId)
if (SEGMENTTYPE_EXPLICT_WRITER == segmentType)
return px_nodes->qcInfo;

if (contentId < 0)
Expand Down
67 changes: 3 additions & 64 deletions src/backend/px_optimizer/libgpopt/include/gpopt/engine/CHint.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

/* POLAR px */
#define MAX_INSERT_DOP_NUM ULONG(128)
#define MAX_UPDATE_DOP_NUM ULONG(128)
#define MAX_SELECT_DOP_NUM ULONG(128)
#define MAX_DELETE_DOP_NUM ULONG(128)
/* POLAR px */

namespace gpopt
Expand Down Expand Up @@ -59,17 +56,6 @@ class CHint : public CRefCount
/* POLAR px */
ULONG m_ulInsertDopNum;

ULONG m_ulUpdateDopNum;

ULONG m_ulSelectDopNum;

ULONG m_ulDeleteDopNum;

BOOL m_fRemoveUpdateRedundantMotion;

BOOL m_fRemoveDeleteRedundantMotion;
/* POLAR px */

public:
CHint(const CHint &) = delete;

Expand All @@ -80,12 +66,7 @@ class CHint : public CRefCount
ULONG broadcast_threshold, BOOL enforce_constraint_on_dml,
ULONG push_group_by_below_setop_threshold,
/* POLAR px */
ULONG insert_dop_num,
ULONG update_dop_num,
ULONG select_dop_num,
ULONG delete_dop_num,
BOOL remove_update_redundant_motion,
BOOL remove_delete_redundant_motion)
ULONG insert_dop_num)
: m_ulMinNumOfPartsToRequireSortOnInsert(
min_num_of_parts_to_require_sort_on_insert),
m_ulJoinArityForAssociativityCommutativity(
Expand All @@ -97,12 +78,7 @@ class CHint : public CRefCount
m_ulPushGroupByBelowSetopThreshold(
push_group_by_below_setop_threshold),
/* POLAR px */
m_ulInsertDopNum(insert_dop_num),
m_ulUpdateDopNum(update_dop_num),
m_ulSelectDopNum(select_dop_num),
m_ulDeleteDopNum(delete_dop_num),
m_fRemoveUpdateRedundantMotion(remove_update_redundant_motion),
m_fRemoveDeleteRedundantMotion(remove_delete_redundant_motion)
m_ulInsertDopNum(insert_dop_num)
{
}

Expand Down Expand Up @@ -175,37 +151,6 @@ class CHint : public CRefCount
return m_ulInsertDopNum;
}

ULONG
UlUpdateDopNum() const
{
return m_ulUpdateDopNum;
}

ULONG
UlSelectDopNum() const
{
return m_ulSelectDopNum;
}

ULONG
UlDeleteDopNum() const
{
return m_ulDeleteDopNum;
}

BOOL
FRemoveUpdateRedundantMotion() const
{
return m_fRemoveUpdateRedundantMotion;
}

BOOL
FRemoveDeleteRedundantMotion() const
{
return m_fRemoveDeleteRedundantMotion;
}
/* POLAR px */

// generate default hint configurations, which disables sort during insert on
// append only row-oriented partitioned tables by default
static CHint *
Expand All @@ -220,13 +165,7 @@ class CHint : public CRefCount
true, /* enforce_constraint_on_dml */
PUSH_GROUP_BY_BELOW_SETOP_THRESHOLD, /* push_group_by_below_setop_threshold */
/* POLAR px */
MAX_INSERT_DOP_NUM,
MAX_UPDATE_DOP_NUM,
MAX_SELECT_DOP_NUM,
MAX_DELETE_DOP_NUM,
true,
true
/* POLAR px */
MAX_INSERT_DOP_NUM
);
}

Expand Down
38 changes: 3 additions & 35 deletions src/backend/px_optimizer/libgpopt/src/operators/CPhysicalDML.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,42 +284,10 @@ CPhysicalDML::PdsRequired(CMemoryPool *mp,
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else if (CLogicalDML::EdmlUpdate == m_edmlop)
// Update and Delete
else if(CLogicalDML::EdmlUpdate == m_edmlop || CLogicalDML::EdmlDelete == m_edmlop)
{
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveUpdateRedundantMotion();
ULONG update_dop_num = optimizer_config->GetHint()->UlUpdateDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (update_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
}
else if (CLogicalDML::EdmlDelete == m_edmlop)
{
/* delete */
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveDeleteRedundantMotion();
ULONG delete_dop_num = optimizer_config->GetHint()->UlDeleteDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (delete_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ using namespace gpopt;

/* POLAR px */
#define DEFAULT_INSERT_DOP_NUM ULONG(1)
#define DEFAULT_UPDATE_DOP_NUM ULONG(1)
#define DEFAULT_SELECT_DOP_NUM ULONG(1)
#define DEFAULT_DELETE_DOP_NUM ULONG(1)
/* POLAR px */

XERCES_CPP_NAMESPACE_USE
Expand Down Expand Up @@ -125,12 +122,7 @@ CParseHandlerHint::StartElement(const XMLCh *const, //element_uri,
join_order_dp_threshold, broadcast_threshold, enforce_constraint_on_dml,
push_group_by_below_setop_threshold,
/* POLAR px */
DEFAULT_INSERT_DOP_NUM,
DEFAULT_UPDATE_DOP_NUM,
DEFAULT_SELECT_DOP_NUM,
DEFAULT_DELETE_DOP_NUM,
true,
true);
DEFAULT_INSERT_DOP_NUM);
}

//---------------------------------------------------------------------------
Expand Down
Loading