DISCLAIMER: The predictions made by this application are presented for the purpose of demonstrating how to integrate machine learning into a FHIR application, and are not medical advice. All data in this demo is generated and not based on any patient health data.
- Create a new project on Google Cloud.
- Enable the Cloud Healthcare API.
- Enable Cloud AI Platform.
- Enable Cloud Functions
- Launch the Google Cloud Shell.
You will now convert a Synthea dataset to Tensorflow (TF) records for training a new model. Remember, this is sample data and a sample modeling exercise. Do not use this data or model in live clinical settings.
Set required shell variables used throughout this lab:
PROJECT_ID=`gcloud config get-value project`
DATASET_BUCKET=synthea-fhir-demo-dataset
BUCKET=${PROJECT_ID?}-data
REGION=us-central1
If you get a warning message from the previous command that says you do not
currently have a project selected use gcloud
to change the active project. You
can see your project ID below the username and password in this lab.
gcloud config set project
Cloud Storage is a blob storage service on Google Cloud. You will use Cloud Storage to persist generated synthetic data for this lab. A "bucket" is a container for data within Cloud Storage controlled by the access rules of the parent Google Cloud project.
Create a new Cloud Storage bucket to hold data for this lab using
gsutil
. gsutil
is a command-line utility for interacting with Cloud
Storage; it has operations that work similarly to common local file
system commands.
gsutil mb -c regional -l ${REGION?} gs://${BUCKET?}
TensorFlow is an open-source, machine learning framework. A TensorFlow record is an efficient data format for passing the features of a problem domain to TensorFlow for training.
You will now convert Patient
bundles to
TensorFlow records. A Python script called assemble_training_data.py
has been provided within the cloned repository. This script performs
several steps required before a TensorFlow model can be trained.
-
Feature extraction: inspect each Patient bundle to determine if the Patient was ever diagnosed with lung cancer. This information comes from a Condition with a SNOMED code of 254637007, 424132000, or 162573006. The other features are patient weight, smoking status, and age.
-
Data conversion: Once the features are extracted they are combined into Tensorflow records.
-
Dataset splitting: The dataset must be split into a training set and an evaluation set. We use 80% of the dataset for training.
-
Upload: TensorFlow records generated by this script will be written to the Cloud Storage bucket created previously.
python3 -m scripts.assemble_training_data \
--src_bucket=${DATASET_BUCKET?} \
--src_folder=synthea \
--dst_bucket=${BUCKET?} \
--dst_folder=tfrecords
Verify that the TensorFlow records were generated correctly by listing the contents of the destination folder on Cloud Storage:
gsutil ls gs://${BUCKET?}/tfrecords
You will now train a TensorFlow model using the newly created TensorFlow records.
A Python script called model.py
has been provided within the cloned
repository. This script will perform model training and export a
serialized snapshot of the model to your Cloud Storage bucket. This
script accepts as input several hyperparameters for model training.
To train a TensorFlow model from the data on Cloud Storage, invoke pass in the paths to the training and evaluation data we created in the last step, as well as some other training hyperparameters. (Production models should adjust such parameters based on the characterics of the source data and performance requirements.) For purposes of this lab, a simple model is defined that should only take a few seconds to train on a single CPU:
python3 -m models.trainer.model \
--training_data=gs://${BUCKET?}/tfrecords/training.tfrecord \
--eval_data=gs://${BUCKET?}/tfrecords/eval.tfrecord \
--model_dir=gs://${BUCKET?}/model \
--training_steps=3000 \
--eval_steps=1000 \
--learning_rate=0.1 \
--export_model_dir=gs://${BUCKET?}/saved_model
The newly trained model is now available in your Cloud Storage bucket
within a directory labeled saved_model
and a subdirectory labeled with
a timestamp. Find the timestamp using gsutil
and save it as an
environment variable.
TIMESTAMP=`gsutil ls gs://${BUCKET?}/saved_model/ | grep -oE '\[0-9\]+'`
AI Platform is a Cloud
product that can host TensorFlow models. It exposes a REST interface for
your model that accepts the same inputs as it was trained on, this time
as JSON, and returns a prediction. Models are created from the snapshot
generated by training. Once you have created your model on AI Platform,
you will create a version of that model. A version
is an implementation
of a model, represented as a serialized TensorFlow graph with trained
parameters.
We will call the model devdaysdemo
and create a version of it called v1
.
Here we use gcloud to interact with ai-platform. gcloud is a
command-line interface that makes working with many of the GCP’s APIs
easier.
export MODEL=devdaysdemo
export VERSION=v1
gcloud ai-platform models create --regions ${REGION?} ${MODEL?}
gcloud ai-platform versions create ${VERSION?} \
--async \
--model ${MODEL?} \
--origin gs://${BUCKET?}/saved_model/${TIMESTAMP?}
You used the --async
flag to create a model version; the operation will
finish within a few minutes. You can poll AI Platform to check on the
status of the operation:
gcloud ai-platform operations list
Once the AI Platform operations returns DONE
, test your model using some
sample data:
python3 -m scripts.predict \
--project ${PROJECT_ID?} \
--model ${MODEL?} \
--version ${VERSION?}
Tip. You can continue the lab and return to this step once the model version is available.
Cloud Healthcare API is a data storage and processing service for healthcare data. Using Cloud Healthcare API, you can create FHIR stores within your Google Cloud project. A FHIR store persists FHIR resources to disk and exposes a FHIR API as its interface. You will use a FHIR store to hold newly generated Patient data.
Every store in Cloud Healthcare API can be associated with a Cloud Pub/Sub Topic. Cloud Pub/Sub is a managed event messaging service. A Cloud Pub/Sub Topic is a named resource (or channel) to which messages are sent. Thus, a change notification is sent through the Cloud Pub/Sub Topic whenever FHIR data is written, updated, or deleted within a FHIR store.
A Cloud Healthcare API dataset contains one or more stores. Set environment variables needed to create and subsequently identify the Cloud Healthcare dataset and FHIR store for this lab; in addition, set the identifier for the Pub/Sub Topic that will be assigned to the FHIR store:
DATASET_ID=devdays
FHIR_STORE_ID=lung-cancer
PUBSUB_TOPIC=fhir
Using gcloud
, create the Cloud Pub/Sub Topic, Cloud Healthcare dataset,
and Cloud Healthcare FHIR store:
gcloud pubsub topics create ${PUBSUB_TOPIC?}
gcloud alpha healthcare datasets create ${DATASET_ID?}
gcloud alpha healthcare fhir-stores create \
--dataset ${DATASET_ID?} \
--pubsub-topic "projects/${PROJECT_ID?}/topics/${PUBSUB_TOPIC?}" \
--enable-update-create \
${FHIR_STORE_ID?}
You created a FHIR store with two optional fields configured,
pubsubTopic
and enableUpdateCreate
. pubsubTopic
provides the name of the
topic to which the FHIR store will publish create, update, delete, etc.
events. enableUpdateCreate
allows PUT
s to act as a POST
when the
resource does not exist. We use this in this lab to provide specific IDs
for resources, instead of having the FHIR store generate them. For other
parameters see the documentation for creating a FHIR store.
gcloud alpha healthcare fhir-stores create --help
Tip: Cloud Healthcare API also supports HL7v2 and DICOM stores; however, those are not used in this lab.
Take a look at the file inference/main.py
. This is a Python script that
will be invoked by Cloud Functions each time it receives a Pub/Sub
notification. Cloud Functions is an event-driven, serverless computing
service. The Python script receives a message object that looks like
this
{
"message":{
"attributes":{
"action":"CreateResource",
"resourceType":"Observation"
},
"data":"<BASE64 encoded resource name>",
"messageId":"0000000000000000",
"publishTime":"2018-11-14T00:00:00Z"
}
}
Cloud Pub/Sub notifications generated by Cloud Healthcare API FHIR stores contain the full resource path of the modified resource within the base64-encoded data parameter of the notification. In the previous example, a new FHIR Observation was created.
When main.py gets an update it checks if the resource type is relevant
to the model we are running, Observation, Condition or Patient. If so,
the function uses the resource to retrieve the Patient bundle that has
been updated, using Patient/$everything
. After extracting the model
features (using the same methods as assemble_training_data.py
did),
the Cloud Function makes a REST call to the model you deployed to AI
Platform. The response is a risk prediction (expressed as a percentage)
so the Cloud Function uses this to generate a
RiskAssessment
resource which is inserted into your FHIR store:
{
"basis": [
{
"reference": "Patient/a6021478-3793-45e2-b059-72cb3ad89200"
}
],
"id": "7f3cf100-7ea4-441e-b9a3-5aa892650471",
"meta": {
"lastUpdated": "2000-01-01T12:00:00+00:00",
"versionId": "MTU1ODcxOTg4MTQ2NzUwMDAwMA"
},
"occurrenceDateTime": "2000-01-01T12:00:00Z",
"prediction": [
{
"outcome": {
"coding": [
{
"code": "162573006",
"display": "Suspected lung cancer (situation)",
"system": "http://snomed.info/sct"
}
],
"text": "Suspected lung cancer (situation)"
},
"qualitativeRisk": {
"coding": [
{
"code": "moderate",
"system": "http://hl7.org/fhir/risk-probability"
}
]
}
}
],
"resourceType": "RiskAssessment",
"status": "final",
"subject": {
"reference": "Patient/a6021478-3793-45e2-b059-72cb3ad89200"
}
}
Tip. This lab uses Cloud Functions for its simplicity, ability to isolate business logic, and low cost maintenance but other options such as App Engine and Cloud Run are available.
A shell script called deploy.sh
has been provided within the cloned
repository. It is a wrapper around gcloud beta functions deploy
that
also copies so of the Cloud Function’s dependencies into the correct
directory. The deploy.sh
script accepts a name for the Cloud Function
(we use the same name as the model it calls), the Pub/Sub topic name
that will trigger the Cloud Function, as well as the model name and
version, so the Cloud Function knows how to construct a request to AI
Platform.
inference/deploy.sh \
--name ${MODEL?} \
--topic ${PUBSUB_TOPIC?} \
--env_vars MODEL=${MODEL?},VERSION=${VERSION?}
Cloud Function deployment will finish in a few minutes.
Set environment variables needed to make subsequent REST calls easier to read; TOKEN is the authentication token for interacting with Cloud Healthcare API and your FHIR store over HTTPS while BASE_URL represents the Base URL of your FHIR store (FHIR server):
TOKEN="Authorization: Bearer $(gcloud auth print-access-token)"
CT="Content-Type: application/json+fhir; charset=utf-8"
BASE_URL="https://healthcare.googleapis.com/v1beta1/projects/${PROJECT_ID?}/locations/${REGION?}/datasets/${DATASET_ID?}/fhirStores/${FHIR_STORE_ID?}/fhir"
Download sample data for the Patient against which you will generate a prediction. The demo patient is a 34-year old female with no history of smoking who is currently 71 kg (roughly 156 pounds):
gsutil cp gs://${DATASET_BUCKET}/synthea/patient_bundle.json .
Send the Patient bundle to the FHIR store:
curl -X POST -H "${TOKEN?}" -H "${CT?}" \
-d @patient_bundle.json \
"${BASE_URL?}"
Your Cloud Healthcare FHIR store will receive and process the Patient data. The FHIR store will generate a notification to the Pub/Sub Topic you associated with the store. That notification will trigger an invocation of your Cloud Function. Because the bundle is processed within a single database transaction, the Cloud Function will retrieve the complete Patient bundle, extract the model input parameters, invoke your TensorFlow model, and write the resulting prediction back to the FHIR store as a RiskAssessment. 🥳
Search for your newly created Patient to ensure she is in your FHIR store:
curl -H "${TOKEN?}" "${BASE_URL?}/Patient?name=Amberly"
Search for any RiskAssessments. You should see one created by your Cloud
Function. Amberly's risk will be negligible
:
curl -H "${TOKEN?}" "${BASE_URL?}/RiskAssessment"
In the synthetic data, smoking status is determined based on a response
to a "Tobacco smoking status NHIS survey" the patient receives during a
hypothetical visit to the clinic. To change that level of risk, update
one of Amberly's checkups to change her smoking status response from
Never smoker
to Every day smoker
.
gsutil cp gs://${DATASET_BUCKET}/synthea/smoking_survey.json .
curl -X PUT -H "${TOKEN?}" -H "${CT?}" \
-d @smoking_survey.json \
"${BASE_URL?}/Observation/a39bb260-4768-4989-8e1b-730c71085f58"
Search for the RiskAssessments again. The RiskAssessment's risk has been
updated from negligible
to moderate
:
curl -H "${TOKEN?}" "${BASE_URL?}/RiskAssessment"