Skip to content

Commit

Permalink
Merge pull request #19050 from richardkchapman/HPCC-32540
Browse files Browse the repository at this point in the history
HPCC-32540 Roxie may flood NIC of target agents if no agents running on a channel

Reviewed-by: Mark Kelly mark.kelly@lexisnexisrisk.com
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Merged-by: Gavin Halliday <ghalliday@hpccsystems.com>
  • Loading branch information
ghalliday authored Oct 9, 2024
2 parents de8b4d8 + c348e4f commit 66d1b8c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 24 deletions.
5 changes: 3 additions & 2 deletions roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class RoxiePacketHeader
void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence);
StringBuffer &toString(StringBuffer &ret) const;
bool allChannelsFailed();
bool retry(bool ack);
bool retry();
void setException(unsigned subChannel);
unsigned thisChannelRetries(unsigned subChannel);

Expand Down Expand Up @@ -265,8 +265,9 @@ interface IRoxieQueryPacket : extends IInterface

virtual void noteTimeSent() = 0;
virtual void setAcknowledged() = 0;
virtual void clearAcknowledged() = 0;
virtual bool isAcknowledged() const = 0;
virtual bool resendNeeded(unsigned timeout, unsigned now) const = 0;
virtual bool resendNeeded(unsigned now) = 0;
};

interface IQueryDll;
Expand Down
23 changes: 15 additions & 8 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ bool RoxiePacketHeader::allChannelsFailed()
return (retries & mask) == mask;
}

bool RoxiePacketHeader::retry(bool ack)
bool RoxiePacketHeader::retry()
{
bool worthRetrying = false;
unsigned mask = SUBCHANNEL_MASK;
Expand All @@ -213,8 +213,7 @@ bool RoxiePacketHeader::retry(bool ack)
{
unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS);
if (subRetries != SUBCHANNEL_MASK)
if (!subRetries || !ack)
subRetries++;
subRetries++;
if (subRetries != SUBCHANNEL_MASK)
worthRetrying = true;
retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS));
Expand Down Expand Up @@ -498,6 +497,7 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa
unsigned contextLength = 0;
std::atomic<unsigned> timeFirstSent = 0;
std::atomic<bool> acknowledged = false;
unsigned resends = 0;

public:
IMPLEMENT_IINTERFACE;
Expand Down Expand Up @@ -646,14 +646,22 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa
acknowledged = true;
}

virtual void clearAcknowledged() override
{
acknowledged = false;
}

virtual bool isAcknowledged() const override
{
return acknowledged;
}

virtual bool resendNeeded(unsigned timeout, unsigned now) const override
virtual bool resendNeeded(unsigned now) override
{
return timeFirstSent && !acknowledged && now-timeFirstSent > timeout;
bool ret = timeFirstSent && !acknowledged && now-timeFirstSent > packetAcknowledgeTimeout*(resends+1);
if (ret)
resends++;
return ret;
}
};

Expand Down Expand Up @@ -2724,6 +2732,8 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
#endif
Owned<ISerializedRoxieQueryPacket> packet = createSerializedRoxiePacket(mb);
unsigned retries = header.thisChannelRetries(mySubchannel);
if (retries >= SUBCHANNEL_MASK)
return; // I already failed unrecoverably on this request - ignore it
if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST)
{
#ifdef DEBUG
Expand All @@ -2743,9 +2753,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase
{
// MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
assertex(header.channel); // should never see a retry on channel 0
if (retries >= SUBCHANNEL_MASK)
return; // someone sent a failure or something - ignore it

// Send back an out-of-band immediately, to let Roxie server know that channel is still active
if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests)
{
Expand Down
30 changes: 16 additions & 14 deletions roxie/ccd/ccdserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4158,17 +4158,17 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
}
}

void retryPending(unsigned timeout)
void retryPending()
{
checkDelayed();
unsigned now = 0;
if (timeout)
if (acknowledgeAllRequests)
{
if (doTrace(traceRoxiePackets))
DBGLOG("Checking %d pending packets for ack status", pending.ordinality());
now = msTick();
if (now-lastRetryCheck < timeout/4)
if (now-lastRetryCheck < packetAcknowledgeTimeout/4)
return;
if (doTrace(traceRoxiePackets))
DBGLOG("Checking %d pending packets for ack status", pending.ordinality());
lastRetryCheck = now;
}
CriticalBlock b(pendingCrit);
Expand All @@ -4180,16 +4180,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
IRoxieQueryPacket *i = p.queryPacket();
if (i)
{
if (timeout)
if (acknowledgeAllRequests)
{
if (!i->resendNeeded(timeout, now))
if (!i->resendNeeded(now))
continue;
if (doTrace(traceAcknowledge) || doTrace(traceRoxiePackets))
activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", timeout);
activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", packetAcknowledgeTimeout);
activity.noteStatistic(StNumAckRetries, 1);

}
if (!i->queryHeader().retry(timeout!=0))
if (!i->queryHeader().retry())
{
StringBuffer s;
IException *E = MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to get response from agent(s) for %s in activity %d", i->queryHeader().toString(s).str(), activity.queryId());
Expand Down Expand Up @@ -5026,7 +5026,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
activity.queryContext()->checkAbort();
if (acknowledgeAllRequests && !localAgent)
{
retryPending(checkInterval);
retryPending();
}
bool anyActivity;
if (ctxTraceLevel > 5)
Expand Down Expand Up @@ -5238,7 +5238,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
Owned<IMessageUnpackCursor> exceptionData = mr->getCursor(rowManager);
throwRemoteException(exceptionData);
}
// Leave it on pending queue in original location
// One channel has failed, but should be recoverable
// Leave it on pending queue in original location, but clear acknowledged flag
op->clearAcknowledged();
break;

case ROXIE_ALIVE:
Expand Down Expand Up @@ -5318,11 +5320,11 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie
else if (!anyActivity && !localAgent && !acknowledgeAllRequests)
{
unsigned timeNow = msTick();
if (timeNow-lastActivity >= checkInterval)
if (timeNow-lastActivity >= timeout)
{
lastActivity = timeNow;
activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", checkInterval);
retryPending(0);
activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", timeout);
retryPending();
}
}
}
Expand Down

0 comments on commit 66d1b8c

Please sign in to comment.