diff --git a/azure/azevents/azevents.go b/azure/azevents/azevents.go index e1413f2..b891887 100644 --- a/azure/azevents/azevents.go +++ b/azure/azevents/azevents.go @@ -34,6 +34,8 @@ type Config struct { AllowedAuthSecrets []string ParsedPortfolioTopicName string + CreatedAuditTopicName string + CreatedReportTopicName string } type DB interface { @@ -45,10 +47,15 @@ type DB interface { IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error) UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error + Analysis(tx db.Tx, id pacta.AnalysisID) (*pacta.Analysis, error) + UpdateAnalysis(tx db.Tx, id pacta.AnalysisID, mutations ...db.UpdateAnalysisFn) error + CreateAnalysisArtifact(tx db.Tx, a *pacta.AnalysisArtifact) (pacta.AnalysisArtifactID, error) } const parsedPortfolioPath = "/events/parsed_portfolio" +const createdAuditPath = "/events/created_audit" +const createdReportPath = "/events/created_report" func (c *Config) validate() error { if c.Logger == nil { @@ -66,6 +73,12 @@ func (c *Config) validate() error { if c.ParsedPortfolioTopicName == "" { return errors.New("no parsed portfolio topic name given") } + if c.CreatedAuditTopicName == "" { + return errors.New("no created audit topic name given") + } + if c.CreatedReportTopicName == "" { + return errors.New("no created report topic name given") + } if c.DB == nil { return errors.New("no DB was given") } @@ -102,6 +115,8 @@ func NewServer(cfg *Config) (*Server, error) { now: cfg.Now, pathToTopic: map[string]string{ parsedPortfolioPath: cfg.ParsedPortfolioTopicName, + createdAuditPath: cfg.CreatedAuditTopicName, + createdReportPath: cfg.CreatedReportTopicName, }, }, nil } @@ -212,6 +227,8 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler { func (s *Server) RegisterHandlers(r chi.Router) { r.Use(s.verifyWebhook) r.Post(parsedPortfolioPath, s.handleParsePortfolioResponse()) + r.Post(createdAuditPath, s.handleCreatedAuditResponse()) + r.Post(createdReportPath, s.handleCreatedReportResponse()) } func (s *Server) handleParsePortfolioResponse() http.HandlerFunc { @@ -326,6 +343,146 @@ func (s *Server) handleParsePortfolioResponse() http.HandlerFunc { } } +func (s *Server) handleCreatedAuditResponse() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var reqs []struct { + Data *task.CreateAuditResponse `json:"data"` + EventType string `json:"eventType"` + ID string `json:"id"` + Subject string `json:"subject"` + DataVersion string `json:"dataVersion"` + MetadataVersion string `json:"metadataVersion"` + EventTime time.Time `json:"eventTime"` + Topic string `json:"topic"` + } + if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil { + s.logger.Error("failed to parse webhook request body", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + if len(reqs) != 1 { + s.logger.Error("webhook response had unexpected number of events", zap.Int("event_count", len(reqs))) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + req := reqs[0] + + if req.Data == nil { + s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + var ranAt time.Time + now := s.now() + // We use a background context here rather than the one from the request so that it cannot be cancelled upstream. + err := s.db.Transactional(context.Background(), func(tx db.Tx) error { + for _, artifact := range req.Data.Artifacts { + blobID, err := s.db.CreateBlob(tx, &pacta.Blob{ + FileName: artifact.FileName, + FileType: artifact.FileType, + BlobURI: artifact.BlobURI, + }) + if err != nil { + return fmt.Errorf("creating blob: %w", err) + } + _, err = s.db.CreateAnalysisArtifact(tx, &pacta.AnalysisArtifact{ + Blob: &pacta.Blob{ID: blobID}, + AnalysisID: req.Data.Request.AnalysisID, + }) + if err != nil { + return fmt.Errorf("creating analysis artifact: %w", err) + } + } + err := s.db.UpdateAnalysis(tx, req.Data.Request.AnalysisID, db.SetAnalysisCompletedAt(now)) + if err != nil { + return fmt.Errorf("updating analysis: %w", err) + } + return nil + }) + if err != nil { + s.logger.Error("failed to save analysis response to database", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + s.logger.Info("audit completed", + zap.String("task_id", string(req.Data.TaskID)), + zap.Duration("run_time", now.Sub(ranAt)), + zap.String("analysis_id", string(req.Data.Request.AnalysisID))) + } +} + +func (s *Server) handleCreatedReportResponse() http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var reqs []struct { + Data *task.CreateReportResponse `json:"data"` + EventType string `json:"eventType"` + ID string `json:"id"` + Subject string `json:"subject"` + DataVersion string `json:"dataVersion"` + MetadataVersion string `json:"metadataVersion"` + EventTime time.Time `json:"eventTime"` + Topic string `json:"topic"` + } + if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil { + s.logger.Error("failed to parse webhook request body", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + if len(reqs) != 1 { + s.logger.Error("webhook response had unexpected number of events", zap.Int("event_count", len(reqs))) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + req := reqs[0] + + if req.Data == nil { + s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID)) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + + var ranAt time.Time + now := s.now() + // We use a background context here rather than the one from the request so that it cannot be cancelled upstream. + err := s.db.Transactional(context.Background(), func(tx db.Tx) error { + for _, artifact := range req.Data.Artifacts { + blobID, err := s.db.CreateBlob(tx, &pacta.Blob{ + FileName: artifact.FileName, + FileType: artifact.FileType, + BlobURI: artifact.BlobURI, + }) + if err != nil { + return fmt.Errorf("creating blob: %w", err) + } + _, err = s.db.CreateAnalysisArtifact(tx, &pacta.AnalysisArtifact{ + Blob: &pacta.Blob{ID: blobID}, + AnalysisID: req.Data.Request.AnalysisID, + }) + if err != nil { + return fmt.Errorf("creating analysis artifact: %w", err) + } + } + err := s.db.UpdateAnalysis(tx, req.Data.Request.AnalysisID, db.SetAnalysisCompletedAt(now)) + if err != nil { + return fmt.Errorf("updating analysis: %w", err) + } + return nil + }) + if err != nil { + s.logger.Error("failed to save analysis response to database", zap.Error(err)) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + s.logger.Info("report completed", + zap.String("task_id", string(req.Data.TaskID)), + zap.Duration("run_time", now.Sub(ranAt)), + zap.String("analysis_id", string(req.Data.Request.AnalysisID))) + } +} + func asStrs[T ~string](ts []T) []string { ss := make([]string, len(ts)) for i, t := range ts { diff --git a/task/task.go b/task/task.go index b7ef1bf..ab5a895 100644 --- a/task/task.go +++ b/task/task.go @@ -43,9 +43,16 @@ type CreateAuditRequest struct { BlobURIs []pacta.BlobURI } +type AnalysisArtifact struct { + BlobURI pacta.BlobURI + FileName string + FileType pacta.FileType +} + type CreateAuditResponse struct { - TaskID ID - Request *CreateAuditRequest + TaskID ID + Request *CreateAuditRequest + Artifacts []*AnalysisArtifact } type CreateReportRequest struct { @@ -54,8 +61,9 @@ type CreateReportRequest struct { } type CreateReportResponse struct { - TaskID ID - Request *CreateReportRequest + TaskID ID + Request *CreateReportRequest + Artifacts []*AnalysisArtifact } type EnvVar struct {