From 73571f6fe2ffcadd697c521569910b7fe2a01475 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 9 Oct 2024 16:48:45 +0100 Subject: [PATCH] WIP Signed-off-by: Richard Chapman --- roxie/ccd/ccdqueue.cpp | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/roxie/ccd/ccdqueue.cpp b/roxie/ccd/ccdqueue.cpp index 4167c0e3b82..60b8f79746b 100644 --- a/roxie/ccd/ccdqueue.cpp +++ b/roxie/ccd/ccdqueue.cpp @@ -837,7 +837,12 @@ extern ISerializedRoxieQueryPacket *createSerializedRoxiePacket(MemoryBuffer &m) { unsigned length = m.length(); // don't make assumptions about evaluation order of parameters... if (true || encryptInTransit) - return new CSerializedRoxieQueryPacket(m.detachOwn(), length); + { +// void *data = m.detachOwn(); + void *data = malloc(length); + memcpy(data, m.bufferBase(), length); + return new CSerializedRoxieQueryPacket(data, length); + } else return new CNocryptRoxieQueryPacket(m.detachOwn(), length); } @@ -2393,7 +2398,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase DelayedPacketQueueManager delayed; #endif - class WorkerUdpTracker : public TimeDivisionTracker<11, false> + class WorkerUdpTracker : public TimeDivisionTracker<12, false> { public: enum @@ -2408,10 +2413,11 @@ class RoxieSocketQueueManager : public RoxieReceiverBase decoding, acknowledging, retrying, - creatingPacket + creatingPacket, + getTimeout }; - WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<11, false>(name, reportIntervalSeconds) + WorkerUdpTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<12, false>(name, reportIntervalSeconds) { stateNames[other] = "other"; stateNames[waiting] = "waiting"; @@ -2424,6 +2430,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase stateNames[acknowledging] = "acknowledging"; stateNames[retrying] = "retrying"; stateNames[creatingPacket] = "creating packet"; + stateNames[getTimeout] = "getting timeout"; } } timeTracker; @@ -2799,7 +2806,9 @@ class RoxieSocketQueueManager : public RoxieReceiverBase { StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str()); } - WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::pushing); + division.switchState(WorkerUdpTracker::creatingPacket); + Owned packet = createSerializedRoxiePacket(mb); + division.switchState(WorkerUdpTracker::pushing); #ifdef NEW_IBYTI // It's debatable whether we should delay for the primary here - they had one chance already... // But then again, so did we, assuming the timeout is longer than the IBYTIdelay @@ -2809,7 +2818,6 @@ class RoxieSocketQueueManager : public RoxieReceiverBase for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++) delay += getIbytiDelay(header.subChannels[subChannel]); } - Owned packet = createSerializedRoxiePacket(mb); if (delay) delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay); else @@ -2849,12 +2857,12 @@ class RoxieSocketQueueManager : public RoxieReceiverBase WorkerUdpTracker::TimeDivision division(timeTracker, WorkerUdpTracker::other); for (;;) { - mb.clear(); try { // NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow // DO NOT put tracing on this thread except at very high tracelevels! #ifdef NEW_IBYTI + division.switchState(WorkerUdpTracker::getTimeout); unsigned timeout = delayed.timeout(msTick()); if (timeout>5000) timeout = 5000; @@ -2862,6 +2870,7 @@ class RoxieSocketQueueManager : public RoxieReceiverBase unsigned timeout = 5000; #endif division.switchState(WorkerUdpTracker::allocating); + mb.clear(); void * buffer = mb.reserve(maxPacketSize); division.switchState(WorkerUdpTracker::waiting); @@ -2923,11 +2932,17 @@ class RoxieSocketQueueManager : public RoxieReceiverBase } #ifdef NEW_IBYTI division.switchState(WorkerUdpTracker::checkingExpired); - delayed.checkExpired(msTick(), slaQueue, hiQueue, loQueue); + unsigned now = msTick(); + if (now != lastCheck) + { + lastCheck = now; + delayed.checkExpired(now, slaQueue, hiQueue, loQueue); + } #endif } return 0; } + unsigned lastCheck = 0; void start() {