diff --git a/azure/azevents/azevents.go b/azure/azevents/azevents.go index 439bac1..87f99ee 100644 --- a/azure/azevents/azevents.go +++ b/azure/azevents/azevents.go @@ -247,7 +247,8 @@ func (s *Server) RegisterHandlers(r chi.Router) { portfolioIDs := []pacta.PortfolioID{} var ranAt time.Time now := s.now() - err := s.db.Transactional(r.Context(), func(tx db.Tx) error { + // We use a background context here rather than the one from the request so that it cannot be cancelled upstream. + err := s.db.Transactional(context.Background(), func(tx db.Tx) error { incompleteUploads, err := s.db.IncompleteUploads(tx, req.Data.Request.IncompleteUploadIDs) if err != nil { return fmt.Errorf("reading incomplete uploads: %w", err) @@ -271,7 +272,9 @@ func (s *Server) RegisterHandlers(r chi.Router) { } else if *holdingsDate != *iu.HoldingsDate { return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads) } - ranAt = iu.RanAt + if iu.RanAt.After(ranAt) { + ranAt = iu.RanAt + } } for i, output := range req.Data.Outputs { blobID, err := s.db.CreateBlob(tx, &output.Blob) @@ -294,9 +297,7 @@ func (s *Server) RegisterHandlers(r chi.Router) { err := s.db.UpdateIncompleteUpload( tx, iu.ID, - db.SetIncompleteUploadCompletedAt(now), - db.SetIncompleteUploadFailureMessage(""), - db.SetIncompleteUploadFailureCode("")) + db.SetIncompleteUploadCompletedAt(now)) if err != nil { return fmt.Errorf("updating incomplete upload %d: %w", i, err) } diff --git a/cmd/runner/BUILD.bazel b/cmd/runner/BUILD.bazel index 27b2f8b..080d643 100644 --- a/cmd/runner/BUILD.bazel +++ b/cmd/runner/BUILD.bazel @@ -71,4 +71,4 @@ oci_tarball( name = "image_tarball", image = ":image", repo_tags = [], -) \ No newline at end of file +) diff --git a/cmd/server/pactasrv/BUILD.bazel b/cmd/server/pactasrv/BUILD.bazel index b03dab7..ee7f21b 100644 --- a/cmd/server/pactasrv/BUILD.bazel +++ b/cmd/server/pactasrv/BUILD.bazel @@ -28,5 +28,6 @@ go_library( "@com_github_go_chi_jwtauth_v5//:jwtauth", "@com_github_google_uuid//:uuid", "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", ], ) diff --git a/cmd/server/pactasrv/incomplete_upload.go b/cmd/server/pactasrv/incomplete_upload.go index c905264..3d3a1f9 100644 --- a/cmd/server/pactasrv/incomplete_upload.go +++ b/cmd/server/pactasrv/incomplete_upload.go @@ -10,11 +10,11 @@ import ( api "github.com/RMI/pacta/openapi/pacta" "github.com/RMI/pacta/pacta" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // (GET /incomplete-uploads) func (s *Server) ListIncompleteUploads(ctx context.Context, request api.ListIncompleteUploadsRequestObject) (api.ListIncompleteUploadsResponseObject, error) { - // TODO(#12) Implement Authorization ownerID, err := s.getUserOwnerID(ctx) if err != nil { return nil, err @@ -96,18 +96,19 @@ func (s *Server) checkIncompleteUploadAuthorization(ctx context.Context, id pact return nil, err } // Extracted to a common variable so that we return the same response for not found and unauthorized. - notFoundErr := func(err error) error { - return oapierr.NotFound("incomplete upload not found", zap.String("incomplete_upload_id", string(id)), zap.Error(err)) + notFoundErr := func(fields ...zapcore.Field) error { + fs := append(fields, zap.String("incomplete_upload_id", string(id))) + return oapierr.NotFound("incomplete upload not found", fs...) } iu, err := s.DB.IncompleteUpload(s.DB.NoTxn(ctx), id) if err != nil { if db.IsNotFound(err) { - return nil, notFoundErr(err) + return nil, notFoundErr(zap.Error(err)) } return nil, oapierr.Internal("failed to look up incomplete upload", zap.String("incomplete_upload_id", string(id)), zap.Error(err)) } if iu.Owner.ID != actorOwnerID { - return nil, notFoundErr(fmt.Errorf("incomplete upload does not belong to user: owner=%s actor=%s", iu.Owner.ID, actorOwnerID)) + return nil, notFoundErr(zap.Error(fmt.Errorf("incomplete upload does not belong to user")), zap.String("owner_id", string(iu.Owner.ID)), zap.String("actor_id", string(actorOwnerID))) } return iu, nil } diff --git a/cmd/server/pactasrv/portfolio.go b/cmd/server/pactasrv/portfolio.go index 9826a1f..b33a743 100644 --- a/cmd/server/pactasrv/portfolio.go +++ b/cmd/server/pactasrv/portfolio.go @@ -10,6 +10,7 @@ import ( api "github.com/RMI/pacta/openapi/pacta" "github.com/RMI/pacta/pacta" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // (GET /portfolios) @@ -95,18 +96,19 @@ func (s *Server) checkPortfolioAuthorization(ctx context.Context, id pacta.Portf return nil, err } // Extracted to a common variable so that we return the same response for not found and unauthorized. - notFoundErr := func(err error) error { - return oapierr.NotFound("portfolio not found", zap.String("portfolio_id", string(id)), zap.Error(err)) + notFoundErr := func(fields ...zapcore.Field) error { + fs := append(fields, zap.String("portfolio_id", string(id))) + return oapierr.NotFound("portfolio not found", fs...) } iu, err := s.DB.Portfolio(s.DB.NoTxn(ctx), id) if err != nil { if db.IsNotFound(err) { - return nil, notFoundErr(err) + return nil, notFoundErr(zap.Error(err)) } return nil, oapierr.Internal("failed to look up portfolio", zap.String("portfolio_id", string(id)), zap.Error(err)) } if iu.Owner.ID != actorOwnerID { - return nil, notFoundErr(fmt.Errorf("portfolio does not belong to user: owner=%s actor=%s", iu.Owner.ID, actorOwnerID)) + return nil, notFoundErr(zap.Error(fmt.Errorf("portfolio does not belong to user")), zap.String("owner", string(iu.Owner.ID)), zap.String("actor", string(actorOwnerID))) } return iu, nil } diff --git a/cmd/server/pactasrv/upload.go b/cmd/server/pactasrv/upload.go index 22bd3a9..4650b0b 100644 --- a/cmd/server/pactasrv/upload.go +++ b/cmd/server/pactasrv/upload.go @@ -31,10 +31,12 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort return nil, err } n := len(request.Body.Items) - blobs := make([]*pacta.Blob, n) - resp := api.StartPortfolioUpload200JSONResponse{ - Items: make([]api.StartPortfolioUploadRespItem, n), + if n > 25 { + // TODO(#71) Implement basic limits + return nil, oapierr.BadRequest("too many items") } + blobs := make([]*pacta.Blob, n) + respItems := make([]api.StartPortfolioUploadRespItem, n) for i, item := range request.Body.Items { fn := item.FileName extStr := filepath.Ext(fn) @@ -46,13 +48,13 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort zap.String("file_name", fn), zap.Error(err)). WithErrorID("INVALID_FILE_EXTENSION"). - WithMessage(fmt.Sprintf("File extension %q from file %q is not supported", extStr, fn)) + WithMessage(fmt.Sprintf(`{"file": %q, "extension": %q}`, fn, extStr)) } blobs[i] = &pacta.Blob{ FileType: ft, FileName: item.FileName, } - resp.Items[i].FileName = string(fn) + respItems[i].FileName = string(fn) } for i := range request.Body.Items { @@ -63,7 +65,7 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort return nil, oapierr.Internal("failed to sign blob URI", zap.String("uri", uri), zap.Error(err)) } blobs[i].BlobURI = pacta.BlobURI(uri) - resp.Items[i].UploadUrl = signed + respItems[i].UploadUrl = signed } err = s.DB.Transactional(ctx, func(tx db.Tx) error { @@ -83,7 +85,7 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort if err != nil { return fmt.Errorf("creating incomplete upload %d: %w", i, err) } - resp.Items[i].IncompleteUploadId = string(iuid) + respItems[i].IncompleteUploadId = string(iuid) } return nil }) @@ -91,7 +93,7 @@ func (s *Server) StartPortfolioUpload(ctx context.Context, request api.StartPort return nil, oapierr.Internal("failed to create uploads", zap.Error(err)) } - return resp, nil + return api.StartPortfolioUpload200JSONResponse{Items: respItems}, nil } // Called after uploads of portfolios to cloud storage are complete. @@ -119,7 +121,7 @@ func (s *Server) CompletePortfolioUpload(ctx context.Context, request api.Comple blobIDs := []pacta.BlobID{} for _, id := range ids { iu := ius[id] - if iu == nil || iu.Owner.ID != ownerID { + if iu == nil || iu.Owner == nil || iu.Owner.ID != ownerID { return oapierr.NotFound( fmt.Sprintf("incomplete upload %s does not belong to user", id), zap.String("incomplete_upload_id", string(id)),