Skip to content

Commit 8c5d5df

Browse files
committed
push more fixes
1 parent 3172ade commit 8c5d5df

File tree

2 files changed

+116
-31
lines changed

2 files changed

+116
-31
lines changed

AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/AWSCloudWatchLoggingSessionController.swift

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,19 @@ final class AWSCloudWatchLoggingSessionController {
132132
}
133133
self.batchSubscription = producer.logBatchPublisher.sink { [weak self] batch in
134134
guard self?.networkMonitor.isOnline == true else { return }
135+
136+
// Capture strong references to consumer and batch before the async task
137+
let strongConsumer = consumer
138+
let strongBatch = batch
139+
135140
Task {
136141
do {
137-
try await consumer.consume(batch: batch)
142+
try await strongConsumer.consume(batch: strongBatch)
138143
} catch {
139144
Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)")
140145
let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription)
141146
Amplify.Hub.dispatch(to: HubChannel.logging, payload: payload)
142-
try batch.complete()
147+
try strongBatch.complete()
143148
}
144149
}
145150
}
@@ -178,8 +183,15 @@ final class AWSCloudWatchLoggingSessionController {
178183
}
179184

180185
private func consumeLogBatch(_ batch: LogBatch) async throws {
186+
// Check if consumer exists before trying to use it
187+
guard let consumer = self.consumer else {
188+
// If consumer is nil, still mark the batch as completed to prevent memory leaks
189+
try batch.complete()
190+
return
191+
}
192+
181193
do {
182-
try await consumer?.consume(batch: batch)
194+
try await consumer.consume(batch: batch)
183195
} catch {
184196
Amplify.Logging.default.error("Error flushing logs with error \(error.localizedDescription)")
185197
let payload = HubPayload(eventName: HubPayload.EventName.Logging.flushLogFailure, context: error.localizedDescription)

AmplifyPlugins/Logging/Sources/AWSCloudWatchLoggingPlugin/Consumer/CloudWatchLoggingConsumer.swift

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,50 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
4040
}
4141
await ensureLogStreamExists()
4242

43-
let batchByteSize = try encoder.encode(entries).count
43+
// Add safety check for nil logStreamName
44+
guard let _ = self.logStreamName else {
45+
Amplify.Logging.error("Log stream name is nil, cannot send logs")
46+
try batch.complete()
47+
return
48+
}
49+
50+
// Wrap encoding in do-catch to prevent crashes
51+
var batchByteSize: Int
52+
do {
53+
batchByteSize = try encoder.encode(entries).count
54+
} catch {
55+
Amplify.Logging.error("Failed to encode log entries: \(error)")
56+
try batch.complete()
57+
return
58+
}
59+
4460
if entries.count > AWSCloudWatchConstants.maxLogEvents {
4561
let smallerEntries = entries.chunked(into: AWSCloudWatchConstants.maxLogEvents)
4662
for entries in smallerEntries {
47-
let entrySize = try encoder.encode(entries).count
48-
if entrySize > AWSCloudWatchConstants.maxBatchByteSize {
49-
let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
50-
for chunk in chunks {
51-
try await sendLogEvents(chunk)
63+
// Wrap in do-catch to prevent crashes
64+
do {
65+
let entrySize = try encoder.encode(entries).count
66+
if entrySize > AWSCloudWatchConstants.maxBatchByteSize {
67+
let chunks = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
68+
for chunk in chunks {
69+
try await sendLogEvents(chunk)
70+
}
71+
} else {
72+
try await sendLogEvents(entries)
5273
}
53-
} else {
54-
try await sendLogEvents(entries)
74+
} catch {
75+
Amplify.Logging.error("Error processing log batch: \(error)")
76+
continue
5577
}
5678
}
57-
5879
} else if batchByteSize > AWSCloudWatchConstants.maxBatchByteSize {
59-
let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
60-
for entries in smallerEntries {
61-
try await sendLogEvents(entries)
80+
do {
81+
let smallerEntries = try chunk(entries, into: AWSCloudWatchConstants.maxBatchByteSize)
82+
for entries in smallerEntries {
83+
try await sendLogEvents(entries)
84+
}
85+
} catch {
86+
Amplify.Logging.error("Error chunking log entries: \(error)")
6287
}
6388
} else {
6489
try await sendLogEvents(entries)
@@ -126,22 +151,49 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
126151
}
127152

128153
private func sendLogEvents(_ entries: [LogEntry]) async throws {
154+
// Safety check for empty entries
155+
if entries.isEmpty {
156+
return
157+
}
158+
159+
// Safety check for logStreamName
160+
guard let logStreamName = self.logStreamName, !logStreamName.isEmpty else {
161+
Amplify.Logging.error("Cannot send log events: Log stream name is nil or empty")
162+
return
163+
}
164+
129165
let events = convertToCloudWatchInputLogEvents(for: entries)
130-
let response = try await self.client.putLogEvents(input: PutLogEventsInput(
131-
logEvents: events,
132-
logGroupName: self.logGroupName,
133-
logStreamName: self.logStreamName,
134-
sequenceToken: nil
135-
))
136-
let retriableEntries = retriable(entries: entries, in: response)
137-
if !retriableEntries.isEmpty {
138-
let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries)
139-
_ = try await self.client.putLogEvents(input: PutLogEventsInput(
140-
logEvents: retriableEvents,
166+
167+
// Safety check for empty events
168+
if events.isEmpty {
169+
Amplify.Logging.warn("No valid events to send to CloudWatch")
170+
return
171+
}
172+
173+
do {
174+
let response = try await self.client.putLogEvents(input: PutLogEventsInput(
175+
logEvents: events,
141176
logGroupName: self.logGroupName,
142-
logStreamName: self.logStreamName,
177+
logStreamName: logStreamName,
143178
sequenceToken: nil
144179
))
180+
181+
// Handle retriable entries
182+
let retriableEntries = retriable(entries: entries, in: response)
183+
if !retriableEntries.isEmpty {
184+
let retriableEvents = convertToCloudWatchInputLogEvents(for: retriableEntries)
185+
if !retriableEvents.isEmpty {
186+
_ = try await self.client.putLogEvents(input: PutLogEventsInput(
187+
logEvents: retriableEvents,
188+
logGroupName: self.logGroupName,
189+
logStreamName: logStreamName,
190+
sequenceToken: nil
191+
))
192+
}
193+
}
194+
} catch {
195+
Amplify.Logging.error("Failed to send log events: \(error)")
196+
throw error
145197
}
146198
}
147199

@@ -176,19 +228,40 @@ extension CloudWatchLoggingConsumer: LogBatchConsumer {
176228
var chunks: [[LogEntry]] = []
177229
var chunk: [LogEntry] = []
178230
var currentChunkSize = 0
231+
179232
for entry in entries {
180-
let entrySize = try encoder.encode(entry).count
233+
// Wrap the encoding in a do-catch to handle potential errors
234+
var entrySize: Int
235+
do {
236+
entrySize = try encoder.encode(entry).count
237+
} catch {
238+
Amplify.Logging.error("Failed to encode log entry: \(error)")
239+
// Skip this entry and continue with the next one
240+
continue
241+
}
242+
181243
if currentChunkSize + entrySize < maxByteSize {
182244
chunk.append(entry)
183245
currentChunkSize = currentChunkSize + entrySize
184246
} else {
185-
chunks.append(chunk)
247+
// Only add non-empty chunks
248+
if !chunk.isEmpty {
249+
chunks.append(chunk)
250+
}
186251
chunk = [entry]
187-
currentChunkSize = currentChunkSize + entrySize
252+
currentChunkSize = entrySize
188253
}
189254
}
190-
255+
256+
// Add the last chunk if it's not empty
257+
if !chunk.isEmpty {
258+
chunks.append(chunk)
259+
}
260+
261+
// Return even if chunks is empty to avoid null pointer issues
191262
return chunks
192263
}
193264
// swiftlint:enable shorthand_operator
194265
}
266+
267+

0 commit comments

Comments
 (0)