Skip to content

Commit

Permalink
Merge pull request #19009 from timothyklemm/hpcc-32455-wssql-otel
Browse files Browse the repository at this point in the history
HPCC-32455 Upgrade WsSQL instrumentation

Reviewed-By: Rodrigo Pastrana <rodrigo.pastrana@lexisnexisrisk.com>
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Oct 4, 2024
2 parents a6cacdc + 6c748d6 commit de8b4d8
Showing 1 changed file with 109 additions and 53 deletions.
162 changes: 109 additions & 53 deletions esp/services/ws_sql/ws_sqlService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -685,44 +685,58 @@ printTree(sqlAST, 0);

bool CwssqlEx::getWUResult(IEspContext &context, const char * wuid, StringBuffer &result, unsigned start, unsigned count, int sequence, const char * dsname, const char * schemaname)
{
context.addTraceSummaryTimeStamp(LogMin, "StrtgetReslts");
if (wuid && *wuid)
OwnedSpanScope resultSpanScope(queryThreadedActiveSpan()->createInternalSpan("get_wu_result"));
try
{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
if (wuid && *wuid)
{
Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);

if (!cw)
throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid);
if (!cw)
throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid);

SCMStringBuffer stateDesc;
SCMStringBuffer stateDesc;

switch (cw->getState())
{
case WUStateCompleted:
case WUStateFailed:
case WUStateUnknown:
case WUStateCompiled:
{
StringBufferAdaptor resultXML(result);
Owned<IResultSetFactory> factory = getResultSetFactory(context.queryUserId(), context.queryPassword());
Owned<INewResultSet> nr = factory->createNewResultSet(wuid, sequence, NULL);
if (nr.get())
{
context.addTraceSummaryTimeStamp(LogMax, "strtgetXMLRslts");
getResultXml(resultXML, nr.get(), dsname, start, count, schemaname);
context.addTraceSummaryTimeStamp(LogMax, "endgetXMLRslts");
}
else
return false;
break;
}
default:
break;
switch (cw->getState())
{
case WUStateCompleted:
case WUStateFailed:
case WUStateUnknown:
case WUStateCompiled:
{
StringBufferAdaptor resultXML(result);
Owned<IResultSetFactory> factory = getResultSetFactory(context.queryUserId(), context.queryPassword());
Owned<INewResultSet> nr = factory->createNewResultSet(wuid, sequence, NULL);
if (nr.get())
{
OwnedSpanScope xmlSpanScope(queryThreadedActiveSpan()->createInternalSpan("get_result_xml"));
try
{
getResultXml(resultXML, nr.get(), dsname, start, count, schemaname);
}
catch(IException* e)
{
xmlSpanScope->recordException(e);
throw;
}
}
else
return false;
break;
}
default:
break;
}
return true;
}
context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
return true;
}
context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
catch(IException* e)
{
resultSpanScope->recordException(e);
throw;
}

return false;
}

Expand Down Expand Up @@ -840,9 +854,9 @@ void CwssqlEx::processMultipleClusterOption(StringArray & clusters, const char

bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IEspExecuteSQLResponse &resp)
{
OwnedSpanScope exSpanScope(queryThreadedActiveSpan()->createInternalSpan("on_execute_sql"));
try
{
context.addTraceSummaryTimeStamp(LogMin, "StrtOnExecuteSQL");
context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::ExecuteSQL: Permission denied.");

double version = context.getClientVersion();
Expand Down Expand Up @@ -949,13 +963,21 @@ bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IE
if (querytype == SQLTypeCreateAndLoad)
clonable = false;

context.addTraceSummaryTimeStamp(LogNormal, "StartECLGenerate");
ECLEngine::generateECL(parsedSQL, ecltext);

if (hashoptions.length() > 0)
ecltext.insert(0, hashoptions.str());
{
OwnedSpanScope eclGenSpanScope(queryThreadedActiveSpan()->createInternalSpan("generate_ecl"));
try
{
ECLEngine::generateECL(parsedSQL, ecltext);

context.addTraceSummaryTimeStamp(LogNormal, "EndECLGenerate");
if (hashoptions.length() > 0)
ecltext.insert(0, hashoptions.str());
}
catch(IException* e)
{
eclGenSpanScope->recordException(e);
throw;
}
}

if (isEmpty(ecltext))
throw MakeStringException(1,"Could not generate ECL from SQL.");
Expand Down Expand Up @@ -986,10 +1008,18 @@ bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IE
wu->commit();
wu.clear();

context.addTraceSummaryTimeStamp(LogNormal, "strtWUCompile");
WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, nullptr, 0, 0, true, false, false, nullptr, nullptr, nullptr, nullptr);
waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
context.addTraceSummaryTimeStamp(LogNormal, "endWUCompile");
OwnedSpanScope wuCompileSpanScope(queryThreadedActiveSpan()->createInternalSpan("submit_ws_workunits"));
try
{
WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, nullptr, 0, 0, true, false, false, nullptr, nullptr, nullptr, nullptr);
wuCompileSpanScope->setSpanAttribute("compiled_wuid", compiledwuid.str());
waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
}
catch(IException* e)
{
wuCompileSpanScope->recordException(e);
throw;
}
}
}

Expand Down Expand Up @@ -1022,17 +1052,36 @@ bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IE

if (clonable)
{
context.addTraceSummaryTimeStamp(LogNormal, "StartWUCloneExe");
cloneAndExecuteWU(context, compiledwuid.str(), runningwuid, xmlparams.str(), NULL, NULL, cluster);
context.addTraceSummaryTimeStamp(LogNormal, "EndWUCloneExe");
OwnedSpanScope wuCloneSpanScope(queryThreadedActiveSpan()->createInternalSpan("clone_and_execute_wu"));
try
{
wuCloneSpanScope->setSpanAttribute("compiled_wuid", compiledwuid.str());
wuCloneSpanScope->setSpanAttribute("running_wuid", runningwuid.str());
wuCloneSpanScope->setSpanAttribute("cluster", cluster ? cluster : "");
cloneAndExecuteWU(context, compiledwuid.str(), runningwuid, xmlparams.str(), NULL, NULL, cluster);
}
catch(IException* e)
{
wuCloneSpanScope->recordException(e);
throw;
}
wuCloneSpanScope.clear();
if(cacheeligible && !isQueryCached(normalizedSQL.str()))
addQueryToCache(normalizedSQL.str(), compiledwuid.str());
}
else
{
context.addTraceSummaryTimeStamp(LogNormal, "StartWUSubmit");
WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, nullptr, 0, 0, false, true, true, nullptr, nullptr, nullptr, nullptr);
context.addTraceSummaryTimeStamp(LogNormal, "EndWUSubmit");
OwnedSpanScope wuSubmitSpanScope(queryThreadedActiveSpan()->createInternalSpan("submit_ws_workunit"));
try
{
WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, nullptr, 0, 0, false, true, true, nullptr, nullptr, nullptr, nullptr);
}
catch (IException* e)
{
wuSubmitSpanScope->recordException(e);
throw;
}
wuSubmitSpanScope.clear();
runningwuid.set(compiledwuid.str());
if (cacheeligible)
addQueryToCache(normalizedSQL.str(), runningwuid.str());
Expand All @@ -1041,9 +1090,16 @@ bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IE
int timeToWait = req.getWait();
if (timeToWait != 0)
{
context.addTraceSummaryTimeStamp(LogNormal, "StartWUProcessWait");
waitForWorkUnitToComplete(runningwuid.str(), timeToWait);
context.addTraceSummaryTimeStamp(LogNormal, "EndWUProcessWait");
OwnedSpanScope wuProcessSpanScope(queryThreadedActiveSpan()->createInternalSpan("wait_for_workunit_to_complete"));
try
{
waitForWorkUnitToComplete(runningwuid.str(), timeToWait);
}
catch(IException* e)
{
wuProcessSpanScope->recordException(e);
throw;
}
}

if (strcmp(runningwuid.str(), compiledwuid.str())!=0)
Expand Down Expand Up @@ -1074,13 +1130,13 @@ bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IE
}
catch(IException* e)
{
exSpanScope->recordException(e);
FORWARDEXCEPTION(context, e, -1);
}
//catch (...)
//{
// me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str()));
//}
context.addTraceSummaryTimeStamp(LogMin, "EndOnExecuteSQL");
return true;
}

Expand Down

0 comments on commit de8b4d8

Please sign in to comment.