Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Dec 6, 2023
1 parent 9e88ed4 commit 152218f
Show file tree
Hide file tree
Showing 21 changed files with 825 additions and 88 deletions.
2 changes: 2 additions & 0 deletions azure/azevents/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
importpath = "github.com/RMI/pacta/azure/azevents",
visibility = ["//visibility:public"],
deps = [
"//db",
"//pacta",
"//task",
"@com_github_go_chi_chi_v5//:chi",
"@org_uber_go_zap//:zap",
Expand Down
113 changes: 111 additions & 2 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package azevents

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,13 +13,17 @@ import (
"strings"
"time"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
"github.com/go-chi/chi/v5"
"go.uber.org/zap"
)

type Config struct {
Logger *zap.Logger
DB DB
Now func() time.Time

Subscription string
ResourceGroup string
Expand All @@ -31,6 +36,16 @@ type Config struct {
ParsedPortfolioTopicName string
}

type DB interface {
Transactional(context.Context, func(tx db.Tx) error) error

CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error)
CreatePortfolio(tx db.Tx, p *pacta.Portfolio) (pacta.PortfolioID, error)

IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error)
UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error
}

const parsedPortfolioPath = "/events/parsed_portfolio"

func (c *Config) validate() error {
Expand All @@ -49,6 +64,12 @@ func (c *Config) validate() error {
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
if c.DB == nil {
return errors.New("no DB was given")
}
if c.Now == nil {
return errors.New("no Now function was given")
}
return nil
}

Expand All @@ -61,6 +82,8 @@ type Server struct {
subscription string
resourceGroup string
pathToTopic map[string]string
db DB
now func() time.Time
}

func NewServer(cfg *Config) (*Server, error) {
Expand All @@ -73,6 +96,8 @@ func NewServer(cfg *Config) (*Server, error) {
allowedAuthSecrets: cfg.AllowedAuthSecrets,
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
db: cfg.DB,
now: cfg.Now,
pathToTopic: map[string]string{
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
},
Expand Down Expand Up @@ -213,7 +238,91 @@ func (s *Server) RegisterHandlers(r chi.Router) {
return
}

// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)))
if len(req.Data.Outputs) == 0 {
s.logger.Error("webhook response had no processed portfolios", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

portfolioIDs := []pacta.PortfolioID{}
var ranAt time.Time
now := s.now()
err := s.db.Transactional(r.Context(), func(tx db.Tx) error {
incompleteUploads, err := s.db.IncompleteUploads(tx, req.Data.Request.IncompleteUploadIDs)
if err != nil {
return fmt.Errorf("reading incomplete uploads: %w", err)
}
if len(incompleteUploads) == 0 {
return fmt.Errorf("no incomplete uploads found for ids: %v", req.Data.Request.IncompleteUploadIDs)
}
var holdingsDate *pacta.HoldingsDate
var ownerID pacta.OwnerID
for _, iu := range incompleteUploads {
if ownerID == "" {
ownerID = iu.Owner.ID
} else if ownerID != iu.Owner.ID {
return fmt.Errorf("multiple owners found for incomplete uploads: %+v", incompleteUploads)
}
if iu.HoldingsDate == nil {
return fmt.Errorf("incomplete upload %s had no holdings date", iu.ID)
}
if holdingsDate == nil {
holdingsDate = iu.HoldingsDate
} else if *holdingsDate != *iu.HoldingsDate {
return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads)
}
ranAt = iu.RanAt
}
for i, output := range req.Data.Outputs {
blobID, err := s.db.CreateBlob(tx, &output.Blob)
if err != nil {
return fmt.Errorf("creating blob %d: %w", i, err)
}
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
Blob: &pacta.Blob{ID: blobID},
HoldingsDate: holdingsDate,
})
if err != nil {
return fmt.Errorf("creating portfolio %d: %w", i, err)
}
portfolioIDs = append(portfolioIDs, portfolioID)
}
for i, iu := range incompleteUploads {
err := s.db.UpdateIncompleteUpload(
tx,
iu.ID,
db.SetIncompleteUploadCompletedAt(now),
db.SetIncompleteUploadFailureMessage(""),
db.SetIncompleteUploadFailureCode(""))
if err != nil {
return fmt.Errorf("updating incomplete upload %d: %w", i, err)
}
}
return nil
})
if err != nil {
s.logger.Error("failed to save response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("parsed portfolio",
zap.String("task_id", string(req.Data.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.Strings("incomplete_upload_ids", asStrs(req.Data.Request.IncompleteUploadIDs)),
zap.Int("incomplete_upload_count", len(req.Data.Request.IncompleteUploadIDs)),
zap.Strings("portfolio_ids", asStrs(portfolioIDs)),
zap.Int("portfolio_count", len(portfolioIDs)))
})
}

func asStrs[T ~string](ts []T) []string {
ss := make([]string, len(ts))
for i, t := range ts {
ss[i] = string(t)
}
return ss
}
2 changes: 1 addition & 1 deletion cmd/runner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ oci_tarball(
name = "image_tarball",
image = ":image",
repo_tags = [],
)
)
2 changes: 1 addition & 1 deletion cmd/runner/taskrunner/taskrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfol
},
{
Key: "PARSE_PORTFOLIO_REQUEST",
Value: buf.String(),
Value: value,
},
})
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func run(args []string) error {
}
runner = tmp
} else {
logger.Info("initializing local task runner client")
tmp, err := dockertask.NewRunner(logger, &dockertask.ServicePrincipal{
TenantID: *localDockerTenantID,
ClientID: *localDockerClientID,
Expand Down Expand Up @@ -270,6 +271,7 @@ func run(args []string) error {
Logger: logger,
DB: db,
TaskRunner: tr,
Now: time.Now,
}

pactaStrictHandler := oapipacta.NewStrictHandlerWithOptions(srv, nil /* middleware */, oapipacta.StrictHTTPServerOptions{
Expand All @@ -290,6 +292,8 @@ func run(args []string) error {
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
ParsedPortfolioTopicName: *azEventParsedPortfolioTopic,
DB: db,
Now: time.Now,
})
if err != nil {
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
Expand All @@ -313,7 +317,7 @@ func run(args []string) error {
// LogEntry created by the logging middleware.
chimiddleware.RequestID,
chimiddleware.RealIP,
zaphttplog.NewMiddleware(logger, zaphttplog.WithConcise(true)),
zaphttplog.NewMiddleware(logger, zaphttplog.WithConcise(false)),
chimiddleware.Recoverer,

jwtauth.Verifier(jwtauth.New("EdDSA", nil, jwKey)),
Expand Down
2 changes: 2 additions & 0 deletions cmd/server/pactasrv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "pactasrv",
srcs = [
"incomplete_upload.go",
"initiative.go",
"initiative_invitation.go",
"initiative_user_relationship.go",
Expand All @@ -25,6 +26,7 @@ go_library(
"//session",
"//task",
"@com_github_go_chi_jwtauth_v5//:jwtauth",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
],
)
1 change: 1 addition & 0 deletions cmd/server/pactasrv/conv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "conv",
srcs = [
"helpers.go",
"oapi_to_pacta.go",
"pacta_to_oapi.go",
],
Expand Down
41 changes: 41 additions & 0 deletions cmd/server/pactasrv/conv/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package conv

import "time"

func ptr[T any](t T) *T {
return &t
}

func ifNil[T any](t *T, fallback T) T {
if t == nil {
return fallback
}
return *t
}

func timeToNilable(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}

func stringToNilable[T ~string](t T) *string {
if t == "" {
return nil
}
s := string(t)
return &s
}

func convAll[I any, O any](is []I, f func(I) (O, error)) ([]O, error) {
os := make([]O, len(is))
for i, v := range is {
o, err := f(v)
if err != nil {
return nil, err
}
os[i] = o
}
return os, nil
}
10 changes: 6 additions & 4 deletions cmd/server/pactasrv/conv/oapi_to_pacta.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ func InitiativeInvitationFromOAPI(i *api.InitiativeInvitationCreate) (*pacta.Ini
}, nil
}

func ifNil[T any](t *T, fallback T) T {
if t == nil {
return fallback
func HoldingsDateFromOAPI(hd *api.HoldingsDate) (*pacta.HoldingsDate, error) {
if hd == nil {
return nil, nil
}
return *t
return &pacta.HoldingsDate{
Time: hd.Time,
}, nil
}
59 changes: 57 additions & 2 deletions cmd/server/pactasrv/conv/pacta_to_oapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/RMI/pacta/oapierr"
api "github.com/RMI/pacta/openapi/pacta"
"github.com/RMI/pacta/pacta"
"go.uber.org/zap"
)

func InitiativeToOAPI(i *pacta.Initiative) (*api.Initiative, error) {
Expand Down Expand Up @@ -94,6 +95,60 @@ func InitiativeUserRelationshipToOAPI(i *pacta.InitiativeUserRelationship) (*api
}, nil
}

func ptr[T any](t T) *T {
return &t
func HoldingsDateToOAPI(hd *pacta.HoldingsDate) (*api.HoldingsDate, error) {
if hd == nil {
return nil, nil
}
return &api.HoldingsDate{
Time: hd.Time,
}, nil
}

func IncompleteUploadsToOAPI(ius []*pacta.IncompleteUpload) ([]*api.IncompleteUpload, error) {
return convAll(ius, IncompleteUploadToOAPI)
}

func IncompleteUploadToOAPI(iu *pacta.IncompleteUpload) (*api.IncompleteUpload, error) {
if iu == nil {
return nil, oapierr.Internal("incompleteUploadToOAPI: can't convert nil pointer")
}
hd, err := HoldingsDateToOAPI(iu.HoldingsDate)
if err != nil {
return nil, oapierr.Internal("incompleteUploadToOAPI: holdingsDateToOAPI failed", zap.Error(err))
}
return &api.IncompleteUpload{
Id: string(iu.ID),
Name: iu.Name,
Description: iu.Description,
HoldingsDate: hd,
CreatedAt: iu.CreatedAt,
RanAt: timeToNilable(iu.RanAt),
CompletedAt: timeToNilable(iu.CompletedAt),
FailureCode: stringToNilable(iu.FailureCode),
FailureMessage: stringToNilable(iu.FailureMessage),
AdminDebugEnabled: iu.AdminDebugEnabled,
}, nil
}

func PortfoliosToOAPI(ius []*pacta.Portfolio) ([]*api.Portfolio, error) {
return convAll(ius, PortfolioToOAPI)
}

func PortfolioToOAPI(p *pacta.Portfolio) (*api.Portfolio, error) {
if p == nil {
return nil, oapierr.Internal("portfolioToOAPI: can't convert nil pointer")
}
hd, err := HoldingsDateToOAPI(p.HoldingsDate)
if err != nil {
return nil, oapierr.Internal("portfolioToOAPI: holdingsDateToOAPI failed", zap.Error(err))
}
return &api.Portfolio{
Id: string(p.ID),
Name: p.Name,
Description: p.Description,
HoldingsDate: hd,
CreatedAt: p.CreatedAt,
NumberOfRows: p.NumberOfRows,
AdminDebugEnabled: p.AdminDebugEnabled,
}, nil
}
Loading

0 comments on commit 152218f

Please sign in to comment.