Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Nov 20, 2023
1 parent 1672438 commit 6505758
Show file tree
Hide file tree
Showing 62 changed files with 2,429 additions and 182 deletions.
24 changes: 24 additions & 0 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,30 @@ func (c *Client) ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error
return resp.Body, nil
}

func (c *Client) DeleteBlob(ctx context.Context, uri string) error {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return fmt.Errorf("malformed URI %q is not for Azure", uri)
}

_, err := c.client.DeleteBlob(ctx, ctr, blb, nil)
if err != nil {
return fmt.Errorf("failed to delete blob: %w", err)
}

return nil
}

func (c *Client) DeleteBlobs(ctx context.Context, uris []string) error {
// TODO: Implement parallel delete if slow.
for _, uri := range uris {
if err := c.DeleteBlob(ctx, uri); err != nil {
return err
}
}
return nil
}

// SignedUploadURL returns a URL that is allowed to upload to the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob@v1.0.0/sas#example-package-UserDelegationSAS
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, error) {
Expand Down
2 changes: 2 additions & 0 deletions azure/azevents/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
importpath = "github.com/RMI/pacta/azure/azevents",
visibility = ["//visibility:public"],
deps = [
"//db",
"//pacta",
"//task",
"@com_github_go_chi_chi_v5//:chi",
"@org_uber_go_zap//:zap",
Expand Down
114 changes: 112 additions & 2 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package azevents

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -12,13 +13,17 @@ import (
"strings"
"time"

"github.com/RMI/pacta/db"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
"github.com/go-chi/chi/v5"
"go.uber.org/zap"
)

type Config struct {
Logger *zap.Logger
DB DB
Now func() time.Time

Subscription string
ResourceGroup string
Expand All @@ -31,6 +36,16 @@ type Config struct {
ParsedPortfolioTopicName string
}

type DB interface {
Transactional(context.Context, func(tx db.Tx) error) error

CreateBlob(tx db.Tx, b *pacta.Blob) (pacta.BlobID, error)
CreatePortfolio(tx db.Tx, p *pacta.Portfolio) (pacta.PortfolioID, error)

IncompleteUploads(tx db.Tx, ids []pacta.IncompleteUploadID) (map[pacta.IncompleteUploadID]*pacta.IncompleteUpload, error)
UpdateIncompleteUpload(tx db.Tx, id pacta.IncompleteUploadID, mutations ...db.UpdateIncompleteUploadFn) error
}

const parsedPortfolioPath = "/events/parsed_portfolio"

func (c *Config) validate() error {
Expand All @@ -49,6 +64,12 @@ func (c *Config) validate() error {
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
if c.DB == nil {
return errors.New("no DB was given")
}
if c.Now == nil {
return errors.New("no Now function was given")
}
return nil
}

Expand All @@ -61,6 +82,8 @@ type Server struct {
subscription string
resourceGroup string
pathToTopic map[string]string
db DB
now func() time.Time
}

func NewServer(cfg *Config) (*Server, error) {
Expand All @@ -73,6 +96,8 @@ func NewServer(cfg *Config) (*Server, error) {
allowedAuthSecrets: cfg.AllowedAuthSecrets,
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
db: cfg.DB,
now: cfg.Now,
pathToTopic: map[string]string{
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
},
Expand Down Expand Up @@ -206,14 +231,99 @@ func (s *Server) RegisterHandlers(r chi.Router) {
return
}
req := reqs[0]
s.logger.Info("starting to process portfolio task", zap.String("task_id", string(req.Data.TaskID)))

if req.Data == nil {
s.logger.Error("webhook response had no payload", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
if len(req.Data.Outputs) == 0 {
s.logger.Error("webhook response had no processed portfolios", zap.String("event_grid_id", req.ID))
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}

portfolioIDs := []pacta.PortfolioID{}
var ranAt time.Time
now := s.now()
err := s.db.Transactional(r.Context(), func(tx db.Tx) error {
incompleteUploads, err := s.db.IncompleteUploads(tx, req.Data.Request.IncompleteUploadIDs)
if err != nil {
return fmt.Errorf("reading incomplete uploads: %w", err)
}
if len(incompleteUploads) == 0 {
return fmt.Errorf("no incomplete uploads found for ids: %v", req.Data.Request.IncompleteUploadIDs)
}
var holdingsDate *pacta.HoldingsDate
var ownerID pacta.OwnerID
for _, iu := range incompleteUploads {
if ownerID == "" {
ownerID = iu.Owner.ID
} else if ownerID != iu.Owner.ID {
return fmt.Errorf("multiple owners found for incomplete uploads: %+v", incompleteUploads)
}
if iu.HoldingsDate == nil {
return fmt.Errorf("incomplete upload %s had no holdings date", iu.ID)
}
if holdingsDate == nil {
holdingsDate = iu.HoldingsDate
} else if *holdingsDate != *iu.HoldingsDate {
return fmt.Errorf("multiple holdings dates found for incomplete uploads: %+v", incompleteUploads)
}
ranAt = iu.RanAt
}
for i, output := range req.Data.Outputs {
blobID, err := s.db.CreateBlob(tx, &output.Blob)
if err != nil {
return fmt.Errorf("creating blob %d: %w", i, err)
}
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
Blob: &pacta.Blob{ID: blobID},
HoldingsDate: holdingsDate,
})
if err != nil {
return fmt.Errorf("creating portfolio %d: %w", i, err)
}
portfolioIDs = append(portfolioIDs, portfolioID)
}
for i, iu := range incompleteUploads {
err := s.db.UpdateIncompleteUpload(
tx,
iu.ID,
db.SetIncompleteUploadCompletedAt(now),
db.SetIncompleteUploadFailureMessage(""),
db.SetIncompleteUploadFailureCode(""))
if err != nil {
return fmt.Errorf("updating incomplete upload %d: %w", i, err)
}
}
return nil
})
if err != nil {
s.logger.Error("failed to save response to database", zap.Error(err))
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}

s.logger.Info("parsed portfolio",
zap.String("task_id", string(req.Data.TaskID)),
zap.Duration("run_time", now.Sub(ranAt)),
zap.Strings("incomplete_upload_ids", asStrs(req.Data.Request.IncompleteUploadIDs)),
zap.Int("incomplete_upload_count", len(req.Data.Request.IncompleteUploadIDs)),
zap.Strings("portfolio_ids", asStrs(portfolioIDs)),
zap.Int("portfolio_count", len(portfolioIDs)))
})
}

func asStrs[T ~string](ts []T) []string {
ss := make([]string, len(ts))
for i, t := range ts {
ss[i] = string(t)
}
return ss
}
1 change: 1 addition & 0 deletions cmd/runner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
"@com_github_google_uuid//:uuid",
"@com_github_namsral_flag//:flag",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
7 changes: 7 additions & 0 deletions cmd/runner/README.md2
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

bazel build --@io_bazel_rules_go//go/config:pure //cmd/runner:image_tarball;
docker load < bazel-bin/cmd/runner/image_tarball/tarball.tar;
docker tag <that SHA> rmisa.azurecr.io/runner


RUN server with --with_public_endpoint
2 changes: 1 addition & 1 deletion cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
env local
min_log_level debug

azure_event_parse_portfolio_topic parsed-portfolios-local
azure_event_parse_portfolio_complete_topic parsed-portfolios-local
azure_topic_location centralus-1

azure_storage_account rmipactalocal
Expand Down
81 changes: 58 additions & 23 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/RMI/pacta/blob"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
"github.com/google/uuid"
"github.com/namsral/flag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -167,19 +168,15 @@ type handler struct {
}

func parsePortfolioReq() (*task.ParsePortfolioRequest, error) {
rawAssetIDs := os.Getenv("ASSET_IDS")
if rawAssetIDs == "" {
return nil, errors.New("no ASSET_IDS given")
taskStr := os.Getenv("PARSE_PORTFOLIO_REQUEST")
if taskStr == "" {
return nil, errors.New("no PARSE_PORTFOLIO_REQUEST given")
}

var assetIDs []string
if err := json.NewDecoder(strings.NewReader(rawAssetIDs)).Decode(&assetIDs); err != nil {
return nil, fmt.Errorf("failed to load asset IDs: %w", err)
var task task.ParsePortfolioRequest
if err := json.NewDecoder(strings.NewReader(taskStr)).Decode(&task); err != nil {
return nil, fmt.Errorf("failed to load ParsePortfolioRequest: %w", err)
}

return &task.ParsePortfolioRequest{
AssetIDs: assetIDs,
}, nil
return &task, nil
}

func (h *handler) uploadDirectory(ctx context.Context, dirPath, container string) error {
Expand Down Expand Up @@ -250,15 +247,17 @@ func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

// TODO: Send a notification when parsing fails.
func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error {
h.logger.Info("cmd/runner/parsePortfolio-start")
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where
// the `process_portfolios.R` script expects it to be.
for _, assetID := range req.AssetIDs {
srcURI := blob.Join(h.blob.Scheme(), h.sourcePortfolioContainer, assetID)
for _, srcURI := range req.BlobURIs {
id := uuid.New().String()
// TODO: Probably set the CSV extension in the signed upload URL instead.
destPath := filepath.Join("/", "mnt", "raw_portfolios", assetID+".csv")
destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id))

if err := h.downloadBlob(ctx, srcURI, destPath); err != nil {
if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil {
return fmt.Errorf("failed to download raw portfolio blob: %w", err)
}
}
Expand Down Expand Up @@ -291,21 +290,38 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
}

// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
var out []string
var out []*task.ParsePortfolioResponseItem
for _, p := range paths {
destURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, filepath.Base(p))
if err := h.uploadBlob(ctx, p, destURI); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, destURI, err)
lineCount, err := countLines(p)
if err != nil {
return fmt.Errorf("failed to count lines in file %q: %w", p, err)
}
out = append(out, destURI)
fileName := filepath.Base(p)
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), h.destPortfolioContainer, fileName))
if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
extension := filepath.Ext(fileName)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err)
}
out = append(out, &task.ParsePortfolioResponseItem{
Blob: pacta.Blob{
FileName: fileName,
FileType: fileType,
BlobURI: blobURI,
},
LineCount: lineCount,
})
}

events := []publisher.Event{
{
Data: task.ParsePortfolioResponse{
TaskID: taskID,
AssetIDs: req.AssetIDs,
Outputs: out,
TaskID: taskID,
Request: req,
Outputs: out,
},
DataVersion: to.Ptr("1.0"),
EventType: to.Ptr("parse-portfolio-complete"),
Expand All @@ -315,6 +331,8 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
},
}

h.logger.Info("cmd/runner/parsePortfolio-prepub")

if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}
Expand All @@ -324,6 +342,23 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
return nil
}

func countLines(path string) (int, error) {
file, err := os.Open(path)
if err != nil {
return 0, fmt.Errorf("opening file failed: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("scanner.error returned: %w", err)
}
return lineCount, nil
}

func createReportReq() (*task.CreateReportRequest, error) {
pID := os.Getenv("PORTFOLIO_ID")
if pID == "" {
Expand Down
Loading

0 comments on commit 6505758

Please sign in to comment.