Skip to content

Commit

Permalink
HPCC-XXXXX RoxieSocketQueueManager::run may be blocked by actCrit
Browse files Browse the repository at this point in the history
Avoid need to obtain critsec just to check if a worker thread's packet matches

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
  • Loading branch information
richardkchapman committed Oct 2, 2024
1 parent 318d964 commit a22655b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 43 deletions.
3 changes: 2 additions & 1 deletion roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class RoxiePacketHeader
unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph)
hash64_t queryHash = 0; // identifies the query

ruid_t uid = 0; // unique id
std::atomic<ruid_t> uid = 0; // unique id
ServerIdentifier serverId;
#ifdef SUBCHANNELS_IN_HEADER
ServerIdentifier subChannels[MAX_SUBCHANNEL];
Expand All @@ -173,6 +173,7 @@ class RoxiePacketHeader

static unsigned getSubChannelMask(unsigned subChannel);
unsigned priorityHash() const;
void clear();
void copy(const RoxiePacketHeader &oh);
bool matchPacket(const RoxiePacketHeader &oh) const;
void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
Expand Down
76 changes: 35 additions & 41 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _
{
// Used to create the header to send a callback to originating server or an IBYTI to a buddy
activityId = _activityId;
uid = source.uid;
uid.store(source.uid);
queryHash = source.queryHash;
channel = source.channel;
overflowSequence = source.overflowSequence;
Expand Down Expand Up @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const

void RoxiePacketHeader::copy(const RoxiePacketHeader &oh)
{
// used for saving away kill packets for later matching by match
uid = oh.uid;
// used for saving away info for later matching by match, without having to lock
overflowSequence = oh.overflowSequence;
continueSequence = oh.continueSequence;
serverId = oh.serverId;
channel = oh.channel;
uid.store(oh.uid);
// MORE - would it be safer, maybe even faster to copy the rest too?
}

void RoxiePacketHeader::clear()
{
// used for saving away kill packets for later matching by match
uid = RUID_NONE; // Will never match a queued packet
}

bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
{
// used when matching up a kill packet against a pending one...
Expand Down Expand Up @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
ret.appendf(" (fetch part)");
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
ret.appendf(" uid=" RUIDF " pri=", uid.load());
switch(activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
Expand Down Expand Up @@ -1184,7 +1190,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory


virtual IPooledThread *createNew();
void abortChannel(unsigned channel);

void start()
{
Expand Down Expand Up @@ -1381,6 +1386,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
Owned<const ITopologyServer> topology;
#endif
AgentContextLogger logctx;
RoxiePacketHeader packetHeader;

public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -1409,37 +1415,38 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
inline void setActivity(IRoxieAgentActivity *act)
{
// Take care to avoid having destructor called inside critsec
Owned<IRoxieAgentActivity> goer;
CriticalBlock b(actCrit);
goer.swap(activity);
activity.setown(act);
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
CriticalBlock b(actCrit);
return packet && packet->queryHeader().matchPacket(h);
}

void abortChannel(unsigned channel)
inline void setPacket(IRoxieQueryPacket *p)
{
CriticalBlock b(actCrit);
if (packet && packet->queryHeader().channel==channel)
if (p)
{
abortLaunch = true;
#ifndef NEW_IBYTI
if (doIbytiDelay)
ibytiSem.signal();
#endif
if (activity)
activity->abort();
packet.setown(p);
packetHeader.copy(p->queryHeader());
}
else
{
packetHeader.clear();
packet.setown(p);
}
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
return packetHeader.matchPacket(h);
}

bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
{
CriticalBlock b(actCrit);
if (packet && packet->queryHeader().matchPacket(h))
if (packetHeader.matchPacket(h))
{
CriticalBlock b(actCrit);
queryFound = true;
abortLaunch = true;
#ifndef NEW_IBYTI
Expand Down Expand Up @@ -1760,7 +1767,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
#ifdef NEW_IBYTI
logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime());
#endif
packet.setown(next->deserialize());
setPacket(next->deserialize());
next.clear();
RoxiePacketHeader &header = packet->queryHeader();
#ifndef SUBCHANNELS_IN_HEADER
Expand Down Expand Up @@ -1792,8 +1799,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
workerThreadBusy = false;
{
CriticalBlock b(actCrit);
packet.clear();
setPacket(nullptr);
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
#endif
Expand All @@ -1803,12 +1809,11 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(IException *E)
{
CriticalBlock b(actCrit);
EXCLOG(E);
if (packet)
{
throwRemoteException(E, NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
else
E->Release();
Expand All @@ -1818,13 +1823,12 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(...)
{
CriticalBlock b(actCrit);
Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
EXCLOG(E);
if (packet)
{
throwRemoteException(E.getClear(), NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
Expand All @@ -1839,16 +1843,6 @@ IPooledThread *RoxieQueue::createNew()
return new CRoxieWorker;
}

void RoxieQueue::abortChannel(unsigned channel)
{
Owned<IPooledThreadIterator> wi = workers->running();
ForEach(*wi)
{
CRoxieWorker &w = (CRoxieWorker &) wi->query();
w.abortChannel(channel);
}
}

//=================================================================================

class CallbackEntry : implements IPendingCallback, public CInterface
Expand Down
3 changes: 2 additions & 1 deletion roxie/udplib/udplib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days
#define RUIDF "0x%.8x"
#define RUID_PING 0
#define RUID_DISCARD 1
#define RUID_FIRST 2
#define RUID_NONE 2
#define RUID_FIRST 3

typedef unsigned RecordLengthType;
#define MAX_RECORD_LENGTH 0xffffffff
Expand Down

0 comments on commit a22655b

Please sign in to comment.