Skip to content

Commit d563be5

Browse files
authored
agent: implements logic to register agents and scan images and upload reports (#90)
* adds logic to register the agent and poll * adds field scanner in policies * adds field scanner in scans tables and updates test cases across * adds foreign key integrationID to table scans and updates test cases * adds field scanner in config yaml and supporting logic, adds enum for status fields * adds field scanner in config yaml and supporting logic, adds enum for status fields * updates logic to add an image:tag string in scans.image and integrationID in scans.integrationsID rendering its value using policy.image.registery which is equivalent to integrations.name * fix executing of migration just once at the start of config loads * restructure code for v0 * updates the get list call logic i.e orders by created_at ASC and adds its test case * modularizing registries like dockerhub, gcr * updates scan.Report logic to hold the json result of scanner * modularizes the scanner tools * modularizes the scanner tools * some todo * updates the get call to query by status * updates the CRUD to use pointers as certain fields are optional or conditionally updated * agent implementation workflow * assigner: implements task assignment for scans and dummy mock test (#95) * updates the get call of scan to query by status * updates the GET call of agents to query by status * updates the GET call of agents to query by status or scan_id * adds task assignment logic * adds task assignment logic * fix lints * fix lints
1 parent 0a82338 commit d563be5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+4161
-285
lines changed

cmd/agent.go

Lines changed: 393 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,393 @@
1+
package cmd
2+
3+
// Task Fetching:
4+
// Execute Tasks
5+
// Publish Results
6+
7+
import (
8+
"bytes"
9+
"encoding/json"
10+
"fmt"
11+
"github.com/google/uuid"
12+
"github.com/shinobistack/gokakashi/internal/restapi/v1/agents"
13+
"github.com/shinobistack/gokakashi/internal/restapi/v1/agenttasks"
14+
"github.com/shinobistack/gokakashi/internal/restapi/v1/integrations"
15+
"github.com/shinobistack/gokakashi/internal/restapi/v1/scans"
16+
"github.com/shinobistack/gokakashi/pkg/registry/v1"
17+
"github.com/shinobistack/gokakashi/pkg/scanner/v1"
18+
"github.com/spf13/cobra"
19+
"log"
20+
"net/http"
21+
"os"
22+
"time"
23+
)
24+
25+
var agentCmd = &cobra.Command{
26+
Use: "agent",
27+
Short: "Manage agents for GoKakashi",
28+
}
29+
30+
var agentStartCmd = &cobra.Command{
31+
Use: "start",
32+
Short: "Register an agent and start polling for tasks",
33+
Run: agentRegister,
34+
}
35+
36+
var (
37+
server string
38+
token string
39+
workspace string
40+
)
41+
42+
//ToDo: for any table status which results to error should we upload err message or just status error
43+
44+
func agentRegister(cmd *cobra.Command, args []string) {
45+
// Validate inputs
46+
if server == "" || token == "" || workspace == "" {
47+
log.Fatalf("Error: Missing required inputs. Please provide --server, --token, and --workspace.")
48+
}
49+
50+
// log.Printf("Server: %s, Token: %s, Workspace: %s", server, token, workspace)
51+
52+
// Register the agent
53+
agentID, err := registerAgent(server, token, workspace)
54+
if err != nil {
55+
log.Fatalf("Failed to register the agent: %v", err)
56+
}
57+
58+
log.Printf("Agent registered successfully! Agent ID: %d", agentID)
59+
60+
// Start polling for tasks
61+
pollTasks(server, token, agentID, workspace)
62+
}
63+
64+
func registerAgent(server, token, workspace string) (int, error) {
65+
reqBody := agents.RegisterAgentRequest{
66+
Server: server,
67+
Token: token,
68+
Workspace: workspace,
69+
}
70+
reqBodyJSON, _ := json.Marshal(reqBody)
71+
72+
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/agents", server), bytes.NewBuffer(reqBodyJSON))
73+
if err != nil {
74+
return 0, fmt.Errorf("failed to create registration request: %w", err)
75+
}
76+
req.Header.Set("Authorization", "Bearer "+token)
77+
req.Header.Set("Content-Type", "application/json")
78+
79+
resp, err := http.DefaultClient.Do(req)
80+
if err != nil {
81+
return 0, fmt.Errorf("failed to send registration request: %w", err)
82+
}
83+
defer resp.Body.Close()
84+
85+
if resp.StatusCode != http.StatusOK {
86+
return 0, fmt.Errorf("server responded with status: %d", resp.StatusCode)
87+
}
88+
89+
var response agents.RegisterAgentResponse
90+
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
91+
return 0, fmt.Errorf("failed to decode registration response: %w", err)
92+
}
93+
return response.ID, nil
94+
}
95+
96+
func pollTasks(server, token string, agentID int, workspace string) {
97+
for {
98+
// Process only tasks with status "pending" in the order returned (created_at ASC)
99+
tasks, err := fetchTasks(server, token, agentID, "pending")
100+
if err != nil {
101+
log.Printf("Error fetching tasks: %v", err)
102+
time.Sleep(10 * time.Second)
103+
continue
104+
}
105+
106+
if len(tasks) == 0 {
107+
log.Println("No pending tasks. Retrying after 10 seconds.")
108+
time.Sleep(10 * time.Second)
109+
continue
110+
}
111+
112+
for _, task := range tasks {
113+
// Update task status to "in_progress"
114+
err := updateAgentTaskStatus(server, token, task.ID, agentID, "in_progress")
115+
if err != nil {
116+
log.Printf("Failed to update agent_task status to 'in_progress': %v", err)
117+
return
118+
}
119+
processTask(server, token, task, workspace, agentID)
120+
continue
121+
}
122+
// Todo: Polling interval time decide
123+
// Sleep for a defined interval
124+
time.Sleep(10 * time.Second)
125+
}
126+
}
127+
128+
func fetchTasks(server, token string, agentID int, status string) ([]agenttasks.GetAgentTaskResponse, error) {
129+
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/agents/%d/tasks?status=%s", server, agentID, status), nil)
130+
if err != nil {
131+
return nil, fmt.Errorf("failed to create task polling request: %w", err)
132+
}
133+
134+
req.Header.Set("Authorization", "Bearer "+token)
135+
136+
resp, err := http.DefaultClient.Do(req)
137+
if err != nil {
138+
return nil, fmt.Errorf("failed to send task polling request: %w", err)
139+
}
140+
defer resp.Body.Close()
141+
142+
if resp.StatusCode != http.StatusOK {
143+
return nil, fmt.Errorf("server responded with status: %d", resp.StatusCode)
144+
}
145+
146+
var tasks []agenttasks.GetAgentTaskResponse
147+
if err := json.NewDecoder(resp.Body).Decode(&tasks); err != nil {
148+
return nil, fmt.Errorf("failed to decode task polling response: %w", err)
149+
}
150+
151+
return tasks, nil
152+
}
153+
154+
func updateAgentTaskStatus(server, token string, taskID uuid.UUID, agentID int, status string) error {
155+
reqBody := agenttasks.UpdateAgentTaskRequest{
156+
ID: taskID,
157+
AgentID: intPtr(agentID),
158+
Status: strPtr(status),
159+
}
160+
161+
reqBodyJSON, _ := json.Marshal(reqBody)
162+
163+
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/api/v1/agents/%d/tasks/%s", server, agentID, taskID), bytes.NewBuffer(reqBodyJSON))
164+
if err != nil {
165+
return fmt.Errorf("failed to create task status update request: %w", err)
166+
}
167+
req.Header.Set("Authorization", "Bearer "+token)
168+
req.Header.Set("Content-Type", "application/json")
169+
170+
resp, err := http.DefaultClient.Do(req)
171+
if err != nil {
172+
return fmt.Errorf("failed to update task status: %w", err)
173+
}
174+
defer resp.Body.Close()
175+
176+
if resp.StatusCode != http.StatusOK {
177+
return fmt.Errorf("server responded with status: %d", resp.StatusCode)
178+
}
179+
180+
return nil
181+
}
182+
183+
func processTask(server, token string, task agenttasks.GetAgentTaskResponse, workspace string, agentID int) {
184+
// Step 1: Fetch scan details
185+
scan, err := fetchScan(server, token, task.ScanID)
186+
if err != nil {
187+
log.Printf("Failed to fetch scan details: %v", err)
188+
return
189+
}
190+
191+
// Step 2: Fetch integration details
192+
integration, err := fetchIntegration(server, token, scan.IntegrationID)
193+
if err != nil {
194+
log.Printf("Failed to fetch integration details: %v", err)
195+
return
196+
}
197+
198+
// Step 3: Authenticate and pull the image
199+
if err := authenticateAndPullImage(scan.Image, integration); err != nil {
200+
log.Printf("Failed to authenticate or pull image: %v", err)
201+
return
202+
}
203+
204+
err = updateScanStatus(server, token, scan.ID, "scan_in_progress")
205+
if err != nil {
206+
log.Printf("Failed to update scan status to 'scan_in_progress': %v", err)
207+
}
208+
// Step 4: Perform the scan
209+
// severityLevels := []string{"HIGH", "CRITICAL"}
210+
reportPath, err := performScan(scan.Image, scan.Scanner)
211+
if err != nil {
212+
log.Printf("Failed to perform scan: %v", err)
213+
if err := updateScanStatus(server, token, scan.ID, "error"); err != nil {
214+
log.Printf("Failed to update scan status to 'error': %v", err)
215+
}
216+
return
217+
}
218+
219+
// Step 5: Upload the scan report
220+
if err := uploadReport(server, token, scan.ID, reportPath); err != nil {
221+
log.Printf("Failed to upload scan report: %v", err)
222+
if err := updateScanStatus(server, token, scan.ID, "error"); err != nil {
223+
log.Printf("Failed to update scan status to 'error': %v", err)
224+
}
225+
return
226+
}
227+
228+
if err := updateAgentTaskStatus(server, token, task.ID, agentID, "complete"); err != nil {
229+
log.Printf("Failed to update agent_task status to 'complete': %v", err)
230+
}
231+
232+
log.Printf("AgentTaskID completed successfully: %v", task.ID)
233+
}
234+
235+
func updateScanStatus(server, token string, scanID uuid.UUID, status string) error {
236+
reqBody := scans.UpdateScanRequest{
237+
ID: scanID,
238+
Status: strPtr(status),
239+
}
240+
reqBodyJSON, _ := json.Marshal(reqBody)
241+
242+
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/api/v1/scans/%s", server, scanID), bytes.NewBuffer(reqBodyJSON))
243+
if err != nil {
244+
return fmt.Errorf("failed to create scan status update request: %w", err)
245+
}
246+
req.Header.Set("Authorization", "Bearer "+token)
247+
req.Header.Set("Content-Type", "application/json")
248+
249+
resp, err := http.DefaultClient.Do(req)
250+
if err != nil {
251+
return fmt.Errorf("failed to update scan status: %w", err)
252+
}
253+
defer resp.Body.Close()
254+
255+
if resp.StatusCode != http.StatusOK {
256+
return fmt.Errorf("server responded with status: %d", resp.StatusCode)
257+
}
258+
259+
return nil
260+
}
261+
262+
func fetchScan(server, token string, scanID uuid.UUID) (*scans.GetScanResponse, error) {
263+
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/scans/%s", server, scanID), nil)
264+
if err != nil {
265+
return nil, fmt.Errorf("failed to create scan request: %w", err)
266+
}
267+
req.Header.Set("Authorization", "Bearer "+token)
268+
269+
resp, err := http.DefaultClient.Do(req)
270+
if err != nil {
271+
return nil, fmt.Errorf("failed to fetch scan details: %w", err)
272+
}
273+
defer resp.Body.Close()
274+
275+
if resp.StatusCode != http.StatusOK {
276+
return nil, fmt.Errorf("server responded with status: %d", resp.StatusCode)
277+
}
278+
279+
var scan scans.GetScanResponse
280+
if err := json.NewDecoder(resp.Body).Decode(&scan); err != nil {
281+
return nil, fmt.Errorf("failed to decode scan response: %w", err)
282+
}
283+
284+
return &scan, nil
285+
}
286+
287+
func fetchIntegration(server, token string, integrationID uuid.UUID) (*integrations.GetIntegrationResponse, error) {
288+
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/integrations/%s", server, integrationID), nil)
289+
if err != nil {
290+
return nil, fmt.Errorf("failed to create integration fetch request: %w", err)
291+
}
292+
req.Header.Set("Authorization", "Bearer "+token)
293+
294+
resp, err := http.DefaultClient.Do(req)
295+
if err != nil {
296+
return nil, fmt.Errorf("failed to fetch integration details: %w", err)
297+
}
298+
defer resp.Body.Close()
299+
300+
if resp.StatusCode != http.StatusOK {
301+
return nil, fmt.Errorf("server responded with status: %d", resp.StatusCode)
302+
}
303+
304+
var integration integrations.GetIntegrationResponse
305+
if err := json.NewDecoder(resp.Body).Decode(&integration); err != nil {
306+
return nil, fmt.Errorf("failed to decode integration response: %w", err)
307+
}
308+
309+
return &integration, nil
310+
}
311+
312+
func authenticateAndPullImage(image string, integration *integrations.GetIntegrationResponse) error {
313+
// Select the registry integration based on type
314+
log.Printf("Registry: %s | Image: %s | Step: Authentication started", integration.Type, image)
315+
reg, err := registry.NewRegistry(integration.Type, integration.Config)
316+
if err != nil {
317+
return err
318+
}
319+
320+
// Authenticate
321+
if err := reg.Authenticate(); err != nil {
322+
return fmt.Errorf("authentication failed: %w", err)
323+
}
324+
325+
// Pull image
326+
if err := reg.PullImage(image); err != nil {
327+
return fmt.Errorf("failed to pull image: %w", err)
328+
}
329+
330+
return nil
331+
}
332+
333+
func performScan(image, scannerType string) (string, error) {
334+
// Initialize the scanner using the factory function.
335+
scanner, err := scanner.NewScanner(scannerType)
336+
if err != nil {
337+
return "", fmt.Errorf("failed to initialize scanner: %w", err)
338+
}
339+
340+
// Perform scan
341+
// Todo: to add feature for severity args or tool args.
342+
reportPath, err := scanner.Scan(image, nil)
343+
if err != nil {
344+
return "", fmt.Errorf("scan failed: %w", err)
345+
}
346+
347+
return reportPath, nil
348+
}
349+
350+
func uploadReport(server, token string, scanID uuid.UUID, reportPath string) error {
351+
report, err := os.ReadFile(reportPath)
352+
if err != nil {
353+
return fmt.Errorf("failed to read report file: %w", err)
354+
}
355+
356+
reqBody := scans.UpdateScanRequest{
357+
ID: scanID,
358+
Report: json.RawMessage(report),
359+
Status: strPtr("success"),
360+
}
361+
reqBodyJSON, err := json.Marshal(reqBody)
362+
if err != nil {
363+
return fmt.Errorf("failed to marshal request body: %w", err)
364+
}
365+
366+
req, err := http.NewRequest("PUT", fmt.Sprintf("%s/api/v1/scans/%s", server, scanID), bytes.NewBuffer(reqBodyJSON))
367+
if err != nil {
368+
return fmt.Errorf("failed to create report upload request: %w", err)
369+
}
370+
371+
req.Header.Set("Authorization", "Bearer "+token)
372+
req.Header.Set("Content-Type", "application/json")
373+
374+
resp, err := http.DefaultClient.Do(req)
375+
if err != nil {
376+
return fmt.Errorf("failed to upload scan report: %w", err)
377+
}
378+
defer resp.Body.Close()
379+
380+
if resp.StatusCode != http.StatusOK {
381+
return fmt.Errorf("server responded with status: %d", resp.StatusCode)
382+
}
383+
384+
return nil
385+
}
386+
387+
func strPtr(s string) *string {
388+
return &s
389+
}
390+
391+
func intPtr(i int) *int {
392+
return &i
393+
}

0 commit comments

Comments
 (0)