Skip to content

Commit 029c37d

Browse files
committed
Updates Parse Runner Structure
1 parent 1672438 commit 029c37d

File tree

9 files changed

+86
-65
lines changed

9 files changed

+86
-65
lines changed

azure/azevents/azevents.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,6 @@ func (s *Server) RegisterHandlers(r chi.Router) {
214214
}
215215

216216
// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
217-
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
217+
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)))
218218
})
219219
}

cmd/runner/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ go_library(
1818
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
1919
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
2020
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
21+
"@com_github_google_uuid//:uuid",
2122
"@com_github_namsral_flag//:flag",
2223
"@org_uber_go_zap//:zap",
2324
"@org_uber_go_zap//zapcore",

cmd/runner/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,14 @@ If you do want to actually run the full `runner` image on Azure, you can use:
1212
# Run the backend, tell it to create tasks as real Azure Container Apps Jobs.
1313
bazel run //scripts:run_apiserver -- --use_azure_runner
1414
```
15+
16+
### Creating a new docker image to run locally
17+
18+
```bash
19+
# Build the runner binary
20+
bazel build --@io_bazel_rules_go//go/config:pure //cmd/runner:image_tarball
21+
# Load the new image into docker, which will output a SHA256 value
22+
docker load < bazel-bin/cmd/runner/image_tarball/tarball.tar
23+
# Tag the runner image in order for it to be picked up locally. Don't push this to the registry!
24+
docker tag <SHA from previous step> rmisa.azurecr.io/runner
25+
```

cmd/runner/configs/local.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
env local
22
min_log_level debug
33

4-
azure_event_parse_portfolio_topic parsed-portfolios-local
4+
azure_event_parse_portfolio_complete_topic parsed-portfolios-local
55
azure_topic_location centralus-1
66

77
azure_storage_account rmipactalocal

cmd/runner/main.go

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/RMI/pacta/blob"
2727
"github.com/RMI/pacta/pacta"
2828
"github.com/RMI/pacta/task"
29+
"github.com/google/uuid"
2930
"github.com/namsral/flag"
3031
"go.uber.org/zap"
3132
"go.uber.org/zap/zapcore"
@@ -167,19 +168,15 @@ type handler struct {
167168
}
168169

169170
func parsePortfolioReq() (*task.ParsePortfolioRequest, error) {
170-
rawAssetIDs := os.Getenv("ASSET_IDS")
171-
if rawAssetIDs == "" {
172-
return nil, errors.New("no ASSET_IDS given")
171+
taskStr := os.Getenv("PARSE_PORTFOLIO_REQUEST")
172+
if taskStr == "" {
173+
return nil, errors.New("no PARSE_PORTFOLIO_REQUEST given")
173174
}
174-
175-
var assetIDs []string
176-
if err := json.NewDecoder(strings.NewReader(rawAssetIDs)).Decode(&assetIDs); err != nil {
177-
return nil, fmt.Errorf("failed to load asset IDs: %w", err)
175+
var task task.ParsePortfolioRequest
176+
if err := json.NewDecoder(strings.NewReader(taskStr)).Decode(&task); err != nil {
177+
return nil, fmt.Errorf("failed to load ParsePortfolioRequest: %w", err)
178178
}
179-
180-
return &task.ParsePortfolioRequest{
181-
AssetIDs: assetIDs,
182-
}, nil
179+
return &task, nil
183180
}
184181

185182
func (h *handler) uploadDirectory(ctx context.Context, dirPath, container string) error {
@@ -250,15 +247,15 @@ func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
250247
return nil
251248
}
252249

250+
// TODO: Send a notification when parsing fails.
253251
func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error {
254252
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where
255253
// the `process_portfolios.R` script expects it to be.
256-
for _, assetID := range req.AssetIDs {
257-
srcURI := blob.Join(h.blob.Scheme(), h.sourcePortfolioContainer, assetID)
254+
for _, srcURI := range req.BlobURIs {
255+
id := uuid.New().String()
258256
// TODO: Probably set the CSV extension in the signed upload URL instead.
259-
destPath := filepath.Join("/", "mnt", "raw_portfolios", assetID+".csv")
260-
261-
if err := h.downloadBlob(ctx, srcURI, destPath); err != nil {
257+
destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id))
258+
if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil {
262259
return fmt.Errorf("failed to download raw portfolio blob: %w", err)
263260
}
264261
}
@@ -291,21 +288,38 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
291288
}
292289

293290
// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
294-
var out []string
291+
var out []*task.ParsePortfolioResponseItem
295292
for _, p := range paths {
296-
destURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, filepath.Base(p))
297-
if err := h.uploadBlob(ctx, p, destURI); err != nil {
298-
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, destURI, err)
293+
lineCount, err := countLines(p)
294+
if err != nil {
295+
return fmt.Errorf("failed to count lines in file %q: %w", p, err)
296+
}
297+
fileName := filepath.Base(p)
298+
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), h.destPortfolioContainer, fileName))
299+
if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil {
300+
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
301+
}
302+
extension := filepath.Ext(fileName)
303+
fileType, err := pacta.ParseFileType(extension)
304+
if err != nil {
305+
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err)
299306
}
300-
out = append(out, destURI)
307+
out = append(out, &task.ParsePortfolioResponseItem{
308+
Blob: pacta.Blob{
309+
FileName: fileName,
310+
FileType: fileType,
311+
BlobURI: blobURI,
312+
},
313+
LineCount: lineCount,
314+
})
301315
}
302316

303317
events := []publisher.Event{
304318
{
305319
Data: task.ParsePortfolioResponse{
306-
TaskID: taskID,
307-
AssetIDs: req.AssetIDs,
308-
Outputs: out,
320+
TaskID: taskID,
321+
Request: req,
322+
Outputs: out,
309323
},
310324
DataVersion: to.Ptr("1.0"),
311325
EventType: to.Ptr("parse-portfolio-complete"),
@@ -324,6 +338,23 @@ func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.
324338
return nil
325339
}
326340

341+
func countLines(path string) (int, error) {
342+
file, err := os.Open(path)
343+
if err != nil {
344+
return 0, fmt.Errorf("opening file failed: %w", err)
345+
}
346+
defer file.Close()
347+
scanner := bufio.NewScanner(file)
348+
lineCount := 0
349+
for scanner.Scan() {
350+
lineCount++
351+
}
352+
if err := scanner.Err(); err != nil {
353+
return 0, fmt.Errorf("scanner.error returned: %w", err)
354+
}
355+
return lineCount, nil
356+
}
357+
327358
func createReportReq() (*task.CreateReportRequest, error) {
328359
pID := os.Getenv("PORTFOLIO_ID")
329360
if pID == "" {

cmd/runner/taskrunner/taskrunner.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,18 +83,18 @@ func New(cfg *Config) (*TaskRunner, error) {
8383
}
8484

8585
func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error) {
86-
var buf bytes.Buffer
87-
if err := json.NewEncoder(&buf).Encode(req.AssetIDs); err != nil {
88-
return "", "", fmt.Errorf("failed to encode asset IDs: %w", err)
86+
var taskBuffer bytes.Buffer
87+
if err := json.NewEncoder(&taskBuffer).Encode(req); err != nil {
88+
return "", "", fmt.Errorf("failed to encode ParsePortfolioRequest: %w", err)
8989
}
9090
return tr.run(ctx, []task.EnvVar{
9191
{
9292
Key: "TASK_TYPE",
9393
Value: string(task.ParsePortfolio),
9494
},
9595
{
96-
Key: "ASSET_IDS",
97-
Value: buf.String(),
96+
Key: "PARSE_PORTFOLIO_REQUEST",
97+
Value: taskBuffer.String(),
9898
},
9999
})
100100
}

cmd/server/pactasrv/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ go_library(
2424
"//session",
2525
"//task",
2626
"@com_github_go_chi_jwtauth_v5//:jwtauth",
27-
"@com_github_google_uuid//:uuid",
2827
"@org_uber_go_zap//:zap",
2928
],
3029
)

cmd/server/pactasrv/portfolio.go

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,16 @@ package pactasrv
33
import (
44
"context"
55

6-
"github.com/RMI/pacta/blob"
76
"github.com/RMI/pacta/oapierr"
87
api "github.com/RMI/pacta/openapi/pacta"
9-
"github.com/RMI/pacta/task"
10-
"github.com/google/uuid"
11-
"go.uber.org/zap"
128
)
139

1410
func (s *Server) CreatePortfolioAsset(ctx context.Context, req api.CreatePortfolioAssetRequestObject) (api.CreatePortfolioAssetResponseObject, error) {
15-
id := uuid.NewString()
16-
uri := blob.Join(s.Blob.Scheme(), s.PorfolioUploadURI, id)
17-
signed, err := s.Blob.SignedUploadURL(ctx, uri)
18-
if err != nil {
19-
return nil, oapierr.Internal("failed to sign blob URI", zap.String("uri", uri), zap.Error(err))
20-
}
21-
return api.CreatePortfolioAsset200JSONResponse{
22-
UploadUrl: signed,
23-
AssetId: id,
24-
}, nil
11+
return nil, oapierr.NotImplemented("no longer implemented")
2512
}
2613

2714
func (s *Server) ParsePortfolio(ctx context.Context, req api.ParsePortfolioRequestObject) (api.ParsePortfolioResponseObject, error) {
28-
taskID, runnerID, err := s.TaskRunner.ParsePortfolio(ctx, &task.ParsePortfolioRequest{
29-
AssetIDs: req.Body.AssetIds,
30-
// PortfolioID: req.Body.PortfolioID,
31-
})
32-
if err != nil {
33-
return nil, oapierr.Internal("failed to start task", zap.Error(err))
34-
}
35-
s.Logger.Info("triggered parse portfolio task",
36-
zap.String("task_id", string(taskID)),
37-
zap.String("task_runner_id", string(runnerID)))
38-
return api.ParsePortfolio200JSONResponse{
39-
TaskId: string(taskID),
40-
}, nil
15+
return nil, oapierr.NotImplemented("no longer implemented")
4116
}
4217

4318
// (GET /portfolios)

task/task.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,19 @@ const (
2222
)
2323

2424
type ParsePortfolioRequest struct {
25-
// Note: This is temporary just to test the full end-to-end flow. We'll likely
26-
// want to reference assets by the portfolio (group?) they were uploaded to.
27-
AssetIDs []string
25+
IncompleteUploadIDs []pacta.IncompleteUploadID
26+
BlobURIs []pacta.BlobURI
27+
}
28+
29+
type ParsePortfolioResponseItem struct {
30+
Blob pacta.Blob
31+
LineCount int
2832
}
2933

3034
type ParsePortfolioResponse struct {
31-
TaskID ID
32-
AssetIDs []string
33-
Outputs []string
35+
TaskID ID
36+
Request *ParsePortfolioRequest
37+
Outputs []*ParsePortfolioResponseItem
3438
}
3539

3640
type CreateReportRequest struct {

0 commit comments

Comments
 (0)