Skip to content

Latest commit

 

History

History

emr-cli

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

Riffl on AWS with EMR

Prerequisites

Environment

Set environment variables with ClusterId (how to provision a cluster), EC2 key pair name and configuration details.

export CLUSTER_ID=j-2XXXXXXXXXXI
export KEY_PAIR_FILE=<SSH key path>.pem

export CONFIG_PATH=example/
export CONFIG_NAME=application-glue-iceberg.yaml
export RIFFL_VERSION=0.3.0
export FLINK_VERSION=1.15.2

Dependencies

Core

# Presto filesystem plugin - checkpointing
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-fs-presto-$FLINK_VERSION.json

# Hadoop filesystem plugin
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-fs-hadoop-$FLINK_VERSION.json

# Riffl
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/riffl-$RIFFL_VERSION-$FLINK_VERSION.json

# Configuration files
# NOTE: Correct S3 bucket must be configured in "application-glue-iceberg.yaml" before uploading files
aws emr put --cluster-id $CLUSTER_ID --src ./$CONFIG_PATH --dest /home/hadoop/ --key-pair-file $KEY_PAIR_FILE

Optional

Connectors

  • Kafka
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-kafka-$FLINK_VERSION.json
  • Kinesis
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-kinesis-$FLINK_VERSION.json
  • Hive

Formats

  • Iceberg on AWS
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-iceberg-$FLINK_VERSION.json
  • Parquet
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-format-parquet-$FLINK_VERSION.json
  • Orc
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-format-orc-$FLINK_VERSION.json
  • Avro
aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./steps/flink-format-orc-$FLINK_VERSION.json

Deploy

aws emr add-steps --cluster-id $CLUSTER_ID --steps '[{"Type": "CUSTOM_JAR", "Name": "Riffl Submit", 
"ActionOnFailure": "CONTINUE", "Jar": "command-runner.jar", 
"Args": ["sudo","-u","flink","flink","run","-m","yarn-cluster", 
"/home/hadoop/riffl-runtime.jar", 
"--application","/home/hadoop/'$CONFIG_PATH$CONFIG_NAME'"]}]'

# SSH
aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file $KEY_PAIR_FILE

# Tunnel
aws emr socks --cluster-id $CLUSTER_ID --key-pair-file $KEY_PAIR_FILE					

Example application

Example application 'example/application-glue-iceberg.yaml' is configured with a data generating source "datagen" and sinking into S3 with metadata stored in Glue in the Apache Iceberg format. Data is produced into two Glue tables with one storing data as-is and another applying in-flight optimization. Queries can be executed either using AWS Athena or Apache Trino.

Provision an EMR cluster with Glue as a metastore

aws emr create-cluster --release-label emr-6.7.0 --name Riffl \
--applications Name=Flink Name=Hive Name=Hadoop Name=Trino Name=Spark \
--configurations file://./example/configurations/glue.json \
--region eu-west-1 \
--log-uri s3://<S3 logs bucket>/elasticmapreduce/ \
--instance-fleets \
InstanceFleetType=MASTER,TargetSpotCapacity=1,InstanceTypeConfigs='{InstanceType=m5d.xlarge}' \
InstanceFleetType=CORE,TargetSpotCapacity=2,InstanceTypeConfigs='{InstanceType=m5d.xlarge}' \
--service-role EMR_DefaultRole \
--ec2-attributes KeyName=<EC2 key pair name>,InstanceProfile=EMR_EC2_DefaultRole \
--steps file://./steps/flink-hadoop-user.json

e.g. output

ClusterArn: arn:aws:elasticmapreduce:eu-west-1:123456789:cluster/j-2XXXXXXXXXXI
ClusterId: j-2XXXXXXXXXXI

Configure dependencies

Copy ClusterId and follow steps above setting up Environment, Core as well as Optional Iceberg on AWS and Parquet dependencies.

Reconfigure Apache Trino to use Glue.

aws emr add-steps --cluster-id $CLUSTER_ID --steps file://./example/steps/trino-iceberg-glue.json

Create tables in Athena

The S3 bucket below needs to be accessible from the EMR cluster.

CREATE DATABASE riffl;

CREATE TABLE riffl.product_optimized (
  id BIGINT,
  type INT,
  name STRING,
  price DECIMAL(10, 2),
  buyer_name STRING,
  buyer_address STRING,
  ts TIMESTAMP,
  dt STRING,
  hr STRING) 
PARTITIONED BY (dt, hr) 
LOCATION 's3://<S3 bucket>/riffl.db/product_optimized' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='PARQUET',
  'write_compression'='zstd'
);

CREATE TABLE riffl.product_default (
  id BIGINT,
  type INT,
  name STRING,
  price DECIMAL(10, 2),
  buyer_name STRING,
  buyer_address STRING,
  ts TIMESTAMP,
  dt STRING,
  hr STRING) 
PARTITIONED BY (dt, hr) 
LOCATION 's3://<S3 bucket>/riffl.db/product_default' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='PARQUET',
  'write_compression'='zstd'
);

Deploy

aws emr add-steps --cluster-id $CLUSTER_ID --steps '[{"Type": "CUSTOM_JAR", "Name": "Riffl Submit", 
"ActionOnFailure": "CONTINUE", "Jar": "command-runner.jar", 
"Args": ["sudo","-u","flink","flink","run","-m","yarn-cluster", 
"/home/hadoop/riffl-runtime.jar", 
"--application","/home/hadoop/'$CONFIG_PATH$CONFIG_NAME'"]}]'

Monitor

Applicaion UI interface is available via YARN resource manager -> Tracking UI, follow EMR web interfaces guide for access.

Run SQL Queries in Apache Trino or Athena

  • Login to the cluster and execute Trino CLI
aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file $KEY_PAIR_FILE

trino-cli
  • Execute queries
use iceberg.riffl;

SELECT 
  avg(price),
  min(ts),
  max(ts)
FROM product_optimized
WHERE type = 1 
  AND dt = '2022-11-09';
  
SELECT 
  avg(price),
  min(ts),
  max(ts)
FROM product_default
WHERE type = 1 
  AND dt = '2022-11-09';