diff --git a/roxie/ccd/ccd.hpp b/roxie/ccd/ccd.hpp index 77b2dd66d3a..53a6c53e338 100644 --- a/roxie/ccd/ccd.hpp +++ b/roxie/ccd/ccd.hpp @@ -178,7 +178,7 @@ class RoxiePacketHeader void init(const RemoteActivityId &_remoteId, ruid_t _uid, unsigned _channel, unsigned _overflowSequence); StringBuffer &toString(StringBuffer &ret) const; bool allChannelsFailed(); - bool retry(bool ack); + bool retry(); void setException(unsigned subChannel); unsigned thisChannelRetries(unsigned subChannel); @@ -265,8 +265,9 @@ interface IRoxieQueryPacket : extends IInterface virtual void noteTimeSent() = 0; virtual void setAcknowledged() = 0; + virtual void clearAcknowledged() = 0; virtual bool isAcknowledged() const = 0; - virtual bool resendNeeded(unsigned timeout, unsigned now) const = 0; + virtual bool resendNeeded(unsigned now) = 0; }; interface IQueryDll; diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 5e41da7fbbf..b70242587cf 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -204,7 +204,7 @@ bool RoxiePacketHeader::allChannelsFailed() return (retries & mask) == mask; } -bool RoxiePacketHeader::retry(bool ack) +bool RoxiePacketHeader::retry() { bool worthRetrying = false; unsigned mask = SUBCHANNEL_MASK; @@ -213,8 +213,7 @@ bool RoxiePacketHeader::retry(bool ack) { unsigned subRetries = (retries & mask) >> (subChannel * SUBCHANNEL_BITS); if (subRetries != SUBCHANNEL_MASK) - if (!subRetries || !ack) - subRetries++; + subRetries++; if (subRetries != SUBCHANNEL_MASK) worthRetrying = true; retries = (retries & ~mask) | (subRetries << (subChannel * SUBCHANNEL_BITS)); @@ -498,6 +497,7 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa unsigned contextLength = 0; std::atomic timeFirstSent = 0; std::atomic acknowledged = false; + unsigned resends = 0; public: IMPLEMENT_IINTERFACE; @@ -646,14 +646,22 @@ class CRoxieQueryPacket : public CRoxieQueryPacketBase, implements IRoxieQueryPa acknowledged = true; } + virtual void clearAcknowledged() override + { + acknowledged = false; + } + virtual bool isAcknowledged() const override { return acknowledged; } - virtual bool resendNeeded(unsigned timeout, unsigned now) const override + virtual bool resendNeeded(unsigned now) override { - return timeFirstSent && !acknowledged && now-timeFirstSent > timeout; + bool ret = timeFirstSent && !acknowledged && now-timeFirstSent > packetAcknowledgeTimeout*(resends+1); + if (ret) + resends++; + return ret; } }; @@ -2720,6 +2728,8 @@ class RoxieSocketQueueManager : public RoxieReceiverBase #endif Owned packet = createSerializedRoxiePacket(mb); unsigned retries = header.thisChannelRetries(mySubchannel); + if (retries >= SUBCHANNEL_MASK) + return; // I already failed unrecoverably on this request - ignore it if (acknowledgeAllRequests && (header.activityId & ~ROXIE_PRIORITY_MASK) < ROXIE_ACTIVITY_SPECIAL_FIRST) { #ifdef DEBUG @@ -2739,9 +2749,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread assertex(header.channel); // should never see a retry on channel 0 - if (retries >= SUBCHANNEL_MASK) - return; // someone sent a failure or something - ignore it - // Send back an out-of-band immediately, to let Roxie server know that channel is still active if (!(testAgentFailure & 0x800) && !acknowledgeAllRequests) { diff --git a/roxie/ccd/ccdserver.cpp b/roxie/ccd/ccdserver.cpp index 45a76675e46..e81829cd280 100644 --- a/roxie/ccd/ccdserver.cpp +++ b/roxie/ccd/ccdserver.cpp @@ -4158,17 +4158,17 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie } } - void retryPending(unsigned timeout) + void retryPending() { checkDelayed(); unsigned now = 0; - if (timeout) + if (acknowledgeAllRequests) { - if (doTrace(traceRoxiePackets)) - DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); now = msTick(); - if (now-lastRetryCheck < timeout/4) + if (now-lastRetryCheck < packetAcknowledgeTimeout/4) return; + if (doTrace(traceRoxiePackets)) + DBGLOG("Checking %d pending packets for ack status", pending.ordinality()); lastRetryCheck = now; } CriticalBlock b(pendingCrit); @@ -4180,16 +4180,16 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie IRoxieQueryPacket *i = p.queryPacket(); if (i) { - if (timeout) + if (acknowledgeAllRequests) { - if (!i->resendNeeded(timeout, now)) + if (!i->resendNeeded(now)) continue; if (doTrace(traceAcknowledge) || doTrace(traceRoxiePackets)) - activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", timeout); + activity.queryLogCtx().CTXLOG("Input has not been acknowledged for %u ms - retry required?", packetAcknowledgeTimeout); activity.noteStatistic(StNumAckRetries, 1); } - if (!i->queryHeader().retry(timeout!=0)) + if (!i->queryHeader().retry()) { StringBuffer s; IException *E = MakeStringException(ROXIE_MULTICAST_ERROR, "Failed to get response from agent(s) for %s in activity %d", i->queryHeader().toString(s).str(), activity.queryId()); @@ -5023,7 +5023,7 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie activity.queryContext()->checkAbort(); if (acknowledgeAllRequests && !localAgent) { - retryPending(checkInterval); + retryPending(); } bool anyActivity; if (ctxTraceLevel > 5) @@ -5235,7 +5235,9 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie Owned exceptionData = mr->getCursor(rowManager); throwRemoteException(exceptionData); } - // Leave it on pending queue in original location + // One channel has failed, but should be recoverable + // Leave it on pending queue in original location, but clear acknowledged flag + op->clearAcknowledged(); break; case ROXIE_ALIVE: @@ -5315,11 +5317,11 @@ class CRemoteResultAdaptor : implements IEngineRowStream, implements IFinalRoxie else if (!anyActivity && !localAgent && !acknowledgeAllRequests) { unsigned timeNow = msTick(); - if (timeNow-lastActivity >= checkInterval) + if (timeNow-lastActivity >= timeout) { lastActivity = timeNow; - activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", checkInterval); - retryPending(0); + activity.queryLogCtx().CTXLOG("Input has stalled for %u ms - retry required?", timeout); + retryPending(); } } }