diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 53a6c53e338..86a901a7c71 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -159,7 +159,7 @@ class RoxiePacketHeader unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph) hash64_t queryHash = 0; // identifies the query - ruid_t uid = 0; // unique id + std::atomic uid = 0; // unique id ServerIdentifier serverId; #ifdef SUBCHANNELS_IN_HEADER ServerIdentifier subChannels[MAX_SUBCHANNEL]; @@ -173,6 +173,7 @@ class RoxiePacketHeader static unsigned getSubChannelMask(unsigned subChannel); unsigned priorityHash() const; + void clear(); void copy(const RoxiePacketHeader &oh); bool matchPacket(const RoxiePacketHeader &oh) const; void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence); @@ -295,7 +296,6 @@ extern unsigned callbackTimeout; extern unsigned lowTimeout; extern unsigned highTimeout; extern unsigned slaTimeout; -extern unsigned headRegionSize; extern unsigned ccdMulticastPort; extern IPropertyTree *topology; extern MapStringTo *preferredClusters; @@ -389,6 +389,7 @@ extern bool ignoreFileDateMismatches; extern bool ignoreFileSizeMismatches; extern int fileTimeFuzzySeconds; extern SinkMode defaultSinkMode; +extern bool limitWaitingWorkers; #if defined(_CONTAINERIZED) || defined(SUBCHANNELS_IN_HEADER) static constexpr bool roxieMulticastEnabled = false; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 9d2680aa8ee..d0af342b917 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -75,7 +75,6 @@ unsigned numRequestArrayThreads = 5; bool blockedLocalAgent = true; bool acknowledgeAllRequests = true; unsigned packetAcknowledgeTimeout = 100; -unsigned headRegionSize; unsigned ccdMulticastPort; bool enableHeartBeat = true; unsigned parallelLoopFlowLimit = 100; @@ -203,6 +202,7 @@ unsigned maxGraphLoopIterations; bool steppingEnabled = true; bool simpleLocalKeyedJoins = true; bool adhocRoxie = false; +bool limitWaitingWorkers = false; unsigned __int64 minFreeDiskSpace = 1024 * 0x100000; // default to 1 GB unsigned socketCheckInterval = 5000; @@ -999,7 +999,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) minPayloadSize = topology->getPropInt("@minPayloadSize", minPayloadSize); blockedLocalAgent = topology->getPropBool("@blockedLocalAgent", blockedLocalAgent); acknowledgeAllRequests = topology->getPropBool("@acknowledgeAllRequests", acknowledgeAllRequests); - headRegionSize = topology->getPropInt("@headRegionSize", 0); packetAcknowledgeTimeout = topology->getPropInt("@packetAcknowledgeTimeout", packetAcknowledgeTimeout); ccdMulticastPort = topology->getPropInt("@multicastPort", CCD_MULTICAST_PORT); statsExpiryTime = topology->getPropInt("@statsExpiryTime", 3600); @@ -1279,6 +1278,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) const char *sinkModeText = topology->queryProp("@sinkMode"); if (sinkModeText) defaultSinkMode = getSinkMode(sinkModeText); + limitWaitingWorkers = topology->getPropBool("@limitWaitingWorkers", limitWaitingWorkers); cacheReportPeriodSeconds = topology->getPropInt("@cacheReportPeriodSeconds", 5*60); setLegacyAES(topology->getPropBool("expert/@useLegacyAES", false)); @@ -1448,7 +1448,6 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) DBGLOG("Loading all packages took %ums", loadPackageTimer.elapsedMs()); ROQ = createOutputQueueManager(numAgentThreads, encryptInTransit); - ROQ->setHeadRegionSize(headRegionSize); ROQ->start(); Owned packetDiscarder = createPacketDiscarder(); #if defined(WIN32) && defined(_DEBUG) && defined(_DEBUG_HEAP_FULL) diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5d9b47ea209..e0ad44803d8 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _ { // Used to create the header to send a callback to originating server or an IBYTI to a buddy activityId = _activityId; - uid = source.uid; + uid.store(source.uid); queryHash = source.queryHash; channel = source.channel; overflowSequence = source.overflowSequence; @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const void RoxiePacketHeader::copy(const RoxiePacketHeader &oh) { - // used for saving away kill packets for later matching by match - uid = oh.uid; + // used for saving away info for later matching by match, without having to lock overflowSequence = oh.overflowSequence; continueSequence = oh.continueSequence; serverId = oh.serverId; channel = oh.channel; + uid.store(oh.uid); // MORE - would it be safer, maybe even faster to copy the rest too? } +void RoxiePacketHeader::clear() +{ + // used for saving away kill packets for later matching by match + uid = RUID_NONE; // Will never match a queued packet +} + bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const { // used when matching up a kill packet against a pending one... @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const ret.appendf(" (fetch part)"); break; } - ret.appendf(" uid=" RUIDF " pri=", uid); + ret.appendf(" uid=" RUIDF " pri=", uid.load()); switch(activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; @@ -1151,8 +1157,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory Owned workers; QueueOf waiting; Semaphore available; + CriticalSection availCrit; // Semaphore post may be slow with a lot of waiters - this crit may be used to limit to a single waiter CriticalSection qcrit; - unsigned headRegionSize; unsigned numWorkers; RelaxedAtomic started; std::atomic idle; @@ -1174,9 +1180,8 @@ class RoxieQueue : public CInterface, implements IThreadFactory public: IMPLEMENT_IINTERFACE; - RoxieQueue(unsigned _headRegionSize, unsigned _numWorkers) + RoxieQueue(unsigned _numWorkers) { - headRegionSize = _headRegionSize; numWorkers = _numWorkers; workers.setown(createThreadPool("RoxieWorkers", this, false, nullptr, numWorkers)); started = 0; @@ -1192,7 +1197,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory virtual IPooledThread *createNew(); - void abortChannel(unsigned channel); void start() { @@ -1319,7 +1323,10 @@ class RoxieQueue : public CInterface, implements IThreadFactory void wait() { idle++; - available.wait(); + { + CLeavableCriticalBlock b(availCrit, limitWaitingWorkers); + available.wait(); + } idle--; } @@ -1331,31 +1338,7 @@ class RoxieQueue : public CInterface, implements IThreadFactory ISerializedRoxieQueryPacket *dequeue() { CriticalBlock qc(qcrit); - unsigned lim = waiting.ordinality(); - if (lim) - { - if (headRegionSize) - { - if (lim > headRegionSize) - lim = headRegionSize; - return waiting.dequeue(fastRand() % lim); - } - return waiting.dequeue(); - } - else - return NULL; - } - - unsigned getHeadRegionSize() const - { - return headRegionSize; - } - - unsigned setHeadRegionSize(unsigned newsize) - { - unsigned ret = headRegionSize; - headRegionSize = newsize; - return ret; + return waiting.dequeue(); } void noteOrphanIBYTI(const RoxiePacketHeader &hdr) @@ -1389,6 +1372,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread Owned topology; #endif AgentContextLogger logctx; + RoxiePacketHeader packetHeader; public: IMPLEMENT_IINTERFACE; @@ -1417,41 +1401,41 @@ class CRoxieWorker : public CInterface, implements IPooledThread } inline void setActivity(IRoxieAgentActivity *act) { - //Ensure that the activity is released outside of the critical section Owned temp(act); { CriticalBlock b(actCrit); activity.swap(temp); } } - inline bool match(RoxiePacketHeader &h) - { - // There is a window between getting packet from queue and being able to match it. - // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) - CriticalBlock b(actCrit); - return packet && packet->queryHeader().matchPacket(h); - } - - void abortChannel(unsigned channel) + inline void setPacket(IRoxieQueryPacket *p) { + Owned temp(p); CriticalBlock b(actCrit); - if (packet && packet->queryHeader().channel==channel) + if (p) { - abortLaunch = true; -#ifndef NEW_IBYTI - if (doIbytiDelay) - ibytiSem.signal(); -#endif - if (activity) - activity->abort(); + packet.swap(temp); + packetHeader.copy(p->queryHeader()); + } + else + { + packetHeader.clear(); + packet.swap(temp); } } + inline bool match(RoxiePacketHeader &h) + { + // There is a window between getting packet from queue and being able to match it. + // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) + return packetHeader.matchPacket(h); + } bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity) { - CriticalBlock b(actCrit); - if (packet && packet->queryHeader().matchPacket(h)) + if (packetHeader.matchPacket(h)) { + CriticalBlock b(actCrit); + if (!packetHeader.matchPacket(h)) + return false; queryFound = true; abortLaunch = true; #ifndef NEW_IBYTI @@ -1772,7 +1756,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread #ifdef NEW_IBYTI logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime()); #endif - packet.setown(next->deserialize()); + setPacket(next->deserialize()); next.clear(); RoxiePacketHeader &header = packet->queryHeader(); #ifndef SUBCHANNELS_IN_HEADER @@ -1804,8 +1788,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread } workerThreadBusy = false; { - CriticalBlock b(actCrit); - packet.clear(); + setPacket(nullptr); #ifndef SUBCHANNELS_IN_HEADER topology.clear(); #endif @@ -1815,12 +1798,11 @@ class CRoxieWorker : public CInterface, implements IPooledThread } catch(IException *E) { - CriticalBlock b(actCrit); EXCLOG(E); if (packet) { throwRemoteException(E, NULL, packet, false); - packet.clear(); + setPacket(nullptr); } else E->Release(); @@ -1830,13 +1812,12 @@ class CRoxieWorker : public CInterface, implements IPooledThread } catch(...) { - CriticalBlock b(actCrit); Owned E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread"); EXCLOG(E); if (packet) { throwRemoteException(E.getClear(), NULL, packet, false); - packet.clear(); + setPacket(nullptr); } #ifndef SUBCHANNELS_IN_HEADER topology.clear(); @@ -1851,16 +1832,6 @@ IPooledThread *RoxieQueue::createNew() return new CRoxieWorker; } -void RoxieQueue::abortChannel(unsigned channel) -{ - Owned wi = workers->running(); - ForEach(*wi) - { - CRoxieWorker &w = (CRoxieWorker &) wi->query(); - w.abortChannel(channel); - } -} - //================================================================================= class CallbackEntry : implements IPendingCallback, public CInterface @@ -1917,20 +1888,8 @@ class RoxieReceiverBase : implements IRoxieOutputQueueManager, public CInterface public: IMPLEMENT_IINTERFACE; - RoxieReceiverBase(unsigned _numWorkers) : slaQueue(headRegionSize, _numWorkers), hiQueue(headRegionSize, _numWorkers), loQueue(headRegionSize, _numWorkers), numWorkers(_numWorkers) - { - } - - virtual unsigned getHeadRegionSize() const - { - return loQueue.getHeadRegionSize(); - } - - virtual void setHeadRegionSize(unsigned newSize) + RoxieReceiverBase(unsigned _numWorkers) : slaQueue(_numWorkers), hiQueue(_numWorkers), loQueue(_numWorkers), numWorkers(_numWorkers) { - slaQueue.setHeadRegionSize(newSize); - hiQueue.setHeadRegionSize(newSize); - loQueue.setHeadRegionSize(newSize); } virtual void start() diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 89190424dd2..56d75c36d35 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days #define RUIDF "0x%.8x" #define RUID_PING 0 #define RUID_DISCARD 1 -#define RUID_FIRST 2 +#define RUID_NONE 2 +#define RUID_FIRST 3 typedef unsigned RecordLengthType; #define MAX_RECORD_LENGTH 0xffffffff @@ -176,9 +177,6 @@ interface IRoxieOutputQueueManager : public IInterface virtual bool replyPending(RoxiePacketHeader &x) = 0; virtual bool abortCompleted(RoxiePacketHeader &x) = 0; - virtual unsigned getHeadRegionSize() const = 0; - virtual void setHeadRegionSize(unsigned newsize) = 0; - virtual void start() = 0; virtual void stop() = 0; virtual void join() = 0;