diff --git a/.pylintrc b/.pylintrc index 13d5196..7983703 100644 --- a/.pylintrc +++ b/.pylintrc @@ -20,6 +20,7 @@ disable= too-many-instance-attributes, too-many-lines, too-many-locals, + too-many-nested-blocks, too-many-statements, unused-argument, ignore= diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dbc659..f928769 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - +## [2.2.0] - 2022-10-18 + +### Added in 2.2.0 + +- Added `rabbitmq-custom` subcommand + ## [2.1.2] - 2022-10-11 ### Changed in 2.1.2 diff --git a/Dockerfile b/Dockerfile index 59097e8..22c4396 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,11 +6,11 @@ ARG BASE_IMAGE=senzing/senzingapi-runtime:3.3.1 FROM ${BASE_IMAGE} AS builder -ENV REFRESHED_AT=2022-10-11 +ENV REFRESHED_AT=2022-10-19 LABEL Name="senzing/stream-loader" \ Maintainer="support@senzing.com" \ - Version="2.1.2" + Version="2.2.0" # Run as "root" for system installation. @@ -55,11 +55,11 @@ RUN curl -X GET \ FROM ${BASE_IMAGE} AS runner -ENV REFRESHED_AT=2022-10-11 +ENV REFRESHED_AT=2022-10-18 LABEL Name="senzing/stream-loader" \ Maintainer="support@senzing.com" \ - Version="2.1.0" + Version="2.2.0" # Define health check. diff --git a/requirements.txt b/requirements.txt index 859e95d..3456cdc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ azure-servicebus==7.8.1 boto3==1.24.92 confluent-kafka==1.9.2 +orjson==3.8.0 pika==1.3.0 psutil==5.9.2 psycopg2-binary==2.9.4 diff --git a/stream-loader.py b/stream-loader.py index 0a4e3ea..bb95672 100755 --- a/stream-loader.py +++ b/stream-loader.py @@ -6,6 +6,7 @@ # Import from standard library. https://docs.python.org/3/library/ +import concurrent.futures import argparse import datetime import functools @@ -26,6 +27,7 @@ import sys import threading import time +import traceback from urllib.parse import urlparse, urlunparse from urllib.request import urlopen @@ -36,6 +38,7 @@ import confluent_kafka import pika import psutil +import orjson # Determine "Major" version of Senzing SDK. @@ -65,9 +68,9 @@ # Metadata __all__ = [] -__version__ = "2.1.1" # See https://www.python.org/dev/peps/pep-0396/ +__version__ = "2.2.0" # See https://www.python.org/dev/peps/pep-0396/ __date__ = '2018-10-29' -__updated__ = '2022-09-28' +__updated__ = '2022-10-18' SENZING_PRODUCT_ID = "5001" # See https://github.com/Senzing/knowledge-base/blob/main/lists/senzing-product-ids.md log_format = '%(asctime)s %(message)s' @@ -81,6 +84,15 @@ MINIMUM_TOTAL_MEMORY_IN_GIGABYTES = 8 MINIMUM_AVAILABLE_MEMORY_IN_GIGABYTES = 6 +# Constants for stream-loader.py rabbitmq-custom. + +MSG_FRAME = 0 +MSG_BODY = 2 + +TUPLE_MSG = 0 +TUPLE_STARTTIME = 1 +TUPLE_ACKED = 2 + # Lists from https://www.ietf.org/rfc/rfc1738.txt safe_character_list = ['$', '-', '_', '.', '+', '!', '*', '(', ')', ',', '"'] + list(string.ascii_letters) @@ -91,6 +103,16 @@ # 1) Command line options, 2) Environment variables, 3) Configuration files, 4) Default values configuration_locator = { + "add_record_withinfo": { + "default": False, + "env": "SENZING_ADD_RECORD_WITHINFO", + "cli": "add-record-withinfo" + }, + "amqp_url": { + "default": None, + "env": "SENZING_AMQP_URL", + "cli": "amqp-url" + }, "azure_failure_connection_string": { "default": None, "env": "SENZING_AZURE_FAILURE_CONNECTION_STRING", @@ -244,6 +266,21 @@ "env": "SENZING_LOG_LICENSE_PERIOD_IN_SECONDS", "cli": "log-license-period-in-seconds" }, + "long_record": { + "default": 300, + "env": "SENZING_LONG_RECORD", + "cli": "long-record", + }, + "max_workers": { + "default": None, + "env": "SENZING_MAX_WORKERS", + "cli": "max-workers", + }, + "message_interval": { + "default": 10000, + "env": "SENZING_MESSAGE_INTERVAL", + "cli": "message-interval", + }, "monitoring_check_frequency_in_seconds": { "default": 20, "env": "SENZING_MONITORING_CHECK_FREQUENCY_IN_SECONDS", @@ -583,6 +620,37 @@ def get_parser(): "help": 'Read JSON Lines from RabbitMQ queue.', "argument_aspects": ["common", "rabbitmq_base"], }, + 'rabbitmq-custom': { + "help": 'Read JSON Lines from RabbitMQ queue.', + "argument_aspects": ["common", "rabbitmq_base"], + "arguments": { + "--add-record-withinfo": { + "dest": "add_record_withinfo", + "action": "store_true", + "help": "Return withInfo when adding record. (SENZING_ADD_RECORD_WITHINFO) Default: False" + }, + "--amqp-url": { + "dest": "rabbitmq_failure_exchange", + "metavar": "SENZING_AMQP_URL", + "help": "AMQP URL for attaching to RabbitMQ. Default: none" + }, + "--long-record": { + "dest": "long_record", + "metavar": "SENZING_LONG_RECORD", + "help": "Number of bytes that define a long record. Default: 300" + }, + "--max-workers": { + "dest": "max_workers", + "metavar": "SENZING_MAX_WORKERS", + "help": "Number of bytes that define a long record. Default: none" + }, + "--message-interval": { + "dest": "message_interval", + "metavar": "SENZING_MESSAGE_INTERVAL", + "help": "Number of bytes that define a long record. Default: 10,000" + }, + } + }, 'rabbitmq-withinfo': { "help": 'Read JSON Lines from RabbitMQ queue. Return info to a queue.', "argument_aspects": ["common", "rabbitmq_base"], @@ -957,6 +1025,9 @@ def get_parser(): "201": "Python 'psutil' not installed. Could not report memory. Error: {0}", "203": " WARNING: License will expire soon. Only {0} days left.", "221": "AWS SQS redrive: {0}", + "222": "WithInfo result: {0}", + "223": "Processed {0} add records at a rate of {1} records per second", + "224": "Total records added: {0}", "292": "Configuration change detected. Old: {0} New: {1}", "293": "For information on warnings and errors, see https://github.com/Senzing/stream-loader#errors", "294": "Version: {0} Updated: {1}", @@ -975,6 +1046,10 @@ def get_parser(): "413": "SQS queue: {0} Unknown SQS error: {1}; DATA_SOURCE: {2}; RECORD_ID: {3}", "417": "RabbitMQ exchange: {0} routing key {1}: Lost connection to server. Waiting {2} seconds and attempting to reconnect. Message: {3}", "418": "Exceeded the requested number of attempts ({0}) to reconnect to RabbitMQ broker at {1}:{2} with no success. Exiting.", + "420": "Rejecting a long running record. DATA_SOURCE: {0}; RECORD_ID: {1}", + "421": "Still processing a long running record. Duration {0:.3g} minutes; Rejected: {1}; DATA_SOURCE: {2}; RECORD_ID: {3}", + "422": "Threads are stuck on long running records. Number of threads: {0}", + "423": "Running recovery.", "499": "{0}", "500": "senzing-" + SENZING_PRODUCT_ID + "{0:04d}E", "551": "Missing G2 database URL.", @@ -1023,6 +1098,10 @@ def get_parser(): "813": "G2ConfigMgr.init() error. Configuration Manager Name: {0}; Configuration JSON: {1} Error: {2}", "814": "G2Diagnostic.init() error. Diagnostic Name: {0}; Configuration JSON: {1} Error: {2}", "815": "G2Product.init() error. Diagnostic Name: {0}; Configuration JSON: {1} Error: {2}", + "820": "G2Engine.addRecord() error. Message: {0} Error: {1}", + "821": "G2Engine.addRecordWithInfo() error. Message: {0} Error: {1}", + "822": "Error from future. Error type: {0} Error: {1}", + "823": "Shutting down due to error. Error type: {0} Error: {1}", "879": "Senzing SDK was not imported.", "880": "Unspecific error when {1}. Error: {0}", "881": "Could not G2Engine.primeEngine with '{0}'. Error: {1}", @@ -1272,6 +1351,7 @@ def get_configuration(args): # Special case: Change boolean strings to booleans. booleans = [ + 'add_record_withinfo', 'debug', 'delay_randomized', 'exit_on_empty_queue', @@ -1298,6 +1378,8 @@ def get_configuration(args): 'delay_in_seconds', 'expiration_warning_in_days', 'log_license_period_in_seconds', + 'long_record', + 'message_interval', 'monitoring_check_frequency_in_seconds', 'monitoring_period_in_seconds', 'queue_maxsize', @@ -1314,6 +1396,11 @@ def get_configuration(args): integer_string = result.get(integer) result[integer] = int(integer_string) + # Special case: Null or integer + + if result.get('max_workers') is not None: + result['max_workers'] = int(result.get('max_workers')) + # Special case: Tailored database URL result['g2_database_url_specific'] = get_g2_database_url_specific(result.get("g2_database_url_generic")) @@ -1400,7 +1487,7 @@ def __init__(self, *args, g2_engine=None, hint=None, **kwargs): self.hint = hint def govern(self, *args, **kwargs): - return + return 0 def close(self): return @@ -1480,7 +1567,8 @@ def filter_info_message(self, message=None): return self.info_filter.filter(message=message) def govern(self): - return self.governor.govern() + sleep_time = self.governor.govern() + time.sleep(sleep_time) def is_time_to_check_g2_configuration(self): now = time.time() @@ -3736,6 +3824,7 @@ def log_license(config): g2_product.destroy() + def log_performance(config): '''Log performance estimates.''' logging.debug(message_debug(950, sys._getframe().f_code.co_name)) @@ -3962,6 +4051,28 @@ def dohelper_thread_runner(args, threadClass, options_to_defaults_map): logging.info(exit_template(config)) + +def process_rabbitmq_message(g2engine, message): + try: + record = orjson.loads(message) + assert isinstance(record, dict), f'Message must be a JSON Object, not a JSON list. Message: {message}' + g2engine.addRecord(record['DATA_SOURCE'], record['RECORD_ID'], message.decode()) + except Exception as err: + logging.error(message_error(820, message, err)) + raise + + +def process_rabbitmq_message_withinfo(g2engine, message): + try: + record = orjson.loads(message) + assert isinstance(record, dict), f'Message must be a JSON Object, not a JSON list. Message: {message}' + response = bytearray() + g2engine.addRecordWithInfo(record['DATA_SOURCE'], record['RECORD_ID'], message.decode(), response) + except Exception as err: + logging.error(message_error(821, message, err)) + raise + return response.decode() + # ----------------------------------------------------------------------------- # do_* functions # Common function signature: do_XXX(args) @@ -4220,6 +4331,198 @@ def do_rabbitmq(args): logging.info(exit_template(config)) +def do_rabbitmq_custom(args): + ''' Read from rabbitmq. ''' + + # Keep Pika logging to a minimum. + + logging.getLogger("pika").setLevel(logging.WARNING) + + # Get context from CLI, environment variables, and ini files. + + config = get_configuration(args) + + # Perform common initialization tasks. + + common_prolog(config) + + # Pull values from configuration. + + amqp_url = config.get('amqp_url') + long_record = config.get('long_record') + message_interval = config.get('message_interval') + rabbitmq_queue = config.get('rabbitmq_queue') + threads_per_process = config.get('threads_per_process') + add_record_withinfo = config.get('add_record_withinfo') + + max_workers = config.get('max_workers') + if threads_per_process: + max_workers = int(threads_per_process) + + # Get the Senzing G2 resources. + + g2_engine = get_g2_engine(config) + governor = Governor(g2_engine=g2_engine, hint="stream-loader") + + # Construct Pika parameters. + + if amqp_url: + pika_parameters = pika.URLParameters(amqp_url) + else: + pika_parameters = pika.ConnectionParameters( + host=config.get("rabbitmq_host"), + port=config.get("rabbitmq_port"), + virtual_host=config.get("rabbitmq_virtual_host"), + credentials=pika.PlainCredentials( + config.get("rabbitmq_username"), + config.get("rabbitmq_password")), + heartbeat=config.get('rabbitmq_heartbeat_in_seconds'), + ) + + # Main loop. + + log_check_time = time.time() + last_message_time = log_check_time + futures = {} + message_count = 0 + + with pika.BlockingConnection(pika_parameters) as connection: + channel = connection.channel() + channel.queue_declare(queue=rabbitmq_queue, passive=True) + with concurrent.futures.ThreadPoolExecutor(max_workers) as executor: + try: + channel.basic_qos(prefetch_count=executor._max_workers) # always have 1 record prefetched for each thread + while True: + loop_start_time = time.time() + + # If any futures exist, process the completed futures. + + if futures: + completed_futures, _ = concurrent.futures.wait(futures, timeout=10, return_when=concurrent.futures.FIRST_COMPLETED) + + # Handle completed futures. + + for future in completed_futures: + + # TODO: Handle WithInfo results better. + + result = future.result() + if result: + logging.info(message_info(222, result)) + + # Remove completed future from list of futures. + + message = futures.pop(future) + if not message[TUPLE_ACKED]: # if we rejected a message before we should not ack it here + channel.basic_ack(message[TUPLE_MSG][MSG_FRAME].delivery_tag) + message_count += 1 + + # Periodically, log statistics. + + if message_count % message_interval == 0: # display rate stats + time_difference = loop_start_time - last_message_time + speed = -1 + if time_difference > 0.0: + speed = int(message_interval / time_difference) + logging.info(message_info(223, message_count, speed)) + last_message_time = loop_start_time + + # Periodically log G2Engine statistics and handle Stuck or Rejected records. + + if last_message_time > log_check_time + (long_record / 2): # log long running records + log_check_time = loop_start_time + + # Log G2Engine statistics + + g2_engine_stats_response = bytearray() + g2_engine.stats(g2_engine_stats_response) + g2_engine_stats_dictionary = json.loads(g2_engine_stats_response.decode()) + logging.info(message_info(125, json.dumps(g2_engine_stats_dictionary, sort_keys=True))) + + # Handle stuck or rejected records. + + number_stuck = 0 + number_rejected = 0 + for future, message in futures.items(): + if not future.done(): + duration = loop_start_time - message[TUPLE_STARTTIME] + if duration > 2 * long_record: # a record taking this long should be rejected to the dead letter queue + number_rejected += 1 + if not message[TUPLE_ACKED]: + logging.warning(message_warning(420, record["DATA_SOURCE"], record["RECORD_ID"])) + channel.basic_reject(message[TUPLE_MSG][MSG_FRAME].delivery_tag, requeue=False) + futures[future] = (message[TUPLE_MSG], message[TUPLE_STARTTIME], True) + message = futures[future] + if duration > long_record: + number_stuck += 1 + record = orjson.loads(message[TUPLE_MSG][MSG_BODY]) + logging.warning(message_warning(421, duration / 60, message[TUPLE_ACKED], record["DATA_SOURCE"], record["RECORD_ID"])) + if number_stuck >= executor._max_workers: + logging.warning(message_warning(422, executor._max_workers)) + if number_rejected >= executor._max_workers: + logging.warning(message_warning(423)) + channel.basic_recover() # supposedly this causes unacked messages to redeliver, should prevent the server from disconnecting us + + # Perform governance on system. + + pause_seconds = governor.govern() + + # Really want something that forces an "I'm alive" to the server. + # Either governor fully triggered or our executor is full + # not going to get more messages. + if pause_seconds < 0.0: + connection.sleep(1) # process rabbitmq protocol for 1s + continue + if len(futures) >= executor._max_workers: + connection.sleep(1) # process rabbitmq protocol for 1s + continue + if pause_seconds > 0.0: + connection.sleep(pause_seconds) + + # When needed, generate more workers. + + while len(futures) < executor._max_workers: + try: + message = channel.basic_get(rabbitmq_queue) + if not message[MSG_FRAME]: + if len(futures) > 0: + connection.sleep(.1) + break + if add_record_withinfo: + futures[executor.submit(process_rabbitmq_message_withinfo, g2_engine, message[MSG_BODY])] = (message, time.time(), False) + else: + futures[executor.submit(process_rabbitmq_message, g2_engine, message[MSG_BODY])] = (message, time.time(), False) + except Exception as err: + logging.error(message_error(822, {type(err).__name__}, err)) + raise + logging.error(message_error(224, message_count)) + + except Exception as err: + logging.error(message_error(823, type(err).__name__, err)) + traceback.print_exc() + loop_start_time = time.time() + for future, message in futures.items(): + if not future.done(): + duration = loop_start_time - message[TUPLE_STARTTIME] + record = orjson.loads(message[TUPLE_MSG][MSG_BODY]) + logging.warning(message_warning(421, duration / 60, message[TUPLE_ACKED], record["DATA_SOURCE"], record["RECORD_ID"])) + executor.shutdown() + connection.close() + sys.exit(-1) + + # Cleanup. + + try: + g2_engine.destroy() + except Exception as err: + logging.error(message_error(810, err)) + raise err + + # Epilog. + + logging.info(exit_template(config)) + + def do_rabbitmq_withinfo(args): ''' Read from rabbitmq. '''