forked from aws/amazon-sagemaker-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathabalone_processing.py
140 lines (113 loc) · 4.93 KB
/
abalone_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from __future__ import print_function, unicode_literals
import csv
import os
import shutil
import sys
import time
import boto3
import pyspark
from awsglue.utils import getResolvedOptions
from mleap.pyspark.spark_support import SimpleSparkSerializer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
def csv_line(data):
r = ",".join(str(d) for d in data[1])
return str(data[0]) + "," + r
def main():
spark = SparkSession.builder.appName("PySparkAbalone").getOrCreate()
args = getResolvedOptions(
sys.argv,
[
"S3_INPUT_BUCKET",
"S3_INPUT_KEY_PREFIX",
"S3_OUTPUT_BUCKET",
"S3_OUTPUT_KEY_PREFIX",
"S3_MODEL_BUCKET",
"S3_MODEL_KEY_PREFIX",
],
)
# This is needed to save RDDs which is the only way to write nested Dataframes into CSV format
spark.sparkContext._jsc.hadoopConfiguration().set(
"mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter"
)
# Defining the schema corresponding to the input data. The input data does not contain the headers
schema = StructType(
[
StructField("sex", StringType(), True),
StructField("length", DoubleType(), True),
StructField("diameter", DoubleType(), True),
StructField("height", DoubleType(), True),
StructField("whole_weight", DoubleType(), True),
StructField("shucked_weight", DoubleType(), True),
StructField("viscera_weight", DoubleType(), True),
StructField("shell_weight", DoubleType(), True),
StructField("rings", DoubleType(), True),
]
)
# Downloading the data from S3 into a Dataframe
total_df = spark.read.csv(
(
"s3://"
+ os.path.join(args["S3_INPUT_BUCKET"], args["S3_INPUT_KEY_PREFIX"], "abalone.csv")
),
header=False,
schema=schema,
)
# StringIndexer on the sex column which has categorical value
sex_indexer = StringIndexer(inputCol="sex", outputCol="indexed_sex")
# one-hot-encoding is being performed on the string-indexed sex column (indexed_sex)
sex_encoder = OneHotEncoder(inputCol="indexed_sex", outputCol="sex_vec")
# vector-assembler will bring all the features to a 1D vector for us to save easily into CSV format
assembler = VectorAssembler(
inputCols=[
"sex_vec",
"length",
"diameter",
"height",
"whole_weight",
"shucked_weight",
"viscera_weight",
"shell_weight",
],
outputCol="features",
)
# The pipeline comprises of the steps added above
pipeline = Pipeline(stages=[sex_indexer, sex_encoder, assembler])
# This step trains the feature transformers. We need to serialize this model with MLeap and save to S3
model = pipeline.fit(total_df)
# This step transforms the dataset with information obtained from the previous fit
transformed_total_df = model.transform(total_df)
# Split the overall dataset into 80-20 training and validation
(train_df, validation_df) = transformed_total_df.randomSplit([0.8, 0.2])
# Convert the train dataframe to RDD to save in CSV format and upload to S3
train_rdd = train_df.rdd.map(lambda x: (x.rings, x.features))
train_lines = train_rdd.map(csv_line)
train_lines.saveAsTextFile(
"s3://" + os.path.join(args["S3_OUTPUT_BUCKET"], args["S3_OUTPUT_KEY_PREFIX"], "train")
)
# Convert the validation dataframe to RDD to save in CSV format and upload to S3
validation_rdd = validation_df.rdd.map(lambda x: (x.rings, x.features))
validation_lines = validation_rdd.map(csv_line)
validation_lines.saveAsTextFile(
"s3://" + os.path.join(args["S3_OUTPUT_BUCKET"], args["S3_OUTPUT_KEY_PREFIX"], "validation")
)
# Serialize and store the model via MLeap
SimpleSparkSerializer().serializeToBundle(model, "jar:file:/tmp/model.zip", validation_df)
# Unzip the model as SageMaker expects a .tar.gz file but MLeap produces a .zip file
import zipfile
with zipfile.ZipFile("/tmp/model.zip") as zf:
zf.extractall("/tmp/model")
# Writw back the content as a .tar.gz file
import tarfile
with tarfile.open("/tmp/model.tar.gz", "w:gz") as tar:
tar.add("/tmp/model/bundle.json", arcname="bundle.json")
tar.add("/tmp/model/root", arcname="root")
# Upload the model in tar.gz format to S3 so that it can be used with SageMaker for inference later
s3 = boto3.resource("s3")
file_name = os.path.join(args["S3_MODEL_KEY_PREFIX"], "model.tar.gz")
s3.Bucket(args["S3_MODEL_BUCKET"]).upload_file("/tmp/model.tar.gz", file_name)
if __name__ == "__main__":
main()