Skip to content

Commit

Permalink
Log GCS upload duration (#1516)
Browse files Browse the repository at this point in the history
* Adjust log levels to match GCP

* Log GCS upload duration

* Fix busted tests
  • Loading branch information
bhearsum authored Sep 25, 2020
1 parent 65cc3db commit 069d6cd
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 5 deletions.
24 changes: 24 additions & 0 deletions src/auslib/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,23 +835,35 @@ def forInsert(self, insertedKeys, columns, changed_by, trans):
identifier = "-".join([columns.get(i) for i in self.identifier_columns])
for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))):
bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
bucket = self._getBucket(identifier)(use_gcloud_aio=False)
blob = bucket.blob(bname)
blob.upload_from_string(data, content_type="application/json")
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})

def forDelete(self, rowData, changed_by, trans):
identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
bucket = self._getBucket(identifier)(use_gcloud_aio=False)
blob = bucket.blob(bname)
blob.upload_from_string("", content_type="application/json")
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})

def forUpdate(self, rowData, changed_by, trans):
identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
bucket = self._getBucket(identifier)(use_gcloud_aio=False)
blob = bucket.blob(bname)
blob.upload_from_string(json.dumps(rowData[self.data_column]), content_type="application/json")
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})

def getChange(self, change_id=None, column_values=None, data_version=None, transaction=None):
if not set(self.identifier_columns).issubset(column_values.keys()) or not data_version:
Expand Down Expand Up @@ -883,6 +895,8 @@ async def forInsert(self, insertedKeys, columns, changed_by, trans):
identifier = "-".join([columns.get(i) for i in self.identifier_columns])
for data_version, ts, data in ((None, timestamp - 1, ""), (columns.get("data_version"), timestamp, json.dumps(columns[self.data_column]))):
bname = "{}/{}-{}-{}.json".format(identifier, data_version, ts, changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
# Using a separate session for each request is not ideal, but it's
# the only thing that seems to work. Ideally, we'd share one session
# for the entire application, but we can't for two reasons:
Expand All @@ -894,22 +908,32 @@ async def forInsert(self, insertedKeys, columns, changed_by, trans):
bucket = self._getBucket(identifier)(session=session)
blob = bucket.new_blob(bname)
await blob.upload(data, session=session)
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})

async def forDelete(self, rowData, changed_by, trans):
identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
async with ClientSession() as session:
bucket = self._getBucket(identifier)(session=session)
blob = bucket.new_blob(bname)
await blob.upload("", session=session)
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})

async def forUpdate(self, rowData, changed_by, trans):
identifier = "-".join([rowData.get(i) for i in self.identifier_columns])
bname = "{}/{}-{}-{}.json".format(identifier, rowData.get("data_version"), getMillisecondTimestamp(), changed_by)
start = time.time()
logging.info(f"Beginning GCS upload of {bname}")
async with ClientSession() as session:
bucket = self._getBucket(identifier)(session=session)
blob = bucket.new_blob(bname)
await blob.upload(json.dumps(rowData[self.data_column]), session=session)
duration = time.time() - start
logging.info(f"Completed GCS upload of {bname}, took {duration} seconds", extra={"duration": duration})


class HistoryTable(AUSTable):
Expand Down
13 changes: 10 additions & 3 deletions src/auslib/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,18 @@ class JsonLogFormatter(logging.Formatter):

LOGGING_FORMAT_VERSION = "2.0"

# Map from Python logging to Syslog severity levels
SYSLOG_LEVEL_MAP = {logging.DEBUG: 2, logging.ERROR: 3, logging.WARNING: 4, logging.INFO: 6, logging.DEBUG: 7}
# Map from Python logging to GCP severity levels
# From https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#LogSeverity
SYSLOG_LEVEL_MAP = {
logging.CRITICAL: 600,
logging.ERROR: 500,
logging.WARNING: 400,
logging.INFO: 200,
logging.DEBUG: 100,
}

# Syslog level to use when/if python level isn't found in map
DEFAULT_SYSLOG_LEVEL = 7
DEFAULT_SYSLOG_LEVEL = 200

EXCLUDED_LOGRECORD_ATTRS = set(
(
Expand Down
4 changes: 2 additions & 2 deletions tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_logger(caplog):
assert r.message == "TEST OUTPUT"

o = json.loads(stream.getvalue())
assert o["Severity"] == 6
assert o["Severity"] == 200
assert o["Fields"]["message"] == "TEST OUTPUT"


Expand All @@ -37,7 +37,7 @@ def test_exception(caplog):
assert r.exc_info

o = json.loads(stream.getvalue())
assert o["Severity"] == 3
assert o["Severity"] == 500
assert o["Fields"]["message"] == "TEST OUTPUT"
assert o["Fields"]["error"].startswith("ValueError")

Expand Down

0 comments on commit 069d6cd

Please sign in to comment.