From a22655b79c3935419ca3709be09026c2ce02d71f Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 2 Oct 2024 13:43:45 +0100 Subject: [PATCH] HPCC-XXXXX RoxieSocketQueueManager::run may be blocked by actCrit Avoid need to obtain critsec just to check if a worker thread's packet matches Signed-off-by: Richard Chapman --- roxie/ccd/ccd.hpp | 3 +- roxie/ccd/ccdqueue.cpp | 76 +++++++++++++++++++---------------------- roxie/udplib/udplib.hpp | 3 +- 3 files changed, 39 insertions(+), 43 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..bd5ca6dccab 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -159,7 +159,7 @@ class RoxiePacketHeader unsigned activityId = 0; // identifies the helper factory to be used (activityId in graph) hash64_t queryHash = 0; // identifies the query - ruid_t uid = 0; // unique id + std::atomic uid = 0; // unique id ServerIdentifier serverId; #ifdef SUBCHANNELS_IN_HEADER ServerIdentifier subChannels[MAX_SUBCHANNEL]; @@ -173,6 +173,7 @@ class RoxiePacketHeader static unsigned getSubChannelMask(unsigned subChannel); unsigned priorityHash() const; + void clear(); void copy(const RoxiePacketHeader &oh); bool matchPacket(const RoxiePacketHeader &oh) const; void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence); diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5e41da7fbbf..ec03c8a988e 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -56,7 +56,7 @@ RoxiePacketHeader::RoxiePacketHeader(const RoxiePacketHeader &source, unsigned _ { // Used to create the header to send a callback to originating server or an IBYTI to a buddy activityId = _activityId; - uid = source.uid; + uid.store(source.uid); queryHash = source.queryHash; channel = source.channel; overflowSequence = source.overflowSequence; @@ -88,15 +88,21 @@ unsigned RoxiePacketHeader::priorityHash() const void RoxiePacketHeader::copy(const RoxiePacketHeader &oh) { - // used for saving away kill packets for later matching by match - uid = oh.uid; + // used for saving away info for later matching by match, without having to lock overflowSequence = oh.overflowSequence; continueSequence = oh.continueSequence; serverId = oh.serverId; channel = oh.channel; + uid.store(oh.uid); // MORE - would it be safer, maybe even faster to copy the rest too? } +void RoxiePacketHeader::clear() +{ + // used for saving away kill packets for later matching by match + uid = RUID_NONE; // Will never match a queued packet +} + bool RoxiePacketHeader::matchPacket(const RoxiePacketHeader &oh) const { // used when matching up a kill packet against a pending one... @@ -156,7 +162,7 @@ StringBuffer &RoxiePacketHeader::toString(StringBuffer &ret) const ret.appendf(" (fetch part)"); break; } - ret.appendf(" uid=" RUIDF " pri=", uid); + ret.appendf(" uid=" RUIDF " pri=", uid.load()); switch(activityId & ROXIE_PRIORITY_MASK) { case ROXIE_SLA_PRIORITY: ret.append("SLA"); break; @@ -1184,7 +1190,6 @@ class RoxieQueue : public CInterface, implements IThreadFactory virtual IPooledThread *createNew(); - void abortChannel(unsigned channel); void start() { @@ -1381,6 +1386,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread Owned topology; #endif AgentContextLogger logctx; + RoxiePacketHeader packetHeader; public: IMPLEMENT_IINTERFACE; @@ -1409,37 +1415,38 @@ class CRoxieWorker : public CInterface, implements IPooledThread } inline void setActivity(IRoxieAgentActivity *act) { + // Take care to avoid having destructor called inside critsec + Owned goer; CriticalBlock b(actCrit); + goer.swap(activity); activity.setown(act); } - inline bool match(RoxiePacketHeader &h) - { - // There is a window between getting packet from queue and being able to match it. - // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) - CriticalBlock b(actCrit); - return packet && packet->queryHeader().matchPacket(h); - } - - void abortChannel(unsigned channel) + inline void setPacket(IRoxieQueryPacket *p) { CriticalBlock b(actCrit); - if (packet && packet->queryHeader().channel==channel) + if (p) { - abortLaunch = true; -#ifndef NEW_IBYTI - if (doIbytiDelay) - ibytiSem.signal(); -#endif - if (activity) - activity->abort(); + packet.setown(p); + packetHeader.copy(p->queryHeader()); + } + else + { + packetHeader.clear(); + packet.setown(p); } } + inline bool match(RoxiePacketHeader &h) + { + // There is a window between getting packet from queue and being able to match it. + // This could cause some deduping to fail, but it does not matter if it does (so long as it is rare!) + return packetHeader.matchPacket(h); + } bool checkAbort(RoxiePacketHeader &h, bool checkRank, bool &queryFound, bool &preActivity) { - CriticalBlock b(actCrit); - if (packet && packet->queryHeader().matchPacket(h)) + if (packetHeader.matchPacket(h)) { + CriticalBlock b(actCrit); queryFound = true; abortLaunch = true; #ifndef NEW_IBYTI @@ -1760,7 +1767,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread #ifdef NEW_IBYTI logctx.setStatistic(StTimeIBYTIDelay, next->queryIBYTIDelayTime()); #endif - packet.setown(next->deserialize()); + setPacket(next->deserialize()); next.clear(); RoxiePacketHeader &header = packet->queryHeader(); #ifndef SUBCHANNELS_IN_HEADER @@ -1792,8 +1799,7 @@ class CRoxieWorker : public CInterface, implements IPooledThread } workerThreadBusy = false; { - CriticalBlock b(actCrit); - packet.clear(); + setPacket(nullptr); #ifndef SUBCHANNELS_IN_HEADER topology.clear(); #endif @@ -1803,12 +1809,11 @@ class CRoxieWorker : public CInterface, implements IPooledThread } catch(IException *E) { - CriticalBlock b(actCrit); EXCLOG(E); if (packet) { throwRemoteException(E, NULL, packet, false); - packet.clear(); + setPacket(nullptr); } else E->Release(); @@ -1818,13 +1823,12 @@ class CRoxieWorker : public CInterface, implements IPooledThread } catch(...) { - CriticalBlock b(actCrit); Owned E = MakeStringException(ROXIE_INTERNAL_ERROR, "Unexpected exception in Roxie worker thread"); EXCLOG(E); if (packet) { throwRemoteException(E.getClear(), NULL, packet, false); - packet.clear(); + setPacket(nullptr); } #ifndef SUBCHANNELS_IN_HEADER topology.clear(); @@ -1839,16 +1843,6 @@ IPooledThread *RoxieQueue::createNew() return new CRoxieWorker; } -void RoxieQueue::abortChannel(unsigned channel) -{ - Owned wi = workers->running(); - ForEach(*wi) - { - CRoxieWorker &w = (CRoxieWorker &) wi->query(); - w.abortChannel(channel); - } -} - //================================================================================= class CallbackEntry : implements IPendingCallback, public CInterface diff --git a/roxie/udplib/udplib.hpp b/roxie/udplib/udplib.hpp index 89190424dd2..ebd674376df 100644 --- a/roxie/udplib/udplib.hpp +++ b/roxie/udplib/udplib.hpp @@ -33,7 +33,8 @@ typedef unsigned ruid_t; // at 1000/sec recycle every 49 days #define RUIDF "0x%.8x" #define RUID_PING 0 #define RUID_DISCARD 1 -#define RUID_FIRST 2 +#define RUID_NONE 2 +#define RUID_FIRST 3 typedef unsigned RecordLengthType; #define MAX_RECORD_LENGTH 0xffffffff