Skip to content

Commit

Permalink
Merge pull request #54 from Senzing/issue-52.dockter.2
Browse files Browse the repository at this point in the history
issue-52 Add the ability to delay processing
  • Loading branch information
docktermj authored Jul 24, 2019
2 parents 3bf2735 + 744bf7c commit a36b12b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
[markdownlint](https://dlaa.me/markdownlint/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.1] - 2019-07-23

### Added in 1.1.1

- Added the ability to delay processing via `SENZING_DELAY_IN_SECONDS`

## [1.1.0] - 2019-07-23

### Added in 1.1.0
Expand Down
29 changes: 20 additions & 9 deletions stream-loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
__all__ = []
__version__ = 1.0
__date__ = '2018-10-29'
__updated__ = '2019-07-21'
__updated__ = '2019-07-23'

SENZING_PRODUCT_ID = "5001" # See https://github.com/Senzing/knowledge-base/blob/master/lists/senzing-product-ids.md
log_format = '%(asctime)s %(message)s'
Expand Down Expand Up @@ -79,6 +79,11 @@
"env": "SENZING_DEBUG",
"cli": "debug"
},
"delay_in_seconds": {
"default": 0,
"env": "SENZING_DELAY_IN_SECONDS",
"cli": "delay-in-seconds"
},
"entity_type": {
"default": None,
"env": "SENZING_ENTITY_TYPE",
Expand Down Expand Up @@ -210,6 +215,7 @@ def get_parser():
subparser_1 = subparsers.add_parser('kafka', help='Read JSON Lines from Apache Kafka topic.')
subparser_1.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.")
subparser_1.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False")
subparser_1.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0")
subparser_1.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.")
subparser_1.add_argument("--kafka-bootstrap-server", dest="kafka_bootstrap_server", metavar="SENZING_KAFKA_BOOTSTRAP_SERVER", help="Kafka bootstrap server. Default: localhost:9092")
subparser_1.add_argument("--kafka-group", dest="kafka_group", metavar="SENZING_KAFKA_GROUP", help="Kafka group. Default: senzing-kafka-group")
Expand All @@ -225,6 +231,7 @@ def get_parser():
subparser_5 = subparsers.add_parser('url', help='Read JSON Lines from URL-addressable file.')
subparser_5.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.")
subparser_5.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False")
subparser_5.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0")
subparser_5.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.")
subparser_5.add_argument("--input-url", dest="input_url", metavar="SENZING_INPUT_URL", help="URL to file of JSON lines.")
subparser_5.add_argument("--monitoring-period-in-seconds", dest="monitoring_period_in_seconds", metavar="SENZING_MONITORING_PERIOD_IN_SECONDS", help="Period, in seconds, between monitoring reports. Default: 300")
Expand All @@ -244,6 +251,7 @@ def get_parser():
subparser_8 = subparsers.add_parser('rabbitmq', help='Read JSON Lines from RabbitMQ queue.')
subparser_8.add_argument("--data-source", dest="data_source", metavar="SENZING_DATA_SOURCE", help="Data Source.")
subparser_8.add_argument("--debug", dest="debug", action="store_true", help="Enable debugging. (SENZING_DEBUG) Default: False")
subparser_8.add_argument("--delay-in-seconds", dest="delay_in_seconds", metavar="SENZING_DELAY_IN_SECONDS", help="Delay before processing in seconds. DEFAULT: 0")
subparser_8.add_argument("--entity-type", dest="entity_type", metavar="SENZING_ENTITY_TYPE", help="Entity type.")
subparser_8.add_argument("--rabbitmq-host", dest="rabbitmq_host", metavar="SENZING_rabbitmq_host", help="RabbitMQ host. Default: localhost:5672")
subparser_8.add_argument("--rabbitmq-queue", dest="rabbitmq_queue", metavar="SENZING_RABBITMQ_QUEUE", help="RabbitMQ queue. Default: senzing-rabbitmq-queue")
Expand Down Expand Up @@ -280,6 +288,7 @@ def get_parser():

message_dictionary = {
"100": "senzing-" + SENZING_PRODUCT_ID + "{0:04d}I",
"120": "Sleeping for requested delay of {0} seconds.",
"121": "Adding JSON to failure queue: {0}",
"122": "Quitting time!",
"123": "Total memory: {0:>15} bytes",
Expand Down Expand Up @@ -597,6 +606,7 @@ def get_configuration(args):
# Special case: Change integer strings to integers.

integers = ['configuration_check_frequency_in_seconds',
'delay_in_seconds',
'expiration_warning_in_days',
'log_license_period_in_seconds',
'monitoring_period_in_seconds',
Expand Down Expand Up @@ -1671,10 +1681,8 @@ def run(self):
# -----------------------------------------------------------------------------


def cleanup_after_past_invocations():
'''Remove residual artifacts from prior invocations of loader.'''
for filename in glob('pyG2*'):
os.remove(filename)
def bootstrap_signal_handler(signal, frame):
sys.exit(0)


def create_signal_handler_function(args):
Expand All @@ -1689,8 +1697,11 @@ def result_function(signal_number, frame):
return result_function


def bootstrap_signal_handler(signal, frame):
sys.exit(0)
def delay(config):
delay_in_seconds = config.get('delay_in_seconds')
if delay_in_seconds > 0:
logging.info(message_info(120, delay_in_seconds))
time.sleep(delay_in_seconds)


def entry_template(config):
Expand Down Expand Up @@ -1973,9 +1984,9 @@ def common_prolog(config):

logging.info(entry_template(config))

# Cleanup after previous invocations.
# If requested, delay start.

cleanup_after_past_invocations()
delay(config)

# Write license information to log.

Expand Down

0 comments on commit a36b12b

Please sign in to comment.