Skip to content

Commit ee04943

Browse files
committed
senders: Skips messages larger than 256K
GCP quotas page for the logging v2(https://cloud.google.com/logging/quotas) specifies that a LogEntry should have at maximum 256KB in size, that is not a hard limit, but an estimation. In anycase we should check if any of our LogEntry objects are bigger than the quota allows and not send those logs and instead let the user know what happened through a custom message
1 parent 9803f9c commit ee04943

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

journalpump/senders/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import re # type: ignore[no-redef]
1515

1616
KAFKA_COMPRESSED_MESSAGE_OVERHEAD = 30
17+
18+
# GCP logging also relies on this MAX message size
1719
MAX_KAFKA_MESSAGE_SIZE = 1024**2 # 1 MiB
1820

1921
MAX_ERROR_MESSAGES = 8

journalpump/senders/google_cloud_logging.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class GoogleCloudLoggingSender(LogSender):
2323
0: "EMERGENCY",
2424
}
2525

26+
# A bit on the safe side, not exactly 256KB but this
27+
# is an approximation anyway
28+
# according to https://cloud.google.com/logging/quotas
29+
_LOG_ENTRY_QUOTA = 250 * 1024
30+
2631
def __init__(self, *, config, googleapiclient_request_builder=None, **kwargs):
2732
super().__init__(config=config, max_send_interval=config.get("max_send_interval", 0.3), **kwargs)
2833
credentials = None
@@ -61,7 +66,14 @@ def send_messages(self, *, messages, cursor):
6166

6267
for message in messages:
6368
msg_str = message.decode("utf8")
64-
msg = json.loads(msg_str)
69+
# This might not measure exactly 256K but should be
70+
# a good enough approximation to handle this error.
71+
# In anycase, we are handling this error in the
72+
# exception handler below also.
73+
if len(message) > GoogleCloudLoggingSender._LOG_ENTRY_QUOTA:
74+
msg = { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" }
75+
else:
76+
msg = json.loads(msg_str)
6577
timestamp = msg.pop("timestamp", None)
6678
journald_priority = msg.pop("PRIORITY", None)
6779

test/test_google_cloud_logging.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,21 @@ def test_correct_timestamp(self):
144144
cursor=None,
145145
)
146146
assert sender._sent_count == 1 # pylint: disable=protected-access
147+
148+
def test_big_logentry_is_skip(self):
149+
"""Check that message was not marked as sent if GoogleApi returns error"""
150+
request_builder = self._generate_request_builder(
151+
[{"jsonPayload": { "MESSAGE": "Log entry cannot be logged because its size is greater than GCP logging quota of 256K" }}],
152+
)
153+
154+
sender = GoogleCloudLoggingSender(
155+
name="googlecloudlogging",
156+
reader=mock.Mock(),
157+
stats=mock.Mock(),
158+
field_filter=None,
159+
config=self.CONFIG,
160+
googleapiclient_request_builder=request_builder,
161+
)
162+
message = f'{{"MESSAGE": "{"A" * 257_000}"}}'
163+
sender.send_messages(messages=[message.encode()], cursor=None)
164+
assert sender._sent_count == 1

0 commit comments

Comments
 (0)