From bf120afa47c1fade137bb475c9f4064fbfd94a9f Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 28 Aug 2024 13:31:10 +0100 Subject: [PATCH 1/3] HPCC-32540 Refactor roxie retryPending code to make backoff possible This change should not affect functionality at all, but makes it possible to make future changes allowing per-channel back-off. Signed-off-by: Richard Chapman --- roxie/ccd/ccdserver.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 45a76675e46..a872fc3828d 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4158,16 +4158,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie } } - void retryPending(unsigned timeout) + void retryPending() { checkDelayed(); unsigned now = 0; - if (timeout) + if (acknowledgeAllRequests) { if (doTrace(traceRoxiePackets)) DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); now = msTick(); - if (now-lastRetryCheck < timeout/4) + if (now-lastRetryCheck < packetAcknowledgeTimeout/4) return; lastRetryCheck = now; } @@ -4180,16 +4180,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie IRoxieQueryPacket *i = p.queryPacket(); if (i) { - if (timeout) + if (acknowledgeAllRequests) { - if (!i->resendNeeded(timeout, now)) + if (!i->resendNeeded(packetAcknowledgeTimeout, now)) continue; if (doTrace(traceAcknowledge) || doTrace(traceRoxiePackets)) - activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", timeout); + activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", packetAcknowledgeTimeout); activity.noteStatistic(StNumAckRetries, 1); } - if (!i->queryHeader().retry(timeout!=0)) + if (!i->queryHeader().retry(acknowledgeAllRequests)) { StringBuffer s; IException *E = MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to get response from agent(s) for %s in activity %d", i->queryHeader().toString(s).str(), activity.queryId()); @@ -5023,7 +5023,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie activity.queryContext()->checkAbort(); if (acknowledgeAllRequests && !localAgent) { - retryPending(checkInterval); + retryPending(); } bool anyActivity; if (ctxTraceLevel > 5) @@ -5319,7 +5319,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie { lastActivity = timeNow; activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", checkInterval); - retryPending(0); + retryPending(); } } } From ae93d7ad28812bd7649cee2e18cc70a3a17e4dce Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 28 Aug 2024 14:23:36 +0100 Subject: [PATCH 2/3] HPCC-32540 Don't retry indefinitely when a server request is not acknowledged Signed-off-by: Richard Chapman --- roxie/ccd/ccd.hpp | 4 ++-- roxie/ccd/ccdqueue.cpp | 13 ++++++++----- roxie/ccd/ccdserver.cpp | 8 ++++---- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..7bac891cc3e 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -178,7 +178,7 @@ class RoxiePacketHeader void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence); StringBuffer &toString(StringBuffer &ret) const; bool allChannelsFailed(); - bool retry(bool ack); + bool retry(); void setException(unsigned subChannel); unsigned thisChannelRetries(unsigned subChannel); @@ -266,7 +266,7 @@ interface IRoxieQueryPacket : extends IInterface virtual void noteTimeSent() = 0; virtual void setAcknowledged() = 0; virtual bool isAcknowledged() const = 0; - virtual bool resendNeeded(unsigned timeout, unsigned now) const = 0; + virtual bool resendNeeded(unsigned now) = 0; }; interface IQueryDll; diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5e41da7fbbf..f74004bfeb8 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -204,7 +204,7 @@ bool RoxiePacketHeader::allChannelsFailed() return (retries & mask) == mask; } -bool RoxiePacketHeader::retry(bool ack) +bool RoxiePacketHeader::retry() { bool worthRetrying = false; unsigned mask = SUBCHANNEL_MASK; @@ -213,8 +213,7 @@ bool RoxiePacketHeader::retry(bool ack) { unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS); if (subRetries != SUBCHANNEL_MASK) - if (!subRetries || !ack) - subRetries++; + subRetries++; if (subRetries != SUBCHANNEL_MASK) worthRetrying = true; retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS)); @@ -498,6 +497,7 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa unsigned contextLength = 0; std::atomic timeFirstSent = 0; std::atomic acknowledged = false; + unsigned resends = 0; public: IMPLEMENT_IINTERFACE; @@ -651,9 +651,12 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa return acknowledged; } - virtual bool resendNeeded(unsigned timeout, unsigned now) const override + virtual bool resendNeeded(unsigned now) override { - return timeFirstSent && !acknowledged && now-timeFirstSent > timeout; + bool ret = timeFirstSent && !acknowledged && now-timeFirstSent > packetAcknowledgeTimeout*(resends+1); + if (ret) + resends++; + return ret; } }; diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index a872fc3828d..0a9ac8869dc 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4182,14 +4182,14 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie { if (acknowledgeAllRequests) { - if (!i->resendNeeded(packetAcknowledgeTimeout, now)) + if (!i->resendNeeded(now)) continue; if (doTrace(traceAcknowledge) || doTrace(traceRoxiePackets)) activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", packetAcknowledgeTimeout); activity.noteStatistic(StNumAckRetries, 1); } - if (!i->queryHeader().retry(acknowledgeAllRequests)) + if (!i->queryHeader().retry()) { StringBuffer s; IException *E = MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to get response from agent(s) for %s in activity %d", i->queryHeader().toString(s).str(), activity.queryId()); @@ -5315,10 +5315,10 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie else if (!anyActivity && !localAgent && !acknowledgeAllRequests) { unsigned timeNow = msTick(); - if (timeNow-lastActivity >= checkInterval) + if (timeNow-lastActivity >= timeout) { lastActivity = timeNow; - activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", checkInterval); + activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", timeout); retryPending(); } } From c348e4f9b9af137f7a89aa0550c0e53494a80172 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 4 Sep 2024 14:51:24 +0100 Subject: [PATCH 3/3] HPCC-32504 Recoverable failures on agents not handled properly with acknowledge enabled Signed-off-by: Richard Chapman --- roxie/ccd/ccd.hpp | 1 + roxie/ccd/ccdqueue.cpp | 10 +++++++--- roxie/ccd/ccdserver.cpp | 8 +++++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 7bac891cc3e..53a6c53e338 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -265,6 +265,7 @@ interface IRoxieQueryPacket : extends IInterface virtual void noteTimeSent() = 0; virtual void setAcknowledged() = 0; + virtual void clearAcknowledged() = 0; virtual bool isAcknowledged() const = 0; virtual bool resendNeeded(unsigned now) = 0; }; diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index f74004bfeb8..b70242587cf 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -646,6 +646,11 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa acknowledged = true; } + virtual void clearAcknowledged() override + { + acknowledged = false; + } + virtual bool isAcknowledged() const override { return acknowledged; @@ -2723,6 +2728,8 @@ class RoxieSocketQueueManager : public RoxieReceiverBase #endif Owned packet = createSerializedRoxiePacket(mb); unsigned retries = header.thisChannelRetries(mySubchannel); + if (retries >= SUBCHANNEL_MASK) + return; // I already failed unrecoverably on this request - ignore it if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) { #ifdef DEBUG @@ -2742,9 +2749,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread assertex(header.channel); // should never see a retry on channel 0 - if (retries >= SUBCHANNEL_MASK) - return; // someone sent a failure or something - ignore it - // Send back an out-of-band immediately, to let Roxie server know that channel is still active if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests) { diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 0a9ac8869dc..e81829cd280 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4164,11 +4164,11 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie unsigned now = 0; if (acknowledgeAllRequests) { - if (doTrace(traceRoxiePackets)) - DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); now = msTick(); if (now-lastRetryCheck < packetAcknowledgeTimeout/4) return; + if (doTrace(traceRoxiePackets)) + DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); lastRetryCheck = now; } CriticalBlock b(pendingCrit); @@ -5235,7 +5235,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie Owned exceptionData = mr->getCursor(rowManager); throwRemoteException(exceptionData); } - // Leave it on pending queue in original location + // One channel has failed, but should be recoverable + // Leave it on pending queue in original location, but clear acknowledged flag + op->clearAcknowledged(); break; case ROXIE_ALIVE: