Skip to content

Add ExecuteCallback support to AwsLambdaExecutor#63035

Open
SameerMesiah97 wants to merge 3 commits intoapache:mainfrom
SameerMesiah97:62887-Lambda-Executor-Callback-Support
Open

Add ExecuteCallback support to AwsLambdaExecutor#63035
SameerMesiah97 wants to merge 3 commits intoapache:mainfrom
SameerMesiah97:62887-Lambda-Executor-Callback-Support

Conversation

@SameerMesiah97
Copy link
Contributor

Description

This change adds support for callback workloads (ExecuteCallback) to the AwsLambdaExecutor.

Previously, the executor only supported task workloads (ExecuteTask). This update extends the executor to accept, queue, and execute callback workloads alongside task workloads. The executor now maintains a queued_callbacks structure and updates queue_workload() to register ExecuteCallback workloads using the callback identifier (callback.id) as the workload key.

The workload processing flow has been updated to support callbacks throughout the executor lifecycle. _process_workloads() now handles both ExecuteTask and ExecuteCallback workloads, dispatching them through the same execution pathway. execute_async() has been extended to serialize both workload types and forward them to the Lambda runtime using python -m airflow.sdk.execution_time.execute_workload. Additionally, attempt_task_runs() and related task tracking logic have been updated to support string-based workload identifiers for callbacks while maintaining JSON-serialized TaskInstanceKey identifiers for task workloads.

Rationale

The ExecutorCallback framework introduced in Airflow 3.2 allows executors to execute synchronous callbacks on worker infrastructure. Executors must therefore be able to accept and dispatch ExecuteCallback workloads in addition to task workloads.

AwsLambdaExecutor delegates execution to the Task SDK runtime by invoking airflow.sdk.execution_time.execute_workload inside the Lambda environment. As reflected in the direction of PR #62645, the expectation is that handling of both task and callback execution will ultimately occur within the Task SDK runtime rather than inside individual executors. Forwarding callback workloads to the same runtime entrypoint used for tasks aligns the Lambda executor with this model.

Tests

Added unit tests verifying that:

  • Callback workloads are queued, processed, and dispatched to Lambda using the Task SDK runtime command.
  • Callback workloads correctly propagate queue overrides specified in callback.data["queue"].
  • Callback workloads are correctly adopted using string-based external_executor_id values.

Documentation

Docstrings have been updated in AwsLambdaExecutor to describe support for both task and callback workloads.

Backwards Compatibility

This change preserves existing behavior for task workloads.

Task workloads continue to use JSON-serialized TaskInstanceKey identifiers for executor tracking and scheduler adoption. Callback workloads use their callback identifier string as the workload key.

Related: #62887

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Mar 7, 2026
@SameerMesiah97 SameerMesiah97 force-pushed the 62887-Lambda-Executor-Callback-Support branch 4 times, most recently from 91cc6df to b1496cd Compare March 7, 2026 17:28
@SameerMesiah97
Copy link
Contributor Author

@ferruzzi

Tagging you here as requested for tracking.

@ferruzzi ferruzzi mentioned this pull request Mar 10, 2026
2 tasks
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start, left some comments and suggestions. Thanks for taking this on!

Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution! This looks like a good start 🙂
But it's the bare minimum changes. Please be sure to cleanup your code before submitting a PR. Make sure types are correct, all comments, variables, method names updated. It takes a long time for maintainers to review code so we want to be using that time wisely.

Comment on lines +534 to +539
data = json.loads(ser_task_key)
task_key = TaskInstanceKey.from_dict(data)
except Exception:
# If that task fails to deserialize, we should just skip it.
self.log.exception(
"Task failed to be adopted because the key could not be deserialized"
)
continue
# Callback workloads use string keys.
task_key = ser_task_key

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferruzzi can callbacks be adopted? If not, then we don't need these changes here, or if anything, only to discard callbacks (we probably don't want to log for each one).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have kept the functional aspects of try_adopt_task_instances the same as in my first attempt for now. Once @ferruzzi confirms whether or not this needs to handle callbacks (even if it is just logging exceptions), I will adjust this accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. I don't believe they support adoption yet, but that might/should/could be added in the future (not this PR, obviously).

Copy link
Contributor Author

@SameerMesiah97 SameerMesiah97 Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferruzzi So you are okay with keeping it as is? Or revert it to the state before the implementation with exception logging removed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think revert? @o-nikolas ?

@@ -411,7 +467,7 @@ def process_queue(self, queue_url: str):
task_key = self.running_tasks[ser_task_key]
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't all these mentions of task(s) in variables and comments be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where appropriate I have changed 'task' to 'workload' in variable names, method signatures, comments, docstrings and log messsages. However, some variables I thought it best to leave them as is because it would be misleading to change them. For example, ser_task_key is derived from the value whose key name is task_key in the JSON payload, which constitutes the SQS message. It would not make sense to change ser_task_key to ser_workload_key just for the sake of consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting yeah. I suppose that key should really be workload_key in the return schema for the lambda executor. It should still continue to accept task, but we should try to migrate towards using workload there. But I agree that is out of scope for this PR 🙂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on all points.

if task_key:
if return_code == 0:
self.success(task_key)
self.success(task_key) # type: ignore[arg-type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we just ignoring this instead of getting the typing right?

Here and just below as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the type :ignore in favour of using cast. Now, arguably, it's another way of doing the same thing but self.success and self.fail are defined in BaseExecutor (which I am not going to touch in this PR) so this limits what can be done to satisfy typing whilst keeping the code explicit. If you have an alternative suggestion in mind, I am all ears.

# Add the serialized task key as the info, this will be assigned on the ti as the external_executor_id
self.running_state(task_key, ser_task_key)

def sync_running_tasks(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -263,7 +313,7 @@ def execute_async(self, key: TaskInstanceKey, command: CommandType, queue=None,

def attempt_task_runs(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def execute_async(
self,
key: TaskInstanceKey | str,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkloadKey

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SameerMesiah97 SameerMesiah97 marked this pull request as draft March 11, 2026 23:44
@SameerMesiah97
Copy link
Contributor Author

Thanks for the contribution! This looks like a good start 🙂 But it's the bare minimum changes. Please be sure to cleanup your code before submitting a PR. Make sure types are correct, all comments, variables, method names updated. It takes a long time for maintainers to review code so we want to be using that time wisely.

@o-nikolas @ferruzzi

This means the PR is not currently reviewable. I have converted it to draft until I can get into an acceptable state.

@o-nikolas
Copy link
Contributor

Thanks for the contribution! This looks like a good start 🙂 But it's the bare minimum changes. Please be sure to cleanup your code before submitting a PR. Make sure types are correct, all comments, variables, method names updated. It takes a long time for maintainers to review code so we want to be using that time wisely.

@o-nikolas @ferruzzi

This means the PR is not currently reviewable. I have converted it to draft until I can get into an acceptable state.

Wonderful, thanks!

@SameerMesiah97 SameerMesiah97 force-pushed the 62887-Lambda-Executor-Callback-Support branch 5 times, most recently from 8ad7f59 to 4ec2290 Compare March 12, 2026 23:56
@SameerMesiah97 SameerMesiah97 marked this pull request as ready for review March 13, 2026 01:09
@SameerMesiah97
Copy link
Contributor Author

SameerMesiah97 commented Mar 13, 2026

@ferruzzi @o-nikolas

I have just pushed my latest changes. Here is a brief summary of the last commit:

  1. All variable names, method signatures, docstrings and comments have been updated where appropriate. For some sections of the code where there is some ambiguity (such as try_adopt_task_instances), I have left the naming as is. Please refer to my comments responding to your feedback.

  2. The interaction between strict mypy conventions and the multiple branches needed to support different Airflow versions made explicit typing difficult in a few places. Where possible, I avoided # type: ignore, but in some cases using it was the clearest option.

  3. You may notice the diff size increased as most of the tests had to be updated to accomodating the changes in variable names and method signatures. Also, while searching for occurrences of "task(s)" to replace with workload(s), I also cleaned up surrounding comments (typos, capitalization, redundant wording, missing punctuation), which further increased the diff size.

@SameerMesiah97 SameerMesiah97 marked this pull request as draft March 13, 2026 09:02
@SameerMesiah97 SameerMesiah97 marked this pull request as ready for review March 13, 2026 09:02
Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only got about half way thought this pass. Lots of improvements! I lleft a few more thoughts.


def _process_workloads(self, workload_items: Sequence[workloads.All]) -> None:
from airflow.executors import workloads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Niko. I know this was existing code, but that was a miss in an earlier review, w is a terrible variable name. Can you fix it while you are in here please?

Copy link
Contributor Author

@SameerMesiah97 SameerMesiah97 Mar 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ferruzzi

Can you be more specific about what you are agreeing on with Niko? I am not sure what action is required from me on this comment besides changing the variable name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear, the variable name was what I was referring to. I see that at least some of it was existing code, but we should clean that up, not just fix your new changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't clear, the variable name was what I was referring to. I see that at least some of it was existing code, but we should clean that up, not just fix your new changes.

Fixed in latest commit.

self.log_task_event(
event="lambda invoke failure",
ti_key=task_key,
ti_key=workload_key,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: Looks like we missed this parameter name in base executor... we'll have to get that later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a callback workload exceeds max submit attempts, log_task_event is called with ti_key=workload_key. For callbacks, this key is a string UUID, not a TaskInstanceKey named tuple, which will cause errors since Log(task_instance=...) expects a TaskInstanceKey. Is this a problem @ferruzzi ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good catch. I think the cleanest solution for now will be to add an optional workload parameter to Log() which will work for either type, and a note that we need to clean that up in the future. Don't remove the task_instance parameter since that will break things, but add the new workload and use it here.

@@ -411,7 +467,7 @@ def process_queue(self, queue_url: str):
task_key = self.running_tasks[ser_task_key]
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on all points.

Sameer Mesiah added 3 commits March 18, 2026 19:03
Extend AwsLambdaExecutor to support ExecuteCallback workloads
introduced in Airflow 3.2.

Callback workloads are queued using the callback identifier and
processed alongside task workloads. Both workload types are serialized
and dispatched to AWS Lambda using the Task SDK runtime entrypoint
`python -m airflow.sdk.execution_time.execute_workload`.

Update executor lifecycle methods to support string-based callback
identifiers while preserving JSON-serialized TaskInstanceKey handling
for task workloads.

Add unit tests covering callback workload execution, queue override
propagation, and adoption of callback workloads using string-based
external_executor_id values.
…typing to align with the workload-based executor model.
@SameerMesiah97 SameerMesiah97 force-pushed the 62887-Lambda-Executor-Callback-Support branch from 4ec2290 to 4760635 Compare March 18, 2026 19:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:amazon AWS/Amazon - related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants