Skip to content

Commit

Permalink
Add support for audits (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
bcspragu authored Jan 15, 2025

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 104beb1 commit 6c542ef
Showing 10 changed files with 164 additions and 20 deletions.
4 changes: 2 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
@@ -104,8 +104,8 @@ oci_pull(

oci_pull(
name = "runner_base",
# This digest is of the nightly/main tag as of 2024-12-06
digest = "sha256:35c8eaac721350ca6ef3bfb3e6080c5412ddb9061299c62bb4fd2fc6df8d0227",
# This digest is of the nightly/main tag as of 2024-12-28
digest = "sha256:b1bcfaf52c418faabcd6b6fcbbec0f27f543a10b4a9384d75811d03dbba0d4d7",
image = "ghcr.io/rmi-pacta/workflow.pacta.webapp",
platforms = ["linux/amd64"],
)
107 changes: 104 additions & 3 deletions async/async.go
Original file line number Diff line number Diff line change
@@ -86,7 +86,6 @@ func New(cfg *Config) (*Handler, error) {

// TODO: Send a notification when parsing fails.
func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest, destPortfolioContainer string) error {

// Make the directories we require first. We use these instead of
// /mnt/{input,output} because the base image (quite reasonably) uses a non-root
// user, so we can't be creating directories in the root filesystem all willy
@@ -231,8 +230,99 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.
return nil
}

func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error {
return errors.New("not implemented")
func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest, auditContainer string) error {
if n := len(req.BlobURIs); n != 1 {
return fmt.Errorf("expected exactly one blob URI as input, got %d", n)
}
blobURI := req.BlobURIs[0]

// We use this instead of /mnt/... because the base image (quite
// reasonably) uses a non-root user, so we can't be creating directories in the
// root filesystem all willy nilly.
baseDir := filepath.Join("/", "home", "workflow-pacta-webapp")

// We don't use the benchmark or PACTA data here, it's just convenient to use the same directory-creating harness for everything.
auditEnv, err := initEnv(h.benchmarkDir, h.pactaDataDir, baseDir, "create-audit")
if err != nil {
return fmt.Errorf("failed to init report env: %w", err)
}

// Load the parsed portfolio from blob storage, place it in our PORFOLIO_DIR,
// where the `run_pacta.R` script expects it to be.
fileNameWithExt := filepath.Base(string(blobURI))
if !strings.HasSuffix(fileNameWithExt, ".csv") {
return fmt.Errorf("given blob wasn't a CSV-formatted portfolio, %q", fileNameWithExt)
}
destPath := filepath.Join(auditEnv.pathForDir(PortfoliosDir), fileNameWithExt)
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
}

inp := AuditInput{
Portfolio: AuditInputPortfolio{
Files: []string{fileNameWithExt},
HoldingsDate: "2023-12-31", // TODO(#206)
Name: "FooPortfolio", // TODO(#206)
},
Inherit: "GENERAL_2023Q4", // TODO(#206): Should this be configurable
}

var inpJSON bytes.Buffer
if err := json.NewEncoder(&inpJSON).Encode(inp); err != nil {
return fmt.Errorf("failed to encode audit input as JSON: %w", err)
}

cmd := exec.CommandContext(ctx, "/workflow.pacta.webapp/inst/extdata/scripts/run_audit.sh", inpJSON.String())
cmd.Env = append(cmd.Env, auditEnv.asEnvVars()...)
cmd.Env = append(cmd.Env,
"LOG_LEVEL=DEBUG",
"HOME=/root", /* Required by pandoc */
)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to run pacta test CLI: %w", err)
}

var artifacts []*task.AnalysisArtifact
uploadDir := func(dir string) error {
aas, err := h.uploadDirectory(ctx, dir, auditContainer, req.AnalysisID)
if err != nil {
return fmt.Errorf("failed to upload report directory: %w", err)
}
artifacts = append(artifacts, aas...)
return nil
}

for _, outDir := range auditEnv.outputDirs() {
if err := uploadDir(outDir); err != nil {
return fmt.Errorf("failed to upload artifacts %q: %w", outDir, err)
}
}

events := []publisher.Event{
{
Data: task.CreateAuditResponse{
TaskID: taskID,
Request: req,
Artifacts: artifacts,
},
DataVersion: to.Ptr("1.0"),
EventType: to.Ptr("created-audit"),
EventTime: to.Ptr(time.Now()),
ID: to.Ptr(string(taskID)),
Subject: to.Ptr(string(taskID)),
},
}

if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil {
return fmt.Errorf("failed to publish event: %w", err)
}

h.logger.Info("created report", zap.String("task_id", string(taskID)))

return nil
}

type ReportInput struct {
@@ -246,6 +336,17 @@ type ReportInputPortfolio struct {
Name string `json:"name"`
}

type AuditInput struct {
Portfolio AuditInputPortfolio `json:"portfolio"`
Inherit string `json:"inherit"`
}

type AuditInputPortfolio struct {
Files []string `json:"files"`
HoldingsDate string `json:"holdingsDate"`
Name string `json:"name"`
}

type DashboardInput struct {
Portfolio DashboardInputPortfolio `json:"portfolio"`
Inherit string `json:"inherit"`
1 change: 1 addition & 0 deletions cmd/runner/configs/dev.conf
Original file line number Diff line number Diff line change
@@ -6,3 +6,4 @@ azure_topic_location centralus-1

azure_storage_account rmipactadev
azure_report_container reports
azure_audit_container audits
1 change: 1 addition & 0 deletions cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ azure_topic_location centralus-1

azure_storage_account rmipactalocal
azure_report_container reports
azure_audit_container audits

benchmark_dir /mnt/workflow-data/benchmarks/2023Q4_20240529T002355Z
pacta_data_dir /mnt/workflow-data/pacta-data/2023Q4_20240424T120055Z
1 change: 1 addition & 0 deletions cmd/runner/configs/test.conf
Original file line number Diff line number Diff line change
@@ -6,3 +6,4 @@ azure_topic_location westeurope-1

azure_storage_account rmipactatest
azure_report_container reports
azure_audit_container audits
5 changes: 4 additions & 1 deletion cmd/runner/main.go
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ func run(args []string) error {

azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations")
azReportContainer = fs.String("azure_report_container", "", "The container in the storage account where we write generated portfolio reports to")
azAuditContainer = fs.String("azure_audit_container", "", "The container in the storage account where we write generated portfolio audits to")

minLogLevel zapcore.Level = zapcore.DebugLevel
)
@@ -97,7 +98,9 @@ func run(args []string) error {
task.CreateReport: toRunFn(async.LoadCreateReportRequestFromEnv, func(ctx context.Context, id task.ID, req *task.CreateReportRequest) error {
return h.CreateReport(ctx, id, req, *azReportContainer)
}),
task.CreateAudit: toRunFn(async.LoadCreateAuditRequestFromEnv, h.CreateAudit),
task.CreateAudit: toRunFn(async.LoadCreateAuditRequestFromEnv, func(ctx context.Context, id task.ID, req *task.CreateAuditRequest) error {
return h.CreateAudit(ctx, id, req, *azAuditContainer)
}),
}

taskID := task.ID(os.Getenv("TASK_ID"))
47 changes: 33 additions & 14 deletions frontend/components/analysis/AccessButtons.vue
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<script setup lang="ts">
import { type Analysis, type AccessBlobContentReqItem, type AccessBlobContentResp } from '@/openapi/generated/pacta'
import { type Analysis, type AccessBlobContentReqItem, type AccessBlobContentResp, AnalysisType } from '@/openapi/generated/pacta'
import JSZip from 'jszip'
const { t } = useI18n()
@@ -33,28 +33,45 @@ const canAccessAsOwner = computed(() => {
const canAccess = computed(() => {
return canAccessAsPublic.value || canAccessAsAdmin.value || canAccessAsOwner.value
})
const isCompleted = computed(() => {
return props.analysis.completedAt !== undefined && props.analysis.failureMessage === undefined
})
const isViewable = computed(() => {
return isCompleted.value && (props.analysis.analysisType === AnalysisType.ANALYSIS_TYPE_DASHBOARD || props.analysis.analysisType === AnalysisType.ANALYSIS_TYPE_REPORT)
})
const downloadInProgress = useState<boolean>(`${statePrefix}.downloadInProgress`, () => false)
const doDownload = async () => {
downloadInProgress.value = true
// Just download the audit_file.csv for audits
const artifactsToDownload = props.analysis.analysisType === AnalysisType.ANALYSIS_TYPE_AUDIT ? props.analysis.artifacts.filter((a) => a.blob.fileName === 'audit_file.csv') : props.analysis.artifacts
const response: AccessBlobContentResp = await pactaClient.accessBlobContent({
items: props.analysis.artifacts.map((asset): AccessBlobContentReqItem => ({
items: artifactsToDownload.map((asset): AccessBlobContentReqItem => ({
blobId: asset.blob.id,
})),
})
const zip = new JSZip()
await Promise.all(response.items.map(
async (item): Promise<void> => {
const response = await fetch(item.downloadUrl)
const data = await response.blob()
const blob = presentOrFileBug(props.analysis.artifacts.find((artifact) => artifact.blob.id === item.blobId)).blob
const fileName = `${blob.fileName}`
zip.file(fileName, data)
}),
)
const content = await zip.generateAsync({ type: 'blob' })
let content: Blob | null = null
let fileName: string | null = null
if (response.items.length === 1) {
const resp = await fetch(response.items[0].downloadUrl)
content = await resp.blob()
fileName = artifactsToDownload[0].blob.fileName
} else {
const zip = new JSZip()
await Promise.all(response.items.map(
async (item): Promise<void> => {
const response = await fetch(item.downloadUrl)
const data = await response.blob()
const blob = presentOrFileBug(props.analysis.artifacts.find((artifact) => artifact.blob.id === item.blobId)).blob
const fileName = `${blob.fileName}`
zip.file(fileName, data)
}),
)
content = await zip.generateAsync({ type: 'blob' })
fileName = `${props.analysis.name}.zip`
}
const element = document.createElement('a')
element.href = URL.createObjectURL(content)
const fileName = `${props.analysis.name}.zip`
element.download = fileName
document.body.appendChild(element)
element.click()
@@ -76,13 +93,15 @@ const openReport = () => navigateTo(`${apiServerURL}/report/${props.analysis.id}
class="flex gap-1 align-items-center w-fit"
>
<PVButton
v-if="isViewable"
icon="pi pi-external-link"
:disabled="!canAccess"
class="p-button-secondary p-button-outlined p-button-xs"
:label="tt('View')"
@click="openReport"
/>
<PVButton
v-if="isCompleted"
v-tooltip="canAccess ? tt('Download') : ''"
:disabled="downloadInProgress || !canAccess"
:loading="downloadInProgress"
7 changes: 7 additions & 0 deletions frontend/components/portfolio/ListView.vue
Original file line number Diff line number Diff line change
@@ -377,6 +377,13 @@ const auditLogURL = (id: string) => {
{{ slotProps.data.analyses.filter((a: Analysis) => a.analysisType === AnalysisType.ANALYSIS_TYPE_AUDIT).length }}
{{ tt('Audits') }}
</PVInlineMessage>
<PVInlineMessage
severity="success"
icon="pi pi-copy"
>
{{ slotProps.data.analyses.filter((a: Analysis) => a.analysisType === AnalysisType.ANALYSIS_TYPE_DASHBOARD).length }}
{{ tt('Dashboards') }}
</PVInlineMessage>
</div>
</CommonAccordionHeader>
</template>
2 changes: 2 additions & 0 deletions frontend/lang/en.json
Original file line number Diff line number Diff line change
@@ -208,6 +208,7 @@
"Details": "Details",
"Groups": "Groups",
"Reports": "Reports",
"Dashboards": "Dashboards",
"Discard Changes": "Discard Changes",
"InitiativesHelpText": "Initiatives allow you to contribute your data for bulk analysis as part of a regulatory or other project. Adding a portfolio to an initiative enables the initiative owner to run analysis over your data as part of bulk runs, and enables them to download your data.",
"Audits": "Audits",
@@ -259,6 +260,7 @@
},
"components/analysis/RunButton": {
"Run": "Run",
"Running": "Running",
"AnalysisTypeAUDIT": "Audit",
"AnalysisTypeREPORT": "Report",
"AnalysisTypeDASHBOARD": "Dashboard (Beta)",
9 changes: 9 additions & 0 deletions taskrunner/taskrunner.go
Original file line number Diff line number Diff line change
@@ -157,6 +157,15 @@ func (tr *TaskRunner) CreateAudit(ctx context.Context, req *task.CreateAuditRequ
Key: "CREATE_AUDIT_REQUEST",
Value: value,
},
// TODO(brandon): Unhardcode these
{
Key: "BENCHMARK_DIR",
Value: "/mnt/benchmark-data/65c1a416721b22a98c7925999ae03bc4",
},
{
Key: "PACTA_DATA_DIR",
Value: "/mnt/pacta-data/2023Q4_20240718T150252Z",
},
})
}

0 comments on commit 6c542ef

Please sign in to comment.