Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32781 RoxieSocketQueueManager::run may be blocked by actCrit #19171

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class RoxiePacketHeader
unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph)
hash64_t queryHash = 0; // identifies the query

ruid_t uid = 0; // unique id
std::atomic<ruid_t> uid = 0; // unique id
ServerIdentifier serverId;
#ifdef SUBCHANNELS_IN_HEADER
ServerIdentifier subChannels[MAX_SUBCHANNEL];
Expand All @@ -173,6 +173,7 @@ class RoxiePacketHeader

static unsigned getSubChannelMask(unsigned subChannel);
unsigned priorityHash() const;
void clear();
void copy(const RoxiePacketHeader &oh);
bool matchPacket(const RoxiePacketHeader &oh) const;
void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
Expand Down Expand Up @@ -295,7 +296,6 @@ extern unsigned callbackTimeout;
extern unsigned lowTimeout;
extern unsigned highTimeout;
extern unsigned slaTimeout;
extern unsigned headRegionSize;
extern unsigned ccdMulticastPort;
extern IPropertyTree *topology;
extern MapStringTo<int> *preferredClusters;
Expand Down Expand Up @@ -389,6 +389,7 @@ extern bool ignoreFileDateMismatches;
extern bool ignoreFileSizeMismatches;
extern int fileTimeFuzzySeconds;
extern SinkMode defaultSinkMode;
extern bool limitWaitingWorkers;

#if defined(_CONTAINERIZED) || defined(SUBCHANNELS_IN_HEADER)
static constexpr bool roxieMulticastEnabled = false;
Expand Down
5 changes: 2 additions & 3 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ unsigned numRequestArrayThreads = 5;
bool blockedLocalAgent = true;
bool acknowledgeAllRequests = true;
unsigned packetAcknowledgeTimeout = 100;
unsigned headRegionSize;
unsigned ccdMulticastPort;
bool enableHeartBeat = true;
unsigned parallelLoopFlowLimit = 100;
Expand Down Expand Up @@ -203,6 +202,7 @@ unsigned maxGraphLoopIterations;
bool steppingEnabled = true;
bool simpleLocalKeyedJoins = true;
bool adhocRoxie = false;
bool limitWaitingWorkers = false;

unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB
unsigned socketCheckInterval = 5000;
Expand Down Expand Up @@ -999,7 +999,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize);
blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent);
acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests);
headRegionSize = topology->getPropInt("@headRegionSize", 0);
packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout);
ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT);
statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600);
Expand Down Expand Up @@ -1279,6 +1278,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
const char *sinkModeText = topology->queryProp("@sinkMode");
if (sinkModeText)
defaultSinkMode = getSinkMode(sinkModeText);
limitWaitingWorkers = topology->getPropBool("@limitWaitingWorkers", limitWaitingWorkers);

cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60);
setLegacyAES(topology->getPropBool("expert/@useLegacyAES", false));
Expand Down Expand Up @@ -1448,7 +1448,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs());

ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit);
ROQ->setHeadRegionSize(headRegionSize);
ROQ->start();
Owned<IPacketDiscarder> packetDiscarder = createPacketDiscarder();
#if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL)
Expand Down
127 changes: 43 additions & 84 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _
{
// Used to create the header to send a callback to originating server or an IBYTI to a buddy
activityId = _activityId;
uid = source.uid;
uid.store(source.uid);
queryHash = source.queryHash;
channel = source.channel;
overflowSequence = source.overflowSequence;
Expand Down Expand Up @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const

void RoxiePacketHeader::copy(const RoxiePacketHeader &oh)
{
// used for saving away kill packets for later matching by match
uid = oh.uid;
// used for saving away info for later matching by match, without having to lock
overflowSequence = oh.overflowSequence;
continueSequence = oh.continueSequence;
serverId = oh.serverId;
channel = oh.channel;
uid.store(oh.uid);
// MORE - would it be safer, maybe even faster to copy the rest too?
}

void RoxiePacketHeader::clear()
{
// used for saving away kill packets for later matching by match
uid = RUID_NONE; // Will never match a queued packet
}

bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const
{
// used when matching up a kill packet against a pending one...
Expand Down Expand Up @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const
ret.appendf(" (fetch part)");
break;
}
ret.appendf(" uid=" RUIDF " pri=", uid);
ret.appendf(" uid=" RUIDF " pri=", uid.load());
switch(activityId & ROXIE_PRIORITY_MASK)
{
case ROXIE_SLA_PRIORITY: ret.append("SLA"); break;
Expand Down Expand Up @@ -1151,8 +1157,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
Owned <IThreadPool> workers;
QueueOf<ISerializedRoxieQueryPacket, true> waiting;
Semaphore available;
CriticalSection availCrit; // Semaphore post may be slow with a lot of waiters - this crit may be used to limit to a single waiter
CriticalSection qcrit;
unsigned headRegionSize;
unsigned numWorkers;
RelaxedAtomic<unsigned> started;
std::atomic<unsigned> idle;
Expand All @@ -1174,9 +1180,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory
public:
IMPLEMENT_IINTERFACE;

RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers)
RoxieQueue(unsigned _numWorkers)
{
headRegionSize = _headRegionSize;
numWorkers = _numWorkers;
workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers));
started = 0;
Expand All @@ -1192,7 +1197,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory


virtual IPooledThread *createNew();
void abortChannel(unsigned channel);

void start()
{
Expand Down Expand Up @@ -1319,7 +1323,10 @@ class RoxieQueue : public CInterface, implements IThreadFactory
void wait()
{
idle++;
available.wait();
{
CLeavableCriticalBlock b(availCrit, limitWaitingWorkers);
available.wait();
}
idle--;
}

Expand All @@ -1331,31 +1338,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory
ISerializedRoxieQueryPacket *dequeue()
{
CriticalBlock qc(qcrit);
unsigned lim = waiting.ordinality();
if (lim)
{
if (headRegionSize)
{
if (lim > headRegionSize)
lim = headRegionSize;
return waiting.dequeue(fastRand() % lim);
}
return waiting.dequeue();
}
else
return NULL;
}

unsigned getHeadRegionSize() const
{
return headRegionSize;
}

unsigned setHeadRegionSize(unsigned newsize)
{
unsigned ret = headRegionSize;
headRegionSize = newsize;
return ret;
return waiting.dequeue();
}

void noteOrphanIBYTI(const RoxiePacketHeader &hdr)
Expand Down Expand Up @@ -1389,6 +1372,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
Owned<const ITopologyServer> topology;
#endif
AgentContextLogger logctx;
RoxiePacketHeader packetHeader;

public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -1417,41 +1401,41 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
inline void setActivity(IRoxieAgentActivity *act)
{
//Ensure that the activity is released outside of the critical section
Owned<IRoxieAgentActivity> temp(act);
{
CriticalBlock b(actCrit);
activity.swap(temp);
}
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
CriticalBlock b(actCrit);
return packet && packet->queryHeader().matchPacket(h);
}

void abortChannel(unsigned channel)
inline void setPacket(IRoxieQueryPacket *p)
{
Owned<IRoxieQueryPacket> temp(p);
CriticalBlock b(actCrit);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use the same pattern as set Activity to avoid releasing in the crit sec:

  Owned<X> temp(p);
  {
     CriticalSection x;
     packet.swap(temp);

and also worth searching the entire source code for similar examples.

if (packet && packet->queryHeader().channel==channel)
if (p)
{
abortLaunch = true;
#ifndef NEW_IBYTI
if (doIbytiDelay)
ibytiSem.signal();
#endif
if (activity)
activity->abort();
packet.swap(temp);
packetHeader.copy(p->queryHeader());
}
else
{
packetHeader.clear();
packet.swap(temp);
}
}
inline bool match(RoxiePacketHeader &h)
{
// There is a window between getting packet from queue and being able to match it.
// This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!)
return packetHeader.matchPacket(h);
}

bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity)
{
CriticalBlock b(actCrit);
if (packet && packet->queryHeader().matchPacket(h))
if (packetHeader.matchPacket(h))
{
CriticalBlock b(actCrit);
if (!packetHeader.matchPacket(h))
return false;
queryFound = true;
abortLaunch = true;
#ifndef NEW_IBYTI
Expand Down Expand Up @@ -1772,7 +1756,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
#ifdef NEW_IBYTI
logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime());
#endif
packet.setown(next->deserialize());
setPacket(next->deserialize());
next.clear();
RoxiePacketHeader &header = packet->queryHeader();
#ifndef SUBCHANNELS_IN_HEADER
Expand Down Expand Up @@ -1804,8 +1788,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
workerThreadBusy = false;
{
CriticalBlock b(actCrit);
packet.clear();
setPacket(nullptr);
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
#endif
Expand All @@ -1815,12 +1798,11 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(IException *E)
{
CriticalBlock b(actCrit);
EXCLOG(E);
if (packet)
{
throwRemoteException(E, NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
else
E->Release();
Expand All @@ -1830,13 +1812,12 @@ class CRoxieWorker : public CInterface, implements IPooledThread
}
catch(...)
{
CriticalBlock b(actCrit);
Owned<IException> E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread");
EXCLOG(E);
if (packet)
{
throwRemoteException(E.getClear(), NULL, packet, false);
packet.clear();
setPacket(nullptr);
}
#ifndef SUBCHANNELS_IN_HEADER
topology.clear();
Expand All @@ -1851,16 +1832,6 @@ IPooledThread *RoxieQueue::createNew()
return new CRoxieWorker;
}

void RoxieQueue::abortChannel(unsigned channel)
{
Owned<IPooledThreadIterator> wi = workers->running();
ForEach(*wi)
{
CRoxieWorker &w = (CRoxieWorker &) wi->query();
w.abortChannel(channel);
}
}

//=================================================================================

class CallbackEntry : implements IPendingCallback, public CInterface
Expand Down Expand Up @@ -1917,20 +1888,8 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface
public:
IMPLEMENT_IINTERFACE;

RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers)
{
}

virtual unsigned getHeadRegionSize() const
{
return loQueue.getHeadRegionSize();
}

virtual void setHeadRegionSize(unsigned newSize)
RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers), hiQueue(_numWorkers), loQueue(_numWorkers), numWorkers(_numWorkers)
{
slaQueue.setHeadRegionSize(newSize);
hiQueue.setHeadRegionSize(newSize);
loQueue.setHeadRegionSize(newSize);
}

virtual void start()
Expand Down
6 changes: 2 additions & 4 deletions roxie/udplib/udplib.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days
#define RUIDF "0x%.8x"
#define RUID_PING 0
#define RUID_DISCARD 1
#define RUID_FIRST 2
#define RUID_NONE 2
#define RUID_FIRST 3

typedef unsigned RecordLengthType;
#define MAX_RECORD_LENGTH 0xffffffff
Expand Down Expand Up @@ -176,9 +177,6 @@ interface IRoxieOutputQueueManager : public IInterface
virtual bool replyPending(RoxiePacketHeader &x) = 0;
virtual bool abortCompleted(RoxiePacketHeader &x) = 0;

virtual unsigned getHeadRegionSize() const = 0;
virtual void setHeadRegionSize(unsigned newsize) = 0;

virtual void start() = 0;
virtual void stop() = 0;
virtual void join() = 0;
Expand Down
Loading