Skip to content

Commit 2625436

Browse files
authored
Add support for running dashboards (#218)
1 parent 4ee80b0 commit 2625436

32 files changed

+587
-39
lines changed

WORKSPACE

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,11 @@ oci_pull(
117117
image = "ghcr.io/rmi-pacta/workflow.portfolio.parsing",
118118
platforms = ["linux/amd64"],
119119
)
120+
121+
oci_pull(
122+
name = "dashboard_base",
123+
# This digest is of the 'main' tag as of 2024-12-19
124+
digest = "sha256:dea705bac105a0847a6070f1914298fb7145273bc4a56b76ba8ec80ce427e269",
125+
image = "ghcr.io/rmi-pacta/workflow.pacta.dashboard",
126+
platforms = ["linux/amd64"],
127+
)

async/async.go

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -246,27 +246,38 @@ type ReportInputPortfolio struct {
246246
Name string `json:"name"`
247247
}
248248

249-
type ReportEnv struct {
249+
type DashboardInput struct {
250+
Portfolio DashboardInputPortfolio `json:"portfolio"`
251+
Inherit string `json:"inherit"`
252+
}
253+
254+
type DashboardInputPortfolio struct {
255+
Files []string `json:"files"`
256+
HoldingsDate string `json:"holdingsDate"`
257+
Name string `json:"name"`
258+
}
259+
260+
type TaskEnv struct {
250261
rootDir string
251262

252263
// These are mounted in from externally.
253264
benchmarksDir string
254265
pactaDataDir string
255266
}
256267

257-
func initReportEnv(benchmarkDir, pactaDataDir, baseDir string) (*ReportEnv, error) {
268+
func initEnv(benchmarkDir, pactaDataDir, baseDir, taskName string) (*TaskEnv, error) {
258269
// Make sure the base directory exists first.
259270
if err := os.MkdirAll(baseDir, 0700); err != nil {
260271
return nil, fmt.Errorf("failed to create base input dir: %w", err)
261272
}
262273
// We create temp subdirectories, because while this code currently executes in
263274
// a new container for each invocation, that might not always be the case.
264-
rootDir, err := os.MkdirTemp(baseDir, "create-report")
275+
rootDir, err := os.MkdirTemp(baseDir, taskName)
265276
if err != nil {
266277
return nil, fmt.Errorf("failed to create temp dir for input CSVs: %w", err)
267278
}
268279

269-
re := &ReportEnv{
280+
re := &TaskEnv{
270281
rootDir: rootDir,
271282
benchmarksDir: benchmarkDir,
272283
pactaDataDir: pactaDataDir,
@@ -293,15 +304,15 @@ const (
293304
SummaryOutputDir = ReportDir("summary-output")
294305
)
295306

296-
func (r *ReportEnv) outputDirs() []string {
307+
func (r *TaskEnv) outputDirs() []string {
297308
return []string{
298309
r.pathForDir(AnalysisOutputDir),
299310
r.pathForDir(ReportOutputDir),
300311
r.pathForDir(SummaryOutputDir),
301312
}
302313
}
303314

304-
func (r *ReportEnv) asEnvVars() []string {
315+
func (r *TaskEnv) asEnvVars() []string {
305316
return []string{
306317
"BENCHMARKS_DIR=" + r.benchmarksDir,
307318
"PACTA_DATA_DIR=" + r.pactaDataDir,
@@ -315,11 +326,11 @@ func (r *ReportEnv) asEnvVars() []string {
315326
}
316327
}
317328

318-
func (r *ReportEnv) pathForDir(d ReportDir) string {
329+
func (r *TaskEnv) pathForDir(d ReportDir) string {
319330
return filepath.Join(r.rootDir, string(d))
320331
}
321332

322-
func (r *ReportEnv) makeDirectories() error {
333+
func (r *TaskEnv) makeDirectories() error {
323334
var rErr error
324335
makeDir := func(reportDir ReportDir) {
325336
if rErr != nil {
@@ -349,6 +360,104 @@ func (r *ReportEnv) makeDirectories() error {
349360
return nil
350361
}
351362

363+
func (h *Handler) CreateDashboard(ctx context.Context, taskID task.ID, req *task.CreateDashboardRequest, dashboardContainer string) error {
364+
if n := len(req.BlobURIs); n != 1 {
365+
return fmt.Errorf("expected exactly one blob URI as input, got %d", n)
366+
}
367+
blobURI := req.BlobURIs[0]
368+
369+
// We use this instead of /mnt/... because the base image (quite
370+
// reasonably) uses a non-root user, so we can't be creating directories in the
371+
// root filesystem all willy nilly.
372+
baseDir := filepath.Join("/", "home", "workflow-pacta-webapp")
373+
374+
dashEnv, err := initEnv(h.benchmarkDir, h.pactaDataDir, baseDir, "create-dashboard")
375+
if err != nil {
376+
return fmt.Errorf("failed to init report env: %w", err)
377+
}
378+
379+
// Load the parsed portfolio from blob storage, place it in our PORFOLIO_DIR,
380+
// where the `prepare_dashboard_data.R` script expects it to be.
381+
fileNameWithExt := filepath.Base(string(blobURI))
382+
if !strings.HasSuffix(fileNameWithExt, ".csv") {
383+
return fmt.Errorf("given blob wasn't a CSV-formatted portfolio, %q", fileNameWithExt)
384+
}
385+
destPath := filepath.Join(dashEnv.pathForDir(PortfoliosDir), fileNameWithExt)
386+
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
387+
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
388+
}
389+
390+
inp := DashboardInput{
391+
Portfolio: DashboardInputPortfolio{
392+
Files: []string{fileNameWithExt},
393+
HoldingsDate: "2023-12-31", // TODO(#206)
394+
Name: "FooPortfolio", // TODO(#206)
395+
},
396+
Inherit: "GENERAL_2023Q4", // TODO(#206): Should this be configurable
397+
}
398+
399+
var inpJSON bytes.Buffer
400+
if err := json.NewEncoder(&inpJSON).Encode(inp); err != nil {
401+
return fmt.Errorf("failed to encode report input as JSON: %w", err)
402+
}
403+
404+
cmd := exec.CommandContext(ctx,
405+
"/usr/local/bin/Rscript",
406+
"--vanilla", "/workflow.pacta.dashboard/inst/extdata/scripts/prepare_dashboard_data.R",
407+
inpJSON.String())
408+
409+
cmd.Env = append(cmd.Env, dashEnv.asEnvVars()...)
410+
cmd.Env = append(cmd.Env,
411+
"LOG_LEVEL=DEBUG",
412+
"HOME=/root", /* Required by pandoc */
413+
)
414+
cmd.Stdout = os.Stdout
415+
cmd.Stderr = os.Stderr
416+
417+
if err := cmd.Run(); err != nil {
418+
return fmt.Errorf("failed to run pacta dashboard script: %w", err)
419+
}
420+
421+
var artifacts []*task.AnalysisArtifact
422+
uploadDir := func(dir string) error {
423+
aas, err := h.uploadDirectory(ctx, dir, dashboardContainer, req.AnalysisID)
424+
if err != nil {
425+
return fmt.Errorf("failed to upload report directory: %w", err)
426+
}
427+
artifacts = append(artifacts, aas...)
428+
return nil
429+
}
430+
431+
for _, outDir := range dashEnv.outputDirs() {
432+
if err := uploadDir(outDir); err != nil {
433+
return fmt.Errorf("failed to upload artifacts %q: %w", outDir, err)
434+
}
435+
}
436+
437+
events := []publisher.Event{
438+
{
439+
Data: task.CreateDashboardResponse{
440+
TaskID: taskID,
441+
Request: req,
442+
Artifacts: artifacts,
443+
},
444+
DataVersion: to.Ptr("1.0"),
445+
EventType: to.Ptr("created-dashboard"),
446+
EventTime: to.Ptr(time.Now()),
447+
ID: to.Ptr(string(taskID)),
448+
Subject: to.Ptr(string(taskID)),
449+
},
450+
}
451+
452+
if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil {
453+
return fmt.Errorf("failed to publish event: %w", err)
454+
}
455+
456+
h.logger.Info("created report", zap.String("task_id", string(taskID)))
457+
458+
return nil
459+
}
460+
352461
func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error {
353462
if n := len(req.BlobURIs); n != 1 {
354463
return fmt.Errorf("expected exactly one blob URI as input, got %d", n)
@@ -360,7 +469,7 @@ func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.Cr
360469
// root filesystem all willy nilly.
361470
baseDir := filepath.Join("/", "home", "workflow-pacta-webapp")
362471

363-
reportEnv, err := initReportEnv(h.benchmarkDir, h.pactaDataDir, baseDir)
472+
reportEnv, err := initEnv(h.benchmarkDir, h.pactaDataDir, baseDir, "create-report")
364473
if err != nil {
365474
return fmt.Errorf("failed to init report env: %w", err)
366475
}

async/req.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ func LoadParsePortfolioRequestFromEnv() (*task.ParsePortfolioRequest, error) {
1414
return loadFromEnv[task.ParsePortfolioRequest]("PARSE_PORTFOLIO_REQUEST", "ParsePortfolioRequest")
1515
}
1616

17+
func LoadCreateDashboardRequestFromEnv() (*task.CreateDashboardRequest, error) {
18+
return loadFromEnv[task.CreateDashboardRequest]("CREATE_DASHBOARD_REQUEST", "CreateDashboardRequest")
19+
}
20+
1721
func LoadCreateAuditRequestFromEnv() (*task.CreateAuditRequest, error) {
1822
return loadFromEnv[task.CreateAuditRequest]("CREATE_AUDIT_REQUEST", "CreateAuditRequest")
1923
}

azure/azevents/azevents.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,14 @@ func (s *Server) handleEventGrid(w http.ResponseWriter, r *http.Request) {
276276
return
277277
}
278278
s.handleCreatedReport(req.ID, &resp, w)
279+
case "created-dashboard":
280+
var resp task.CreateDashboardResponse
281+
if err := json.Unmarshal(req.Data, &resp); err != nil {
282+
s.logger.Error("failed to parse event data as CreateDashboardResponse", zap.String("event_grid_id", req.ID), zap.Error(err))
283+
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
284+
return
285+
}
286+
s.handleCreatedDashboard(req.ID, &resp, w)
279287
default:
280288
s.logger.Error("unexpected event type", zap.String("event_grid_id", req.ID), zap.String("event_type", req.EventType))
281289
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
@@ -400,6 +408,15 @@ func (s *Server) handleCreatedReport(id string, resp *task.CreateReportResponse,
400408
w)
401409
}
402410

411+
func (s *Server) handleCreatedDashboard(id string, resp *task.CreateDashboardResponse, w http.ResponseWriter) {
412+
s.handleCompletedAnalysis(
413+
pacta.AnalysisType_Dashboard,
414+
resp.Request.AnalysisID,
415+
id,
416+
resp.Artifacts,
417+
w)
418+
}
419+
403420
func (s *Server) handleCompletedAnalysis(
404421
analysisType pacta.AnalysisType,
405422
analysisID pacta.AnalysisID,

cmd/dashboard/BUILD.bazel

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
2+
load("@rules_pkg//:pkg.bzl", "pkg_tar")
3+
load("@rules_oci//oci:defs.bzl", "oci_image", "oci_push", "oci_tarball")
4+
5+
go_library(
6+
name = "dashboard_lib",
7+
srcs = ["main.go"],
8+
importpath = "github.com/RMI/pacta/cmd/dashboard",
9+
visibility = ["//visibility:private"],
10+
deps = [
11+
"//async",
12+
"//azure/azblob",
13+
"//azure/azcreds",
14+
"//azure/azlog",
15+
"//task",
16+
"@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity",
17+
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
18+
"@com_github_namsral_flag//:flag",
19+
"@org_uber_go_zap//:zap",
20+
"@org_uber_go_zap//zapcore",
21+
"@org_uber_go_zap_exp//zapfield",
22+
],
23+
)
24+
25+
go_binary(
26+
name = "dashboard",
27+
embed = [":dashboard_lib"],
28+
visibility = ["//visibility:public"],
29+
)
30+
31+
pkg_tar(
32+
name = "dashboard_tar",
33+
srcs = [":dashboard"],
34+
)
35+
36+
filegroup(
37+
name = "configs",
38+
srcs = glob(["configs/**"]),
39+
visibility = ["//visibility:public"],
40+
)
41+
42+
pkg_tar(
43+
name = "configs_tar",
44+
srcs = [":configs"],
45+
package_dir = "/configs",
46+
strip_prefix = "/cmd/dashboard/configs",
47+
)
48+
49+
oci_image(
50+
name = "image",
51+
base = "@dashboard_base",
52+
entrypoint = ["/dashboard"],
53+
tars = [
54+
":dashboard_tar",
55+
":configs_tar",
56+
],
57+
)
58+
59+
oci_push(
60+
name = "push_image",
61+
image = ":image",
62+
remote_tags = ["latest"],
63+
repository = "rmisppactaweupatdev.azurecr.io/pactadashboard",
64+
)
65+
66+
# Note: This tarball is provided for local testing of the Docker image, see the README.md for details on usage.
67+
oci_tarball(
68+
name = "image_tarball",
69+
image = ":image",
70+
repo_tags = [],
71+
)

cmd/dashboard/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Dashboard
2+
3+
This directory contains the `dashboard` binary, which acts as a thin shim around the PACTA [`workflow.pacta.dashboard` tooling](https://github.com/RMI-PACTA/workflow.pacta.dashboard), running tasks created via either Azure Container App Jobs (via the `aztask` package) or local Docker (`dockertask`), loading relevant blobs, and writing relevant outputs.
4+
5+
## Running locally
6+
7+
The `dashboard` binary doesn't need to be run locally in order to test PACTA processing. By default, the backend API server will trigger PACTA dashboard creation runs against a local Docker daemon, testing most of the run-handling code in the process (e.g. file handling, task execution, etc).
8+
9+
If you do want to actually run the full `dashboard` image on Azure, you can use:
10+
11+
```bash
12+
# Run the backend, tell it to create tasks as real Azure Container Apps Jobs.
13+
bazel run //scripts:run_server -- --use_azure_runner
14+
```
15+
16+
### Creating a new docker image to run locally
17+
18+
When developing the runner, you have two options:
19+
20+
* **Test against local Docker** - Run the server **without** the `--use_azure_runner`, which means async tasks will run locally, using `docker run ...`. To test local runner changes, you can build and tag a runner image locally with `bazel run //scripts:build_and_load_dashboard`.
21+
* After running the script, the updated runner will immediately be available, no need to restart the server.
22+
* This is the option you'll want to use most of the time.
23+
* **Test against Azure Container Apps Jobs** - Run the server **with** the `--use_azure_runner`, which means async tasks will be run on Azure, created via the Azure API. To test changes here, you can build and tag a runner image locally with `bazel run //scripts:build_and_load_dashboard`, and then push it to Azure with `docker push rmisa.azurecr.io/pactadashboard:latest`
24+
* You generally won't need to use this option unless you're testing something very specific about the runner's integration with Azure, as the runner code is identical whether run locally or on Azure.
25+
26+
### Cleaning up old dashboard containers
27+
28+
By default, we don't auto-remove stopped containers (i.e. finished dashboard tasks), to give developers a chance to review the logs (e.g. with `docker logs <sha>`). To clean up all completed runs at once, run:
29+
30+
```bash
31+
docker rm $(docker ps -a -q -f "status=exited" -f "ancestor=rmisa.azurecr.io/pactadashboard:latest")
32+
```

cmd/dashboard/configs/dev.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
env dev
2+
min_log_level warn
3+
4+
azure_event_topic pacta-events-dev
5+
azure_topic_location centralus-1
6+
7+
azure_storage_account rmipactadev
8+
azure_dest_portfolio_container parsedportfolios

cmd/dashboard/configs/local.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
env local
2+
min_log_level debug
3+
4+
azure_event_topic pacta-events-local
5+
azure_topic_location centralus-1
6+
7+
azure_storage_account rmipactalocal
8+
azure_dest_portfolio_container parsedportfolios

cmd/dashboard/configs/test.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
env test
2+
min_log_level warn
3+
4+
azure_event_topic pacta-events-test
5+
azure_topic_location westeurope-1
6+
7+
azure_storage_account rmipactatest
8+
azure_dest_portfolio_container parsedportfolios
9+

0 commit comments

Comments
 (0)