Skip to content

Conversation

jumski
Copy link
Contributor

@jumski jumski commented Sep 18, 2025

This PR fixes a critical issue where messages for pending tasks remain in the queue indefinitely when a run fails, causing performance degradation and resource waste.

Problem

  • Failed runs left queued messages orphaned, causing workers to poll them forever
  • Map steps with N tasks would leave N-1 messages orphaned when one task failed
  • Type constraint violations would retry unnecessarily despite being deterministic failures

Solution

  • Archive all queued messages when a run fails
  • Handle type violations gracefully (fail immediately, no retries)
  • Prevent any retries when the run is already failed
  • Add index for efficient message archiving

Testing

  • Added comprehensive tests for map task failures and type violations
  • All existing tests pass without regression

Copy link

changeset-bot bot commented Sep 18, 2025

🦋 Changeset detected

Latest commit: 842f929

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 7 packages
Name Type
@pgflow/core Patch
pgflow Patch
@pgflow/client Patch
@pgflow/edge-worker Patch
@pgflow/example-flows Patch
@pgflow/dsl Patch
@pgflow/website Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Sep 18, 2025

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

✨ Finishing touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch 09-18-fix-orphaned-messages-on-fail

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@jumski jumski changed the title docs: add instructions for fixing SQL tests and updating functions Fix Orphaned Messages on Run Failure Sep 18, 2025
@jumski jumski marked this pull request as ready for review September 18, 2025 15:38
Comment on lines +115 to +121
select is(
(select count(*)::integer from pgflow.step_tasks
where run_id = :'test_run_id'::uuid
and step_slug = 'parallel_single'),
0,
'Parallel single task should not exist after type constraint violation (transaction rolled back)'
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test comment indicates that parallel_single tasks won't exist due to transaction rollback, but the PR changes the behavior to handle type violations gracefully without rolling back transactions. With the new implementation, these tasks might actually exist in a failed state rather than not existing at all.

Consider updating this test to match the new behavior - either by checking for failed status instead of non-existence, or by updating the comment to reflect the current implementation's expected behavior. This will ensure the test accurately validates the intended behavior of the type violation handling.

Suggested change
select is(
(select count(*)::integer from pgflow.step_tasks
where run_id = :'test_run_id'::uuid
and step_slug = 'parallel_single'),
0,
'Parallel single task should not exist after type constraint violation (transaction rolled back)'
);
select is(
(select count(*)::integer from pgflow.step_tasks
where run_id = :'test_run_id'::uuid
and step_slug = 'parallel_single'
and status = 'failed'),
1,
'Parallel single task should exist but be in failed state after type constraint violation (graceful handling)'
);

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Copy link

nx-cloud bot commented Sep 18, 2025

View your CI Pipeline Execution ↗ for commit 842f929

Command Status Duration Result
nx run-many -t build --projects client,dsl --co... ✅ Succeeded 6s View ↗
nx affected -t build --configuration=production... ✅ Succeeded 4s View ↗
nx affected -t lint typecheck test --parallel -... ✅ Succeeded 6m 3s View ↗

☁️ Nx Cloud last updated this comment at 2025-09-19 10:29:00 UTC

@jumski jumski force-pushed the 09-18-fix-orphaned-messages-on-fail branch 2 times, most recently from c4b287c to 6602788 Compare September 18, 2025 16:04
@jumski jumski force-pushed the 09-18-fix-orphaned-messages-on-fail branch 2 times, most recently from f303c04 to 79502f5 Compare September 18, 2025 20:52
@jumski jumski force-pushed the 09-18-fix-orphaned-messages-on-fail branch from 79502f5 to ae9a6ee Compare September 19, 2025 08:55
@jumski jumski force-pushed the 09-18-fix-orphaned-messages-on-fail branch from ae9a6ee to 104f337 Compare September 19, 2025 09:17
Comment on lines 172 to 182
PERFORM pgmq.archive(r.flow_slug, st.message_id)
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL;
END IF;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance and correctness issue: The archive operation uses individual pgmq.archive() calls in a loop rather than batch archiving. This is inefficient for large numbers of messages and could cause partial archiving if one call fails. Should collect message IDs and use batch archiving like in complete_task, or use a single query with array_agg() to archive all messages atomically.

Suggested change
PERFORM pgmq.archive(r.flow_slug, st.message_id)
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL;
END IF;
WITH messages_to_archive AS (
SELECT r.flow_slug, array_agg(st.message_id) AS message_ids
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
)
SELECT pgmq.archive_batch(flow_slug, message_ids)
FROM messages_to_archive
WHERE array_length(message_ids, 1) > 0;
END IF;

Spotted by Diamond

Fix in Graphite


Is this helpful? React 👍 or 👎 to let us know.

Provides guidance on fixing invalid tests, updating SQL functions, and rerunning tests
without creating migrations or using nx, to streamline test maintenance and debugging.
Copy link
Contributor

🔍 Preview Deployment: Website

Deployment successful!

🔗 Preview URL: https://pr-220.pgflow.pages.dev

📝 Details:

  • Branch: 09-18-fix-orphaned-messages-on-fail
  • Commit: a15ec309424d4173af51fd0ade9a345872c889bd
  • View Logs

_Last updated: _

Copy link
Contributor

🔍 Preview Deployment: Playground

Deployment successful!

🔗 Preview URL: https://pr-220--pgflow-demo.netlify.app

📝 Details:

  • Branch: 09-18-fix-orphaned-messages-on-fail
  • Commit: a15ec309424d4173af51fd0ade9a345872c889bd
  • View Logs

_Last updated: _

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant