Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add live stream input Lambda #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions deployment/build-s3-dist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ cp "$workflows_dir/MieCompleteWorkflow.yaml" "$dist_dir/MieCompleteWorkflow.temp
cp "$source_dir/consumers/elastic/media-insights-elasticsearch.yaml" "$dist_dir/media-insights-elasticsearch.template"
cp "$source_dir/consumers/elastic/media-insights-elasticsearch.yaml" "$dist_dir/media-insights-s3.template"
cp "$webapp_dir/media-insights-webapp.yaml" "$dist_dir/media-insights-webapp.template"
cp "$source_dir/livestream/media-insights-livestream.yaml" "$dist_dir/media-insights-livestream.template"
find "$dist_dir"
echo "Updating code source bucket in template files with '$bucket'"
echo "Updating solution version in template files with '$version'"
Expand All @@ -218,6 +219,9 @@ sed -i.orig -e "$new_bucket" "$dist_dir/media-insights-s3.template"
sed -i.orig -e "$new_version" "$dist_dir/media-insights-s3.template"
sed -i.orig -e "$new_bucket" "$dist_dir/media-insights-webapp.template"
sed -i.orig -e "$new_version" "$dist_dir/media-insights-webapp.template"
sed -i.orig -e "$new_bucket" "$dist_dir/media-insights-livestream.template"
sed -i.orig -e "$new_version" "$dist_dir/media-insights-livestream.template"


echo "------------------------------------------------------------------------------"
echo "Operators"
Expand Down Expand Up @@ -691,6 +695,41 @@ if [ $? -ne 0 ]; then
fi
rm -f ./dist

echo "------------------------------------------------------------------------------"
echo "Live Stream Input Lambda Function"
echo "------------------------------------------------------------------------------"

echo "Building Live Stream Input Lambda function"
cd "$source_dir/livestream" || exit 1

[ -e dist ] && rm -r dist
mkdir -p dist
[ -e package ] && rm -r package
mkdir -p package
echo "preparing packages from requirements.txt"
# Package dependencies listed in requirements.txt
pushd package || exit 1
# Handle distutils install errors with setup.cfg
touch ./setup.cfg
echo "[install]" > ./setup.cfg
echo "prefix= " >> ./setup.cfg
# Try and handle failure if pip version mismatch
if [ -x "$(command -v pip)" ]; then
pip install --quiet -r ../requirements.txt --target .
elif [ -x "$(command -v pip3)" ]; then
echo "pip not found, trying with pip3"
pip3 install --quiet -r ../requirements.txt --target .
elif ! [ -x "$(command -v pip)" ] && ! [ -x "$(command -v pip3)" ]; then
echo "No version of pip installed. This script requires pip. Cleaning up and exiting."
exit 1
fi
zip -q -r9 ../dist/livestreaminput.zip .
popd || exit 1

zip -q -g dist/livestreaminput.zip ./*.py
cp "./dist/livestreaminput.zip" "$dist_dir/livestreaminput.zip"
rm -f ./dist ./package

echo "------------------------------------------------------------------------------"
echo "Demo website stack"
echo "------------------------------------------------------------------------------"
Expand Down
34 changes: 34 additions & 0 deletions deployment/media-insights-stack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ AWSTemplateFormatVersion: "2010-09-09"
Description: Media Insights Engine - Base AWS CloudFormation template that provisions the core Media Insights Engine services and provides parameters for enabling additional functionality.

Parameters:
LiveStreamBucket:
Description: Live Stream Video Segment S3 Bucket
Type: String
Default: mie-test-input

AdminEmail:
Description: Email address for the MIE Administrator
Type: String
Expand Down Expand Up @@ -1132,6 +1137,7 @@ Resources:
Parameters:
DataplaneTableName: !Ref DataplaneTable
DataplaneBucketName: !Ref Dataplane
LiveStreamBucketName: !Ref LiveStreamBucket
UserPoolArn: !GetAtt MieUserPool.Arn
DeploymentPackageBucket: !FindInMap ["SourceCode", "General", "S3Bucket"]
DeploymentPackageKey:
Expand Down Expand Up @@ -1463,7 +1469,34 @@ Resources:
IdentityPoolId: !Ref MieIdentityPool
AwsRegion: !Ref "AWS::Region"
PoolClientId: !Ref MieWebAppClient

LiveStreamInputLambda:
DependsOn:
- StringFunctions
- Analytics
- MediaInsightsWorkflowApi
- MediaInsightsDataplaneApiStack
Type: "AWS::CloudFormation::Stack"
Properties:
TemplateURL:
!Join [
"/",
[
"https://s3.amazonaws.com",
!FindInMap ["SourceCode", "General", "S3Bucket"],
!FindInMap ["SourceCode", "General", "TemplateKeyPrefix"],
"media-insights-livestream.template",
],
]
Parameters:
LiveStreamBucket: !Ref LiveStreamBucket #existing live stream s3 bucket
WorkflowEndpoint: !GetAtt MediaInsightsWorkflowApi.Outputs.EndpointURL
UserPoolId: !Ref MieUserPool
PoolClientId: !Ref MieAdminClient
AwsRegion: !Ref "AWS::Region"



# TranscriberWebApp:
# Condition: DeployTranscriberApp
# DependsOn:
Expand Down Expand Up @@ -1544,3 +1577,4 @@ Outputs:
AdminUsername:
Description: Username of the default MIE admin
Value: !Ref AdminEmail

11 changes: 6 additions & 5 deletions source/dataplaneapi/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def create_asset():

# build key for new s3 object

new_key = directory + 'input' + '/' + source_key
new_key = directory + 'input' + '/' + source_key.split('/')[-1]

# Move input media into newly created dataplane s3 directory.

Expand All @@ -351,10 +351,11 @@ def create_asset():
CopySource={'Bucket': source_bucket, 'Key': source_key}
)
# remove input media from upload/
s3_client.delete_object(
Bucket=source_bucket,
Key=source_key
)
if source_bucket == dataplane_s3_bucket:
s3_client.delete_object(
Bucket=source_bucket,
Key=source_key
)
except ClientError as e:
error = e.response['Error']['Message']
logger.error("Exception occurred during asset creation: {e}".format(e=error))
Expand Down
17 changes: 17 additions & 0 deletions source/dataplaneapi/external_resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"Type": "String",
"Description": "Bucket used to store asset media"
},
"LiveStreamBucketName": {
"Type": "String",
"Description": "Bucket contains the live stream segments"
},
"UserPoolArn": {
"Type": "String",
"Description": "Arn of the Mie Cognito user pool"
Expand Down Expand Up @@ -46,6 +50,19 @@
],
"Resource": {"Fn::Sub": "arn:aws:s3:::${DataplaneBucketName}/*"}
},
{
"Effect": "Allow",
"Action": [
"s3:ReplicateObject",
"s3:GetObject",
"s3:RestoreObject",
"s3:GetObjectVersionAcl",
"s3:ListBucket",
"s3:HeadBucket",
"s3:GetObjectVersion"
],
"Resource": {"Fn::Sub": "arn:aws:s3:::${LiveStreamBucketName}/*"}
},
{
"Effect": "Allow",
"Action": "dynamodb:*",
Expand Down
140 changes: 140 additions & 0 deletions source/livestream/lambda_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import boto3
import os
import logging
import sys
import requests
import json
import urllib.parse

MIE_POOL_ID = str(os.environ['UserPoolId'])
MIE_CLIENT_ID = str(os.environ['PoolClientId'])
MIE_USER_NAME = str(os.environ['UserName'])
MIE_USER_PWD = str(os.environ['UserPwd'])
MIE_WORKFLOW_ENDPOINT = str(os.environ['WorkflowEndpoint'])


def lambda_handler(event, context):
s3 = event['Records'][0]['s3']
key = urllib.parse.unquote_plus(s3['object']['key'])
config = workflow_config(s3['bucket']['name'], key)
try:
token = authenticate_and_get_token(MIE_USER_NAME, MIE_USER_PWD, MIE_POOL_ID, MIE_CLIENT_ID)
run_workflow(config, token)
logging.info(f'workflow kicked off for: {key}')
except Exception as e:
logging.error(f'failed to process message {s3}: {e}')
raise e


def run_workflow(config: str, token: str):
try:
resp = requests.post(
url=MIE_WORKFLOW_ENDPOINT + 'workflow/execution',
data=config,
headers={
'Content-Type':'application/json',
'Authorization':token
}
)
resp.raise_for_status()
except Exception as e:
raise e

def authenticate_and_get_token(username: str, password: str,
pool_id: str, client_id: str) -> str:
client = boto3.client('cognito-idp')

resp = client.admin_initiate_auth(
UserPoolId=pool_id,
ClientId=client_id,
AuthFlow='ADMIN_NO_SRP_AUTH',
AuthParameters={
'USERNAME': username,
'PASSWORD': password
}
)
return resp['AuthenticationResult']['IdToken']

def workflow_config(bucket: str, key: str) -> dict:
config = {
'Name': 'MieCompleteWorkflow',
'Configuration': {
'defaultPrelimVideoStage': {
'Thumbnail': {
'ThumbnailPosition': '10',
'Enabled': True
},
'Mediainfo': {
'Enabled': True
}
},
'defaultVideoStage': {
'faceDetection': {
'Enabled': True
},
'technicalCueDetection': {
'Enabled': False
},
'shotDetection': {
'Enabled': False
},
'celebrityRecognition': {
'Enabled': True
},
'labelDetection': {
'Enabled': True
},
'Mediaconvert': {
'Enabled': True
},
'contentModeration': {
'Enabled': True
},
'faceSearch': {
'Enabled': False,
'CollectionId': 'undefined'
},
'textDetection': {
'Enabled': True
},
'GenericDataLookup': {
'Enabled': False,
'Bucket': 'mie-dataplane-1oufs3l5cabvb',
'Key': 'undefined'
}
},
'defaultAudioStage': {
'Transcribe': {
'Enabled': True,
'TranscribeLanguage': 'en-US'
}
},
'defaultTextStage': {
'Translate': {
'Enabled': False,
'SourceLanguageCode': 'en',
'TargetLanguageCode': 'es'
},
'ComprehendEntities': {
'Enabled': True
},
'ComprehendKeyPhrases': {
'Enabled': True
}
},
'defaultTextSynthesisStage': {
'Polly': {
'Enabled': False
}
}
},
'Input': {
'Media': {
'Video': {
'S3Bucket': '{}'.format(bucket),
'S3Key': '{}'.format(key)
}
}
}
}
return json.dumps(config)
Loading