From 069d6cdcad1f3d75dfb7649e8de6aa7f29aad506 Mon Sep 17 00:00:00 2001 From: Ben Hearsum Date: Fri, 25 Sep 2020 15:22:37 -0400 Subject: [PATCH] Log GCS upload duration (#1516) * Adjust log levels to match GCP * Log GCS upload duration * Fix busted tests --- src/auslib/db.py | 24 ++++++++++++++++++++++++ src/auslib/log.py | 13 ++++++++++--- tests/test_logging.py | 4 ++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/auslib/db.py b/src/auslib/db.py index 9100770e91..f29e5acc47 100644 --- a/src/auslib/db.py +++ b/src/auslib/db.py @@ -835,23 +835,35 @@ def forInsert(self, insertedKeys, columns, changed_by, trans): identifier = "-".join([columns.get(i) for i in self.identifier_columns]) for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))): bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") bucket = self._getBucket(identifier)(use_gcloud_aio=False) blob = bucket.blob(bname) blob.upload_from_string(data, content_type="application/json") + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) def forDelete(self, rowData, changed_by, trans): identifier = "-".join([rowData.get(i) for i in self.identifier_columns]) bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") bucket = self._getBucket(identifier)(use_gcloud_aio=False) blob = bucket.blob(bname) blob.upload_from_string("", content_type="application/json") + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) def forUpdate(self, rowData, changed_by, trans): identifier = "-".join([rowData.get(i) for i in self.identifier_columns]) bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") bucket = self._getBucket(identifier)(use_gcloud_aio=False) blob = bucket.blob(bname) blob.upload_from_string(json.dumps(rowData[self.data_column]), content_type="application/json") + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) def getChange(self, change_id=None, column_values=None, data_version=None, transaction=None): if not set(self.identifier_columns).issubset(column_values.keys()) or not data_version: @@ -883,6 +895,8 @@ async def forInsert(self, insertedKeys, columns, changed_by, trans): identifier = "-".join([columns.get(i) for i in self.identifier_columns]) for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))): bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") # Using a separate session for each request is not ideal, but it's # the only thing that seems to work. Ideally, we'd share one session # for the entire application, but we can't for two reasons: @@ -894,22 +908,32 @@ async def forInsert(self, insertedKeys, columns, changed_by, trans): bucket = self._getBucket(identifier)(session=session) blob = bucket.new_blob(bname) await blob.upload(data, session=session) + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) async def forDelete(self, rowData, changed_by, trans): identifier = "-".join([rowData.get(i) for i in self.identifier_columns]) bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") async with ClientSession() as session: bucket = self._getBucket(identifier)(session=session) blob = bucket.new_blob(bname) await blob.upload("", session=session) + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) async def forUpdate(self, rowData, changed_by, trans): identifier = "-".join([rowData.get(i) for i in self.identifier_columns]) bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by) + start = time.time() + logging.info(f"Beginning GCS upload of {bname}") async with ClientSession() as session: bucket = self._getBucket(identifier)(session=session) blob = bucket.new_blob(bname) await blob.upload(json.dumps(rowData[self.data_column]), session=session) + duration = time.time() - start + logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration}) class HistoryTable(AUSTable): diff --git a/src/auslib/log.py b/src/auslib/log.py index 93ad95bad6..d2c0a426f0 100644 --- a/src/auslib/log.py +++ b/src/auslib/log.py @@ -52,11 +52,18 @@ class JsonLogFormatter(logging.Formatter): LOGGING_FORMAT_VERSION = "2.0" - # Map from Python logging to Syslog severity levels - SYSLOG_LEVEL_MAP = {logging.DEBUG: 2, logging.ERROR: 3, logging.WARNING: 4, logging.INFO: 6, logging.DEBUG: 7} + # Map from Python logging to GCP severity levels + # From https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity + SYSLOG_LEVEL_MAP = { + logging.CRITICAL: 600, + logging.ERROR: 500, + logging.WARNING: 400, + logging.INFO: 200, + logging.DEBUG: 100, + } # Syslog level to use when/if python level isn't found in map - DEFAULT_SYSLOG_LEVEL = 7 + DEFAULT_SYSLOG_LEVEL = 200 EXCLUDED_LOGRECORD_ATTRS = set( ( diff --git a/tests/test_logging.py b/tests/test_logging.py index 37de8b6e33..724bcb6e87 100644 --- a/tests/test_logging.py +++ b/tests/test_logging.py @@ -17,7 +17,7 @@ def test_logger(caplog): assert r.message == "TEST OUTPUT" o = json.loads(stream.getvalue()) - assert o["Severity"] == 6 + assert o["Severity"] == 200 assert o["Fields"]["message"] == "TEST OUTPUT" @@ -37,7 +37,7 @@ def test_exception(caplog): assert r.exc_info o = json.loads(stream.getvalue()) - assert o["Severity"] == 3 + assert o["Severity"] == 500 assert o["Fields"]["message"] == "TEST OUTPUT" assert o["Fields"]["error"].startswith("ValueError")