55 "regexp"
66 "strconv"
77 "strings"
8+ "sync"
89 "time"
910
1011 "github.com/akto-api-security/api-gateway-logging/trafficUtil/utils"
@@ -13,15 +14,53 @@ import (
1314 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
1415)
1516
16- // StreamTracker tracks the progress and state of a log stream.
17- type StreamTracker struct {
18- NextToken * string
19- LastChecked time.Time
20- Active bool
21- logs map [string ]* LogEntry
17+ var cloudwatchReadBatchSize = 5
18+ var globalTimestampTracker = NewTimestampTracker ()
19+
20+ const POLL_DURATION = 300000 // 5 minutes in milliseconds
21+ const LOG_STREAM_FETCH_TIME = 5400000 // 1.5 hours in milliseconds
22+ const MAX_STREAM_MAP_SIZE = 10000 // Maximum entries in lastReadTimestamps map
23+
24+ // Simple timestamp tracker for log group + stream combinations
25+ type TimestampTracker struct {
26+ mu sync.RWMutex
27+ lastReadTimestamps map [string ]int64 // "logGroupArn|streamName" -> last read timestamp
2228}
2329
24- var cloudwatchReadBatchSize = 5
30+ func NewTimestampTracker () * TimestampTracker {
31+ return & TimestampTracker {
32+ lastReadTimestamps : make (map [string ]int64 ),
33+ }
34+ }
35+
36+ func (t * TimestampTracker ) GetLastReadTimestamp (logGroupArn , streamName string ) int64 {
37+ t .mu .RLock ()
38+ defer t .mu .RUnlock ()
39+ key := logGroupArn + "|" + streamName
40+ return t .lastReadTimestamps [key ]
41+ }
42+
43+ func (t * TimestampTracker ) UpdateLastReadTimestamp (logGroupArn , streamName string , timestamp int64 ) {
44+ t .mu .Lock ()
45+ defer t .mu .Unlock ()
46+ key := logGroupArn + "|" + streamName
47+ t .lastReadTimestamps [key ] = timestamp
48+
49+ // Cleanup if map gets too large
50+ if len (t .lastReadTimestamps ) > MAX_STREAM_MAP_SIZE {
51+ t .cleanupStaleEntries ()
52+ }
53+ }
54+
55+ func (t * TimestampTracker ) cleanupStaleEntries () {
56+ cutoffTime := time .Now ().Unix ()* 1000 - LOG_STREAM_FETCH_TIME
57+ for key , timestamp := range t .lastReadTimestamps {
58+ if timestamp < cutoffTime {
59+ delete (t .lastReadTimestamps , key )
60+ }
61+ }
62+ utils .DebugLog ("TimestampTracker cleanup completed, entries: %d" , len (t .lastReadTimestamps ))
63+ }
2564
2665func init () {
2766 utils .InitVar ("CLOUDWATCH_READ_BATCH_SIZE" , & cloudwatchReadBatchSize )
@@ -32,56 +71,53 @@ func MonitorLogGroup(ctx context.Context, client *cloudwatchlogs.Client, logGrou
3271 utils .DebugLog ("MonitorLogGroup() - Starting log processor for log group: %s" , logGroupArn )
3372
3473 for {
35- now := time .Now ().Unix () * 1000
36- lastProcessedEventTime := now - 300000
37- utils .DebugLog ("MonitorLogGroup() - Time now: %d" , now )
38- utils .DebugLog ("MonitorLogGroup() - Starting Last processed event time: %d" , lastProcessedEventTime )
74+ cycleStartTime := time .Now ().Unix () * 1000
75+ utils .DebugLog ("MonitorLogGroup() - Starting new monitoring cycle at: %d for logGroup: %s" , cycleStartTime , logGroupArn )
3976
40- logStreams , err := FetchLogStreams (ctx , client , logGroupArn , lastProcessedEventTime )
77+ // Fetch log streams from the last 1.5 hours
78+ lookBackTime := cycleStartTime - LOG_STREAM_FETCH_TIME
79+ logStreams , err := FetchLogStreams (ctx , client , logGroupArn , lookBackTime )
4180
4281 if err != nil {
43- utils .DebugLog ("MonitorLogGroup() - Error fetching log streams: %+v" , err )
82+ utils .DebugLog ("MonitorLogGroup() - Error fetching log streams: %+v for logGroup: %s " , err , logGroupArn )
4483 time .Sleep (10 * time .Second )
4584 continue
4685 }
4786
48- if len (logStreams ) == 0 {
49- utils .DebugLog ("MonitorLogGroup() - No new log streams found" )
50- }
87+ utils .DebugLog ("MonitorLogGroup() - Found %d log streams to process for logGroup: %s" , len (logStreams ), logGroupArn )
5188
52- latestTimestamp := lastProcessedEventTime
89+ // For each stream, query from last read timestamp onwards
5390 for _ , stream := range logStreams {
54- utils .DebugLog ("MonitorLogGroup() - Processing log stream: %s" , * stream .LogStreamName )
55- utils .DebugLog ("MonitorLogGroup() - Last event timestamp: %d" , * stream .LastEventTimestamp )
56- utils .DebugLog ("MonitorLogGroup() - Last processed event time: %d" , lastProcessedEventTime )
57- if stream .LastEventTimestamp != nil && * stream .LastEventTimestamp > lastProcessedEventTime {
58- events , err := getLogEvents (ctx , client , logGroupArn , * stream .LogStreamName , lastProcessedEventTime )
91+ streamName := * stream .LogStreamName
92+ lastReadTime := globalTimestampTracker .GetLastReadTimestamp (logGroupArn , streamName )
5993
60- if err != nil {
61- continue
62- }
94+ // If never read before, start from 10 minutes ago to avoid too much historical data
95+ if lastReadTime == 0 {
96+ lastReadTime = cycleStartTime - 2 * POLL_DURATION
97+ }
6398
64- if len (events ) > 0 {
65- for _ , event := range events {
66- if * event .Timestamp > latestTimestamp {
67- latestTimestamp = * event .Timestamp
68- }
69- }
70- }
99+ utils .DebugLog ("MonitorLogGroup() - Processing stream: %s, reading from: %d for logGroup: %s" , streamName , lastReadTime , logGroupArn )
100+
101+ events , maxTimestamp , err := getLogEvents (ctx , client , logGroupArn , streamName , lastReadTime )
102+ if err != nil {
103+ utils .DebugLog ("MonitorLogGroup() - Error fetching log events for stream %s: %+v for logGroup: %s" , streamName , err , logGroupArn )
104+ continue
71105 }
72- }
73106
74- if latestTimestamp > lastProcessedEventTime {
75- lastProcessedEventTime = latestTimestamp
107+ if len (events ) > 0 {
108+ // Update last read timestamp to the latest event we processed
109+ maxTimestamp = maxTimestamp + 1 // Move forward by 1 ms to avoid re-reading the same event
110+ globalTimestampTracker .UpdateLastReadTimestamp (logGroupArn , streamName , maxTimestamp )
111+ utils .DebugLog ("MonitorLogGroup() - Processed %d new events from stream: %s, updated timestamp to: %d for logGroup: %s" , len (events ), streamName , maxTimestamp , logGroupArn )
112+ }
76113 }
77114
78- elapsed := time .Now ().Unix ()* 1000 - now
79- if elapsed < 300000 {
80- utils .DebugLog ("MonitorLogGroup() - Sleeping for %d milliseconds. Log group: %s" , 300000 - elapsed , logGroupArn )
81- time .Sleep (time .Duration (300000 - elapsed ) * time .Millisecond )
82- } else {
83- utils .DebugLog ("MonitorLogGroup() - Resetting last processed event time" )
84- lastProcessedEventTime = now - 300000
115+ // Sleep for the remainder of the 5-minute cycle
116+ elapsed := time .Now ().Unix ()* 1000 - cycleStartTime
117+ if elapsed < POLL_DURATION {
118+ sleepTime := POLL_DURATION - elapsed
119+ utils .DebugLog ("MonitorLogGroup() - Cycle completed in %d ms, sleeping for %d ms for logGroup: %s" , elapsed , sleepTime , logGroupArn )
120+ time .Sleep (time .Duration (sleepTime ) * time .Millisecond )
85121 }
86122
87123 time .Sleep (10 * time .Second )
@@ -101,26 +137,23 @@ func FetchLogStreams(ctx context.Context, client *cloudwatchlogs.Client, logGrou
101137 })
102138
103139 if output != nil {
104- utils .DebugLog ("FetchLogStreams() - Number of log streams fetched: %+v" , len (output .LogStreams ))
140+ utils .DebugLog ("FetchLogStreams() - Number of log streams fetched: %+v for logGroup: %s " , len (output .LogStreams ), logGroupArn )
105141 }
106142
107143 if err != nil {
108- utils .DebugLog ("FetchLogStreams() - Error fetching log streams: %+v" , err )
144+ utils .DebugLog ("FetchLogStreams() - Error fetching log streams: %+v for logGroup: %s " , err , logGroupArn )
109145 return nil , err
110146 }
111147
112148 for _ , stream := range output .LogStreams {
113149 if stream .LastEventTimestamp != nil && * stream .LastEventTimestamp > lastProcessedEventTime {
114- utils .DebugLog ("FetchLogStreams() - Adding log stream: %s, LastEventTimestamp: %d" , * stream .LogStreamName , * stream .LastEventTimestamp )
150+ utils .DebugLog ("FetchLogStreams() - Adding log stream: %s, LastEventTimestamp: %d for logGroup: %s " , * stream .LogStreamName , * stream .LastEventTimestamp , logGroupArn )
115151 logStreams = append (logStreams , stream )
116- } else {
117- utils .DebugLog ("FetchLogStreams() - Discard all older log streams beyond this stream: %s , Last processed event time: %d, LastEventTimestamp: %d in log group: %s" , * stream .LogStreamName , lastProcessedEventTime , * stream .LastEventTimestamp , logGroupArn )
118- return logStreams , nil
119152 }
120153 }
121154
122155 if output .NextToken == nil {
123- utils .DebugLog ("FetchLogStreams() - No more log streams" )
156+ utils .DebugLog ("FetchLogStreams() - No more log streams for logGroup: %s" , logGroupArn )
124157 break
125158 }
126159 nextToken = output .NextToken
@@ -129,9 +162,11 @@ func FetchLogStreams(ctx context.Context, client *cloudwatchlogs.Client, logGrou
129162 return logStreams , nil
130163}
131164
132- func getLogEvents (ctx context.Context , client * cloudwatchlogs.Client , logGroupArn , logStreamName string , startTime int64 ) ([]types.OutputLogEvent , error ) {
133- utils .DebugLog ("MonitorLogGroup() - Fetching log events for stream: %s" , logStreamName )
165+ func getLogEvents (ctx context.Context , client * cloudwatchlogs.Client , logGroupArn , logStreamName string , startTime int64 ) ([]types.OutputLogEvent , int64 , error ) {
166+ utils .DebugLog ("getLogEvents() - Fetching events for stream: %s, time start: %d for logGroup: %s" , logStreamName , startTime , logGroupArn )
167+
134168 var logEvents []types.OutputLogEvent
169+ var maxTimestamp int64 = startTime
135170
136171 reqIDRegex := regexp .MustCompile (`\(([^)]+)\)` )
137172 httpMethodRegex := regexp .MustCompile (`HTTP Method:\s*(\S+),\s*Resource Path:\s*(\S+)` )
@@ -142,39 +177,42 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
142177 output , err := client .GetLogEvents (ctx , & cloudwatchlogs.GetLogEventsInput {
143178 LogGroupIdentifier : aws .String (logGroupArn ),
144179 LogStreamName : aws .String (logStreamName ),
145- StartTime : aws .Int64 (startTime - 60000 ),
180+ StartTime : aws .Int64 (startTime ),
146181 EndTime : aws .Int64 (time .Now ().Unix () * 1000 ),
147182 NextToken : nextToken ,
148183 })
149184
150185 if err != nil {
151- utils .DebugLog ("getLogEvents() - Error fetching log events: %+v" , err )
152- return nil , err
186+ utils .DebugLog ("getLogEvents() - Error fetching log events: %+v for stream: %s, logGroup: %s, startTime: %d " , err , logStreamName , logGroupArn , startTime )
187+ return nil , maxTimestamp , err
153188 }
154189
155190 if output != nil {
156- utils .DebugLog ("getLogEvents() - Logs output number of events: %+v" , len (output .Events ))
191+ utils .DebugLog ("getLogEvents() - Logs output number of events: %+v for stream: %s, logGroup: %s, startTime: %d " , len (output .Events ), logStreamName , logGroupArn , startTime )
157192 }
158193
159194 if len (output .Events ) == 0 {
160- utils .DebugLog ("getLogEvents() - No new events found" )
195+ utils .DebugLog ("getLogEvents() - No new events found for stream: %s, logGroup: %s, startTime: %d" , logStreamName , logGroupArn , startTime )
161196 break
162197 }
163198
164199 logEntries := make (map [string ]LogEntry )
165200
166201 for _ , event := range output .Events {
202+ eventTime := * event .Timestamp
203+
204+ if eventTime > maxTimestamp {
205+ maxTimestamp = eventTime
206+ }
207+
167208 message := * event .Message
168209 matches := reqIDRegex .FindStringSubmatch (message )
169210 if len (matches ) < 2 {
170- utils .DebugLog ("getLogEvents() - No request ID found in message: %s" , message )
211+ utils .DebugLog ("getLogEvents() - No request ID found in message: %s for stream: %s, logGroup: %s " , message , logStreamName , logGroupArn )
171212 continue
172213 }
173214
174- eventTime := * event .Timestamp
175- utils .DebugLog ("getLogEvents() - Event time: %d" , eventTime )
176- utils .DebugLog ("getLogEvents() - Log message: %s" , message )
177- utils .DebugLog ("getLogEvents() - Request ID: %s" , matches [1 ])
215+ utils .DebugLog ("getLogEvents() - Event time: %d, RequestID: %s, Message: %s for stream: %s, logGroup: %s" , eventTime , matches [1 ], message , logStreamName , logGroupArn )
178216
179217 requestID := matches [1 ]
180218
@@ -189,9 +227,9 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
189227 if len (matches ) == 3 {
190228 logEntry .HTTPMethod = matches [1 ]
191229 logEntry .ResourcePath = matches [2 ]
192- utils .DebugLog ("getLogEvents() - HTTP Method: %s, Resource Path: %s" , logEntry .HTTPMethod , logEntry .ResourcePath )
230+ utils .DebugLog ("getLogEvents() - HTTP Method: %s, Resource Path: %s for stream: %s, logGroup: %s " , logEntry .HTTPMethod , logEntry .ResourcePath , logStreamName , logGroupArn )
193231 } else {
194- utils .DebugLog ("getLogEvents() - Error parsing HTTP method and resource path: %s" , message )
232+ utils .DebugLog ("getLogEvents() - Error parsing HTTP method and resource path: %s for stream: %s, logGroup: %s " , message , logStreamName , logGroupArn )
195233 }
196234 } else if strings .Contains (message , "Method request query string:" ) {
197235 logEntry .QueryParams = extractMap (message , "Method request query string:" )
@@ -211,10 +249,10 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
211249 if err == nil {
212250 logEntry .StatusCode = statusCode
213251 } else {
214- utils .DebugLog ("Error converting status code to integer: %+v" , err )
252+ utils .DebugLog ("Error converting status code to integer: %+v for stream: %s, logGroup: %s " , err , logStreamName , logGroupArn )
215253 }
216254 } else {
217- utils .DebugLog ("Error: Could not find status code in the message: %s" , message )
255+ utils .DebugLog ("Error: Could not find status code in the message: %s for stream: %s, logGroup: %s " , message , logStreamName , logGroupArn )
218256 }
219257 }
220258 }
@@ -223,18 +261,18 @@ func getLogEvents(ctx context.Context, client *cloudwatchlogs.Client, logGroupAr
223261 }
224262
225263 for _ , logEntry := range logEntries {
226- utils .DebugLog ("getLogEvents() - Final log entry: %+v" , logEntry )
264+ utils .DebugLog ("getLogEvents() - Final log entry: %+v for stream: %s, logGroup: %s " , logEntry , logStreamName , logGroupArn )
227265 ParseAndProduce (logEntry )
228266 }
229267
230268 logEvents = append (logEvents , output .Events ... )
231269
232270 if output .NextForwardToken == nil || (nextToken != nil && * nextToken == * output .NextForwardToken ) {
233- utils .DebugLog ("getLogEvents() - No more events" )
271+ utils .DebugLog ("getLogEvents() - No more events for stream: %s, logGroup: %s" , logStreamName , logGroupArn )
234272 break
235273 }
236274 nextToken = output .NextForwardToken
237275 }
238276
239- return logEvents , nil
277+ return logEvents , maxTimestamp , nil
240278}
0 commit comments