Skip to content

senders: Mitigate issue with GCP logging quota #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions journalpump/senders/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import re # type: ignore[no-redef]

KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30

# GCP logging also relies on this MAX message size
MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB

MAX_ERROR_MESSAGES = 8
Expand Down
23 changes: 22 additions & 1 deletion journalpump/senders/google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ class GoogleCloudLoggingSender(LogSender):
0: "EMERGENCY",
}

# A bit on the safe side, not exactly 256KB but this
# is an approximation anyway
# according to https://cloud.google.com/logging/quotas
_LOG_ENTRY_QUOTA = 250 * 1024

# Somewhat arbitrary maximum message size choosen, this gives a 56K
# headroom for the other fields in the LogEntry
_MAX_MESSAGE_SIZE = 200 * 1024

def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs):
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
credentials = None
Expand Down Expand Up @@ -62,6 +71,18 @@ def send_messages(self, *, messages, cursor):
for message in messages:
msg_str = message.decode("utf8")
msg = json.loads(msg_str)

# This might not measure exactly 256K but should be a good enough approximation to handle this error.
# We try truncating the message if it isn't possible then it is skip.
if len(message) > self._LOG_ENTRY_QUOTA:
DEFAULT_MESSAGE = "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
if "MESSAGE" in msg:
msg["MESSAGE"] = f'{msg["MESSAGE"][:self._MAX_MESSAGE_SIZE]}[MESSAGE TRUNCATED]'
messsage_size = len(json.dumps(msg, ensure_ascii=False).encode("utf-8"))
if messsage_size > self._LOG_ENTRY_QUOTA:
msg = {"MESSAGE": DEFAULT_MESSAGE}
else:
msg = {"MESSAGE": DEFAULT_MESSAGE}
timestamp = msg.pop("timestamp", None)
journald_priority = msg.pop("PRIORITY", None)

Expand All @@ -75,7 +96,7 @@ def send_messages(self, *, messages, cursor):
if timestamp is not None:
entry["timestamp"] = timestamp[:26] + "Z" # assume timestamp to be UTC
if journald_priority is not None:
severity = GoogleCloudLoggingSender._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
severity = self._SEVERITY_MAPPING.get(journald_priority, "DEFAULT")
entry["severity"] = severity
body["entries"].append(entry)

Expand Down
47 changes: 47 additions & 0 deletions test/test_google_cloud_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,50 @@ def test_correct_timestamp(self):
cursor=None,
)
assert sender._sent_count == 1 # pylint: disable=protected-access

def test_big_logentry_is_truncated(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
message_content = "A" * 257_00
request_builder = self._generate_request_builder(
[{"jsonPayload": {"MESSAGE": message_content[: GoogleCloudLoggingSender._MAX_MESSAGE_SIZE]}}],
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": message_content}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

def test_big_logentry_sends_default(self):
"""Check that message was not marked as sent if GoogleApi returns error"""
request_builder = self._generate_request_builder(
[
{
"jsonPayload": {
"MESSAGE": "Log entry can't be logged because its size is greater than GCP logging quota of 256K"
}
}
]
)

sender = GoogleCloudLoggingSender(
name="googlecloudlogging",
reader=mock.Mock(),
stats=mock.Mock(),
field_filter=None,
config=self.CONFIG,
googleapiclient_request_builder=request_builder,
)
message = {"MESSAGE": "A" * 200_000, "OTHER_FIELD": "B" * 200_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 1

message = {"OTHER_FIELD": "B" * 257_000}
sender.send_messages(messages=[json.dumps(message).encode()], cursor=None)
assert sender._sent_count == 2
Loading