diff --git a/src/main/java/com/hms_networks/americas/sc/thingworx/TWConnectorMain.java b/src/main/java/com/hms_networks/americas/sc/thingworx/TWConnectorMain.java index ac5e111..a90ee69 100644 --- a/src/main/java/com/hms_networks/americas/sc/thingworx/TWConnectorMain.java +++ b/src/main/java/com/hms_networks/americas/sc/thingworx/TWConnectorMain.java @@ -4,9 +4,16 @@ import com.ewon.ewonitf.TagControl; import com.hms_networks.americas.sc.extensions.config.exceptions.ConfigFileException; import com.hms_networks.americas.sc.extensions.config.exceptions.ConfigFileWriteException; +import com.hms_networks.americas.sc.extensions.historicaldata.CircularizedFileException; +import com.hms_networks.americas.sc.extensions.historicaldata.EbdTimeoutException; import com.hms_networks.americas.sc.extensions.historicaldata.HistoricalDataQueueManager; +import com.hms_networks.americas.sc.extensions.historicaldata.TimeTrackerUnrecoverableException; import com.hms_networks.americas.sc.extensions.json.JSONException; import com.hms_networks.americas.sc.extensions.logging.Logger; +import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryCode; +import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryCodeLinear; +import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryResult; +import com.hms_networks.americas.sc.extensions.retry.AutomaticRetryState; import com.hms_networks.americas.sc.extensions.system.http.SCHttpUtility; import com.hms_networks.americas.sc.extensions.system.tags.SCTagUtils; import com.hms_networks.americas.sc.extensions.system.time.SCTimeUnit; @@ -120,99 +127,188 @@ private static void runGrabData() { } } + // Create queue poll retry task + final long queuePollRetryTaskLinearSlopeMillis = 1000; + final long queuePollRetryTaskMaxDelayMillisBeforeRetry = 3000; + final int queuePollRetryTaskMaxRetries = 3; + AutomaticRetryCode queuePollRetryTask = + new AutomaticRetryCodeLinear() { + private int tryCounter = 0; + + protected long getLinearSlopeMillis() { + return queuePollRetryTaskLinearSlopeMillis; + } + + protected long getMaxDelayMillisBeforeRetry() { + return queuePollRetryTaskMaxDelayMillisBeforeRetry; + } + + protected int getMaxRetries() { + return queuePollRetryTaskMaxRetries; + } + + protected void codeToRetry() { + try { + // Increment try counter + tryCounter++; + + // Check if a queue force reset has been requested + boolean forceReset = false; + if (queueDiagnosticForceResetTag != null) { + forceReset = + queueDiagnosticForceResetTag.getTagValueAsInt() + == TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_TRUE_VALUE; + + if (forceReset) { + Logger.LOG_SERIOUS( + "A force reset of the queue has been requested using the tag `" + + TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_NAME + + "`. A new queue time tracker will be created at the current time!"); + } + } + + // Read data points from queue + ArrayList datapointsReadFromQueue; + if (HistoricalDataQueueManager.doesTimeTrackerExist() && !forceReset) { + final boolean startNewTimeTracker = false; + datapointsReadFromQueue = + HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker); + } else { + final boolean startNewTimeTracker = true; + datapointsReadFromQueue = + HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker); + } + + // Reset queue force reset tag value to false, if true + if (queueDiagnosticForceResetTag != null && forceReset) { + queueDiagnosticForceResetTag.setTagValueAsInt( + TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_FALSE_VALUE); + } + + Logger.LOG_DEBUG( + "Read " + + datapointsReadFromQueue.size() + + " data points from the historical log."); + + // Send data to Thingworx + TWDataManager.addDataPointsToPending(datapointsReadFromQueue); + + // Check if queue is behind + try { + long queueBehindMillis = HistoricalDataQueueManager.getQueueTimeBehindMillis(); + if (queueBehindMillis >= TWConnectorConsts.QUEUE_DATA_POLL_BEHIND_MILLIS_WARN) { + String timeBehindString = + SCTimeUtils.getDayHourMinSecsForMillis( + (int) queueBehindMillis, + DAYS_STRING, + HOURS_STRING, + MINUTES_STRING, + SECONDS_STRING); + Logger.LOG_WARN( + "The historical data queue is running behind by " + timeBehindString + "."); + } else { + // Queue is not past running behind threshold, reset to 0 for updating debug tag + queueBehindMillis = 0; + } + + // Enable queue double data rate if running behind + if (!queueDoubleDataRateEnabled && queueBehindMillis > 0) { + long doubleQueueDataPollSizeMins = + connectorConfig.getQueueDataPollSizeMinutes() * 2; + HistoricalDataQueueManager.setQueueFifoTimeSpanMins( + doubleQueueDataPollSizeMins); + Logger.LOG_SERIOUS( + "The size of data polled on each interval has been doubled to " + + doubleQueueDataPollSizeMins + + " minutes while the queue is running behind!"); + queueDoubleDataRateEnabled = true; + } + // Disable queue double data rate if no longer running behind + else if (queueDoubleDataRateEnabled && queueBehindMillis == 0) { + long queueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes(); + HistoricalDataQueueManager.setQueueFifoTimeSpanMins(queueDataPollSizeMins); + Logger.LOG_SERIOUS( + "The size of data polled on each interval has been restored to " + + queueDataPollSizeMins + + " minutes."); + queueDoubleDataRateEnabled = false; + } + + // Update queue debug tag + if (queueDiagnosticRunningBehindSecondsTag != null) { + queueDiagnosticRunningBehindSecondsTag.setTagValueAsLong( + SCTimeUnit.MILLISECONDS.toSeconds(queueBehindMillis)); + } + + setState(AutomaticRetryState.FINISHED); + if (getCurrentTryNumber() > 1) { + Logger.LOG_SERIOUS( + "Successfully retried and read data from the historical log. This is attempt #" + + tryCounter + + " of " + + getMaxRetries() + + "."); + } + } catch (IOException e) { + Logger.LOG_SERIOUS( + "Unable to detect if historical data queue is running behind."); + Logger.LOG_EXCEPTION(e); + } + + } catch (Exception e) { + Logger.LOG_CRITICAL( + "An error occurred while reading data from the historical log. This is attempt #" + + tryCounter + + " of " + + getMaxRetries() + + "."); + Logger.LOG_EXCEPTION(e); + setState(AutomaticRetryState.ERROR_RETRY); + } + } + }; + + // Run queue poll retry task + AutomaticRetryResult queuePollRetryTaskResult = null; try { - // Check if a queue force reset has been requested - boolean forceReset = false; - if (queueDiagnosticForceResetTag != null) { - forceReset = - queueDiagnosticForceResetTag.getTagValueAsInt() - == TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_TRUE_VALUE; - - if (forceReset) { - Logger.LOG_SERIOUS( - "A force reset of the queue has been requested using the tag `" - + TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_NAME - + "`. A new queue time tracker will be created at the current time!"); - } - } - - // Read data points from queue - ArrayList datapointsReadFromQueue; - if (HistoricalDataQueueManager.doesTimeTrackerExist() && !forceReset) { - final boolean startNewTimeTracker = false; - datapointsReadFromQueue = - HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker); - } else { - final boolean startNewTimeTracker = true; - datapointsReadFromQueue = - HistoricalDataQueueManager.getFifoNextSpanDataAllGroups(startNewTimeTracker); - } - - // Reset queue force reset tag value to false, if true - if (queueDiagnosticForceResetTag != null && forceReset) { - queueDiagnosticForceResetTag.setTagValueAsInt( - TWConnectorConsts.QUEUE_DIAGNOSTIC_TAG_FORCE_RESET_FALSE_VALUE); - } - - Logger.LOG_DEBUG( - "Read " + datapointsReadFromQueue.size() + " data points from the historical log."); - - // Send data to Thingworx - TWDataManager.addDataPointsToPending(datapointsReadFromQueue); + queuePollRetryTaskResult = queuePollRetryTask.run(); + } catch (InterruptedException e) { + Logger.LOG_CRITICAL( + "An interruption prevented reading data from the historical log from completing!"); + Logger.LOG_EXCEPTION(e); + } - // Check if queue is behind + // Check queue poll retry task result + boolean doQueueAdvanceTracker = false; + if (queuePollRetryTaskResult == AutomaticRetryResult.FAIL_RETRY_LIMIT) { + doQueueAdvanceTracker = true; + Logger.LOG_CRITICAL( + "Reading data from the historical log was unsuccessful after " + + queuePollRetryTaskMaxRetries + + " retries. Skipping to next time interval - data loss may result!"); + } else if (queuePollRetryTaskResult == AutomaticRetryResult.FAIL_CRITICAL_STOP) { + doQueueAdvanceTracker = true; + Logger.LOG_CRITICAL( + "Reading data from the historical log failed due to an exception. " + + "Skipping to next time interval - data loss may result!"); + } else if (queuePollRetryTaskResult == null) { + Logger.LOG_CRITICAL( + "Reading data from the historical log failed to run for an unknown reason!"); + } + if (doQueueAdvanceTracker) { + Logger.LOG_CRITICAL( + "The historical data queue time tracker has been advanced to the next interval!"); try { - long queueBehindMillis = HistoricalDataQueueManager.getQueueTimeBehindMillis(); - if (queueBehindMillis >= TWConnectorConsts.QUEUE_DATA_POLL_BEHIND_MILLIS_WARN) { - String timeBehindString = - SCTimeUtils.getDayHourMinSecsForMillis( - (int) queueBehindMillis, - DAYS_STRING, - HOURS_STRING, - MINUTES_STRING, - SECONDS_STRING); - Logger.LOG_WARN( - "The historical data queue is running behind by " + timeBehindString + "."); - } else { - // Queue is not past running behind threshold, reset to 0 for updating debug tag - queueBehindMillis = 0; - } - - // Enable queue double data rate if running behind - if (!queueDoubleDataRateEnabled && queueBehindMillis > 0) { - long doubleQueueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes() * 2; - HistoricalDataQueueManager.setQueueFifoTimeSpanMins(doubleQueueDataPollSizeMins); - Logger.LOG_SERIOUS( - "The size of data polled on each interval has been doubled to " - + doubleQueueDataPollSizeMins - + " minutes while the queue is running behind!"); - queueDoubleDataRateEnabled = true; - } - // Disable queue double data rate if no longer running behind - else if (queueDoubleDataRateEnabled && queueBehindMillis == 0) { - long queueDataPollSizeMins = connectorConfig.getQueueDataPollSizeMinutes(); - HistoricalDataQueueManager.setQueueFifoTimeSpanMins(queueDataPollSizeMins); - Logger.LOG_SERIOUS( - "The size of data polled on each interval has been restored to " - + queueDataPollSizeMins - + " minutes."); - queueDoubleDataRateEnabled = false; - } - - // Update queue debug tag - if (queueDiagnosticRunningBehindSecondsTag != null) { - queueDiagnosticRunningBehindSecondsTag.setTagValueAsLong( - SCTimeUnit.MILLISECONDS.toSeconds(queueBehindMillis)); - } - + HistoricalDataQueueManager.advanceTrackingStartTime(); } catch (IOException e) { - Logger.LOG_SERIOUS("Unable to detect if historical data queue is running behind."); - Logger.LOG_EXCEPTION(e); + throw new RuntimeException(e); + } catch (TimeTrackerUnrecoverableException e) { + throw new RuntimeException(e); + } catch (CircularizedFileException e) { + throw new RuntimeException(e); + } catch (EbdTimeoutException e) { + throw new RuntimeException(e); } - - } catch (Exception e) { - Logger.LOG_CRITICAL("An error occurred while reading data from the historical log."); - Logger.LOG_EXCEPTION(e); } } }