Skip to content

Conversation

@adityachoudhari26
Copy link
Contributor

@adityachoudhari26 adityachoudhari26 commented Oct 24, 2025

Summary by CodeRabbit

  • New Features

    • Event-driven job dispatching via messaging for more reliable, distributed job processing.
    • Improved tracing and observability for job creation and dispatch events.
  • Refactor

    • Switched from in-process dispatch to an event-based workflow.
    • Producer configuration moved to environment-driven settings for easier deployment.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 24, 2025

Walkthrough

Refactors job dispatch to publish "job.created" events via a Kafka producer package moved to pkg/kafka/producer, adds a traced job-created event handler with async dispatch/notification, and updates types/imports (ticker) and manager/executor APIs to expose a JobDispatcher.

Changes

Cohort / File(s) Summary
Main / Producer init
apps/workspace-engine/main.go, apps/workspace-engine/pkg/kafka/producer/producer.go
Introduces new producer package path; moves Kafka producer code into pkg/kafka/producer, adds env-driven Topic, GroupID, Brokers vars and getEnv, and updates main to use producer.NewProducer() import.
Job event handling & dispatch flow
apps/workspace-engine/pkg/events/handler/jobs/jobs.go
Adds HandleJobCreated(ctx, ws, event) with tracing, unmarshalling, upsert, and async dispatchAndNotifyJob that dispatches via release manager and emits job.updated events on errors (marks InvalidIntegration).
Executor: emit job.created via Kafka
apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
Replaces in-process dispatch path with creation of Kafka producer and emission of job.created event (uses helper to resolve workspace ID); removes direct goroutine dispatch/upsert for dispatch results; adds JobDispatcher() accessor.
Manager: expose dispatcher
apps/workspace-engine/pkg/workspace/releasemanager/manager.go
Adds JobDispatcher() *jobs.Dispatcher method returning executor dispatcher.
Ticker type updates
apps/workspace-engine/pkg/ticker/ticker.go
Changes imports and types to reference producer.EventProducer (instead of kafka.EventProducer) and updates Ticker constructors (NewDefault, New) signatures accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Event Source
    participant Handler as Job Event Handler
    participant Store as Workspace Store
    participant Manager as Release Manager
    participant Dispatcher as jobs.Dispatcher
    participant Producer as Kafka Producer

    Client->>Handler: job.created event
    Handler->>Handler: start span, unmarshal job
    Handler->>Store: upsert job record
    alt job status != InvalidJobAgent
        Handler->>Manager: async dispatch request
        Manager->>Dispatcher: dispatch job
        alt dispatch error (not ErrUnsupportedJobAgent)
            Dispatcher-->>Handler: dispatch failure
            Handler->>Store: mark job InvalidIntegration, set UpdatedAt
        else dispatch success / unsupported
            Dispatcher-->>Handler: returns (success or unsupported)
        end
        Handler->>Producer: Create producer
        Handler->>Producer: Produce job.updated event
        Producer-->>Handler: ack
    end
    Handler-->>Client: return
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Review areas needing extra attention:
    • jobs.HandleJobCreated: replay-mode behavior, span attributes, and async error handling.
    • executor: workspace ID resolution, producer creation/closing, and event payload correctness.
    • producer package: environment variable defaults and exported vars (Topic/GroupID/Brokers).
    • API surface: JobDispatcher exposure on Executor/Manager and ticker signature changes.

Possibly related PRs

Suggested reviewers

  • j7m4
  • jsbroks

Poem

🐰 I hopped from code to eventful stream,
Jobs now hum along Kafka's beam.
Traced and tidied, dispatched with cheer,
A rabbit claps — new events appear! 🥕✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 45.45% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "chore: job created event" directly corresponds to the primary changes in the changeset. The modifications across multiple files collectively implement a job creation event workflow, including adding a new HandleJobCreated method to process job creation events, transitioning from in-process job dispatch to Kafka-based event emission (specifically the "job.created" event), restructuring the producer package, and updating related type signatures. The title is concise, specific, and accurately captures the main objective of this change without vague terminology or unnecessary details.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch job-created-event

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link

📊 DB Package Test Coverage

pkg/db coverage: 55.6%

View detailed coverage report in artifacts

@github-actions
Copy link

📊 DB Package Test Coverage

pkg/db coverage: 55.6%

View detailed coverage report in artifacts

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
apps/workspace-engine/main.go (1)

142-148: Fix double Close and avoid shadowing the imported package name

  • You defer producer.Close() and also call producer.Close() again on Line 187. Closing twice can cause undefined behavior with the underlying client. Remove the explicit Close and keep the defer.
  • The local variable named producer shadows the imported package producer, reducing readability.

Apply this minimal diff:

-	producer, err := producer.NewProducer()
+	kafkaProducer, err := producer.NewProducer()
 	if err != nil {
 		log.Fatal("Failed to create Kafka producer", "error", err)
 	}
-	defer producer.Close()
+	defer kafkaProducer.Close()

-	workspaceTicker := ticker.NewDefault(producer)
+	workspaceTicker := ticker.NewDefault(kafkaProducer)
@@
-	workspaceSaveTicker := ticker.New(producer, 1*time.Hour, string(handler.WorkspaceSave))
+	workspaceSaveTicker := ticker.New(kafkaProducer, 1*time.Hour, string(handler.WorkspaceSave))
@@
-	producer.Close()
+	// explicit close not needed; handled by defer

Also applies to: 149-159, 187-187

🧹 Nitpick comments (8)
apps/workspace-engine/pkg/workspace/releasemanager/manager.go (1)

274-276: Add doc comment and consider returning an interface

  • This is a new exported method but lacks a doc comment starting with “JobDispatcher …”. Please add one to meet Go doc/lint expectations and to explain why this is exposed.
  • To reduce coupling, consider returning an interface (e.g., jobs.Dispatcher interface) instead of the concrete *jobs.Dispatcher. This will make testing and future refactors easier without leaking internals.
apps/workspace-engine/pkg/kafka/producer/producer.go (2)

44-58: Set client.id for observability and broker-side tracing

Adding client.id helps trace producers at the broker and in metrics/dashboards.

 	p, err := kafka.NewProducer(&kafka.ConfigMap{
 		"bootstrap.servers": Brokers,
+		"client.id":         getEnv("KAFKA_CLIENT_ID", "workspace-engine"),
 		// Enable idempotence to prevent duplicate messages
 		"enable.idempotence": true,

90-133: Centralize event type strings

"job.created", "job.updated", "workspace.tick" appear in multiple places. Define shared consts (e.g., in a small events package) to avoid typos and ease changes.

apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go (3)

32-39: Validate non-empty workspace ID

If resource exists but WorkspaceId is empty, return a clear error to avoid producing an event with an empty key.

  if !ok {
    return "", fmt.Errorf("resource not found: %s", resourceID)
  }
- return resource.WorkspaceId, nil
+ if resource.WorkspaceId == "" {
+   return "", fmt.Errorf("workspace id missing for resource: %s", resourceID)
+ }
+ return resource.WorkspaceId, nil

78-82: Use shared event type constants

Replace the string literal "job.created" with a shared constant to prevent drift across publishers/consumers.


138-140: Add doc comment for exported accessor

Add a brief comment explaining why this accessor is exposed (e.g., consumed by event handlers to dispatch to integrations).

apps/workspace-engine/pkg/events/handler/jobs/jobs.go (2)

80-92: Record unmarshal error on span

You already have a span; record the error and set status before returning to aid troubleshooting.

 if err := json.Unmarshal(event.Data, &job); err != nil {
-  return err
+  span.RecordError(err)
+  span.SetAttributes(attribute.Bool("job.created", false))
+  return err
 }

112-114: Consider passing an injected producer into dispatch goroutine

If you adopt DI for the producer, thread it through here to avoid a per-call NewProducer inside dispatchAndNotifyJob.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 60df126 and a24e377.

📒 Files selected for processing (6)
  • apps/workspace-engine/main.go (2 hunks)
  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go (2 hunks)
  • apps/workspace-engine/pkg/kafka/producer/producer.go (1 hunks)
  • apps/workspace-engine/pkg/ticker/ticker.go (3 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go (4 hunks)
  • apps/workspace-engine/pkg/workspace/releasemanager/manager.go (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go

📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)

apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods

Files:

  • apps/workspace-engine/pkg/workspace/releasemanager/manager.go
  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go
  • apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go
  • apps/workspace-engine/main.go
  • apps/workspace-engine/pkg/ticker/ticker.go
  • apps/workspace-engine/pkg/kafka/producer/producer.go
🧬 Code graph analysis (4)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (6)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • Workspace (62-68)
apps/workspace-engine/pkg/oapi/oapi.gen.go (8)
  • Job (219-231)
  • Status (57-57)
  • UpdatedAt (58-58)
  • JobUpdateEvent (246-253)
  • JobAgentId (53-53)
  • ExternalId (50-50)
  • JobUpdateEventFieldsToUpdate (256-256)
  • Id (51-51)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/dispatcher.go (1)
  • ErrUnsupportedJobAgent (25-25)
apps/workspace-engine/pkg/kafka/producer/producer.go (1)
  • NewProducer (45-88)
apps/workspace-engine/pkg/events/handler/handler.go (1)
  • RawEvent (85-90)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • Store (39-61)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/executor.go (3)
apps/workspace-engine/pkg/oapi/oapi.gen.go (1)
  • Release (397-403)
apps/workspace-engine/pkg/kafka/producer/producer.go (1)
  • NewProducer (45-88)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/dispatcher.go (1)
  • Dispatcher (13-15)
apps/workspace-engine/main.go (1)
apps/workspace-engine/pkg/kafka/producer/producer.go (1)
  • NewProducer (45-88)
apps/workspace-engine/pkg/ticker/ticker.go (1)
apps/workspace-engine/pkg/kafka/producer/producer.go (1)
  • EventProducer (29-33)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: workspace-engine-tests
  • GitHub Check: workspace-engine-tests
  • GitHub Check: Typecheck
  • GitHub Check: Lint
  • GitHub Check: build (linux/amd64)
🔇 Additional comments (1)
apps/workspace-engine/pkg/ticker/ticker.go (1)

30-54: LGTM on interface/type switch

Clean swap to producer.EventProducer; no behavior change.

Comment on lines 30 to 59
func dispatchAndNotifyJob(ctx context.Context, ws *workspace.Workspace, job *oapi.Job) {
if err := ws.ReleaseManager().JobDispatcher().DispatchJob(ctx, job); err != nil && !errors.Is(err, jobs.ErrUnsupportedJobAgent) {
log.Error("error dispatching job to integration", "error", err.Error())
job.Status = oapi.InvalidIntegration
job.UpdatedAt = time.Now()
}

kafkaProducer, err := producer.NewProducer()
if err != nil {
log.Error("error creating kafka producer", "error", err.Error())
return
}
defer kafkaProducer.Close()

jobUpdateEvent := &oapi.JobUpdateEvent{
AgentId: &job.JobAgentId,
ExternalId: job.ExternalId,
FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
oapi.JobUpdateEventFieldsToUpdate("status"),
},
Id: &job.Id,
Job: *job,
}

err = kafkaProducer.ProduceEvent("job.updated", ws.ID, jobUpdateEvent)
if err != nil {
log.Error("error producing job updated event", "error", err.Error())
return
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Reuse a producer; and ensure UpdatedAt is reflected in the store

  • Creating a Kafka producer per dispatch is expensive. Pass a long‑lived producer into this handler (DI) or obtain it from a shared component to reuse across events.
  • You set job.UpdatedAt here but only include "status" in FieldsToUpdate. If the store doesn’t set UpdatedAt automatically on write, this timestamp won’t persist. Either include "updatedAt" in FieldsToUpdate or rely on the store to set it server‑side.

Possible direction:

-func dispatchAndNotifyJob(ctx context.Context, ws *workspace.Workspace, job *oapi.Job) {
+func dispatchAndNotifyJob(ctx context.Context, ws *workspace.Workspace, ep producer.EventProducer, job *oapi.Job) {
@@
-  kafkaProducer, err := producer.NewProducer()
-  if err != nil { ...; return }
-  defer kafkaProducer.Close()
+  // use injected ep
@@
-  err = kafkaProducer.ProduceEvent("job.updated", ws.ID, jobUpdateEvent)
+  err := ep.ProduceEvent("job.updated", ws.ID, jobUpdateEvent)

And pass the shared producer from the caller.

Can you confirm whether UpdatedAt is auto-managed on write in ws.Jobs().Upsert? If not, include it in FieldsToUpdate.



---

🏁 Script executed:

```shell
#!/bin/bash
# Find the dispatchAndNotifyJob function and its caller context
rg -n "dispatchAndNotifyJob" apps/workspace-engine/pkg/events/handler/jobs/ -A 5 -B 5

Length of output: 1836


🏁 Script executed:

#!/bin/bash
# Look for producer.NewProducer usage patterns across the codebase
rg -n "NewProducer\(\)" apps/workspace-engine/ -B 3 -A 3 --type go

Length of output: 2290


🏁 Script executed:

#!/bin/bash
# Find Jobs().Upsert implementation
rg -n "func.*Upsert" apps/workspace-engine/ --type go -A 10

Length of output: 25383


🏁 Script executed:

#!/bin/bash
# Find JobUpdateEventFieldsToUpdate definition
rg -n "JobUpdateEventFieldsToUpdate" apps/workspace-engine/ --type go -B 2 -A 5

Length of output: 3761


🏁 Script executed:

#!/bin/bash
# Look for how producer is passed or managed in similar handlers
rg -n "EventProducer\|ProduceEvent" apps/workspace-engine/ --type go -B 2 -A 2 | head -50

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Check main.go to see how producer is managed and if it could be injected
rg -n "producer\|Producer" apps/workspace-engine/main.go -B 5 -A 10 | head -80

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for HandleJobCreated signature and context to see if producer could be passed
rg -n "func.*HandleJobCreated|HandleJobCreated" apps/workspace-engine/pkg/events/handler/jobs/ -B 2 -A 15

Length of output: 1812


🏁 Script executed:

#!/bin/bash
# Check if there are other async event handlers that create producers
rg -n "go [a-zA-Z]+\(.*\)" apps/workspace-engine/pkg/events/handler/ --type go -B 2 -A 2 | head -40

Length of output: 375


Add "updatedAt" to FieldsToUpdate and reconsider producer lifecycle

Two issues need addressing:

  1. UpdatedAt not persisted: You set job.UpdatedAt at line 34 but only include "status" in FieldsToUpdate. The store's Upsert method doesn't auto-set UpdatedAt server-side. Add oapi.JobUpdateEventFieldsToUpdate("updatedAt") to the slice at line 48 to ensure the timestamp is reflected in any downstream consumer of this event.

  2. Per-call producer creation: Creating a Kafka producer on every job dispatch (line 37) is resource-intensive. While the goroutine context makes dependency injection challenging, consider obtaining a shared producer instance or refactoring the handler signature to accept one. This pattern appears elsewhere in the codebase (e.g., executor.go line 71), suggesting a systematic opportunity for optimization.

🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around lines 30 to 59,
the job UpdateEvent only lists "status" in FieldsToUpdate so the manually-set
job.UpdatedAt is not persisted—add
oapi.JobUpdateEventFieldsToUpdate("updatedAt") to the FieldsToUpdate slice so
downstream consumers/store upsert will receive the timestamp; additionally,
avoid creating a new Kafka producer per call (around line 37) by refactoring the
handler to accept a shared producer (or use a singleton/shared instance) or pass
the producer in from the caller (see executor.go line 71 for an existing
pattern), updating signatures and call sites accordingly to reuse a single
producer instead of NewProducer on every dispatch.

Comment on lines +65 to +76
workspaceID, err := e.getWorkspaceID(releaseToDeploy)
if err != nil {
span.RecordError(err)
return err
}

if e.store.IsReplay() {
log.Info("Skipping job dispatch in replay mode", "job.id", newJob.Id)
return nil
kafkaProducer, err := producer.NewProducer()
if err != nil {
span.RecordError(err)
return err
}
defer kafkaProducer.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid per-call Kafka producer; inject a long‑lived EventProducer

Creating a producer for every job is costly (socket setup, goroutine, buffers) and can thrash under load. Inject a long‑lived producer into Executor (or Manager) and reuse it.

Additionally, you cancel outdated jobs before publishing externally. If publishing fails, you may end up with no active job. Consider moving cancellation to the consumer side or adopting an outbox pattern to make writes and publish atomic(ish).

Suggested shape:

 type Executor struct {
   store         *store.Store
   jobFactory    *jobs.Factory
   jobDispatcher *jobs.Dispatcher
+  events        producer.EventProducer
 }
 
-func NewExecutor(store *store.Store) *Executor {
+func NewExecutor(store *store.Store, events producer.EventProducer) *Executor {
   return &Executor{
     store:         store,
     jobFactory:    jobs.NewFactory(store),
     jobDispatcher: jobs.NewDispatcher(store),
+    events:        events,
   }
 }
@@
- kafkaProducer, err := producer.NewProducer()
- if err != nil { ... }
- defer kafkaProducer.Close()
- err = kafkaProducer.ProduceEvent("job.created", workspaceID, newJob)
+ err := e.events.ProduceEvent("job.created", workspaceID, newJob)
 if err != nil { ... }

Committable suggestion skipped: line range outside the PR's diff.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (1)

30-62: Address the two issues flagged in the previous review

Two unresolved concerns from the previous review:

  1. Line 40: Producer created per dispatch — Creating a Kafka producer on every job dispatch is resource-intensive. Consider refactoring to accept a shared producer.EventProducer instance (either via dependency injection or a singleton pattern).

  2. Line 51: UpdatedAt not persisted — You set job.UpdatedAt at line 38, but FieldsToUpdate only includes "status". Add oapi.JobUpdateEventFieldsToUpdate("updatedAt") to the slice so the timestamp is actually persisted by the store's Upsert method.

Apply this diff to include updatedAt:

 	FieldsToUpdate: &[]oapi.JobUpdateEventFieldsToUpdate{
 		oapi.JobUpdateEventFieldsToUpdate("status"),
+		oapi.JobUpdateEventFieldsToUpdate("updatedAt"),
 	},

For the producer lifecycle, consider refactoring the handler signature to accept a shared producer as shown in the previous review's suggestion.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a24e377 and 1627e72.

📒 Files selected for processing (1)
  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go (2 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
apps/workspace-engine/**/*.go

📄 CodeRabbit inference engine (apps/workspace-engine/CLAUDE.md)

apps/workspace-engine/**/*.go: Do not add extraneous inline comments that state the obvious
Do not add comments that simply restate what the code does
Do not add comments for standard Go patterns (e.g., noting WaitGroup or semaphore usage)
Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods

Files:

  • apps/workspace-engine/pkg/events/handler/jobs/jobs.go
🧬 Code graph analysis (1)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (6)
apps/workspace-engine/pkg/workspace/workspace.go (1)
  • Workspace (62-68)
apps/workspace-engine/pkg/oapi/oapi.gen.go (10)
  • Job (219-231)
  • Status (57-57)
  • InvalidIntegration (39-39)
  • UpdatedAt (58-58)
  • JobUpdateEvent (246-253)
  • JobAgentId (53-53)
  • ExternalId (50-50)
  • JobUpdateEventFieldsToUpdate (256-256)
  • Id (51-51)
  • InvalidJobAgent (40-40)
apps/workspace-engine/pkg/workspace/releasemanager/deployment/jobs/dispatcher.go (1)
  • ErrUnsupportedJobAgent (25-25)
apps/workspace-engine/pkg/kafka/producer/producer.go (1)
  • NewProducer (45-88)
apps/workspace-engine/pkg/events/handler/handler.go (1)
  • RawEvent (85-90)
apps/workspace-engine/pkg/workspace/store/store.go (1)
  • Store (39-61)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Lint
  • GitHub Check: workspace-engine-tests
🔇 Additional comments (2)
apps/workspace-engine/pkg/events/handler/jobs/jobs.go (2)

3-24: LGTM!

The imports and tracer initialization are appropriate for the job-created event handler with tracing and Kafka integration.


88-101: Good tracing instrumentation

The span attributes and events provide excellent observability for debugging job creation flow, including replay mode tracking.

attribute.String("workspace.id", ws.ID),
)

ws.Jobs().Upsert(ctx, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Check for Upsert errors

The Upsert call at line 103 can fail, but the error is neither checked nor logged. This could result in silent failures when persisting the job to the store.

Apply this diff to handle errors:

-	ws.Jobs().Upsert(ctx, job)
+	if err := ws.Jobs().Upsert(ctx, job); err != nil {
+		span.RecordError(err)
+		return fmt.Errorf("failed to upsert job: %w", err)
+	}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around line 103, the
call to ws.Jobs().Upsert(ctx, job) ignores the returned error; capture its error
result, check it, and handle it: assign the call to a variable (e.g., err :=
ws.Jobs().Upsert(ctx, job)), and if err != nil log the error with contextual
info (job ID/type) using the handler's existing logger and then return or
propagate the error to the caller (or convert to the appropriate error response)
so failures to persist jobs are not silent.

}

if job.Status != oapi.InvalidJobAgent {
go dispatchAndNotifyJob(ctx, ws, job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Context passed to goroutine may be cancelled prematurely

The parent context ctx is passed to the async dispatchAndNotifyJob goroutine. If the parent request completes and cancels this context, the dispatch operation may fail unexpectedly. For detached async operations, use context.Background() or (in Go 1.21+) context.WithoutCancel(ctx) to preserve tracing metadata while preventing premature cancellation.

Consider this approach:

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.Background(), ws, job)
 	}

Or if you need to preserve tracing context (Go 1.21+):

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
 	}


---

🏁 Script executed:

```shell
#!/bin/bash
# Inspect the file around line 116 to understand context
cat apps/workspace-engine/pkg/events/handler/jobs/jobs.go | head -n 150 | tail -n 50

Length of output: 1311


🏁 Script executed:

#!/bin/bash
# Find the dispatchAndNotifyJob function definition
rg -n "func.*dispatchAndNotifyJob" apps/workspace-engine/

Length of output: 145


🏁 Script executed:

#!/bin/bash
# Check Go version in the project
if [ -f "go.mod" ]; then
  head -3 go.mod
fi

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Search for other goroutine patterns passing ctx to understand codebase conventions
rg -n "go.*\(ctx" apps/workspace-engine/ -A 2 -B 2 | head -n 100

Length of output: 760



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the dispatchAndNotifyJob function implementation
sed -n '30,80p' apps/workspace-engine/pkg/events/handler/jobs/jobs.go

Length of output: 1354


🏁 Script executed:

#!/bin/bash
# Get full go.mod for version info
cat go.mod | head -20

Length of output: 38


🏁 Script executed:

#!/bin/bash
# Check the parent handler function to understand context lifecycle
sed -n '95,130p' apps/workspace-engine/pkg/events/handler/jobs/jobs.go

Length of output: 883



---

🏁 Script executed:

```shell
#!/bin/bash
# Find go.mod file in the project
find . -name "go.mod" -type f 2>/dev/null | head -5

Length of output: 30


🏁 Script executed:

#!/bin/bash
# Check if there's a go version file or workspace setting
ls -la apps/workspace-engine/ | grep -E "go\.|versions"

Length of output: 115


🏁 Script executed:

#!/bin/bash
# Look for go directives in the jobs handler file or nearby
rg "go 1\." apps/workspace-engine/ --max-count 3

Length of output: 38


Use context.WithoutCancel(ctx) or context.Background() for detached async task

The dispatchAndNotifyJob goroutine calls ws.ReleaseManager().JobDispatcher().DispatchJob(ctx, job) at line 31, which actively uses the passed context. When the parent event handler returns, its context is cancelled, causing the async dispatch operation to fail prematurely. Since this is a fire-and-forget async operation, pass a detached context using context.WithoutCancel(ctx) (Go 1.21+) to preserve any tracing metadata while preventing premature cancellation, or use context.Background() if tracing isn't needed. Your project uses Go 1.24.0, so both approaches are fully supported.

 	if job.Status != oapi.InvalidJobAgent {
-		go dispatchAndNotifyJob(ctx, ws, job)
+		go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
 	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go dispatchAndNotifyJob(ctx, ws, job)
if job.Status != oapi.InvalidJobAgent {
go dispatchAndNotifyJob(context.WithoutCancel(ctx), ws, job)
}
🤖 Prompt for AI Agents
In apps/workspace-engine/pkg/events/handler/jobs/jobs.go around line 116, the
goroutine is started with the parent ctx which will be cancelled when the
handler returns causing the fire-and-forget dispatch to fail; change the
goroutine to use a detached context (preserving trace metadata) by creating ctx
:= context.WithoutCancel(ctx) (Go 1.21+) and passing that into
dispatchAndNotifyJob, or if tracing isn’t required use context.Background();
ensure the new detached ctx is created immediately before launching the
goroutine so the async DispatchJob call won’t be cancelled prematurely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants