Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] refactor pdml #450

Merged
merged 1 commit into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/backend/executor/execUtils_px.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ FillSliceGangInfo(ExecSlice *slice, int numsegments, DirectDispatchInfo *dd)
slice->segments = NIL;

for(k = 0;k < numsegments; k++)
slice->segments = lappend_int(slice->segments,RW_SEGMENT);
slice->segments = lappend_int(slice->segments, k);
break;
case GANGTYPE_PRIMARY_READER:
slice->planNumSegments = numsegments;
Expand Down
22 changes: 9 additions & 13 deletions src/backend/px/dispatcher/px_conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ pxconn_createWorkerDescriptor(struct PxNodeInfo *pxinfo, int identifier,
pxWorkerDesc->identifier = identifier;
pxWorkerDesc->logicalWorkerInfo.total_count = logicalTotalWorkers;
/* POLAR px */
if (MASTER_CONTENT_ID == logicalWorkerIdx && RW_COUNTER_START <= identifier)
pxWorkerDesc->logicalWorkerInfo.idx = (identifier % RW_COUNTER_START) % logicalTotalWorkers;
else
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
pxWorkerDesc->logicalWorkerInfo.idx = logicalWorkerIdx;
/* POLAR end */

MemoryContextSwitchTo(oldContext);
Expand All @@ -118,10 +115,7 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
px_nodes = pxWorkerDesc->pxNodeInfo->px_nodes;

/* put px identifier to free list for reuse */
if (RW_COUNTER_START > pxWorkerDesc->identifier)
{
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);
}
px_nodes->freeCounterList = lappend_int(px_nodes->freeCounterList, pxWorkerDesc->identifier);

pxconn_disconnect(pxWorkerDesc);

Expand All @@ -139,7 +133,8 @@ pxconn_termWorkerDescriptor(PxWorkerDescriptor *pxWorkerDesc)
void
pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *pxid,
const char *options)
const char *options,
SegmentType segmentType)
{
#define MAX_KEYWORDS 10
#define MAX_INT_STRING_LEN 20
Expand All @@ -148,6 +143,7 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
const char *values[MAX_KEYWORDS];
char portstr[MAX_INT_STRING_LEN];
int nkeywords = 0;
bool should_be_localhost = false;

keywords[nkeywords] = "pxid";
values[nkeywords] = pxid;
Expand All @@ -170,10 +166,10 @@ pxconn_doConnectStart(PxWorkerDescriptor *pxWorkerDesc,
*
* For other PX connections, we set "hostaddr". "host" is not used.
*/
if ((px_role == PX_ROLE_QC &&
(pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID ||
pxWorkerDesc->identifier >= RW_COUNTER_START)) ||
FAULT_COND(SIMPLE_FAULT_INJECTOR("hostaddr_info") == FaultInjectorTypeEnable))
/* When EntryDB or pdml, hostaddr should be localhost(127.0.0.1) */
should_be_localhost = (pxWorkerDesc->logicalWorkerInfo.idx == MASTER_CONTENT_ID)
|| (segmentType == SEGMENTTYPE_EXPLICT_WRITER);
if (px_role == PX_ROLE_QC && should_be_localhost)
{
keywords[nkeywords] = "hostaddr";
values[nkeywords] = "127.0.0.1";
Expand Down
4 changes: 2 additions & 2 deletions src/backend/px/dispatcher/px_gang.c
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ makeOptions(void)

Assert(px_role == PX_ROLE_QC);

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);
appendStringInfo(&string, " -c polar_px_qc_hostname=%s", qdinfo->config->hostip);
appendStringInfo(&string, " -c polar_px_qc_port=%d", qdinfo->config->port);

Expand Down Expand Up @@ -639,7 +639,7 @@ getPxProcessesForQC(int isPrimary)
elog(FATAL, "getPxProcessesForQC: unsupported request for master mirror process");
}

qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID);
qdinfo = pxnode_getPxNodeInfo(MASTER_CONTENT_ID, SEGMENTTYPE_ANY);

Assert(qdinfo->config->node_idx == -1);
Assert(qdinfo->config->hostip != NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/px/dispatcher/px_gang_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pxgang_createGang_async(List *segments, SegmentType segmentType)
options = makeOptions();

/* start connection in asynchronous way */
pxconn_doConnectStart(pxWorkerDesc, pxid, options);
pxconn_doConnectStart(pxWorkerDesc, pxid, options, segmentType);

if (pxconn_isBadConnection(pxWorkerDesc))
ereport(ERROR, (errcode(ERRCODE_PX_INTERCONNECTION_ERROR),
Expand Down
54 changes: 17 additions & 37 deletions src/backend/px/px_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ static int px_node_configs_generation = POLAR_CLUSTER_INFO_INVALID_GENERATION
static MemoryContext px_worker_context = NULL;
static PxNodeConfigEntry *px_node_configs = NULL;

static int nextPXIdentifer(PxNodes *px_nodes, bool isRW);
static int nextPXIdentifer(PxNodes *px_nodes);
static void GeneratePxNodeConfigs(void);

/* NB: all extern function should switch to this context */
Expand Down Expand Up @@ -249,7 +249,7 @@ GeneratePxNodeConfigs(void)
qc_config->node_idx = -1;
qc_config->dop = 1;
qc_config->port = PostPortNumber;
qc_config->hostip = "127.0.0.1";
qc_config->hostip = "";

px_node_configs = configs;
px_node_configs_size = idx;
Expand Down Expand Up @@ -305,7 +305,6 @@ pxnode_getPxNodes()
px_nodes->numActivePXs = 0;
px_nodes->numIdlePXs = 0;
px_nodes->pxCounter = 0;
px_nodes->rwCounter = RW_COUNTER_START;
px_nodes->freeCounterList = NIL;

px_nodes->pxInfo =
Expand Down Expand Up @@ -368,10 +367,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType

if (logicalWorkerIdx == -1)
{
pxinfo = pxnode_getPxNodeInfo(-1);
pxinfo = pxnode_getPxNodeInfo(-1, segmentType);
logicalTotalWorkers = getPxWorkerCount();
} else
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx);
pxinfo = pxnode_getPxNodeInfo(logicalWorkerIdx, segmentType);

if (pxinfo == NULL)
{
Expand Down Expand Up @@ -414,21 +413,10 @@ pxnode_allocateIdlePX(int logicalWorkerIdx, int logicalTotalWorkers, SegmentType
/* POLAR px */
if (!pxWorkerDesc)
{
if (RW_SEGMENT == logicalWorkerIdx)
{
/* RW */
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, true),
MASTER_CONTENT_ID,
logicalTotalWorkers);
}
else
{
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes, false),
logicalWorkerIdx,
logicalTotalWorkers);
}
pxWorkerDesc = pxconn_createWorkerDescriptor(pxinfo,
nextPXIdentifer(pxinfo->px_nodes),
logicalWorkerIdx,
logicalTotalWorkers);
}
/* POLAR end */

Expand Down Expand Up @@ -504,33 +492,25 @@ pxnode_recycleIdlePX(PxWorkerDescriptor *pxWorkerDesc, bool forceDestroy)
}

static int
nextPXIdentifer(PxNodes *px_nodes, bool isRW)
nextPXIdentifer(PxNodes *px_nodes)
{
int result;
if (isRW)
{
return px_nodes->rwCounter++;
}
else
{
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter;
px_nodes->pxCounter = (px_nodes->pxCounter + 1) % RW_COUNTER_START;
return result;
}

result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
if (!px_nodes->freeCounterList)
{
result = px_nodes->pxCounter++;
return result;
}
result = linitial_int(px_nodes->freeCounterList);
px_nodes->freeCounterList = list_delete_first(px_nodes->freeCounterList);
return result;
}

/*
* Find PxNodeInfo in the array by segment index.
*/
PxNodeInfo *
pxnode_getPxNodeInfo(int contentId)
pxnode_getPxNodeInfo(int contentId, SegmentType segmentType)
{
PxNodeInfo *pxInfo = NULL;
PxNodes *px_nodes;
Expand All @@ -546,7 +526,7 @@ pxnode_getPxNodeInfo(int contentId)
* Because the IP and Port of the RW and QC nodes are exactly the same
* , qcInfo can be used directly.
*/
if (RW_SEGMENT == contentId)
if (SEGMENTTYPE_EXPLICT_WRITER == segmentType)
return px_nodes->qcInfo;

if (contentId < 0)
Expand Down
67 changes: 3 additions & 64 deletions src/backend/px_optimizer/libgpopt/include/gpopt/engine/CHint.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@

/* POLAR px */
#define MAX_INSERT_DOP_NUM ULONG(128)
#define MAX_UPDATE_DOP_NUM ULONG(128)
#define MAX_SELECT_DOP_NUM ULONG(128)
#define MAX_DELETE_DOP_NUM ULONG(128)
/* POLAR px */

namespace gpopt
Expand Down Expand Up @@ -59,17 +56,6 @@ class CHint : public CRefCount
/* POLAR px */
ULONG m_ulInsertDopNum;

ULONG m_ulUpdateDopNum;

ULONG m_ulSelectDopNum;

ULONG m_ulDeleteDopNum;

BOOL m_fRemoveUpdateRedundantMotion;

BOOL m_fRemoveDeleteRedundantMotion;
/* POLAR px */

public:
CHint(const CHint &) = delete;

Expand All @@ -80,12 +66,7 @@ class CHint : public CRefCount
ULONG broadcast_threshold, BOOL enforce_constraint_on_dml,
ULONG push_group_by_below_setop_threshold,
/* POLAR px */
ULONG insert_dop_num,
ULONG update_dop_num,
ULONG select_dop_num,
ULONG delete_dop_num,
BOOL remove_update_redundant_motion,
BOOL remove_delete_redundant_motion)
ULONG insert_dop_num)
: m_ulMinNumOfPartsToRequireSortOnInsert(
min_num_of_parts_to_require_sort_on_insert),
m_ulJoinArityForAssociativityCommutativity(
Expand All @@ -97,12 +78,7 @@ class CHint : public CRefCount
m_ulPushGroupByBelowSetopThreshold(
push_group_by_below_setop_threshold),
/* POLAR px */
m_ulInsertDopNum(insert_dop_num),
m_ulUpdateDopNum(update_dop_num),
m_ulSelectDopNum(select_dop_num),
m_ulDeleteDopNum(delete_dop_num),
m_fRemoveUpdateRedundantMotion(remove_update_redundant_motion),
m_fRemoveDeleteRedundantMotion(remove_delete_redundant_motion)
m_ulInsertDopNum(insert_dop_num)
{
}

Expand Down Expand Up @@ -175,37 +151,6 @@ class CHint : public CRefCount
return m_ulInsertDopNum;
}

ULONG
UlUpdateDopNum() const
{
return m_ulUpdateDopNum;
}

ULONG
UlSelectDopNum() const
{
return m_ulSelectDopNum;
}

ULONG
UlDeleteDopNum() const
{
return m_ulDeleteDopNum;
}

BOOL
FRemoveUpdateRedundantMotion() const
{
return m_fRemoveUpdateRedundantMotion;
}

BOOL
FRemoveDeleteRedundantMotion() const
{
return m_fRemoveDeleteRedundantMotion;
}
/* POLAR px */

// generate default hint configurations, which disables sort during insert on
// append only row-oriented partitioned tables by default
static CHint *
Expand All @@ -220,13 +165,7 @@ class CHint : public CRefCount
true, /* enforce_constraint_on_dml */
PUSH_GROUP_BY_BELOW_SETOP_THRESHOLD, /* push_group_by_below_setop_threshold */
/* POLAR px */
MAX_INSERT_DOP_NUM,
MAX_UPDATE_DOP_NUM,
MAX_SELECT_DOP_NUM,
MAX_DELETE_DOP_NUM,
true,
true
/* POLAR px */
MAX_INSERT_DOP_NUM
);
}

Expand Down
38 changes: 3 additions & 35 deletions src/backend/px_optimizer/libgpopt/src/operators/CPhysicalDML.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,42 +284,10 @@ CPhysicalDML::PdsRequired(CMemoryPool *mp,
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else if (CLogicalDML::EdmlUpdate == m_edmlop)
// Update and Delete
else if(CLogicalDML::EdmlUpdate == m_edmlop || CLogicalDML::EdmlDelete == m_edmlop)
{
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveUpdateRedundantMotion();
ULONG update_dop_num = optimizer_config->GetHint()->UlUpdateDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (update_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
}
else if (CLogicalDML::EdmlDelete == m_edmlop)
{
/* delete */
bool remove_redundant_motion = optimizer_config->GetHint()->FRemoveDeleteRedundantMotion();
ULONG delete_dop_num = optimizer_config->GetHint()->UlDeleteDopNum();
ULONG select_dop_num = optimizer_config->GetHint()->UlSelectDopNum();
if (!remove_redundant_motion)
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
else
{
if (delete_dop_num == select_dop_num)
{
m_pds->AddRef();
return m_pds;
}
else
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
return GPOS_NEW(mp) CDistributionSpecStrictRandom();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ using namespace gpopt;

/* POLAR px */
#define DEFAULT_INSERT_DOP_NUM ULONG(1)
#define DEFAULT_UPDATE_DOP_NUM ULONG(1)
#define DEFAULT_SELECT_DOP_NUM ULONG(1)
#define DEFAULT_DELETE_DOP_NUM ULONG(1)
/* POLAR px */

XERCES_CPP_NAMESPACE_USE
Expand Down Expand Up @@ -125,12 +122,7 @@ CParseHandlerHint::StartElement(const XMLCh *const, //element_uri,
join_order_dp_threshold, broadcast_threshold, enforce_constraint_on_dml,
push_group_by_below_setop_threshold,
/* POLAR px */
DEFAULT_INSERT_DOP_NUM,
DEFAULT_UPDATE_DOP_NUM,
DEFAULT_SELECT_DOP_NUM,
DEFAULT_DELETE_DOP_NUM,
true,
true);
DEFAULT_INSERT_DOP_NUM);
}

//---------------------------------------------------------------------------
Expand Down
Loading