From 35edb9c999674a1cf02331fe766b37fe141c7bd5 Mon Sep 17 00:00:00 2001 From: Chris Kalafarski Date: Tue, 6 Jan 2026 12:42:14 -0500 Subject: [PATCH 1/2] Use waitForTaskToken for Fargate tasks --- src/containers/ftp/Gemfile | 1 + src/containers/ftp/ftp.rb | 64 ++-- src/containers/transcode/Gemfile | 1 + src/containers/transcode/transcode.rb | 330 ++++++++++-------- src/lambdas/ftp-copy-task-output/index.js | 68 ---- src/lambdas/ftp-copy-task-output/package.json | 5 - src/lambdas/transcode-task-output/index.js | 54 --- .../transcode-task-output/package.json | 5 - state-machine.asl.yml | 73 +--- template.yml | 147 +------- 10 files changed, 236 insertions(+), 512 deletions(-) delete mode 100644 src/lambdas/ftp-copy-task-output/index.js delete mode 100644 src/lambdas/ftp-copy-task-output/package.json delete mode 100644 src/lambdas/transcode-task-output/index.js delete mode 100644 src/lambdas/transcode-task-output/package.json diff --git a/src/containers/ftp/Gemfile b/src/containers/ftp/Gemfile index 4ce38765..48c90323 100644 --- a/src/containers/ftp/Gemfile +++ b/src/containers/ftp/Gemfile @@ -7,6 +7,7 @@ gem "aws-sdk-cloudwatch", "~> 1" gem "aws-sdk-ec2", "~> 1" gem "aws-sdk-ecs", "~> 1" gem "aws-sdk-s3", "~> 1" +gem "aws-sdk-states", "~> 1" gem "net-ftp" gem "net-sftp" diff --git a/src/containers/ftp/ftp.rb b/src/containers/ftp/ftp.rb index 4a39a799..f087e359 100755 --- a/src/containers/ftp/ftp.rb +++ b/src/containers/ftp/ftp.rb @@ -12,6 +12,8 @@ # STATE_MACHINE_ARTIFACT_BUCKET_NAME # STATE_MACHINE_ARTIFACT_OBJECT_KEY # STATE_MACHINE_TASK_JSON +# STATE_MACHINE_TASK_TOKEN +# STATE_MACHINE_TASK_TYPE # Set elsewhere # FTP_LISTEN_PORT # PUBLIC_IP @@ -22,6 +24,7 @@ require "rubygems" require "bundler/setup" require "aws-sdk-cloudwatch" +require "aws-sdk-states" require "net/sftp" require "net/ftp" require "logger" @@ -42,6 +45,10 @@ })) begin + task_result = { + Task: ENV["STATE_MACHINE_TASK_TYPE"] + } + # Count the transfers in CloudWatch Metrics recorder = Recorder.new( @@ -85,41 +92,23 @@ } used_mode = ftp_files.upload_file(uri, file, ftp_options) - if used_mode - logger.debug(JSON.dump({ - msg: "Copying state machine results file", - bucket_name: bucket, - object_key: RESULT_KEY - })) - s3.put_object( - bucket: bucket, - key: RESULT_KEY, - body: JSON.dump({ - # All properties listed here will be included in the task result for - # this task. - Mode: used_mode - }) - ) - end + task_result["Mode"] = used_mode if used_mode elsif uri.scheme == "sftp" sftp_files = SftpFiles.new(logger, recorder) sftp_files.upload_file(uri, file, md5: md5, timeout: timeout) - - logger.debug(JSON.dump({ - msg: "Copying state machine results file", - bucket_name: bucket, - object_key: RESULT_KEY - })) - s3.put_object( - bucket: bucket, - key: RESULT_KEY, - body: JSON.dump({ - # All properties listed here will be included in the task result for - # this task. - # Foo: "bar" - }) - ) end + + task_result["URL"] = task["URL"] + + now = Time.now + task_result["Time"] = now.getutc.iso8601 + task_result["Timestamp"] = now.to_i + + puts JSON.dump({msg: "Task output", output: task_result}) + sf.send_task_success({ + task_token: ENV["STATE_MACHINE_TASK_TOKEN"], + output: task_result.to_json + }) rescue => e puts e.class.name puts e.message @@ -130,14 +119,11 @@ bucket_name: bucket, object_key: RESULT_KEY })) - s3.put_object( - bucket: bucket, - key: RESULT_KEY, - body: JSON.dump({ - Error: e.class.name, - ErrorMessage: e.message - }) - ) + sf.send_task_failure({ + task_token: ENV["STATE_MACHINE_TASK_TOKEN"], + error: e.class.name, + cause: e.message + }) end # Count the transfers in CloudWatch Metrics diff --git a/src/containers/transcode/Gemfile b/src/containers/transcode/Gemfile index fd256491..192bc011 100644 --- a/src/containers/transcode/Gemfile +++ b/src/containers/transcode/Gemfile @@ -4,5 +4,6 @@ source "https://rubygems.org" gem "aws-sdk-cloudwatch", "~> 1" gem "aws-sdk-s3", "~> 1" +gem "aws-sdk-states", "~> 1" gem "aws-sdk-sts", "~> 1" gem "nokogiri" diff --git a/src/containers/transcode/transcode.rb b/src/containers/transcode/transcode.rb index 863d7921..b81920ff 100644 --- a/src/containers/transcode/transcode.rb +++ b/src/containers/transcode/transcode.rb @@ -21,6 +21,8 @@ # STATE_MACHINE_FFMPEG_OUTPUT_FILE_OPTIONS # STATE_MACHINE_TASK_JSON # STATE_MACHINE_ARTIFACT_JSON +# STATE_MACHINE_TASK_TOKEN +# STATE_MACHINE_TASK_TYPE # Raising an error will cause the Fargate container to exit, resulting in a # States.TaskFailed error in the Step Function state. These will follow the @@ -28,9 +30,12 @@ require "aws-sdk-cloudwatch" require "aws-sdk-s3" +require "aws-sdk-states" require "aws-sdk-sts" require "json" +require "time" +require "open3" class String def underscore @@ -42,166 +47,191 @@ def underscore end end -artifact = JSON.parse(ENV["STATE_MACHINE_ARTIFACT_JSON"]) - -cloudwatch = Aws::CloudWatch::Client.new -get_artifact_s3tm = Aws::S3::TransferManager.new - -start_time = Time.now.to_i - -# Count the transcode in CloudWatch Metrics -cloudwatch.put_metric_data({ - namespace: "PRX/Porter", - metric_data: [ - { - metric_name: "Transcodes", - dimensions: [ - { - name: "StateMachineName", - value: ENV["STATE_MACHINE_NAME"] - } - ], - value: 1, - unit: "Count" - } - ] -}) - -# Get the artifact file from S3 -puts "Downloading artifact" -get_artifact_s3tm.download_file("artifact.file", bucket: ENV["STATE_MACHINE_ARTIFACT_BUCKET_NAME"], key: ENV["STATE_MACHINE_ARTIFACT_OBJECT_KEY"]) - -if ENV["STATE_MACHINE_DESTINATION_FORMAT"] == "INHERIT" && !artifact.dig("Descriptor", "Extension") - raise StandardError, "Output format could not be inheritted" -end - -# Execute the transcode -global_opts = ENV["STATE_MACHINE_FFMPEG_GLOBAL_OPTIONS"] -input_opts = ENV["STATE_MACHINE_FFMPEG_INPUT_FILE_OPTIONS"] -output_opts = ENV["STATE_MACHINE_FFMPEG_OUTPUT_FILE_OPTIONS"] -output_format = (ENV["STATE_MACHINE_DESTINATION_FORMAT"] == "INHERIT") ? artifact["Descriptor"]["Extension"] : ENV["STATE_MACHINE_DESTINATION_FORMAT"] -ffmpeg_cmd = [ - "./ffmpeg-bin/ffmpeg", - global_opts, - "#{input_opts} -i artifact.file", - "#{output_opts} -f #{output_format} output.file" -].join(" ") - -puts "Calling FFmpeg" -puts ffmpeg_cmd - -raise StandardError, "FFmpeg failed" unless system ffmpeg_cmd - -end_time = Time.now.to_i -duration = end_time - start_time - -# Probe the output of the transcode -ffprobe_cmd = [ - "./ffmpeg-bin/ffprobe", - "-v error", - "-show_streams", - "-show_format", - "-i output.file", - "-print_format json", - "> ffprobe.json" -].join(" ") - -raise StandardError, "FFmpeg probe failed" unless system ffprobe_cmd - -# Write the probe output to S3 -puts "Writing probe output to S3 artifact bucket" - -put_probe_s3_client = Aws::S3::Client.new(region: ENV["STATE_MACHINE_AWS_REGION"]) -put_probe_s3tm = Aws::S3::TransferManager.new(client: put_probe_s3_client) -bucket_name = ENV["STATE_MACHINE_ARTIFACT_BUCKET_NAME"] -object_key = "#{ENV["STATE_MACHINE_EXECUTION_ID"]}/transcode/ffprobe-#{ENV["STATE_MACHINE_TASK_INDEX"]}.json" -put_probe_s3tm.upload_file("ffprobe.json", bucket: bucket_name, key: object_key) - -# Record transcode duration in CloudWatch Metrics -cloudwatch.put_metric_data({ - namespace: "PRX/Porter", - metric_data: [ - { - metric_name: "TranscodeDuration", - dimensions: [ - { - name: "StateMachineName", - value: ENV["STATE_MACHINE_NAME"] - } - ], - value: duration, - unit: "Seconds" - } - ] -}) - -destination = JSON.parse(ENV["STATE_MACHINE_DESTINATION_JSON"]) - -if destination["Mode"] == "AWS/S3" - region = ENV["STATE_MACHINE_AWS_REGION"] - - sts = Aws::STS::Client.new(endpoint: "https://sts.#{region}.amazonaws.com") - - # Assume a role that will have access to the S3 destination bucket, and use - # that role's credentials for the S3 upload - role = sts.assume_role({ - role_arn: ENV["STATE_MACHINE_S3_DESTINATION_WRITER_ROLE"], - role_session_name: "porter_transcode_task" +begin + artifact = JSON.parse(ENV["STATE_MACHINE_ARTIFACT_JSON"]) + + sf = Aws::States::Client.new + cloudwatch = Aws::CloudWatch::Client.new + get_artifact_s3tm = Aws::S3::TransferManager.new + + task_result = { + Task: ENV["STATE_MACHINE_TASK_TYPE"] + } + + start_time = Time.now.to_i + + # Count the transcode in CloudWatch Metrics + cloudwatch.put_metric_data({ + namespace: "PRX/Porter", + metric_data: [ + { + metric_name: "Transcodes", + dimensions: [ + { + name: "StateMachineName", + value: ENV["STATE_MACHINE_NAME"] + } + ], + value: 1, + unit: "Count" + } + ] }) - credentials = Aws::Credentials.new( - role.credentials.access_key_id, - role.credentials.secret_access_key, - role.credentials.session_token - ) - - # The Ruby AWS SDK does not intelligently handle cases where the client isn't - # explicitly set for the region where the bucket exists. We have to detect - # the region using HeadBucket, and then create the client with the returned - # region. - # TODO This isn't necessary when the bucket and the client are in the same - # region. It would be possible to catch the error and do the lookup only when - # necessary. - - # Create a client with permission to HeadBucket - begin - s3_writer = Aws::S3::Client.new(credentials: credentials, endpoint: "https://s3.amazonaws.com") - bucket_head = s3_writer.head_bucket({bucket: ENV["STATE_MACHINE_DESTINATION_BUCKET_NAME"]}) - bucket_region = bucket_head.context.http_response.headers["x-amz-bucket-region"] - rescue Aws::S3::Errors::Http301Error, Aws::S3::Errors::PermanentRedirect => e - bucket_region = e.context.http_response.headers["x-amz-bucket-region"] + # Get the artifact file from S3 + puts "Downloading artifact" + get_artifact_s3tm.download_file("artifact.file", bucket: ENV["STATE_MACHINE_ARTIFACT_BUCKET_NAME"], key: ENV["STATE_MACHINE_ARTIFACT_OBJECT_KEY"]) + + if ENV["STATE_MACHINE_DESTINATION_FORMAT"] == "INHERIT" && !artifact.dig("Descriptor", "Extension") + raise StandardError, "Output format could not be inheritted" end - puts "Destination bucket in region: #{bucket_region}" + # Execute the transcode + global_opts = ENV["STATE_MACHINE_FFMPEG_GLOBAL_OPTIONS"] + input_opts = ENV["STATE_MACHINE_FFMPEG_INPUT_FILE_OPTIONS"] + output_opts = ENV["STATE_MACHINE_FFMPEG_OUTPUT_FILE_OPTIONS"] + output_format = (ENV["STATE_MACHINE_DESTINATION_FORMAT"] == "INHERIT") ? artifact["Descriptor"]["Extension"] : ENV["STATE_MACHINE_DESTINATION_FORMAT"] + ffmpeg_cmd = [ + "./ffmpeg-bin/ffmpeg", + global_opts, + "#{input_opts} -i artifact.file", + "#{output_opts} -f #{output_format} output.file" + ].join(" ") + + puts "Calling FFmpeg" + puts ffmpeg_cmd + + raise StandardError, "FFmpeg failed" unless system ffmpeg_cmd + + end_time = Time.now.to_i + duration = end_time - start_time + + # Probe the output of the transcode + ffprobe_cmd = [ + "./ffmpeg-bin/ffprobe", + "-v error", + "-show_streams", + "-show_format", + "-i output.file", + "-print_format json", + "> ffprobe.json" + ].join(" ") + + stdout, _stderr, status = Open3.capture3(ffprobe_cmd) + raise StandardError, "FFmpeg probe failed" unless status.success? + + # Add the probe results for this output to the task result + probe_results = JSON.parse(stdout) + task_result[:Duration] = probe_results["format"]["duration"].to_f * 1000 + task_result[:Size] = probe_results["format"]["size"].to_f + + # Record transcode duration in CloudWatch Metrics + cloudwatch.put_metric_data({ + namespace: "PRX/Porter", + metric_data: [ + { + metric_name: "TranscodeDuration", + dimensions: [ + { + name: "StateMachineName", + value: ENV["STATE_MACHINE_NAME"] + } + ], + value: duration, + unit: "Seconds" + } + ] + }) - # Create a new client with the permissions and the correct region - s3_writer = Aws::S3::Client.new(credentials: credentials, region: bucket_region) + destination = JSON.parse(ENV["STATE_MACHINE_DESTINATION_JSON"]) + + if destination["Mode"] == "AWS/S3" + region = ENV["STATE_MACHINE_AWS_REGION"] + + # Add S3 destination details to the task result + task_result[:BucketName] = ENV["STATE_MACHINE_DESTINATION_BUCKET_NAME"] + task_result[:ObjectKey] = ENV["STATE_MACHINE_DESTINATION_OBJECT_KEY"] + + sts = Aws::STS::Client.new(endpoint: "https://sts.#{region}.amazonaws.com") + + # Assume a role that will have access to the S3 destination bucket, and use + # that role's credentials for the S3 upload + role = sts.assume_role({ + role_arn: ENV["STATE_MACHINE_S3_DESTINATION_WRITER_ROLE"], + role_session_name: "porter_transcode_task" + }) + + credentials = Aws::Credentials.new( + role.credentials.access_key_id, + role.credentials.secret_access_key, + role.credentials.session_token + ) + + # The Ruby AWS SDK does not intelligently handle cases where the client isn't + # explicitly set for the region where the bucket exists. We have to detect + # the region using HeadBucket, and then create the client with the returned + # region. + # TODO This isn't necessary when the bucket and the client are in the same + # region. It would be possible to catch the error and do the lookup only when + # necessary. + + # Create a client with permission to HeadBucket + begin + s3_writer = Aws::S3::Client.new(credentials: credentials, endpoint: "https://s3.amazonaws.com") + bucket_head = s3_writer.head_bucket({bucket: ENV["STATE_MACHINE_DESTINATION_BUCKET_NAME"]}) + bucket_region = bucket_head.context.http_response.headers["x-amz-bucket-region"] + rescue Aws::S3::Errors::Http301Error, Aws::S3::Errors::PermanentRedirect => e + bucket_region = e.context.http_response.headers["x-amz-bucket-region"] + end - put_object_params = {} + puts "Destination bucket in region: #{bucket_region}" - # When the optional `ContentType` property is set to `REPLACE`, if a MIME is - # included with the artifact, that should be used as the new file's - # content type - if destination["ContentType"] == "REPLACE" && artifact.dig("Descriptor", "MIME") - put_object_params[:content_type] = artifact["Descriptor"]["MIME"] - end + # Create a new client with the permissions and the correct region + s3_writer = Aws::S3::Client.new(credentials: credentials, region: bucket_region) + + put_object_params = {} - # For historical reasons, the available parameters match ALLOWED_UPLOAD_ARGS - # from Boto3's S3Transfer class. - # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html - # If any parameters are included on the destination config, they are - # reformatted to snake case, and added to the put_object params as symbols. - if destination.key?("Parameters") - destination["Parameters"].each do |k, v| - put_object_params[k.underscore.to_sym] = v + # When the optional `ContentType` property is set to `REPLACE`, if a MIME is + # included with the artifact, that should be used as the new file's + # content type + if destination["ContentType"] == "REPLACE" && artifact.dig("Descriptor", "MIME") + put_object_params[:content_type] = artifact["Descriptor"]["MIME"] end + + # For historical reasons, the available parameters match ALLOWED_UPLOAD_ARGS + # from Boto3's S3Transfer class. + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html + # If any parameters are included on the destination config, they are + # reformatted to snake case, and added to the put_object params as symbols. + if destination.key?("Parameters") + destination["Parameters"].each do |k, v| + put_object_params[k.underscore.to_sym] = v + end + end + + put_object_params[:bucket] = ENV["STATE_MACHINE_DESTINATION_BUCKET_NAME"] + put_object_params[:key] = ENV["STATE_MACHINE_DESTINATION_OBJECT_KEY"] + + # Upload the encoded file to the S3 + puts "Writing output to S3 destination" + put_ouput_s3tm = Aws::S3::TransferManager.new(client: s3_writer) + put_ouput_s3tm.upload_file("output.file", **put_object_params) end - put_object_params[:bucket] = ENV["STATE_MACHINE_DESTINATION_BUCKET_NAME"] - put_object_params[:key] = ENV["STATE_MACHINE_DESTINATION_OBJECT_KEY"] + now = Time.now + task_result["Time"] = now.getutc.iso8601 + task_result["Timestamp"] = now.to_i - # Upload the encoded file to the S3 - puts "Writing output to S3 destination" - put_ouput_s3tm = Aws::S3::TransferManager.new(client: s3_writer) - put_ouput_s3tm.upload_file("output.file", **put_object_params) + puts JSON.dump({msg: "Task output", output: task_result}) + sf.send_task_success({ + task_token: ENV["STATE_MACHINE_TASK_TOKEN"], + output: task_result.to_json + }) +rescue => e + puts JSON.dump({msg: "Task failed!", error: e.class.name, cause: e.message}) + sf.send_task_failure({ + task_token: ENV["STATE_MACHINE_TASK_TOKEN"], + error: e.class.name, + cause: e.message + }) end diff --git a/src/lambdas/ftp-copy-task-output/index.js b/src/lambdas/ftp-copy-task-output/index.js deleted file mode 100644 index f770a2de..00000000 --- a/src/lambdas/ftp-copy-task-output/index.js +++ /dev/null @@ -1,68 +0,0 @@ -/* eslint-disable max-classes-per-file */ -// Because the result of a Fargate task is not sufficient for sending a proper -// callback, this function takes the entire task input and builds a better -// result that gets passed to the callback task. The Fargate tasks should -// always write a JSON file with information about the execution to S3. If it -// includes an Error property, the FTP execution was unsuccessful, and this -// Lambda should throw an error, to proxy the failure up to the state machine. -// If the FTP operation was successful, there won't be an Error property, but -// could be other properties like Mode, etc. All such properties will be -// included in the task result. -import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; - -const s3 = new S3Client({ - apiVersion: "2006-03-01", - followRegionRedirects: true, -}); - -class MissingFtpResultsError extends Error { - constructor(...params) { - super(...params); - this.name = "MissingFtpResultsError"; - } -} - -class FtpOperationError extends Error { - constructor(...params) { - super(...params); - this.name = "FtpOperationError"; - } -} - -export const handler = async (event) => { - console.log(JSON.stringify({ msg: "State input", input: event })); - - const file = await s3.send( - new GetObjectCommand({ - Bucket: process.env.ARTIFACT_BUCKET_NAME, - Key: `${event.Execution.Id}/copy/ftp-result-${event.TaskIteratorIndex}.json`, - }), - ); - const json = await file.Body.transformToString(); - const ftpResult = JSON.parse(json); - - if (!ftpResult) { - throw new MissingFtpResultsError("No Fargate results file found"); - } else if (ftpResult.Error) { - // If the Fargate experienced an issue, reraise it here so it's visible - // within the state machine - // TODO Possible to throw the actual error class? - throw new FtpOperationError( - `${ftpResult.Error}: ${ftpResult.ErrorMessage}`, - ); - } else { - const now = new Date(); - - const result = { - Task: event.Task.Type, - URL: event.Task.URL, - ...ftpResult, - Time: now.toISOString(), - Timestamp: +now / 1000, - }; - - console.log(JSON.stringify({ msg: "Result", result })); - - return result; - } -}; diff --git a/src/lambdas/ftp-copy-task-output/package.json b/src/lambdas/ftp-copy-task-output/package.json deleted file mode 100644 index a4feaebf..00000000 --- a/src/lambdas/ftp-copy-task-output/package.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "name": "ftp-copy-task-output", - "version": "1.0.0", - "type": "module" -} diff --git a/src/lambdas/transcode-task-output/index.js b/src/lambdas/transcode-task-output/index.js deleted file mode 100644 index 0f2ace62..00000000 --- a/src/lambdas/transcode-task-output/index.js +++ /dev/null @@ -1,54 +0,0 @@ -// Because the result of a Fargate task is not sufficient for sending a proper -// callback, this function takes the entire task input and builds a better -// result that gets passed to the callback task. -import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3"; -import { NodeHttpHandler } from "@smithy/node-http-handler"; -import { ConfiguredRetryStrategy } from "@smithy/util-retry"; - -const retryStrategy = new ConfiguredRetryStrategy( - 5, // Max attempts - (attempt) => 100 + attempt * 500, -); - -const requestHandler = new NodeHttpHandler({ - connectionTimeout: 800, - requestTimeout: 2000, - socketTimeout: 500, -}); - -const s3 = new S3Client({ - apiVersion: "2006-03-01", - followRegionRedirects: true, - retryStrategy, - requestHandler, -}); - -export const handler = async (event) => { - console.log(JSON.stringify({ msg: "State input", input: event })); - - // Get ffprobe results - const file = await s3.send( - new GetObjectCommand({ - Bucket: process.env.ARTIFACT_BUCKET_NAME, - Key: `${event.Execution.Id}/transcode/ffprobe-${event.TaskIteratorIndex}.json`, - }), - ); - const json = await file.Body.transformToString(); - const ffprobe = JSON.parse(json); - - const now = new Date(); - - const result = { - Task: event.Task.Type, - BucketName: event.Task.Destination.BucketName, - ObjectKey: event.Task.Destination.ObjectKey, - Duration: +ffprobe.format.duration * 1000, - Size: +ffprobe.format.size, - Time: now.toISOString(), - Timestamp: +now / 1000, - }; - - console.log(JSON.stringify({ msg: "Result", result })); - - return result; -}; diff --git a/src/lambdas/transcode-task-output/package.json b/src/lambdas/transcode-task-output/package.json deleted file mode 100644 index 14bf97d4..00000000 --- a/src/lambdas/transcode-task-output/package.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "name": "transcode-task-output", - "version": "1.0.0", - "type": "module" -} diff --git a/state-machine.asl.yml b/state-machine.asl.yml index b43a0cde..d1237923 100644 --- a/state-machine.asl.yml +++ b/state-machine.asl.yml @@ -305,7 +305,7 @@ States: FTP Copy Task Fargate Execution: Type: Task Comment: Sends artifact to an FTP destination - Resource: arn:aws:states:::ecs:runTask.sync + Resource: arn:aws:states:::ecs:runTask.waitForTaskToken InputPath: "$" Parameters: Cluster: "${EcsClusterArn}" @@ -331,6 +331,10 @@ States: Value.$: "$.Artifact.ObjectKey" - Name: STATE_MACHINE_TASK_JSON Value.$: States.JsonToString($.Task) + - Name: STATE_MACHINE_TASK_TOKEN + Value.$: "$$.Task.Token" + - Name: STATE_MACHINE_TASK_TYPE + Value.$: "$.Task.Type" Name: "${FtpCopyContainerName}" NetworkConfiguration: AwsvpcConfiguration: @@ -342,42 +346,13 @@ States: - "${VpcPublicSubnet2}" PropagateTags: TASK_DEFINITION TaskDefinition: "${FtpCopyEcsTaskDefinitionArn}" - ResultPath: "$.Void" - OutputPath: "$" - Next: FTP Copy Task Results Formatter - Retry: - - ErrorEquals: - - States.ALL - IntervalSeconds: 10 - MaxAttempts: 3 - BackoffRate: 2 - Catch: - - ErrorEquals: - - States.ALL - ResultPath: "$.Error" - Next: TaskResult Error Callback Map - FTP Copy Task Results Formatter: - Type: Task - Comment: Formats the output of an FTP copy task - Resource: "${FtpCopyTaskOutputLambdaFunctionArn}" - InputPath: "$" - Parameters: - Job: - Id.$: "$.Job.Id" - Execution: - Id.$: "$$.Execution.Id" - Task.$: "$.Task" - TaskIteratorIndex.$: "$.TaskIndex" ResultPath: "$.TaskResult" OutputPath: "$" Next: TaskResult Callbacks Map Retry: - - ErrorEquals: - - FtpOperationError - MaxAttempts: 0 - ErrorEquals: - States.ALL - IntervalSeconds: 5 + IntervalSeconds: 10 MaxAttempts: 3 BackoffRate: 2 Catch: @@ -388,7 +363,7 @@ States: Transcode Task Fargate Execution: Type: Task Comment: Transcodes multimedia artifact - Resource: arn:aws:states:::ecs:runTask.sync + Resource: arn:aws:states:::ecs:runTask.waitForTaskToken InputPath: "$" Parameters: Cluster: "${EcsClusterArn}" @@ -438,6 +413,10 @@ States: Value.$: States.JsonToString($.Task) - Name: STATE_MACHINE_ARTIFACT_JSON Value.$: States.JsonToString($.Artifact) + - Name: STATE_MACHINE_TASK_TOKEN + Value.$: "$$.Task.Token" + - Name: STATE_MACHINE_TASK_TYPE + Value.$: "$.Task.Type" Name: "${TranscodeContainerName}" NetworkConfiguration: AwsvpcConfiguration: @@ -447,40 +426,14 @@ States: - "${VpcPublicSubnet2}" PropagateTags: TASK_DEFINITION TaskDefinition: "${TranscodeEcsTaskDefinitionArn}" - ResultPath: "$.Void" - OutputPath: "$" - Next: Transcode Task Results Formatter - Retry: - - ErrorEquals: - - States.ALL - IntervalSeconds: 15 - MaxAttempts: 5 - BackoffRate: 2 - Catch: - - ErrorEquals: - - States.ALL - ResultPath: "$.Error" - Next: TaskResult Error Callback Map - Transcode Task Results Formatter: - Type: Task - Comment: Formats the output of a transcode task - Resource: "${TranscodeTaskOutputLambdaFunctionArn}" - InputPath: "$" - Parameters: - Job: - Id.$: "$.Job.Id" - Execution: - Id.$: "$$.Execution.Id" - Task.$: "$.Task" - TaskIteratorIndex.$: "$.TaskIndex" ResultPath: "$.TaskResult" OutputPath: "$" Next: TaskResult Callbacks Map Retry: - ErrorEquals: - States.ALL - IntervalSeconds: 5 - MaxAttempts: 3 + IntervalSeconds: 15 + MaxAttempts: 5 BackoffRate: 2 Catch: - ErrorEquals: diff --git a/template.yml b/template.yml index a94a9827..4af3b1ff 100644 --- a/template.yml +++ b/template.yml @@ -1097,6 +1097,14 @@ Resources: Effect: Allow Resource: "*" Version: "2012-10-17" + # Allow it to send task results + - PolicyName: SendTaskStatus + PolicyDocument: + Statement: + - Action: states:SendTask* + Effect: Allow + Resource: "*" + Version: "2012-10-17" ManagedPolicyArns: # Transcode operations are always pulling FROM the artifact bucket - !Ref ArtifactBucketReadOnlyAccessPolicy @@ -1217,6 +1225,14 @@ Resources: Effect: Allow Resource: "*" Version: "2012-10-17" + # Allow it to send task results + - PolicyName: SendTaskStatus + PolicyDocument: + Statement: + - Action: states:SendTask* + Effect: Allow + Resource: "*" + Version: "2012-10-17" ManagedPolicyArns: - !Ref ArtifactBucketReadOnlyAccessPolicy - !Ref ArtifactBucketWriteAccessPolicy @@ -1705,133 +1721,6 @@ Resources: - { Key: prx:cloudformation:stack-id, Value: !Ref AWS::StackId } - { Key: prx:ops:environment, Value: !Ref EnvironmentType } - { Key: prx:dev:application, Value: Porter } - # FTP Copy Output Lambda - FtpCopyTaskOutputLambdaFunction: - Type: AWS::Serverless::Function - Properties: - Architectures: [arm64] - CodeUri: src/lambdas/ftp-copy-task-output/ - Description: >- - Formats the output of a transcode task result - Environment: - Variables: - ARTIFACT_BUCKET_NAME: !Ref ArtifactBucket - Handler: index.handler - Layers: - - !Ref AwsSdkJsV3LambdaLayer - MemorySize: 128 - Policies: - - !Ref ArtifactBucketReadOnlyAccessPolicy - Runtime: nodejs24.x - Tags: - prx:meta:tagging-version: "2021-04-07" - prx:cloudformation:stack-name: !Ref AWS::StackName - prx:cloudformation:stack-id: !Ref AWS::StackId - prx:ops:environment: !Ref EnvironmentType - prx:dev:application: Porter - Timeout: 4 - FtpCopyTaskOutputLambdaLogGroup: - Type: AWS::Logs::LogGroup - DeletionPolicy: Delete - UpdateReplacePolicy: Delete - Properties: - LogGroupName: !Sub /aws/lambda/${FtpCopyTaskOutputLambdaFunction} - RetentionInDays: 30 - Tags: - - { Key: prx:meta:tagging-version, Value: "2021-04-07" } - - { Key: prx:cloudformation:stack-name, Value: !Ref AWS::StackName } - - { Key: prx:cloudformation:stack-id, Value: !Ref AWS::StackId } - - { Key: prx:ops:environment, Value: !Ref EnvironmentType } - - { Key: prx:dev:application, Value: Porter } - FtpCopyTaskOutputLambdaErrorAlarm: - Type: AWS::CloudWatch::Alarm - Condition: CreateProductionResources - Properties: - AlarmName: !Sub MINOR [Porter] FTP output Lambda <${EnvironmentTypeAbbreviation}> INVOCATION FAILED (${AWS::StackName}) - AlarmDescription: >- - FTP copy output function has failed. This is generally an indication - that an FTP copy task failed. (Fargate FTP copy tasks cannot trigger - alarms themselves, so this Lambda function is used as a proxy.) The - Lambda logs should include the FTP error message. - ComparisonOperator: GreaterThanThreshold - EvaluationPeriods: 1 - MetricName: LambdaFunctionsFailed - Namespace: AWS/States - Period: 60 - Statistic: Sum - Tags: - - { Key: prx:meta:tagging-version, Value: "2021-04-07" } - - { Key: prx:cloudformation:stack-name, Value: !Ref AWS::StackName } - - { Key: prx:cloudformation:stack-id, Value: !Ref AWS::StackId } - - { Key: prx:ops:environment, Value: !Ref EnvironmentType } - - { Key: prx:dev:application, Value: Porter } - Threshold: 0 - TreatMissingData: notBreaching - Dimensions: - - Name: LambdaFunctionArn - Value: !GetAtt FtpCopyTaskOutputLambdaFunction.Arn - # Transcode Output Lambda - TranscodeTaskOutputLambdaFunction: - Type: AWS::Serverless::Function - Properties: - Architectures: [arm64] - CodeUri: src/lambdas/transcode-task-output/ - Description: >- - Formats the output of a transcode task result - Environment: - Variables: - ARTIFACT_BUCKET_NAME: !Ref ArtifactBucket - Handler: index.handler - Layers: - - !Ref AwsSdkJsV3LambdaLayer - MemorySize: 128 - Policies: - - !Ref ArtifactBucketReadOnlyAccessPolicy - Runtime: nodejs24.x - Tags: - prx:meta:tagging-version: "2021-04-07" - prx:cloudformation:stack-name: !Ref AWS::StackName - prx:cloudformation:stack-id: !Ref AWS::StackId - prx:ops:environment: !Ref EnvironmentType - prx:dev:application: Porter - Timeout: 30 - TranscodeTaskOutputLambdaLogGroup: - Type: AWS::Logs::LogGroup - DeletionPolicy: Delete - UpdateReplacePolicy: Delete - Properties: - LogGroupName: !Sub /aws/lambda/${TranscodeTaskOutputLambdaFunction} - RetentionInDays: 30 - Tags: - - { Key: prx:meta:tagging-version, Value: "2021-04-07" } - - { Key: prx:cloudformation:stack-name, Value: !Ref AWS::StackName } - - { Key: prx:cloudformation:stack-id, Value: !Ref AWS::StackId } - - { Key: prx:ops:environment, Value: !Ref EnvironmentType } - - { Key: prx:dev:application, Value: Porter } - TranscodeTaskOutputLambdaErrorAlarm: - Type: AWS::CloudWatch::Alarm - Condition: CreateProductionResources - Properties: - AlarmName: !Sub MINOR [Porter] Transcode output Lambda <${EnvironmentTypeAbbreviation}> INVOCATION FAILED (${AWS::StackName}) - AlarmDescription: >- - Transcode output format function has encountered an invocation error - ComparisonOperator: GreaterThanThreshold - EvaluationPeriods: 1 - MetricName: LambdaFunctionsFailed - Namespace: AWS/States - Period: 60 - Statistic: Sum - Tags: - - { Key: prx:meta:tagging-version, Value: "2021-04-07" } - - { Key: prx:cloudformation:stack-name, Value: !Ref AWS::StackName } - - { Key: prx:cloudformation:stack-id, Value: !Ref AWS::StackId } - - { Key: prx:ops:environment, Value: !Ref EnvironmentType } - - { Key: prx:dev:application, Value: Porter } - Threshold: 0 - TreatMissingData: notBreaching - Dimensions: - - Name: LambdaFunctionArn - Value: !GetAtt TranscodeTaskOutputLambdaFunction.Arn # Image Manipulation Lambda ImageTransformLambdaIamRole: Type: AWS::IAM::Role @@ -2435,10 +2324,8 @@ Resources: - !GetAtt TranscriptionJobStartLambdaFunction.Arn - !GetAtt TranscriptionJobResultsLambdaFunction.Arn - !GetAtt CallbackLambdaFunction.Arn - - !GetAtt TranscodeTaskOutputLambdaFunction.Arn - !GetAtt NormalizeOutputLambdaFunction.Arn - !GetAtt JobSerializerLambdaFunction.Arn - - !GetAtt FtpCopyTaskOutputLambdaFunction.Arn - !GetAtt SilenceDetectionLambdaFunction.Arn - !GetAtt ToneDetectionLambdaFunction.Arn - !GetAtt WaveformLambdaFunction.Arn @@ -2523,10 +2410,8 @@ Resources: SourceTypeLambdaFunctionArn: !GetAtt SourceTypeLambdaFunction.Arn FtpCopyContainerName: !Sub ${AWS::StackName}-ftp-copy-container FtpCopyEcsTaskDefinitionArn: !Ref FtpCopyEcsTaskDefinition - FtpCopyTaskOutputLambdaFunctionArn: !GetAtt FtpCopyTaskOutputLambdaFunction.Arn TranscodeContainerName: !Sub ${AWS::StackName}-transcode-container TranscodeEcsTaskDefinitionArn: !Ref TranscodeEcsTaskDefinition - TranscodeTaskOutputLambdaFunctionArn: !GetAtt TranscodeTaskOutputLambdaFunction.Arn TranscriptionJobResultsLambdaFunctionArn: !GetAtt TranscriptionJobResultsLambdaFunction.Arn TranscriptionJobStartLambdaFunctionArn: !GetAtt TranscriptionJobStartLambdaFunction.Arn VpcPublicSubnet1: !Ref PublicSubnet1 From 5f408d7d3f9b319ab469ac5710e8af5c547d18ff Mon Sep 17 00:00:00 2001 From: Chris Kalafarski Date: Tue, 13 Jan 2026 11:00:59 -0500 Subject: [PATCH 2/2] Report FFprobe size as integer --- src/containers/transcode/transcode.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/containers/transcode/transcode.rb b/src/containers/transcode/transcode.rb index b81920ff..dfb970e7 100644 --- a/src/containers/transcode/transcode.rb +++ b/src/containers/transcode/transcode.rb @@ -123,7 +123,7 @@ def underscore # Add the probe results for this output to the task result probe_results = JSON.parse(stdout) task_result[:Duration] = probe_results["format"]["duration"].to_f * 1000 - task_result[:Size] = probe_results["format"]["size"].to_f + task_result[:Size] = probe_results["format"]["size"].to_i # Record transcode duration in CloudWatch Metrics cloudwatch.put_metric_data({