Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Update queue read logic
Browse files Browse the repository at this point in the history
Enclose the queue data reading logic
in an automatic retry task to better
handle exceptions and errors.
  • Loading branch information
alexjhawk committed Jun 8, 2022
1 parent a46e9d8 commit bbb8cb9
Showing 1 changed file with 184 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import com.ewon.ewonitf.TagControl;
import com.hms_networks.americas.sc.extensions.config.exceptions.ConfigFileException;
import com.hms_networks.americas.sc.extensions.config.exceptions.ConfigFileWriteException;
import com.hms_networks.americas.sc.extensions.historicaldata.CircularizedFileException;
import com.hms_networks.americas.sc.extensions.historicaldata.EbdTimeoutException;
import com.hms_networks.americas.sc.extensions.historicaldata.HistoricalDataQueueManager;
import com.hms_networks.americas.sc.extensions.historicaldata.TimeTrackerUnrecoverableException;
import com.hms_networks.americas.sc.extensions.json.JSONException;
import com.hms_networks.americas.sc.extensions.logging.Logger;
import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryCode;
import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryCodeLinear;
import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryResult;
import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryState;
import com.hms_networks.americas.sc.extensions.system.http.SCHttpUtility;
import com.hms_networks.americas.sc.extensions.system.tags.SCTagUtils;
import com.hms_networks.americas.sc.extensions.system.time.SCTimeUnit;
Expand Down Expand Up @@ -120,99 +127,188 @@ private static void runGrabData() {
}
}

// Create queue poll retry task
final long queuePollRetryTaskLinearSlopeMillis = 1000;
final long queuePollRetryTaskMaxDelayMillisBeforeRetry = 3000;
final int queuePollRetryTaskMaxRetries = 3;
AutomaticRetryCode queuePollRetryTask =
new AutomaticRetryCodeLinear() {
private int tryCounter = 0;

protected long getLinearSlopeMillis() {
return queuePollRetryTaskLinearSlopeMillis;
}

protected long getMaxDelayMillisBeforeRetry() {
return queuePollRetryTaskMaxDelayMillisBeforeRetry;
}

protected int getMaxRetries() {
return queuePollRetryTaskMaxRetries;
}

protected void codeToRetry() {
try {
// Increment try counter
tryCounter++;

// Check if a queue force reset has been requested
boolean forceReset = false;
if (queueDiagnosticForceResetTag != null) {
forceReset =
queueDiagnosticForceResetTag.getTagValueAsInt()
== TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_TRUE_VALUE;

if (forceReset) {
Logger.LOG_SERIOUS(
"A force reset of the queue has been requested using the tag `"
+ TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_NAME
+ "`. A new queue time tracker will be created at the current time!");
}
}

// Read data points from queue
ArrayList datapointsReadFromQueue;
if (HistoricalDataQueueManager.doesTimeTrackerExist() && !forceReset) {
final boolean startNewTimeTracker = false;
datapointsReadFromQueue =
HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker);
} else {
final boolean startNewTimeTracker = true;
datapointsReadFromQueue =
HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker);
}

// Reset queue force reset tag value to false, if true
if (queueDiagnosticForceResetTag != null && forceReset) {
queueDiagnosticForceResetTag.setTagValueAsInt(
TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_FALSE_VALUE);
}

Logger.LOG_DEBUG(
"Read "
+ datapointsReadFromQueue.size()
+ " data points from the historical log.");

// Send data to Thingworx
TWDataManager.addDataPointsToPending(datapointsReadFromQueue);

// Check if queue is behind
try {
long queueBehindMillis = HistoricalDataQueueManager.getQueueTimeBehindMillis();
if (queueBehindMillis >= TWConnectorConsts.QUEUE_DATA_POLL_BEHIND_MILLIS_WARN) {
String timeBehindString =
SCTimeUtils.getDayHourMinSecsForMillis(
(int) queueBehindMillis,
DAYS_STRING,
HOURS_STRING,
MINUTES_STRING,
SECONDS_STRING);
Logger.LOG_WARN(
"The historical data queue is running behind by " + timeBehindString + ".");
} else {
// Queue is not past running behind threshold, reset to 0 for updating debug tag
queueBehindMillis = 0;
}

// Enable queue double data rate if running behind
if (!queueDoubleDataRateEnabled && queueBehindMillis > 0) {
long doubleQueueDataPollSizeMins =
connectorConfig.getQueueDataPollSizeMinutes() * 2;
HistoricalDataQueueManager.setQueueFifoTimeSpanMins(
doubleQueueDataPollSizeMins);
Logger.LOG_SERIOUS(
"The size of data polled on each interval has been doubled to "
+ doubleQueueDataPollSizeMins
+ " minutes while the queue is running behind!");
queueDoubleDataRateEnabled = true;
}
// Disable queue double data rate if no longer running behind
else if (queueDoubleDataRateEnabled && queueBehindMillis == 0) {
long queueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes();
HistoricalDataQueueManager.setQueueFifoTimeSpanMins(queueDataPollSizeMins);
Logger.LOG_SERIOUS(
"The size of data polled on each interval has been restored to "
+ queueDataPollSizeMins
+ " minutes.");
queueDoubleDataRateEnabled = false;
}

// Update queue debug tag
if (queueDiagnosticRunningBehindSecondsTag != null) {
queueDiagnosticRunningBehindSecondsTag.setTagValueAsLong(
SCTimeUnit.MILLISECONDS.toSeconds(queueBehindMillis));
}

setState(AutomaticRetryState.FINISHED);
if (getCurrentTryNumber() > 1) {
Logger.LOG_SERIOUS(
"Successfully retried and read data from the historical log. This is attempt #"
+ tryCounter
+ " of "
+ getMaxRetries()
+ ".");
}
} catch (IOException e) {
Logger.LOG_SERIOUS(
"Unable to detect if historical data queue is running behind.");
Logger.LOG_EXCEPTION(e);
}

} catch (Exception e) {
Logger.LOG_CRITICAL(
"An error occurred while reading data from the historical log. This is attempt #"
+ tryCounter
+ " of "
+ getMaxRetries()
+ ".");
Logger.LOG_EXCEPTION(e);
setState(AutomaticRetryState.ERROR_RETRY);
}
}
};

// Run queue poll retry task
AutomaticRetryResult queuePollRetryTaskResult = null;
try {
// Check if a queue force reset has been requested
boolean forceReset = false;
if (queueDiagnosticForceResetTag != null) {
forceReset =
queueDiagnosticForceResetTag.getTagValueAsInt()
== TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_TRUE_VALUE;

if (forceReset) {
Logger.LOG_SERIOUS(
"A force reset of the queue has been requested using the tag `"
+ TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_NAME
+ "`. A new queue time tracker will be created at the current time!");
}
}

// Read data points from queue
ArrayList datapointsReadFromQueue;
if (HistoricalDataQueueManager.doesTimeTrackerExist() && !forceReset) {
final boolean startNewTimeTracker = false;
datapointsReadFromQueue =
HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker);
} else {
final boolean startNewTimeTracker = true;
datapointsReadFromQueue =
HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker);
}

// Reset queue force reset tag value to false, if true
if (queueDiagnosticForceResetTag != null && forceReset) {
queueDiagnosticForceResetTag.setTagValueAsInt(
TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_FALSE_VALUE);
}

Logger.LOG_DEBUG(
"Read " + datapointsReadFromQueue.size() + " data points from the historical log.");

// Send data to Thingworx
TWDataManager.addDataPointsToPending(datapointsReadFromQueue);
queuePollRetryTaskResult = queuePollRetryTask.run();
} catch (InterruptedException e) {
Logger.LOG_CRITICAL(
"An interruption prevented reading data from the historical log from completing!");
Logger.LOG_EXCEPTION(e);
}

// Check if queue is behind
// Check queue poll retry task result
boolean doQueueAdvanceTracker = false;
if (queuePollRetryTaskResult == AutomaticRetryResult.FAIL_RETRY_LIMIT) {
doQueueAdvanceTracker = true;
Logger.LOG_CRITICAL(
"Reading data from the historical log was unsuccessful after "
+ queuePollRetryTaskMaxRetries
+ " retries. Skipping to next time interval - data loss may result!");
} else if (queuePollRetryTaskResult == AutomaticRetryResult.FAIL_CRITICAL_STOP) {
doQueueAdvanceTracker = true;
Logger.LOG_CRITICAL(
"Reading data from the historical log failed due to an exception. "
+ "Skipping to next time interval - data loss may result!");
} else if (queuePollRetryTaskResult == null) {
Logger.LOG_CRITICAL(
"Reading data from the historical log failed to run for an unknown reason!");
}
if (doQueueAdvanceTracker) {
Logger.LOG_CRITICAL(
"The historical data queue time tracker has been advanced to the next interval!");
try {
long queueBehindMillis = HistoricalDataQueueManager.getQueueTimeBehindMillis();
if (queueBehindMillis >= TWConnectorConsts.QUEUE_DATA_POLL_BEHIND_MILLIS_WARN) {
String timeBehindString =
SCTimeUtils.getDayHourMinSecsForMillis(
(int) queueBehindMillis,
DAYS_STRING,
HOURS_STRING,
MINUTES_STRING,
SECONDS_STRING);
Logger.LOG_WARN(
"The historical data queue is running behind by " + timeBehindString + ".");
} else {
// Queue is not past running behind threshold, reset to 0 for updating debug tag
queueBehindMillis = 0;
}

// Enable queue double data rate if running behind
if (!queueDoubleDataRateEnabled && queueBehindMillis > 0) {
long doubleQueueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes() * 2;
HistoricalDataQueueManager.setQueueFifoTimeSpanMins(doubleQueueDataPollSizeMins);
Logger.LOG_SERIOUS(
"The size of data polled on each interval has been doubled to "
+ doubleQueueDataPollSizeMins
+ " minutes while the queue is running behind!");
queueDoubleDataRateEnabled = true;
}
// Disable queue double data rate if no longer running behind
else if (queueDoubleDataRateEnabled && queueBehindMillis == 0) {
long queueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes();
HistoricalDataQueueManager.setQueueFifoTimeSpanMins(queueDataPollSizeMins);
Logger.LOG_SERIOUS(
"The size of data polled on each interval has been restored to "
+ queueDataPollSizeMins
+ " minutes.");
queueDoubleDataRateEnabled = false;
}

// Update queue debug tag
if (queueDiagnosticRunningBehindSecondsTag != null) {
queueDiagnosticRunningBehindSecondsTag.setTagValueAsLong(
SCTimeUnit.MILLISECONDS.toSeconds(queueBehindMillis));
}

HistoricalDataQueueManager.advanceTrackingStartTime();
} catch (IOException e) {
Logger.LOG_SERIOUS("Unable to detect if historical data queue is running behind.");
Logger.LOG_EXCEPTION(e);
throw new RuntimeException(e);
} catch (TimeTrackerUnrecoverableException e) {
throw new RuntimeException(e);
} catch (CircularizedFileException e) {
throw new RuntimeException(e);
} catch (EbdTimeoutException e) {
throw new RuntimeException(e);
}

} catch (Exception e) {
Logger.LOG_CRITICAL("An error occurred while reading data from the historical log.");
Logger.LOG_EXCEPTION(e);
}
}
}
Expand Down

0 comments on commit bbb8cb9

Please sign in to comment.