A datalake is a centralized repository that allows you to store structured and unstructured data at any scale. Business organizations require datalake because it has been shown that those organizations with datalake are able to retrieve and use actionable business intelligence from their lakes and outperform their peers. There are four essential elements of a Datalake Analytics solution
- Data Movement (Batch and or Streaming)
- Data Catalog and Security
- Analytics
- Machine Learning
This project falls into the first element, which is the Data Movement and the intent is to provide an example pattern for designing an incremental ingestion pipeline on the AWS cloud using a AWS Step Functions and a combination of multiple AWS Services such as Amazon S3, Amazon DynamoDB, Amazon ElasticMapReduce and Amazon Cloudwatch Events Rule. This pattern does not replace what is already provided within AWS Glue and or Amazon Datapipeline, it only serves to provide an example pattern for Engineers who are interested in using a combination of AWS services to achieve a similar purpose.
- Amazon Cloudwatch Events
- AWS Step StepFunctions
- Amazon Lambda Functions
- Amazon DynamoDB
- Spark on Elastic MapReduce
- Amazon S3
- Amazon Aurora RDS
- Amazon Cloudformation
- An Amazon Web Services Account
- AWS CLI Installed and configured
- Create an Aurora RDS database (Optional if you have an existing Aurora Postgresql database) and load sample data into RDS Aurora database tables
- Execute Incremental Ingestion pipeline
####Prerequisites:
- A VPC with at least one Private and Public Subnet
- (Optional) An EC2 instance that can used as a Bastion Host for connection to the created database
- Clone the repository
- Create an S3 bucket and sync the repository to the bucket. aws s3 sync . s3://
- Create a Aurora RDS database using this cloudformation template glue/postgredb.yml
- Navigate to the /glue folder and open the aws-glue-etl-job.py, replace the values for database (etl) with your database name, save and upload to s3.
- Create Glue Crawler and Job load stack using the aws-etl-load-rds.yml cloudformation template. This cloudformation stack will create Glue crawlers that will crawl the public s3 bucket locations(dfw-meeetup-emr/Deposits, Loans, Investments and Shipments) and a glue job to load data from the s3 bucket locations into the Aurora database you created. The data in these locations are made up.
- Parameter Values for above
Parameter Name | Parameter Value |
---|---|
CFNConnectionName | cfn-connection-spark-1 |
CFNDatabaseName | cfn-database-s3 |
CFNDepositsCrawlerName | cfn-crawler-spark-dep |
CFNInvestmentsCrawlerName | cfn-crawler-spark-inv |
CFNJDBCPassword | |
CFNJDBCString | |
CFNJDBCUser | |
CFNJobName | cfn-glue-job-s3-to-JDBC |
CFNLoansCrawlerName | cfn-crawler-spark-loa |
CFNS3PATHDEPOSIT | s3://dfw-meetup-emr/Deposits |
CFNS3PATHINV | s3://dfw-meetup-emr/Investments |
CFNS3PATHLOAN | s3://dfw-meetup-emr/Loans |
CFNS3PATHSHIP | s3://dfw-meetup-emr/Shipments |
CFNScriptLocation | s3:///aws-glue-etl-job.py |
CFNShipmentsCrawlerName | cfn-crawler-spark-shi |
CFNTablePrefixName | cfn_s3_sprk_1_ |
GlueCrawlerCustomKey | glue/aws-etl-start-crawler-custom-resource.py.zip |
GlueCrawlerCustomModule | aws-etl-start-crawler-custom-resource |
GlueJobCustomKey | glue/aws-etl-start-job-custom-resource.py.zip |
GlueJobCustomModule | aws-etl-start-job-custom-resource |
S3Bucket | |
SubnetId |
#####At the end of this part we would have created
- An AWS Aurora database
- Created Glue Crawlers and Glue Job to populate AWS Aurora Database with Sample data
- Successfully loaded data into Aurora database tables
The Aurora Database in this context represents the on premises database
####Prerequisites:
- An S3 Bucket
- EC2 Key pair
- VPC Private Subnet
- Navigate to the cfn/aws-sns-topic.yml and use it to create an SNS topic. This creates a cloudformation export that its value are then imported into the aws-etl-stepfunction stack. Confirm the subscription.
- Navigate to the cfn/aws-roles.yml and use it to create the roles that will be used by the step function , lambda ETL process. This creates a cloudformation export whose values are then imported into the aws-etl-stepfunction stack.
- Navigate to the cfn/emr-roles and use it to create the EMR roles. This creates a cloudformation export whose values are then imported into the aws-etl-stepfunction stack.
- Navigate to the cfn/emr-security-groups.yml and use it to create EMR security groups. This creates a cloudformation export for the security groups and its values are imported into the aws-etl-stepfunction stack.
- Navigate to the lambdas folder and upload all the zip files to an S3 bucket location <my_bucket_name>/lambdas. aws s3 sync lambdas s3://<my_bucket_name>/lambdas/
- Note the location and the names of the lambda functions , it will be used in the cloudformation stack to kick off the incremental ingestion execution run.
- Create AWS your database secrets using below commands from the AWSCLI aws ssm put-parameter --name postgre-psswd --type SecureString --value P@ssw0rd aws ssm put-parameter --name postgre-user --type SecureString --value aws ssm put-parameter --name postgre-jdbcurl --type String --value <jdbc:postgresql://-instance.2.rds.amazonaws.com:5432/example> This will be required from the sample spark script.
- Download the postgresql jdbc jar https://jdbc.postgresql.org/download.html and upload it to an S3 location. Note this location. aws s3 cp postgresql-42.2.6.jar s3:///
- Navigate to the ba folder in the repository, open the bootstrap-emr-step.sh and replace the value of the location of the postgresql jdbc jar, save the file and upload it to an s3 location. aws s3 sync ba s3:///ba/ aws s3 sync spark s3:///spark/
- Modify cfn/config.txt and replace the table names in columns 7,8 and 9 to yours. save and syn to s3 bucket folder aws s3 sync cfn s3:///cfn/
- Navigate to the cfn/aws-etl-stepfunction.json template and the cfn/stepfunction-parameters.json file. Replace the parameter values with your own parameter values.
Parameters to change in stepfunction-parameters.json
ParameterKey | ParameterValue |
---|---|
CreateEMRModuleName | aws_etl_emr_cluster_create |
AllJobsCompletedModule | aws_etl_all_steps_completed |
AllJobsCompletedS3Key | lambdas/aws_etl_all_steps_completed.zip |
ClusterStatusModuleName | aws_etl_emr_cluster_status |
ClusterStatusS3Key | lambdas/aws_etl_emr_cluster_status.zip |
configtable | aws_etl_conf |
CreateEMRS3Key | lambdas/aws_etl_emr_cluster_create.zip |
ec2keyname | <my_ec2_key_name> |
ec2subnetid | <my_subnet_id> |
emrbalocation | s3://<my_bucket_name>/ba/bootstrap-emr.sh |
emrname | AWS_SF_ETL_CLUSTER |
emrsteplocation | s3://my_bucket_name/ba/bootstrap-emr-step.sh |
EMRStepStatusModuleName | aws_etl_emr_step_status |
EMRStepStatusS3Key | lambdas/aws_etl_emr_step_status.zip |
EMRStepSubmitModuleName | aws_etl_add_emr_step |
EMRStepSubmitS3Key | lambdas/aws_etl_add_emr_step.zip |
GetNextEMRJobModule | aws_etl_iterator |
GetNextEMRJobS3Key | lambdas/aws_etl_iterator.zip |
historytable | aws_etl_history |
loguri | s3n://aws-logs-<MY-ACCOUNT_NUMBR>-us-west-2/elasticmapreduce |
regionname | us-west-2 |
releaselabel | emr-5.17.0 |
S3Bucket | <my_bucket_name> |
DDBConfigModule | aws_etl_conf_jobs_custom_resource |
DDBConfigS3Key | lambdas/aws_etl_conf_jobs_custom_resource.zip |
CustomResourceS3Key | cfn/config.txt |
emrcommandrunnerscript | s3://us-west-2.elasticmapreduce/libs/script-runner/script-runner.jar |
Environment | NonProd |
SubEnvironment | dev2 |
AccountName aws-etl-state-machine | |
ETLStateMachineDateRotationS3Key | lambdas/aws_etl_date_rotation.zip |
ETLStateMachineDateRotationModuleName | aws_etl_date_rotation |
- Navigate to the AWS management console for Cloudformation and browse to the cfn folder,, load the aws-roles.yml to create the roles that will be used by the pipeline.
- Modify the config.txt replace the bucket name values with your bucket name.
job_name | load_date | load_window_start | load_window_stop | job_flow_id | job_status | output_dir | script_source | database_name | table_name | window_db_column | partition_by_col | lower_bound | upper_bound | num_partitions |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
deposit | 11/5/18 | 2018-11-04 00:00:000 | 2018-11-05 00:00:000 | j-0000000000000 | PENDING | s3://my-bucketholder/RAW/ | s3://my-bucketholder/spark/ingest_on_prem_db_tables.py | spark | cfn_s3_sprk_1_deposits | shipmt_date_tstmp | quarter | 1 | 1000 | 10 |
investment | 11/5/18 | 2018-11-04 00:00:000 | 2018-11-05 00:00:000 | j-0000000000000 | PENDING | s3://my-bucketholder/RAW/ | s3://my-bucketholder/spark/ingest_on_prem_db_tables.py | spark | cfn_s3_sprk_1_investments | shipmt_date_tstmp | quarter | 1 | 1000 | 10 |
loan | 11/5/18 | 2018-11-04 00:00:000 | 2018-11-05 00:00:000 | j-0000000000000 | PENDING | s3://my-bucketholder/RAW/ | s3://my-bucketholder/spark/ingest_on_prem_db_tables.py | spark | cfn_s3_sprk_1_loans | shipmt_date_tstmp | quarter | 1 | 1000 | 10 |
shipment | 11/5/18 | 2018-11-04 00:00:000 | 2018-11-05 00:00:000 | j-0000000000000 | PENDING | s3://my-bucketholder/RAW/ | s3://my-bucketholder/spark/ingest_on_prem_db_tables.py | spark | cfn_s3_sprk_1_shipments | shipmt_date_tstmp | quarter | 1 | 1000 | 10 |
- Navigate to the CFN folder, From the AWS command line execute below command to create the cloudformation stack.
aws cloudformation create-stack --stack-name gwfstepfunction --template-body file://aws-etl-stepfunction.json --region us-west-2 --capabilities CAPABILITY_IAM --parameters file://stepfunction-parameters.json
Below is an example folder structure for an S3 datalake
RAW (immutable) • RAW-us-east-1/sourcename/tablename/original/full (full load) partitioned by arrival date as-is • RAW-us-east-1/sourcename/tablename/original/incremental (changes/updates/inserts/deletes) partitioned by arrival date as-is incoming format • FORMAT-us-east-1/sourcename/tablename/masked/full (w/sensitive data masked, if any) partitioned by arrival date as-is incoming format • FORMAT-us-east-1/sourcename/tablename/masked/incremental (w/sensitive data masked, if any) partitioned by arrival date as-is
FORMAT (mutable) • FORMAT-us-east-1/sourcename/tablename/original/full (w/ original data) partitioned by • xx-FORMAT-us-east-1/sourcename/tablename/original/incremental (w/ original data. • xx-FORMAT-us-east-1/sourcename/tablename/masked/full (w/ sensitive data masked, if any. • xx-FORMAT-us-east-1/sourcename/tablename/masked/incremental (w/ sensitive data masked, if any.
####At the end of this part we would have created the following:
- An EMR Cluster
- Two DynamoDB Tables (Config and History)
- AWS Step Function State machine
- Eight Lambda Functions
- AWS Events ScheduledRule
- A Cloudformation Lambda function Custom Resource
- SSM Parameters
Confirm that data has been added to the output directory that you specified in your config.txt.
Now it is time to tear down the Cloudformation stacks and delete the dynamodb tables.