diff --git a/CHANGELOG.md b/CHANGELOG.md index 333fd4e..a935008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), [markdownlint](https://dlaa.me/markdownlint/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.1.1] - 2019-07-23 + +### Added in 1.1.1 + +- Added the ability to delay processing via `SENZING_DELAY_IN_SECONDS` + ## [1.1.0] - 2019-07-23 ### Added in 1.1.0 diff --git a/stream-loader.py b/stream-loader.py index 1cbe959..f1f35d3 100755 --- a/stream-loader.py +++ b/stream-loader.py @@ -40,7 +40,7 @@ __all__ = [] __version__ = 1.0 __date__ = '2018-10-29' -__updated__ = '2019-07-21' +__updated__ = '2019-07-23' SENZING_PRODUCT_ID = "5001" # See https://github.com/Senzing/knowledge-base/blob/master/lists/senzing-product-ids.md log_format = '%(asctime)s %(message)s' @@ -79,6 +79,11 @@ "env": "SENZING_DEBUG", "cli": "debug" }, + "delay_in_seconds": { + "default": 0, + "env": "SENZING_DELAY_IN_SECONDS", + "cli": "delay-in-seconds" + }, "entity_type": { "default": None, "env": "SENZING_ENTITY_TYPE", @@ -210,6 +215,7 @@ def get_parser(): subparser_1 = subparsers.add_parser('kafka', help='Read JSON Lines from Apache Kafka topic.') subparser_1.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.") subparser_1.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False") + subparser_1.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0") subparser_1.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.") subparser_1.add_argument("--kafka-bootstrap-server", dest="kafka_bootstrap_server", metavar="SENZING_KAFKA_BOOTSTRAP_SERVER", help="Kafka bootstrap server. Default: localhost:9092") subparser_1.add_argument("--kafka-group", dest="kafka_group", metavar="SENZING_KAFKA_GROUP", help="Kafka group. Default: senzing-kafka-group") @@ -225,6 +231,7 @@ def get_parser(): subparser_5 = subparsers.add_parser('url', help='Read JSON Lines from URL-addressable file.') subparser_5.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.") subparser_5.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False") + subparser_5.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0") subparser_5.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.") subparser_5.add_argument("--input-url", dest="input_url", metavar="SENZING_INPUT_URL", help="URL to file of JSON lines.") subparser_5.add_argument("--monitoring-period-in-seconds", dest="monitoring_period_in_seconds", metavar="SENZING_MONITORING_PERIOD_IN_SECONDS", help="Period, in seconds, between monitoring reports. Default: 300") @@ -244,6 +251,7 @@ def get_parser(): subparser_8 = subparsers.add_parser('rabbitmq', help='Read JSON Lines from RabbitMQ queue.') subparser_8.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.") subparser_8.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False") + subparser_8.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0") subparser_8.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.") subparser_8.add_argument("--rabbitmq-host", dest="rabbitmq_host", metavar="SENZING_rabbitmq_host", help="RabbitMQ host. Default: localhost:5672") subparser_8.add_argument("--rabbitmq-queue", dest="rabbitmq_queue", metavar="SENZING_RABBITMQ_QUEUE", help="RabbitMQ queue. Default: senzing-rabbitmq-queue") @@ -280,6 +288,7 @@ def get_parser(): message_dictionary = { "100": "senzing-" + SENZING_PRODUCT_ID + "{0:04d}I", + "120": "Sleeping for requested delay of {0} seconds.", "121": "Adding JSON to failure queue: {0}", "122": "Quitting time!", "123": "Total memory: {0:>15} bytes", @@ -597,6 +606,7 @@ def get_configuration(args): # Special case: Change integer strings to integers. integers = ['configuration_check_frequency_in_seconds', + 'delay_in_seconds', 'expiration_warning_in_days', 'log_license_period_in_seconds', 'monitoring_period_in_seconds', @@ -1671,10 +1681,8 @@ def run(self): # ----------------------------------------------------------------------------- -def cleanup_after_past_invocations(): - '''Remove residual artifacts from prior invocations of loader.''' - for filename in glob('pyG2*'): - os.remove(filename) +def bootstrap_signal_handler(signal, frame): + sys.exit(0) def create_signal_handler_function(args): @@ -1689,8 +1697,11 @@ def result_function(signal_number, frame): return result_function -def bootstrap_signal_handler(signal, frame): - sys.exit(0) +def delay(config): + delay_in_seconds = config.get('delay_in_seconds') + if delay_in_seconds > 0: + logging.info(message_info(120, delay_in_seconds)) + time.sleep(delay_in_seconds) def entry_template(config): @@ -1973,9 +1984,9 @@ def common_prolog(config): logging.info(entry_template(config)) - # Cleanup after previous invocations. + # If requested, delay start. - cleanup_after_past_invocations() + delay(config) # Write license information to log.