Skip to content

Commit 73ea028

Browse files
authored
Finishes Backend for Uploads (#72)
1 parent 9e88ed4 commit 73ea028

20 files changed

+831
-87
lines changed

azure/azevents/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ go_library(
66
importpath = "github.com/RMI/pacta/azure/azevents",
77
visibility = ["//visibility:public"],
88
deps = [
9+
"//db",
10+
"//pacta",
911
"//task",
1012
"@com_github_go_chi_chi_v5//:chi",
1113
"@org_uber_go_zap//:zap",

azure/azevents/azevents.go

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package azevents
55

66
import (
7+
"context"
78
"encoding/json"
89
"errors"
910
"fmt"
@@ -12,13 +13,17 @@ import (
1213
"strings"
1314
"time"
1415

16+
"github.com/RMI/pacta/db"
17+
"github.com/RMI/pacta/pacta"
1518
"github.com/RMI/pacta/task"
1619
"github.com/go-chi/chi/v5"
1720
"go.uber.org/zap"
1821
)
1922

2023
type Config struct {
2124
Logger *zap.Logger
25+
DB DB
26+
Now func() time.Time
2227

2328
Subscription string
2429
ResourceGroup string
@@ -31,6 +36,16 @@ type Config struct {
3136
ParsedPortfolioTopicName string
3237
}
3338

39+
type DB interface {
40+
Transactional(context.Context, func(tx db.Tx) error) error
41+
42+
CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error)
43+
CreatePortfolio(tx db.Tx, p *pacta.Portfolio) (pacta.PortfolioID, error)
44+
45+
IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error)
46+
UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error
47+
}
48+
3449
const parsedPortfolioPath = "/events/parsed_portfolio"
3550

3651
func (c *Config) validate() error {
@@ -49,6 +64,12 @@ func (c *Config) validate() error {
4964
if c.ParsedPortfolioTopicName == "" {
5065
return errors.New("no parsed portfolio topic name given")
5166
}
67+
if c.DB == nil {
68+
return errors.New("no DB was given")
69+
}
70+
if c.Now == nil {
71+
return errors.New("no Now function was given")
72+
}
5273
return nil
5374
}
5475

@@ -61,6 +82,8 @@ type Server struct {
6182
subscription string
6283
resourceGroup string
6384
pathToTopic map[string]string
85+
db DB
86+
now func() time.Time
6487
}
6588

6689
func NewServer(cfg *Config) (*Server, error) {
@@ -73,6 +96,8 @@ func NewServer(cfg *Config) (*Server, error) {
7396
allowedAuthSecrets: cfg.AllowedAuthSecrets,
7497
subscription: cfg.Subscription,
7598
resourceGroup: cfg.ResourceGroup,
99+
db: cfg.DB,
100+
now: cfg.Now,
76101
pathToTopic: map[string]string{
77102
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
78103
},
@@ -213,7 +238,92 @@ func (s *Server) RegisterHandlers(r chi.Router) {
213238
return
214239
}
215240

216-
// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
217-
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)))
241+
if len(req.Data.Outputs) == 0 {
242+
s.logger.Error("webhook response had no processed portfolios", zap.String("event_grid_id", req.ID))
243+
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
244+
return
245+
}
246+
247+
portfolioIDs := []pacta.PortfolioID{}
248+
var ranAt time.Time
249+
now := s.now()
250+
// We use a background context here rather than the one from the request so that it cannot be cancelled upstream.
251+
err := s.db.Transactional(context.Background(), func(tx db.Tx) error {
252+
incompleteUploads, err := s.db.IncompleteUploads(tx, req.Data.Request.IncompleteUploadIDs)
253+
if err != nil {
254+
return fmt.Errorf("reading incomplete uploads: %w", err)
255+
}
256+
if len(incompleteUploads) == 0 {
257+
return fmt.Errorf("no incomplete uploads found for ids: %v", req.Data.Request.IncompleteUploadIDs)
258+
}
259+
var holdingsDate *pacta.HoldingsDate
260+
var ownerID pacta.OwnerID
261+
for _, iu := range incompleteUploads {
262+
if ownerID == "" {
263+
ownerID = iu.Owner.ID
264+
} else if ownerID != iu.Owner.ID {
265+
return fmt.Errorf("multiple owners found for incomplete uploads: %+v", incompleteUploads)
266+
}
267+
if iu.HoldingsDate == nil {
268+
return fmt.Errorf("incomplete upload %s had no holdings date", iu.ID)
269+
}
270+
if holdingsDate == nil {
271+
holdingsDate = iu.HoldingsDate
272+
} else if *holdingsDate != *iu.HoldingsDate {
273+
return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads)
274+
}
275+
if iu.RanAt.After(ranAt) {
276+
ranAt = iu.RanAt
277+
}
278+
}
279+
for i, output := range req.Data.Outputs {
280+
blobID, err := s.db.CreateBlob(tx, &output.Blob)
281+
if err != nil {
282+
return fmt.Errorf("creating blob %d: %w", i, err)
283+
}
284+
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
285+
Owner: &pacta.Owner{ID: ownerID},
286+
Name: output.Blob.FileName,
287+
NumberOfRows: output.LineCount,
288+
Blob: &pacta.Blob{ID: blobID},
289+
HoldingsDate: holdingsDate,
290+
})
291+
if err != nil {
292+
return fmt.Errorf("creating portfolio %d: %w", i, err)
293+
}
294+
portfolioIDs = append(portfolioIDs, portfolioID)
295+
}
296+
for i, iu := range incompleteUploads {
297+
err := s.db.UpdateIncompleteUpload(
298+
tx,
299+
iu.ID,
300+
db.SetIncompleteUploadCompletedAt(now))
301+
if err != nil {
302+
return fmt.Errorf("updating incomplete upload %d: %w", i, err)
303+
}
304+
}
305+
return nil
306+
})
307+
if err != nil {
308+
s.logger.Error("failed to save response to database", zap.Error(err))
309+
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
310+
return
311+
}
312+
313+
s.logger.Info("parsed portfolio",
314+
zap.String("task_id", string(req.Data.TaskID)),
315+
zap.Duration("run_time", now.Sub(ranAt)),
316+
zap.Strings("incomplete_upload_ids", asStrs(req.Data.Request.IncompleteUploadIDs)),
317+
zap.Int("incomplete_upload_count", len(req.Data.Request.IncompleteUploadIDs)),
318+
zap.Strings("portfolio_ids", asStrs(portfolioIDs)),
319+
zap.Int("portfolio_count", len(portfolioIDs)))
218320
})
219321
}
322+
323+
func asStrs[T ~string](ts []T) []string {
324+
ss := make([]string, len(ts))
325+
for i, t := range ts {
326+
ss[i] = string(t)
327+
}
328+
return ss
329+
}

cmd/runner/taskrunner/taskrunner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfol
9898
},
9999
{
100100
Key: "PARSE_PORTFOLIO_REQUEST",
101-
Value: buf.String(),
101+
Value: value,
102102
},
103103
})
104104
}

cmd/server/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ func run(args []string) error {
234234
}
235235
runner = tmp
236236
} else {
237+
logger.Info("initializing local task runner client")
237238
tmp, err := dockertask.NewRunner(logger, &dockertask.ServicePrincipal{
238239
TenantID: *localDockerTenantID,
239240
ClientID: *localDockerClientID,
@@ -270,6 +271,7 @@ func run(args []string) error {
270271
Logger: logger,
271272
DB: db,
272273
TaskRunner: tr,
274+
Now: time.Now,
273275
}
274276

275277
pactaStrictHandler := oapipacta.NewStrictHandlerWithOptions(srv, nil /* middleware */, oapipacta.StrictHTTPServerOptions{
@@ -290,6 +292,8 @@ func run(args []string) error {
290292
Subscription: *azEventSubscription,
291293
ResourceGroup: *azEventResourceGroup,
292294
ParsedPortfolioTopicName: *azEventParsedPortfolioTopic,
295+
DB: db,
296+
Now: time.Now,
293297
})
294298
if err != nil {
295299
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
@@ -313,7 +317,7 @@ func run(args []string) error {
313317
// LogEntry created by the logging middleware.
314318
chimiddleware.RequestID,
315319
chimiddleware.RealIP,
316-
zaphttplog.NewMiddleware(logger, zaphttplog.WithConcise(true)),
320+
zaphttplog.NewMiddleware(logger, zaphttplog.WithConcise(false)),
317321
chimiddleware.Recoverer,
318322

319323
jwtauth.Verifier(jwtauth.New("EdDSA", nil, jwKey)),

cmd/server/pactasrv/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "pactasrv",
55
srcs = [
6+
"incomplete_upload.go",
67
"initiative.go",
78
"initiative_invitation.go",
89
"initiative_user_relationship.go",
@@ -25,6 +26,8 @@ go_library(
2526
"//session",
2627
"//task",
2728
"@com_github_go_chi_jwtauth_v5//:jwtauth",
29+
"@com_github_google_uuid//:uuid",
2830
"@org_uber_go_zap//:zap",
31+
"@org_uber_go_zap//zapcore",
2932
],
3033
)

cmd/server/pactasrv/conv/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
33
go_library(
44
name = "conv",
55
srcs = [
6+
"helpers.go",
67
"oapi_to_pacta.go",
78
"pacta_to_oapi.go",
89
],

cmd/server/pactasrv/conv/helpers.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package conv
2+
3+
import "time"
4+
5+
func ptr[T any](t T) *T {
6+
return &t
7+
}
8+
9+
func ifNil[T any](t *T, fallback T) T {
10+
if t == nil {
11+
return fallback
12+
}
13+
return *t
14+
}
15+
16+
func timeToNilable(t time.Time) *time.Time {
17+
if t.IsZero() {
18+
return nil
19+
}
20+
return &t
21+
}
22+
23+
func stringToNilable[T ~string](t T) *string {
24+
if t == "" {
25+
return nil
26+
}
27+
s := string(t)
28+
return &s
29+
}
30+
31+
func convAll[I any, O any](is []I, f func(I) (O, error)) ([]O, error) {
32+
os := make([]O, len(is))
33+
for i, v := range is {
34+
o, err := f(v)
35+
if err != nil {
36+
return nil, err
37+
}
38+
os[i] = o
39+
}
40+
return os, nil
41+
}

cmd/server/pactasrv/conv/oapi_to_pacta.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,11 @@ func InitiativeInvitationFromOAPI(i *api.InitiativeInvitationCreate) (*pacta.Ini
6767
}, nil
6868
}
6969

70-
func ifNil[T any](t *T, fallback T) T {
71-
if t == nil {
72-
return fallback
70+
func HoldingsDateFromOAPI(hd *api.HoldingsDate) (*pacta.HoldingsDate, error) {
71+
if hd == nil {
72+
return nil, nil
7373
}
74-
return *t
74+
return &pacta.HoldingsDate{
75+
Time: hd.Time,
76+
}, nil
7577
}

cmd/server/pactasrv/conv/pacta_to_oapi.go

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"github.com/RMI/pacta/oapierr"
55
api "github.com/RMI/pacta/openapi/pacta"
66
"github.com/RMI/pacta/pacta"
7+
"go.uber.org/zap"
78
)
89

910
func InitiativeToOAPI(i *pacta.Initiative) (*api.Initiative, error) {
@@ -94,6 +95,60 @@ func InitiativeUserRelationshipToOAPI(i *pacta.InitiativeUserRelationship) (*api
9495
}, nil
9596
}
9697

97-
func ptr[T any](t T) *T {
98-
return &t
98+
func HoldingsDateToOAPI(hd *pacta.HoldingsDate) (*api.HoldingsDate, error) {
99+
if hd == nil {
100+
return nil, nil
101+
}
102+
return &api.HoldingsDate{
103+
Time: hd.Time,
104+
}, nil
105+
}
106+
107+
func IncompleteUploadsToOAPI(ius []*pacta.IncompleteUpload) ([]*api.IncompleteUpload, error) {
108+
return convAll(ius, IncompleteUploadToOAPI)
109+
}
110+
111+
func IncompleteUploadToOAPI(iu *pacta.IncompleteUpload) (*api.IncompleteUpload, error) {
112+
if iu == nil {
113+
return nil, oapierr.Internal("incompleteUploadToOAPI: can't convert nil pointer")
114+
}
115+
hd, err := HoldingsDateToOAPI(iu.HoldingsDate)
116+
if err != nil {
117+
return nil, oapierr.Internal("incompleteUploadToOAPI: holdingsDateToOAPI failed", zap.Error(err))
118+
}
119+
return &api.IncompleteUpload{
120+
Id: string(iu.ID),
121+
Name: iu.Name,
122+
Description: iu.Description,
123+
HoldingsDate: hd,
124+
CreatedAt: iu.CreatedAt,
125+
RanAt: timeToNilable(iu.RanAt),
126+
CompletedAt: timeToNilable(iu.CompletedAt),
127+
FailureCode: stringToNilable(iu.FailureCode),
128+
FailureMessage: stringToNilable(iu.FailureMessage),
129+
AdminDebugEnabled: iu.AdminDebugEnabled,
130+
}, nil
131+
}
132+
133+
func PortfoliosToOAPI(ius []*pacta.Portfolio) ([]*api.Portfolio, error) {
134+
return convAll(ius, PortfolioToOAPI)
135+
}
136+
137+
func PortfolioToOAPI(p *pacta.Portfolio) (*api.Portfolio, error) {
138+
if p == nil {
139+
return nil, oapierr.Internal("portfolioToOAPI: can't convert nil pointer")
140+
}
141+
hd, err := HoldingsDateToOAPI(p.HoldingsDate)
142+
if err != nil {
143+
return nil, oapierr.Internal("portfolioToOAPI: holdingsDateToOAPI failed", zap.Error(err))
144+
}
145+
return &api.Portfolio{
146+
Id: string(p.ID),
147+
Name: p.Name,
148+
Description: p.Description,
149+
HoldingsDate: hd,
150+
CreatedAt: p.CreatedAt,
151+
NumberOfRows: p.NumberOfRows,
152+
AdminDebugEnabled: p.AdminDebugEnabled,
153+
}, nil
99154
}

0 commit comments

Comments
 (0)