Skip to content

Conversation

@Mangaal
Copy link

@Mangaal Mangaal commented Sep 16, 2025

This PR introduces a unidirectional(agent → principal) log streaming service and wires it into the resource-proxy path so the Principal can serve Kubernetes pod logs to the Argo CD UI. The Agent handles both static logs (follow=false) and live streaming (follow=true) with resume support.

What’s included:

  • New LogStreaming service (gRPC) — Agent opens a client-streaming RPC and pushes log chunks keyed by request_uuid; Principal writes directly to the HTTP response stream and returns a final status when the stream ends.
  • Principal resource-proxy integration — /…/pods/{name}/log requests are recognised, the HTTP writer is registered, and a log event is enqueued to the Agent.
  • Agent log workers — static and live log handlers; time-window flush or 64KiB chunk flush; live streaming has resume (SinceTime) on transient errors.

Key feature:

  • Principal LogStream gRPC server & HTTP bridge.
  • Agent log streaming implementation (static + live + resume).
  • Principal resource proxy: log-subresource branch & handoff to LogStream.

Assisted-by: Cursor/Gemini etc

logs.mov

Summary by CodeRabbit

  • New Features
    • Container log streaming: request-and-stream support for static (one-time) and live (follow) logs with resume-capable reconnection, duplicate-request protection, cancellation, and HTTP/gRPC delivery; principal now exposes a log streaming service and proxying for per-request log streams.
  • Tests
    • Extensive unit and end-to-end tests validating streaming workflows, timestamp extraction/resume, Kubernetes log retrieval, HTTP/gRPC interactions, and error/cancellation paths.

@Mangaal Mangaal force-pushed the log-streaming branch 2 times, most recently from 4d5e132 to 30aab14 Compare September 16, 2025 14:58
@Mangaal Mangaal closed this Sep 16, 2025
@Mangaal Mangaal reopened this Sep 16, 2025
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit 2a08301)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit d07df62)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit 161f2a4)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit 30aab14)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit f8a6666)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
(cherry picked from commit e820c35)
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
@codecov-commenter
Copy link

codecov-commenter commented Sep 17, 2025

Codecov Report

❌ Patch coverage is 45.13109% with 293 lines in your changes missing coverage. Please review.
✅ Project coverage is 45.54%. Comparing base (e90cc07) to head (152e239).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
agent/log.go 34.78% 153 Missing and 12 partials ⚠️
principal/resource.go 12.50% 49 Missing and 7 partials ⚠️
internal/event/event.go 0.00% 43 Missing ⚠️
principal/apis/logstreamapi/logstream.go 83.73% 23 Missing and 4 partials ⚠️
agent/inbound.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #569      +/-   ##
==========================================
+ Coverage   45.52%   45.54%   +0.02%     
==========================================
  Files          90       92       +2     
  Lines        9957    10488     +531     
==========================================
+ Hits         4533     4777     +244     
- Misses       4959     5226     +267     
- Partials      465      485      +20     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Mangaal Mangaal marked this pull request as ready for review September 17, 2025 12:01
@chetan-rns
Copy link
Collaborator

@Mangaal I see this error intermittently on the UI. Works fine after requesting the logs again. I guess we are not handling EOF somewhere?

Get "https://rathole-container-internal:9090/api/v1/namespaces/
guestbook/pods/kustomize-guestbook-ui-7689b675bc-cbv8h/log?container=guestbook-ui&follow=true&
tailLines=1000&timestamps=true": EOF

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
@Mangaal
Copy link
Author

Mangaal commented Sep 30, 2025

@chetan-rns, Thanks for reviewing my PR. I’ve updated it and addressed your suggestions. Please take a look when you get a chance.

Copy link
Collaborator

@chetan-rns chetan-rns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mangaal Sorry for the delay. Added a few questions around simplifying the agent logic. IMO, the agent should only propagate the options from the argocd server to the client-go's GetLogs(). Read the bytes from the reader until EOF, forward them back in chunks, and return any errors. We can avoid extracting timestamps to modify the sinceTime dynamically. I think we can rely on the argocd server to handle the chunks. This way the agent doesn't have to do any extra work. WDYT @jannfis

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link
Collaborator

@chetan-rns chetan-rns left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Mangaal! The overall PR looks good to me.

@jannfis @jgwest Please take a look when you have a moment.

@jannfis
Copy link
Collaborator

jannfis commented Oct 31, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

✅ Actions performed

Full review triggered.

@coderabbitai
Copy link

coderabbitai bot commented Oct 31, 2025

Walkthrough

Adds end-to-end container log streaming: agents accept ContainerLogRequest events, stream pod logs (static and resumable live) to principal via a new gRPC LogStream service; principal exposes HTTP streaming endpoints, protobufs, server implementation, tests, mocks, and a backoff dependency.

Changes

Cohort / File(s) Summary
Agent core & inbound
agent/agent.go, agent/inbound.go
Added inflightMu sync.Mutex and inflightLogs map[string]context.CancelFunc on Agent; inbound routing now dispatches TargetContainerLog events to processIncomingContainerLogRequest.
Agent log implementation
agent/log.go
New container log handling: request deduplication/in-flight registry, static and resume-capable live streaming, Kubernetes PodLog stream creation, chunked gRPC streaming to principal, timestamp extraction for resume, exponential backoff, concurrency and cleanup logic, and gRPC client creation.
Agent unit tests
agent/log_test.go
Added test scaffolding and mocks (MockLogStreamClient, MockReadCloser), helpers and tests for timestamp extraction, kube stream creation, dedupe behavior, static/live flows, cancellation, and stream lifecycle.
Event model
internal/event/event.go
Added TargetContainerLog constant, ContainerLogRequest struct, Event.ContainerLogRequest() extractor, and EventSource.NewLogRequestEvent to build container log events; extended Target recognition.
Principal server wiring
principal/server.go, principal/listen.go
Added logStream *logstream.Server to Server and registered the LogStreamService in gRPC server setup.
Principal resource handling
principal/resource.go
Special-case handling for log subresource: validate namespace/pod, register HTTP writer for streaming, submit log event, and handle follow (stream) vs static (wait for completion/timeout) semantics.
LogStream protobuf
principal/apis/logstreamapi/logstream.proto
New proto with LogStreamData (request_uuid,data,eof,error), LogStreamResponse, and LogStreamService RPC StreamLogs (client-streaming).
LogStream server impl
principal/apis/logstreamapi/logstream.go
New exported logstream.Server with session map, RegisterHTTP to attach HTTP writers, StreamLogs RPC handling, message processing (data/EOF/errors), WaitForCompletion, safe flushing, cancellation propagation, and session finalization.
LogStream server tests
principal/apis/logstreamapi/logstream_test.go
Tests for server lifecycle, HTTP registration, stream workflows (data, EOF, errors), cancellation, completion waiting, and panic/safety scenarios.
LogStream mocks
principal/apis/logstreamapi/mock/mock.go
Added MockLogStreamServer, MockHTTPResponseWriter, PanicFlusher, and MockWriterWithoutFlusher to support unit tests.
E2E client helpers
test/e2e/fixture/argoclient.go
Added GetLogs and GetApplicationLogs to fetch pod/application logs (container, tailLines query options).
E2E tests
test/e2e/logs_test.go
New LogsStreamingTestSuite with tests creating apps, waiting for sync/health, locating pods, fetching logs, and asserting non-empty logs.
Dependencies
go.mod
Added dependency github.com/cenkalti/backoff/v4 v4.3.0.
Tooling
hack/generate-proto.sh
Added proto generation target for principal/apis/logstreamapi.

Sequence Diagram(s)

sequenceDiagram
    participant Client as HTTP Client
    participant Principal as Principal
    participant Agent as Agent
    participant K8s as Kubernetes API

    Client->>Principal: GET /logs (follow or static)
    Principal->>Principal: RegisterHTTP(requestUUID) → open writer/session
    Principal->>Agent: Emit ContainerLogRequest event

    Agent->>Agent: startLogStreamIfNew (dedupe via inflightLogs)
    alt Static (follow=false)
        Agent->>K8s: PodLogs (follow=false)
        K8s-->>Agent: log bytes + EOF
        Agent->>Principal: StreamLogs (send chunks, EOF)
        Principal->>Client: write & flush, then close
    else Live (follow=true)
        Agent->>Agent: streamLogsWithResume (background goroutine)
        loop resume/retry with backoff
            Agent->>K8s: PodLogs (follow=true, SinceTime)
            K8s-->>Agent: new log bytes
            Agent->>Principal: StreamLogs (data + timestamps)
            Principal->>Client: write & flush
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • Focus review on:
    • agent/log.go (streaming logic, resume/backoff, timestamp parsing, inflight concurrency)
    • principal/resource.go (HTTP writer lifecycle, blocking vs streaming paths)
    • principal/apis/logstreamapi/logstream.go (session concurrency, safeFlush, cancellation)
    • Tests and mocks in both agent and principal packages for correctness and flakiness

Poem

🐰 I stitched the streams from pod to shore,

bytes and timestamps hop-hop more.
Backoff breaths and resumes bright,
tails rejoin in moonlit byte.
Hooray — the rabbit streams the night!

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(logstream): Log streaming for argocd agent' accurately summarizes the main change: adding log streaming capability to the Argo CD agent.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f957238 and 66bace8.

⛔ Files ignored due to path filters (3)
  • go.sum is excluded by !**/*.sum
  • pkg/api/grpc/logstreamapi/logstream.pb.go is excluded by !**/*.pb.go
  • pkg/api/grpc/logstreamapi/logstream_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (15)
  • agent/agent.go (1 hunks)
  • agent/inbound.go (1 hunks)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
  • go.mod (1 hunks)
  • internal/event/event.go (5 hunks)
  • principal/apis/logstream/logstream.go (1 hunks)
  • principal/apis/logstream/logstream.proto (1 hunks)
  • principal/apis/logstream/logstream_test.go (1 hunks)
  • principal/apis/logstream/mock/mock.go (1 hunks)
  • principal/listen.go (2 hunks)
  • principal/resource.go (2 hunks)
  • principal/server.go (3 hunks)
  • test/e2e/fixture/argoclient.go (1 hunks)
  • test/e2e/logs_test.go (1 hunks)
🧰 Additional context used
🪛 Buf (1.59.0)
principal/apis/logstream/logstream.proto

17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstream".

(PACKAGE_DIRECTORY_MATCH)

Comment on lines 17 to 19
package logstreamapi;

option go_package = "github.com/argoproj-labs/argocd-agent/pkg/api/grpc/logstreamapi";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix proto package directory mismatch

Line 17 declares package logstreamapi, but the file lives under principal/apis/logstream/…. Buf 1.59.0 enforces PACKAGE_DIRECTORY_MATCH, so buf lint (and any proto codegen driven by it) will fail until the directory matches the declared package. Please either relocate the proto to principal/apis/logstreamapi/logstream.proto (matching the existing go_package) or adjust the package name/directory structure so Buf’s rule passes. Based on static analysis hints.

🧰 Tools
🪛 Buf (1.59.0)

17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstream".

(PACKAGE_DIRECTORY_MATCH)

🤖 Prompt for AI Agents
In principal/apis/logstream/logstream.proto around lines 17 to 19, the declared
proto package "logstreamapi" does not match the file's directory
(principal/apis/logstream/...), causing Buf's PACKAGE_DIRECTORY_MATCH lint to
fail; either move this .proto into principal/apis/logstreamapi/logstream.proto
to match the package and the go_package, or update the package name and
go_package to reflect the current directory (e.g., change package to "logstream"
and adjust option go_package accordingly), then run buf lint to verify the
mismatch is resolved.

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
…og-streaming

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
principal/apis/logstreamapi/logstream.proto (1)

17-20: Fix Buf PACKAGE_DIRECTORY_MATCH failure.

buf lint still reports PACKAGE_DIRECTORY_MATCH because the proto package logstreamapi must live directly under a logstreamapi directory relative to the module root, while the current path principal/apis/logstreamapi still violates that rule. Adjust either the package declaration (e.g. package principal.apis.logstreamapi;) or the directory/module configuration so the package path and filesystem layout align; otherwise, Buf-based lint/codegen will continue to fail. Based on static analysis hints

🧹 Nitpick comments (1)
principal/apis/logstreamapi/logstream.go (1)

289-293: Lower the per-chunk log level to avoid flooding principal logs.

Writing an Info entry for every chunk sent to the HTTP client will spam logs during live streams and adds measurable overhead. Please drop this log or demote it to Trace, keeping high-volume traffic out of the default log level.

-	logCtx.WithFields(logrus.Fields{
-		"data_length": len(data),
-		"request_id":  reqID,
-	}).Info("HTTP write and flush successful")
+	logCtx.WithFields(logrus.Fields{
+		"data_length": len(data),
+		"request_id":  reqID,
+	}).Trace("HTTP write and flush successful")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 66bace8 and 6f649a0.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (12)
  • agent/agent.go (2 hunks)
  • agent/inbound.go (1 hunks)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
  • go.mod (1 hunks)
  • internal/event/event.go (5 hunks)
  • principal/apis/logstreamapi/logstream.go (1 hunks)
  • principal/apis/logstreamapi/logstream.proto (1 hunks)
  • principal/apis/logstreamapi/logstream_test.go (1 hunks)
  • principal/apis/logstreamapi/mock/mock.go (1 hunks)
  • principal/server.go (3 hunks)
  • test/e2e/fixture/argoclient.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • go.mod
  • agent/agent.go
🧰 Additional context used
🧬 Code graph analysis (6)
principal/apis/logstreamapi/logstream_test.go (2)
principal/apis/logstreamapi/logstream.go (1)
  • NewServer (66-71)
principal/apis/logstreamapi/mock/mock.go (4)
  • NewMockHTTPResponseWriter (119-123)
  • MockWriterWithoutFlusher (168-172)
  • NewMockLogStreamServer (41-46)
  • PanicFlusher (161-161)
principal/server.go (1)
principal/apis/logstreamapi/logstream.go (2)
  • Server (32-36)
  • NewServer (66-71)
principal/apis/logstreamapi/logstream.go (2)
principal/server.go (2)
  • Server (71-164)
  • NewServer (183-388)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsServer (104-108)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
  • LogStreamResponse (100-109)
  • LogStreamResponse (124-124)
  • LogStreamResponse (139-141)
agent/agent.go (1)
  • Agent (62-116)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
agent/inbound.go (1)
internal/event/event.go (1)
  • TargetContainerLog (85-85)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)
🪛 Buf (1.59.0)
principal/apis/logstreamapi/logstream.proto

17-17: Files with package "logstreamapi" must be within a directory "logstreamapi" relative to root but were in directory "principal/apis/logstreamapi".

(PACKAGE_DIRECTORY_MATCH)

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
agent/log.go (2)

238-246: Consider propagating actual error details to the principal.

Line 244 sends a generic error message "log stream read failed" to the principal. Sending the actual error message (e.g., err.Error()) would aid debugging on the principal/UI side, similar to how line 119 and line 294 already do this for other error paths.

Apply this diff if you want more specific error messages:

 		if errors.Is(err, io.EOF) {
 			_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true})
 			return nil
 		}
 		logCtx.WithError(err).Error("Error reading log stream")
-		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: "log stream read failed"})
+		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()})
 		return err

268-272: Consider extracting the hardcoded overlap duration to a named constant.

The -100ms overlap at line 270 ensures no log lines are lost during resume, which is important. However, it's hardcoded and lacks explanation. Extracting it to a named constant would improve readability and make the intent clear.

For example, add this constant near line 251:

const (
	waitForReconnect  = 10 * time.Second // how long we poll IsConnected() after Unauthenticated
	pollEvery         = 1 * time.Second
	resumeOverlap     = 100 * time.Millisecond // overlap to prevent losing log lines during resume
)

Then update line 270:

 		resumeReq := *logReq
 		if lastTimestamp != nil {
-			t := lastTimestamp.Add(-100 * time.Millisecond)
+			t := lastTimestamp.Add(-resumeOverlap)
 			resumeReq.SinceTime = t.Format(time.RFC3339)
 		}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6f649a0 and 0bcdf95.

📒 Files selected for processing (1)
  • agent/log.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (24-37)
  • LogStreamData (52-52)
  • LogStreamData (67-69)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
  • LogStreamService_StreamLogsClient (46-50)
  • NewLogStreamServiceClient (33-35)
🔇 Additional comments (10)
agent/log.go (10)

17-34: LGTM!

The imports are well-organized and all dependencies are appropriate for the log streaming functionality.


36-58: LGTM!

Clean entry point with appropriate error handling and structured logging.


60-97: LGTM!

The duplicate detection and lifecycle management logic is correct. The cleanup function properly invokes cancel() and removes the inflight entry, addressing the critical issues from previous reviews.


148-165: LGTM!

The goroutine correctly defers cleanup() at line 156, ensuring the inflight entry is always released when the live stream ends. The panic handler only logs the panic, relying on the deferred cleanup for resource management—this is the correct pattern.


167-176: LGTM!

Simple and correct helper for creating the gRPC log stream.


178-201: LGTM!

The Kubernetes log stream configuration is correct. Setting Timestamps to true is essential for the resume capability, and the SinceTime parsing properly supports retry logic.


282-286: Verify that the initial empty data message is necessary.

The code sends an empty LogStreamData message at the start of each attempt for "health checks" (lines 282-286). While the comment explains this, it's worth confirming whether the principal actually requires this initial message or if it could be omitted to simplify the protocol.

If the principal doesn't strictly require this empty message, consider removing it to simplify the streaming flow.


322-346: LGTM, but consider simplifying the reconnection wait loop if possible.

The authentication failure handling is sophisticated and correct: it pauses exponential backoff and waits for reconnection by polling IsConnected(). The manual ticker loop (lines 327-344) works correctly, though it could potentially be simplified. Given the complexity is warranted for reliable auth handling, this is acceptable as-is.


402-408: Verify EOF handling is intentional for live streaming.

The code does NOT send an EOF message when rc.Read() returns io.EOF (lines 402-408), instead treating it as an error. In contrast, streamLogsToCompletion (line 239) handles EOF as clean termination. For live streaming with follow=true, is it intentional to treat EOF as an error and retry?

This might be related to the intermittent EOF errors reported in the PR comments. If the Kubernetes stream returns EOF prematurely (e.g., during reconnection), treating it as an error would trigger a retry with the resume logic, which seems correct. However, sending err.Error() (which would be "EOF") to the principal as an error message might surface in the UI.

Consider adding explicit EOF handling to distinguish between expected EOF (if any) and unexpected errors:

if err != nil {
	if errors.Is(err, io.EOF) {
		// For follow=true, EOF from k8s might indicate temporary disruption
		// Let the resume logic handle it
		logCtx.Info("EOF from k8s stream, will resume")
		return lastTimestamp, err
	}
	logCtx.WithError(err).Error("Error reading log stream")
	_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()})
	return lastTimestamp, err
}

412-438: LGTM!

The timestamp extraction logic is well-optimized with appropriate guards and early exits. Trying both RFC3339Nano and RFC3339 formats ensures compatibility with Kubernetes timestamp variations.

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
agent/log.go (1)

99-146: Remove redundant defer rc.Close() at line 123.

The rc.Close() is deferred at both line 123 and inside streamLogsToCompletion at line 213, resulting in a double-close. While most io.ReadCloser implementations handle this gracefully, calling Close() twice is a code smell and not guaranteed safe across all implementations.

Apply this diff to remove the redundant defer:

 	// Create Kubernetes log stream
 	rc, err := a.createKubernetesLogStream(ctx, logReq)
 	if err != nil {
 		_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true, Error: err.Error()})
 		_, _ = stream.CloseAndRecv()
 		return err
 	}
-	defer rc.Close()
 
 	err = a.streamLogsToCompletion(ctx, stream, rc, logReq, logCtx)
🧹 Nitpick comments (2)
agent/log.go (2)

280-286: Clarify the purpose of the initial empty message.

The comment "Used for health checks" may be misleading. This initial message appears to establish the stream connection rather than perform a health check. Consider clarifying the comment to accurately reflect its purpose (e.g., "Send initial message to establish stream connection").


383-392: Consider adding a comment to explain the timestamp extraction logic.

The logic for extracting the timestamp from the last complete line in the buffer is correct but somewhat dense. A brief comment explaining why we search backwards for the last complete line (to enable resume from the most recent known timestamp) would improve maintainability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0bcdf95 and 17637b3.

⛔ Files ignored due to path filters (2)
  • pkg/api/grpc/logstreamapi/logstream.pb.go is excluded by !**/*.pb.go
  • pkg/api/grpc/logstreamapi/logstream_grpc.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (4)
  • agent/log.go (1 hunks)
  • hack/generate-proto.sh (1 hunks)
  • principal/apis/logstreamapi/logstream.proto (1 hunks)
  • principal/server.go (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • principal/server.go
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)
🔇 Additional comments (7)
hack/generate-proto.sh (1)

26-26: Proto code generation target addition looks good.

The new logstreamapi entry follows the established pattern and will be processed identically to existing API targets by the generation loop.

principal/apis/logstreamapi/logstream.proto (2)

17-19: Proto package-to-directory alignment appears to resolve prior Buf lint issue.

The file location principal/apis/logstreamapi/logstream.proto now correctly matches the declared package principal.apis.logstreamapi and go_package option, satisfying Buf's PACKAGE_DIRECTORY_MATCH rule that was flagged in the previous review.


22-44: Proto schema structure aligns with unidirectional client-streaming design.

LogStreamData messages (with request_uuid correlation, data payload, eof marker, and error field) enable the agent to push log chunks to the principal, which the principal writes directly to HTTP response streams. The single LogStreamResponse returned after stream closure provides final status and statistics. The StreamLogs RPC's client-streaming semantics correctly implement the agent→principal push model described in the PR objectives.

agent/log.go (4)

60-97: LGTM: Cleanup and duplicate detection logic is correct.

The in-flight tracking with cleanup function properly calls cancel() before deleting the map entry, and the separation between static (with deferred cleanup) and live streaming (with cleanup passed to goroutine) correctly handles resource lifecycle for both paths.


148-165: LGTM: Live streaming goroutine properly defers cleanup.

The goroutine correctly defers cleanup() at the top, ensuring the in-flight entry and context cancellation are released when streaming ends, regardless of how it terminates (success, error, or panic).


250-360: LGTM: Resume logic with exponential backoff is well-designed.

The error handling properly distinguishes between terminal errors (Canceled/NotFound), authentication errors requiring reconnection wait, and transient errors handled with exponential backoff. The 100ms timestamp overlap prevents log loss during resume.


411-437: LGTM: Timestamp extraction is robust and efficient.

The function properly guards against edge cases (short lines, missing spaces, long tokens) and handles both RFC3339 and RFC3339Nano formats. The fast-path checks prevent unnecessary parsing attempts.

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 17637b3 and 5e00613.

📒 Files selected for processing (1)
  • agent/log.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (1)
  • LogStreamService_StreamLogsClient (46-50)

Signed-off-by: Mangaal <angommeeteimangaal@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5e00613 and 152e239.

📒 Files selected for processing (2)
  • agent/log.go (1 hunks)
  • agent/log_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
agent/log_test.go (5)
principal/apis/logstreamapi/mock/mock.go (2)
  • MockLogStreamServer (30-39)
  • NewMockLogStreamServer (41-46)
pkg/api/grpc/logstreamapi/logstream.pb.go (6)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
  • LogStreamResponse (114-123)
  • LogStreamResponse (138-138)
  • LogStreamResponse (153-155)
agent/agent.go (1)
  • Agent (62-116)
test/fake/kube/kubernetes.go (1)
  • NewKubernetesFakeClientWithResources (69-78)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
agent/log.go (4)
agent/agent.go (1)
  • Agent (62-116)
internal/event/event.go (1)
  • ContainerLogRequest (959-973)
pkg/api/grpc/logstreamapi/logstream.pb.go (3)
  • LogStreamData (38-51)
  • LogStreamData (66-66)
  • LogStreamData (81-83)
pkg/api/grpc/logstreamapi/logstream_grpc.pb.go (2)
  • LogStreamService_StreamLogsClient (46-50)
  • NewLogStreamServiceClient (33-35)

Comment on lines +348 to +395
const chunkMax = 64 * 1024 // 64KB chunks
var lastTimestamp *time.Time
readBuf := make([]byte, chunkMax)

for {
select {
case <-ctx.Done():
return lastTimestamp, ctx.Err()
case <-stream.Context().Done():
return lastTimestamp, stream.Context().Err()
default:
}
n, err := rc.Read(readBuf)
if n > 0 {
b := readBuf[:n]
// Extract timestamp from the last complete line in the buffer to enable resume capability.
if end := bytes.LastIndexByte(b, '\n'); end >= 0 {
start := bytes.LastIndexByte(b[:end], '\n') + 1
line := b[start:end]
if len(line) > 0 && line[len(line)-1] == '\r' {
line = line[:len(line)-1]
}
if ts := extractTimestamp(string(line)); ts != nil {
lastTimestamp = ts
}
}
if sendErr := stream.Send(&logstreamapi.LogStreamData{
RequestUuid: logReq.UUID,
Data: readBuf[:n],
}); sendErr != nil {
// For client side streaming, the actual gRPC error may only surface
// after stream closure. Attempt to close and return the final error.
if _, closedErr := stream.CloseAndRecv(); closedErr != nil {
return lastTimestamp, closedErr
}
return lastTimestamp, sendErr
}
}
if err != nil {
if errors.Is(err, io.EOF) {
_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Eof: true})
return lastTimestamp, nil
}
_ = stream.Send(&logstreamapi.LogStreamData{RequestUuid: logReq.UUID, Error: err.Error()})
return lastTimestamp, err
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure the Kubernetes log ReadCloser is closed

streamLogs never closes the rc it reads from, so every successful or failed pass leaks the Kubernetes HTTP response. Over time the agent will exhaust the underlying sockets/descriptors and degrade streaming/retry behaviour. Please close the reader as soon as the function starts so all exit paths release the resource.

 func (a *Agent) streamLogs(ctx context.Context, stream logstreamapi.LogStreamService_StreamLogsClient, rc io.ReadCloser, logReq *event.ContainerLogRequest, logCtx *logrus.Entry) (*time.Time, error) {
 	const chunkMax = 64 * 1024 // 64KB chunks
+	defer rc.Close()
 	var lastTimestamp *time.Time
 	readBuf := make([]byte, chunkMax)
🤖 Prompt for AI Agents
In agent/log.go around lines 348 to 395, the Kubernetes log ReadCloser (rc) is
never closed causing a file descriptor/socket leak; immediately close it on
function entry by adding a nil-checked defer rc.Close() (e.g., if rc != nil {
defer func(){ _ = rc.Close() }() }) so every exit path (normal, EOF, errors,
context cancel, or after CloseAndRecv) releases the underlying HTTP response; do
not remove existing stream.CloseAndRecv logic—just ensure rc is always closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants