From 473db6bbe9fa1790436b1a48c70115cfacce3cf7 Mon Sep 17 00:00:00 2001 From: Justin Brooks Date: Sat, 22 Mar 2025 19:28:54 -0400 Subject: [PATCH] init --- cmd/ctrlc/root/run/exec/exec.go | 84 ++++++++-- cmd/ctrlc/root/run/exec/runner.go | 247 +++++++++++++++++++++++++++--- cmd/ctrlc/root/run/run.go | 3 +- 3 files changed, 303 insertions(+), 31 deletions(-) diff --git a/cmd/ctrlc/root/run/exec/exec.go b/cmd/ctrlc/root/run/exec/exec.go index f0873b0..8e84d4f 100644 --- a/cmd/ctrlc/root/run/exec/exec.go +++ b/cmd/ctrlc/root/run/exec/exec.go @@ -1,7 +1,12 @@ package exec import ( + "context" "fmt" + "os" + "os/signal" + "syscall" + "time" "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" @@ -11,7 +16,13 @@ import ( ) func NewRunExecCmd() *cobra.Command { - return &cobra.Command{ + var ( + name string + workspaceID string + interval time.Duration + ) + + cmd := &cobra.Command{ Use: "exec", Short: "Execute commands directly when a job is received", RunE: func(cmd *cobra.Command, args []string) error { @@ -21,24 +32,79 @@ func NewRunExecCmd() *cobra.Command { if err != nil { return fmt.Errorf("failed to create API client: %w", err) } + + // Create the ExecRunner with API client + runner := NewExecRunner(client) + + // Create job agent config + agentConfig := api.UpsertJobAgentJSONRequestBody{ + Name: name, + Type: "exec", + } + + // Set workspace ID if provided + if workspaceID != "" { + agentConfig.WorkspaceId = workspaceID + } + + // Create job agent ja, err := jobagent.NewJobAgent( client, - api.UpsertJobAgentJSONRequestBody{ - Name: "exec", - Type: "exec", - }, - &ExecRunner{}, + agentConfig, + runner, ) if err != nil { return fmt.Errorf("failed to create job agent: %w", err) } + + log.Info("Exec runner started", "name", name, "workspaceID", workspaceID) + + // Setup signal handling for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + go func() { + <-sigCh + log.Info("Shutting down gracefully...") + runner.ExitAll(true) + cancel() + }() + + // Main polling loop + ticker := time.NewTicker(interval) + defer ticker.Stop() + + // Run initial job check if err := ja.RunQueuedJobs(); err != nil { - log.Error("failed to run queued jobs", "error", err) + log.Error("Failed to run queued jobs", "error", err) } if err := ja.UpdateRunningJobs(); err != nil { - log.Error("failed to check for jobs", "error", err) + log.Error("Failed to check for running jobs", "error", err) + } + + // Polling loop + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := ja.RunQueuedJobs(); err != nil { + log.Error("Failed to run queued jobs", "error", err) + } + if err := ja.UpdateRunningJobs(); err != nil { + log.Error("Failed to check for running jobs", "error", err) + } + } } - return nil }, } + + cmd.Flags().StringVar(&name, "name", "exec-runner", "Name of the job agent") + cmd.Flags().StringVar(&workspaceID, "workspace-id", "", "Workspace ID to pull jobs from") + cmd.Flags().DurationVar(&interval, "interval", 10*time.Second, "Polling interval for checking jobs") + + return cmd } diff --git a/cmd/ctrlc/root/run/exec/runner.go b/cmd/ctrlc/root/run/exec/runner.go index 8717d43..569ada3 100644 --- a/cmd/ctrlc/root/run/exec/runner.go +++ b/cmd/ctrlc/root/run/exec/runner.go @@ -2,50 +2,159 @@ package exec import ( "bytes" + "context" "encoding/json" "fmt" "html/template" "os" "os/exec" + "os/signal" "runtime" "strconv" + "sync" "syscall" + "github.com/charmbracelet/log" "github.com/ctrlplanedev/cli/internal/api" "github.com/ctrlplanedev/cli/pkg/jobagent" ) var _ jobagent.Runner = &ExecRunner{} -type ExecRunner struct{} - type ExecConfig struct { WorkingDir string `json:"workingDir,omitempty"` Script string `json:"script"` } +type RunningJob struct { + cmd *exec.Cmd + jobID string + client *api.ClientWithResponses + exitCode int + cancelled bool +} + +type ExecRunner struct { + runningJobs map[string]*RunningJob + client *api.ClientWithResponses + mu sync.Mutex + wg sync.WaitGroup +} + +// NewExecRunner creates a new ExecRunner +func NewExecRunner(client *api.ClientWithResponses) *ExecRunner { + runner := &ExecRunner{ + runningJobs: make(map[string]*RunningJob), + client: client, + } + + // Set up signal handling for graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + log.Info("Received shutdown signal, terminating all jobs") + runner.ExitAll(true) + os.Exit(0) + }() + + return runner +} + func (r *ExecRunner) Status(job api.Job) (api.JobStatus, string) { - externalId, err := strconv.Atoi(*job.ExternalId) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("invalid process id: %v", err) + r.mu.Lock() + runningJob, exists := r.runningJobs[job.Id.String()] + r.mu.Unlock() + + if !exists || runningJob == nil { + return api.JobStatusSuccessful, "" } - process, err := os.FindProcess(externalId) - if err != nil { - return api.JobStatusExternalRunNotFound, fmt.Sprintf("failed to find process: %v", err) + // Check for completed job with exit code + if runningJob.cmd.ProcessState != nil { + r.mu.Lock() + delete(r.runningJobs, job.Id.String()) + r.mu.Unlock() + + if runningJob.cancelled { + return api.JobStatusCancelled, "Job was cancelled" + } + + if runningJob.exitCode != 0 { + return api.JobStatusFailed, fmt.Sprintf("Job failed with exit code: %d", runningJob.exitCode) + } + + return api.JobStatusSuccessful, "" } - // On Unix systems, FindProcess always succeeds, so we need to send signal 0 - // to check if process exists - err = process.Signal(syscall.Signal(0)) - if err != nil { - return api.JobStatusSuccessful, fmt.Sprintf("process not running: %v", err) + // If process exists but has not completed + if runningJob.cmd.Process != nil { + // Check if process is still running + if err := runningJob.cmd.Process.Signal(syscall.Signal(0)); err != nil { + // Process is no longer running but wasn't captured by ProcessState (unusual case) + r.mu.Lock() + delete(r.runningJobs, job.Id.String()) + r.mu.Unlock() + + return api.JobStatusSuccessful, "" + } + + // Process is still running + return api.JobStatusInProgress, "" } - return api.JobStatusInProgress, fmt.Sprintf("process running with pid %d", externalId) + // Process hasn't started yet + return api.JobStatusInProgress, "" +} + +// ExitAll stops all currently running commands +func (r *ExecRunner) ExitAll(cancelled bool) { + r.mu.Lock() + defer r.mu.Unlock() + + for id, runningJob := range r.runningJobs { + if runningJob != nil && runningJob.cmd != nil && runningJob.cmd.Process != nil { + // Check if process is still running before attempting to kill + if err := runningJob.cmd.Process.Signal(syscall.Signal(0)); err == nil { + log.Info("Killing job", "id", id) + runningJob.cancelled = cancelled + + // Process is running, kill it and its children + if runtime.GOOS == "windows" { + exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(runningJob.cmd.Process.Pid)).Run() + } else { + // Send SIGTERM first for graceful shutdown + runningJob.cmd.Process.Signal(syscall.SIGTERM) + + // Update job status to cancelled + if cancelled { + status := api.JobStatusCancelled + message := "Job was cancelled" + runningJob.client.UpdateJobWithResponse( + context.Background(), + runningJob.jobID, + api.UpdateJobJSONRequestBody{ + Status: &status, + Message: &message, + }, + ) + } + } + } + } + } + + if cancelled { + r.runningJobs = make(map[string]*RunningJob) + } } func (r *ExecRunner) Start(job api.Job) (string, error) { + // Initialize map if nil + if r.runningJobs == nil { + r.runningJobs = make(map[string]*RunningJob) + } + // Create temp script file ext := ".sh" if runtime.GOOS == "windows" { @@ -56,7 +165,17 @@ func (r *ExecRunner) Start(job api.Job) (string, error) { if err != nil { return "", fmt.Errorf("failed to create temp script file: %w", err) } - defer os.Remove(tmpFile.Name()) + + // Delete temp file when command completes + scriptPath := tmpFile.Name() + defer func() { + // Don't remove the file immediately as the command might still be using it + // Schedule it for removal after the command completes + go func() { + r.wg.Wait() + os.Remove(scriptPath) + }() + }() config := ExecConfig{} jsonBytes, err := json.Marshal(job.JobAgentConfig) @@ -88,23 +207,111 @@ func (r *ExecRunner) Start(job api.Job) (string, error) { // Make executable on Unix systems if runtime.GOOS != "windows" { - if err := os.Chmod(tmpFile.Name(), 0700); err != nil { + if err := os.Chmod(scriptPath, 0700); err != nil { return "", fmt.Errorf("failed to make script executable: %w", err) } } - cmd := exec.Command("bash", "-c", tmpFile.Name()) + var cmd *exec.Cmd if runtime.GOOS == "windows" { - cmd = exec.Command("powershell", "-File", tmpFile.Name()) + cmd = exec.Command("powershell", "-File", scriptPath) + } else { + cmd = exec.Command("bash", scriptPath) } cmd.Dir = config.WorkingDir cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr - if err := cmd.Run(); err != nil { - return "", fmt.Errorf("failed to execute script: %w", err) + // Create running job struct + runningJob := &RunningJob{ + cmd: cmd, + jobID: job.Id.String(), + client: r.client, } + r.mu.Lock() + r.runningJobs[job.Id.String()] = runningJob + r.mu.Unlock() + + // Launch command in goroutine + r.wg.Add(1) + go func() { + defer r.wg.Done() + + err := cmd.Start() + if err != nil { + log.Error("Failed to start command", "error", err, "jobID", job.Id.String()) + status := api.JobStatusFailed + message := fmt.Sprintf("Failed to start command: %s", err.Error()) + r.client.UpdateJobWithResponse( + context.Background(), + job.Id.String(), + api.UpdateJobJSONRequestBody{ + Status: &status, + Message: &message, + }, + ) + + r.mu.Lock() + delete(r.runningJobs, job.Id.String()) + r.mu.Unlock() + return + } + + // Update job to InProgress with PID + pid := strconv.Itoa(cmd.Process.Pid) + status := api.JobStatusInProgress + r.client.UpdateJobWithResponse( + context.Background(), + job.Id.String(), + api.UpdateJobJSONRequestBody{ + Status: &status, + ExternalId: &pid, + }, + ) + + // Wait for command to complete + err = cmd.Wait() + exitCode := 0 + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + exitCode = exitErr.ExitCode() + log.Warn("Command failed", "exitCode", exitCode, "jobID", job.Id.String()) + } else { + log.Error("Command error", "error", err, "jobID", job.Id.String()) + exitCode = 1 + } + } + + runningJob.exitCode = exitCode + + // Update job with final status + var finalStatus api.JobStatus + var message string + + r.mu.Lock() + if runningJob.cancelled { + finalStatus = api.JobStatusCancelled + message = "Job was cancelled" + } else if exitCode != 0 { + finalStatus = api.JobStatusFailed + message = fmt.Sprintf("Job failed with exit code: %d", exitCode) + } else { + finalStatus = api.JobStatusSuccessful + message = "Job completed successfully" + } + r.mu.Unlock() + + r.client.UpdateJobWithResponse( + context.Background(), + job.Id.String(), + api.UpdateJobJSONRequestBody{ + Status: &finalStatus, + Message: &message, + }, + ) + }() + return strconv.Itoa(cmd.Process.Pid), nil } diff --git a/cmd/ctrlc/root/run/run.go b/cmd/ctrlc/root/run/run.go index 440f966..d76f633 100644 --- a/cmd/ctrlc/root/run/run.go +++ b/cmd/ctrlc/root/run/run.go @@ -2,7 +2,6 @@ package run import ( "github.com/ctrlplanedev/cli/cmd/ctrlc/root/run/exec" - "github.com/ctrlplanedev/cli/internal/cliutil" "github.com/spf13/cobra" ) @@ -15,7 +14,7 @@ func NewRunCmd() *cobra.Command { }, } - cmd.AddCommand(cliutil.AddIntervalSupport(exec.NewRunExecCmd(), "")) + cmd.AddCommand(exec.NewRunExecCmd()) return cmd }