Skip to content

Commit

Permalink
Updates Parse Runner Structure (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs authored Dec 6, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 46b31cf commit 9e88ed4
Showing 9 changed files with 89 additions and 63 deletions.
2 changes: 1 addition & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
@@ -214,6 +214,6 @@ func (s *Server) RegisterHandlers(r chi.Router) {
}

// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)))
})
}
1 change: 1 addition & 0 deletions cmd/runner/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ go_library(
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
"@com_github_google_uuid//:uuid",
"@com_github_namsral_flag//:flag",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
11 changes: 11 additions & 0 deletions cmd/runner/README.md
Original file line number Diff line number Diff line change
@@ -12,3 +12,14 @@ If you do want to actually run the full `runner` image on Azure, you can use:
# Run the backend, tell it to create tasks as real Azure Container Apps Jobs.
bazel run //scripts:run_apiserver -- --use_azure_runner
```

### Creating a new docker image to run locally

```bash
# Build the runner binary
bazel build --@io_bazel_rules_go//go/config:pure //cmd/runner:image_tarball
# Load the new image into docker, which will output a SHA256 value
docker load < bazel-bin/cmd/runner/image_tarball/tarball.tar
# Tag the runner image in order for it to be picked up locally. Don't push this to the registry!
docker tag <SHA from previous step> rmisa.azurecr.io/runner
```
2 changes: 1 addition & 1 deletion cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
env local
min_log_level debug

azure_event_parse_portfolio_topic parsed-portfolios-local
azure_event_parse_portfolio_complete_topic parsed-portfolios-local
azure_topic_location centralus-1

azure_storage_account rmipactalocal
80 changes: 56 additions & 24 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (
"github.com/RMI/pacta/blob"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
"github.com/google/uuid"
"github.com/namsral/flag"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@@ -167,19 +168,15 @@ type handler struct {
}

func parsePortfolioReq() (*task.ParsePortfolioRequest, error) {
rawAssetIDs := os.Getenv("ASSET_IDS")
if rawAssetIDs == "" {
return nil, errors.New("no ASSET_IDS given")
taskStr := os.Getenv("PARSE_PORTFOLIO_REQUEST")
if taskStr == "" {
return nil, errors.New("no PARSE_PORTFOLIO_REQUEST given")
}

var assetIDs []string
if err := json.NewDecoder(strings.NewReader(rawAssetIDs)).Decode(&assetIDs); err != nil {
return nil, fmt.Errorf("failed to load asset IDs: %w", err)
var task task.ParsePortfolioRequest
if err := json.NewDecoder(strings.NewReader(taskStr)).Decode(&task); err != nil {
return nil, fmt.Errorf("failed to load ParsePortfolioRequest: %w", err)
}

return &task.ParsePortfolioRequest{
AssetIDs: assetIDs,
}, nil
return &task, nil
}

func (h *handler) uploadDirectory(ctx context.Context, dirPath, container string) error {
@@ -250,15 +247,15 @@ func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

// TODO: Send a notification when parsing fails.
func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error {
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where
// the `process_portfolios.R` script expects it to be.
for _, assetID := range req.AssetIDs {
srcURI := blob.Join(h.blob.Scheme(), h.sourcePortfolioContainer, assetID)
for _, srcURI := range req.BlobURIs {
id := uuid.New().String()
// TODO: Probably set the CSV extension in the signed upload URL instead.
destPath := filepath.Join("/", "mnt", "raw_portfolios", assetID+".csv")

if err := h.downloadBlob(ctx, srcURI, destPath); err != nil {
destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id))
if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil {
return fmt.Errorf("failed to download raw portfolio blob: %w", err)
}
}
@@ -291,21 +288,38 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
}

// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
var out []string
var out []*task.ParsePortfolioResponseItem
for _, p := range paths {
destURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, filepath.Base(p))
if err := h.uploadBlob(ctx, p, destURI); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, destURI, err)
lineCount, err := countCSVLines(p)
if err != nil {
return fmt.Errorf("failed to count lines in file %q: %w", p, err)
}
fileName := filepath.Base(p)
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), h.destPortfolioContainer, fileName))
if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
extension := filepath.Ext(fileName)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err)
}
out = append(out, destURI)
out = append(out, &task.ParsePortfolioResponseItem{
Blob: pacta.Blob{
FileName: fileName,
FileType: fileType,
BlobURI: blobURI,
},
LineCount: lineCount,
})
}

events := []publisher.Event{
{
Data: task.ParsePortfolioResponse{
TaskID: taskID,
AssetIDs: req.AssetIDs,
Outputs: out,
TaskID: taskID,
Request: req,
Outputs: out,
},
DataVersion: to.Ptr("1.0"),
EventType: to.Ptr("parse-portfolio-complete"),
@@ -324,6 +338,24 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
return nil
}

func countCSVLines(path string) (int, error) {
file, err := os.Open(path)
if err != nil {
return 0, fmt.Errorf("opening file failed: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("scanner.error returned: %w", err)
}
// Subtract 1 for the header row
return lineCount - 1, nil
}

func createReportReq() (*task.CreateReportRequest, error) {
pID := os.Getenv("PORTFOLIO_ID")
if pID == "" {
10 changes: 7 additions & 3 deletions cmd/runner/taskrunner/taskrunner.go
Original file line number Diff line number Diff line change
@@ -84,16 +84,20 @@ func New(cfg *Config) (*TaskRunner, error) {

func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req.AssetIDs); err != nil {
return "", "", fmt.Errorf("failed to encode asset IDs: %w", err)
if err := json.NewEncoder(&buf).Encode(req); err != nil {
return "", "", fmt.Errorf("failed to encode ParsePortfolioRequest: %w", err)
}
value := buf.String()
if len(value) > 128*1024 {
return "", "", fmt.Errorf("ParsePortfolioRequest is too large: %d bytes > 128 kb", len(value))
}
return tr.run(ctx, []task.EnvVar{
{
Key: "TASK_TYPE",
Value: string(task.ParsePortfolio),
},
{
Key: "ASSET_IDS",
Key: "PARSE_PORTFOLIO_REQUEST",
Value: buf.String(),
},
})
1 change: 0 additions & 1 deletion cmd/server/pactasrv/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -25,7 +25,6 @@ go_library(
"//session",
"//task",
"@com_github_go_chi_jwtauth_v5//:jwtauth",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
],
)
29 changes: 2 additions & 27 deletions cmd/server/pactasrv/portfolio.go
Original file line number Diff line number Diff line change
@@ -3,41 +3,16 @@ package pactasrv
import (
"context"

"github.com/RMI/pacta/blob"
"github.com/RMI/pacta/oapierr"
api "github.com/RMI/pacta/openapi/pacta"
"github.com/RMI/pacta/task"
"github.com/google/uuid"
"go.uber.org/zap"
)

func (s *Server) CreatePortfolioAsset(ctx context.Context, req api.CreatePortfolioAssetRequestObject) (api.CreatePortfolioAssetResponseObject, error) {
id := uuid.NewString()
uri := blob.Join(s.Blob.Scheme(), s.PorfolioUploadURI, id)
signed, err := s.Blob.SignedUploadURL(ctx, uri)
if err != nil {
return nil, oapierr.Internal("failed to sign blob URI", zap.String("uri", uri), zap.Error(err))
}
return api.CreatePortfolioAsset200JSONResponse{
UploadUrl: signed,
AssetId: id,
}, nil
return nil, oapierr.NotImplemented("no longer implemented")
}

func (s *Server) ParsePortfolio(ctx context.Context, req api.ParsePortfolioRequestObject) (api.ParsePortfolioResponseObject, error) {
taskID, runnerID, err := s.TaskRunner.ParsePortfolio(ctx, &task.ParsePortfolioRequest{
AssetIDs: req.Body.AssetIds,
// PortfolioID: req.Body.PortfolioID,
})
if err != nil {
return nil, oapierr.Internal("failed to start task", zap.Error(err))
}
s.Logger.Info("triggered parse portfolio task",
zap.String("task_id", string(taskID)),
zap.String("task_runner_id", string(runnerID)))
return api.ParsePortfolio200JSONResponse{
TaskId: string(taskID),
}, nil
return nil, oapierr.NotImplemented("no longer implemented")
}

// (GET /portfolios)
16 changes: 10 additions & 6 deletions task/task.go
Original file line number Diff line number Diff line change
@@ -22,15 +22,19 @@ const (
)

type ParsePortfolioRequest struct {
// Note: This is temporary just to test the full end-to-end flow. We'll likely
// want to reference assets by the portfolio (group?) they were uploaded to.
AssetIDs []string
IncompleteUploadIDs []pacta.IncompleteUploadID
BlobURIs []pacta.BlobURI
}

type ParsePortfolioResponseItem struct {
Blob pacta.Blob
LineCount int
}

type ParsePortfolioResponse struct {
TaskID ID
AssetIDs []string
Outputs []string
TaskID ID
Request *ParsePortfolioRequest
Outputs []*ParsePortfolioResponseItem
}

type CreateReportRequest struct {

0 comments on commit 9e88ed4

Please sign in to comment.