Skip to content

Commit

Permalink
[GH-1221] import snapshots into a workspace (#347)
Browse files Browse the repository at this point in the history
RR: https://broadinstitute.atlassian.net/browse/GH-1221
In this PR, I'm adding test demonstration code for how to import data from a TDR snapshot into a Terra Workspace. This includes:

Querying all the rows in the specified table in the snapshot
Selecting and renaming the columns of the table to match those of the target entity
Importing the rows as new entities into the workspace.
The renaming works by viewing the rows of the table as a list of maps, mapping those names onto workspace names, then reconstructing the table.
Added dataset/snapshot querying utilities to the datarepo namespace.

Bug fixes:

"flattening" the big-query rows didn't flatten rows of arrays
  • Loading branch information
ehigham authored Apr 2, 2021
1 parent 9589dc6 commit e3d500d
Show file tree
Hide file tree
Showing 17 changed files with 429 additions and 281 deletions.
4 changes: 2 additions & 2 deletions api/src/wfl/environment.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"WFL_COOKIE_SECRET"
#(-> "secret/dsde/gotc/dev/zero" vault-secrets :cookie_secret)
"WFL_TDR_URL"
#(-> "https://jade.datarepo-dev.broadinstitute.org/")
#(-> "https://jade.datarepo-dev.broadinstitute.org")
"WFL_OAUTH2_CLIENT_ID"
#(-> "secret/dsde/gotc/dev/zero" vault-secrets :oauth2_client_id)
"WFL_POSTGRES_PASSWORD"
Expand All @@ -47,7 +47,7 @@
"WFL_POSTGRES_USERNAME"
#(-> nil)
"WFL_FIRECLOUD_URL"
#(-> "https://api.firecloud.org/")
#(-> "https://api.firecloud.org")

;; -- variables used in test code below this line --
"WFL_CROMWELL_URL"
Expand Down
13 changes: 3 additions & 10 deletions api/src/wfl/service/cromwell.clj
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,6 @@
(defn wait-for-workflow-complete
"Return status of workflow named by ID when it completes, given Cromwell URL."
[url id]
(work-around-cromwell-fail-bug 9 url id)
(loop [url url id id]
(let [seconds 15
now (status url id)]
(if (#{"Submitted" "Running"} now)
(do (log/infof "%s: Sleeping %s seconds on status: %s"
id seconds now)
(util/sleep-seconds seconds)
(recur url id))
(status url id)))))
(let [finished? (comp (set final-statuses) #(status url id))]
(work-around-cromwell-fail-bug 9 url id)
(util/poll finished? 15 (Integer/MAX_VALUE))))
167 changes: 93 additions & 74 deletions api/src/wfl/service/datarepo.clj
Original file line number Diff line number Diff line change
@@ -1,34 +1,38 @@
(ns wfl.service.datarepo
"Do stuff in the data repo."
(:require [clj-http.client :as http]
[clojure.data.json :as json]
[clojure.string :as str]
[wfl.auth :as auth]
[wfl.environment :as env]
[wfl.mime-type :as mime-type]
[wfl.util :as util])
(:require [clj-http.client :as http]
[clojure.data.json :as json]
[clojure.string :as str]
[wfl.auth :as auth]
[wfl.environment :as env]
[wfl.mime-type :as mime-type]
[wfl.service.google.bigquery :as bigquery]
[wfl.util :as util])
(:import (java.time Instant)
(java.util.concurrent TimeUnit)))

(defn ^:private datarepo-url [& parts]
(let [url (util/slashify (env/getenv "WFL_TDR_URL"))]
(apply str url parts)))
(let [url (util/de-slashify (env/getenv "WFL_TDR_URL"))]
(str/join "/" (cons url parts))))

(def ^:private repository
"API URL for Data Repo API."
(partial datarepo-url "api/repository/v1/"))
(partial datarepo-url "api/repository/v1"))

(defn ^:private get-repository-json [& parts]
(-> (apply repository parts)
(http/get {:headers (auth/get-auth-header)})
util/response-body-json))

(defn dataset
"Query the DataRepo for the Dataset with `dataset-id`."
[dataset-id]
(-> (repository "datasets/" dataset-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json))
(get-repository-json "datasets" dataset-id))

(defn ^:private ingest
"Ingest THING to DATASET-ID according to BODY."
[thing dataset-id body]
(-> (repository (format "datasets/%s/%s" dataset-id thing))
(-> (repository "datasets" dataset-id thing)
(http/post {:content-type :application/json
:headers (auth/get-service-account-header)
:body (json/write-str body :escape-slash false)})
Expand Down Expand Up @@ -73,25 +77,24 @@
(ingest
"ingest"
dataset-id
{:format "json"
:load_tag "string"
:max_bad_records 0
:path path
:table table}))
{:format "json"
:load_tag (new-load-tag)
:max_bad_records 0
:path path
:table table}))

(defn poll-job
"Return result for JOB-ID in ENVIRONMENT when it stops running."
[job-id]
(let [get-result #(-> (repository "jobs/" job-id "/result")
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json)
running? #(-> (repository "jobs/" job-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json
:job_status
#{"running"})]
(while (running?) (.sleep TimeUnit/SECONDS 1))
(get-result)))
"Poll the job with `job-id` every `seconds` [default: 5] and return its
result."
([job-id seconds]
(let [result #(get-repository-json "jobs" job-id "result")
running? #(-> (get-repository-json "jobs" job-id)
:job_status
#{"running"})]
(while (running?) (.sleep TimeUnit/SECONDS seconds))
(result)))
([job-id]
(poll-job job-id 5)))

(defn create-dataset
"Create a dataset with EDN `dataset-request` and return the id
Expand All @@ -111,26 +114,21 @@
:id))

(defn delete-dataset
"Delete the Dataset with `dataset-id`."
"Delete the dataset with `dataset-id`."
[dataset-id]
(-> (repository "datasets/" dataset-id)
(-> (repository "datasets" dataset-id)
(http/delete {:headers (auth/get-service-account-header)})
util/response-body-json
:id
poll-job))

;; Note the TDR is under active development,
;; the endpoint spec is getting changed so the
;; spec in this function is not consistent with
;; the TDR Swagger page in order to make the
;; request work.
;; Note the TDR is under active development, the endpoint spec is getting
;; changed so the spec in this function is not consistent with the TDR Swagger
;; page in order to make the request work.
;; See also https://cloud.google.com/bigquery/docs/reference/standard-sql/migrating-from-legacy-sql
(defn create-snapshot
"Return snapshot-id when the snapshot defined
by `snapshot-request` is ready.
See `SnapshotRequestModel` in the
DataRepo swagger page for more information.
"Return snapshot-id when the snapshot defined by `snapshot-request` is ready.
See `SnapshotRequestModel` in the DataRepo swagger page for more information.
https://jade.datarepo-dev.broadinstitute.org/swagger-ui.html#/"
[snapshot-request]
(-> (repository "snapshots")
Expand All @@ -143,9 +141,8 @@
:id))

(defn list-snapshots
"Return snapshots optionally filtered by source dataset,
where dataset-ids identify the source datasets.
Hard-coded to return 999 pages for now.
"Return snapshots optionally filtered by source dataset, where dataset-ids
identify the source datasets. Hard-coded to return 999 pages for now.
Parameters
----------
Expand All @@ -155,7 +152,7 @@
Example
-------
(list-snapshots)
(list-snapshots \"48a51f71-6bab-483d-a270-3f9ebfb241cd\" \"85efdfea-52fb-4698-bee6-eef76104a7f4\")"
(list-snapshots \"48a51f71-6bab-483d-a270-3f9ebfb241cd\")"
[& dataset-ids]
(letfn [(maybe-merge [m k v] (if (seq v) (assoc m k {:datasetIds v}) m))]
(-> (http/get (repository "snapshots")
Expand All @@ -167,7 +164,7 @@
(defn delete-snapshot
"Delete the Snapshot with `snapshot-id`."
[snapshot-id]
(-> (repository "snapshots/" snapshot-id)
(-> (repository "snapshots" snapshot-id)
(http/delete {:headers (auth/get-service-account-header)})
util/response-body-json
:id
Expand All @@ -176,30 +173,7 @@
(defn snapshot
"Return the snapshot with `snapshot-id`."
[snapshot-id]
(-> (repository "snapshots/" snapshot-id)
(http/get {:headers (auth/get-service-account-header)})
util/response-body-json))

;; Note if there are no matching rows between (start, end], TDR will throw
;; a 400 exception.
;; Note TDR prefixes datasets in BigQuery with `datarepo_`.
(defn make-snapshot-query
"Make row-id query payload from `dataset` and `table`,
given a date range specified by exclusive `start` and inclusive `end`.
Parameters
----------
_dataset - Dataset information response from TDR.
table - Name of the table in the dataset schema to query from.
start - The start date object in the timeframe to query exclusively.
end - The end date object in the timeframe to query inclusively."
[{:keys [name dataProject] :as _dataset} table start end]
(let [dataset-name (str "datarepo_" name)
query (str/join \newline ["SELECT datarepo_row_id"
"FROM `%s.%s.%s`"
"WHERE datarepo_ingest_date > '%s'"
"AND datarepo_ingest_date <= '%s'"])]
(format query dataProject dataset-name table start end)))
(get-repository-json "snapshots" snapshot-id))

(defn all-columns
"Return all of the columns of `table` in `dataset` content."
Expand All @@ -212,8 +186,8 @@
;; Note TDR uses snapshot names as unique identifier so the
;; name must be unique among snapshots.
(defn make-snapshot-request
"Return a snapshot request for `row-ids`and `columns`
from `table` name in `_dataset`."
"Return a snapshot request for `row-ids`and `columns` from `table` name
in `_dataset`."
[{:keys [name defaultProfileId description] :as _dataset} columns table row-ids]
(let [row-ids (vec row-ids)]
{:contents [{:datasetName name
Expand All @@ -224,3 +198,48 @@
:description description
:name name
:profileId defaultProfileId}))

;; hack - TDR adds the "datarepo_" prefix to the dataset name in BigQuery
;; They plan to expose this name via `GET /api/repository/v1/datasets/{id}`
;; in a future release.
(defn ^:private bigquery-name
"Return the BigQuery name of the `dataset-or-snapshot`."
[{:keys [name] :as dataset-or-snapshot}]
(letfn [(snapshot? [x] (util/absent? x :defaultSnapshotId))]
(if (snapshot? dataset-or-snapshot) name (str "datarepo_" name))))

(defn ^:private query-table-impl
([{:keys [dataProject] :as dataset} table col-spec]
(let [bq-name (bigquery-name dataset)]
(->> (format "SELECT %s FROM `%s.%s.%s`" col-spec dataProject bq-name table)
(bigquery/query-sync dataProject)))))

(defn query-table
"Query everything or optionally the `columns` in `table` in the Terra DataRepo
`dataset`, where `dataset` is a DataRepo dataset or a snapshot of a dataset."
([dataset table]
(query-table-impl dataset table "*"))
([dataset table columns]
(->> (util/to-comma-separated-list (map name columns))
(query-table-impl dataset table))))

(defn ^:private query-table-between-impl
[{:keys [dataProject] :as dataset} table [start end] col-spec]
(let [bq-name (bigquery-name dataset)
query "SELECT %s
FROM `%s.%s.%s`
WHERE datarepo_ingest_date > '%s'
AND datarepo_ingest_date <= '%s'"]
(->> (format query col-spec dataProject bq-name table start end)
(bigquery/query-sync dataProject))))

(defn query-table-between
"Query everything or optionally the `columns` in `table` in the Terra DataRepo
`dataset` in the open-closed `interval` of `datarepo_ingest_date`, where
`dataset` is a DataRepo dataset or a snapshot of a dataset. If no rows match
the `interval`, TDR will respond with error 400."
([dataset table interval]
(query-table-between-impl dataset table interval "*"))
([dataset table interval columns]
(->> (util/to-comma-separated-list (map name columns))
(query-table-between-impl dataset table interval))))
Loading

0 comments on commit e3d500d

Please sign in to comment.