Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions pkgs/core/schemas/0100_function__cascade_force_skip_steps.sql
Original file line number Diff line number Diff line change
Expand Up @@ -108,19 +108,10 @@ BEGIN
WHERE r.run_id = _cascade_force_skip_steps.run_id
AND skipped_count.count > 0
)
SELECT COUNT(*) INTO v_total_skipped FROM skipped;

-- Archive queued/started task messages for all steps that were just skipped
-- (query step_states since CTE state is no longer accessible)
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
AND ss.status = 'skipped'
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
HAVING COUNT(st.message_id) > 0;
SELECT skipped_count.count
INTO v_total_skipped
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
LEFT JOIN archived_messages ON true;

RETURN v_total_skipped;
END;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ BEGIN
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern
SELECT
flow_slug,
step_slug,
required_input_pattern,
forbidden_input_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
Expand Down
29 changes: 8 additions & 21 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
RETURN;
END IF;

-- Late callback guard: if step is not 'started', don't mutate step/run state
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
SELECT ss.status INTO v_prev_step_status
FROM pgflow.step_states ss
-- Late callback guard: lock run + step rows and use current step status
-- under lock so concurrent fail_task calls cannot read stale status.
SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug
FROM pgflow.runs r
JOIN pgflow.step_states ss ON ss.run_id = r.run_id
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug;
AND ss.step_slug = fail_task.step_slug
FOR UPDATE OF r, ss;

IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id;

PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
Expand All @@ -77,18 +75,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
RETURN;
END IF;

WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
WITH flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,19 +430,10 @@ BEGIN
WHERE r.run_id = _cascade_force_skip_steps.run_id
AND skipped_count.count > 0
)
SELECT COUNT(*) INTO v_total_skipped FROM skipped;

-- Archive queued/started task messages for all steps that were just skipped
-- (query step_states since CTE state is no longer accessible)
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.step_states ss ON ss.run_id = st.run_id AND ss.step_slug = st.step_slug
WHERE st.run_id = _cascade_force_skip_steps.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
AND ss.status = 'skipped'
AND ss.skipped_at >= now() - interval '1 second' -- Only recently skipped
HAVING COUNT(st.message_id) > 0;
SELECT skipped_count.count
INTO v_total_skipped
FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count
LEFT JOIN archived_messages ON true;

RETURN v_total_skipped;
END;
Expand Down Expand Up @@ -532,7 +523,11 @@ BEGIN
FROM steps_with_conditions swc
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
)
SELECT flow_slug, step_slug, required_input_pattern, forbidden_input_pattern
SELECT
flow_slug,
step_slug,
required_input_pattern,
forbidden_input_pattern
INTO v_first_fail
FROM condition_evaluations
WHERE NOT condition_met AND when_unmet = 'fail'
Expand Down Expand Up @@ -1327,19 +1322,17 @@ IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id
RETURN;
END IF;

-- Late callback guard: if step is not 'started', don't mutate step/run state
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
SELECT ss.status INTO v_prev_step_status
FROM pgflow.step_states ss
-- Late callback guard: lock run + step rows and use current step status
-- under lock so concurrent fail_task calls cannot read stale status.
SELECT ss.status, r.flow_slug INTO v_prev_step_status, v_flow_slug
FROM pgflow.runs r
JOIN pgflow.step_states ss ON ss.run_id = r.run_id
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug;
AND ss.step_slug = fail_task.step_slug
FOR UPDATE OF r, ss;

IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id;

PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
Expand All @@ -1355,18 +1348,7 @@ IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
RETURN;
END IF;

WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
WITH flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/supabase/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
h1:jDc+2bvTL4ZYqATAMfBXbTYtMlx8RPvDUvRJjrP537w=
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
Expand All @@ -18,4 +18,4 @@ h1:ZZJEI67KUViUzd0rVHGMZPpbUXU2MFSXdTIe/yyJqyE=
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E=
20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc=
20260214181656_pgflow_step_conditions.sql h1:nG52qhydTJMeLTd4AoI4buATJNHdEN2C1ZJdKp+i7wE=
20260214181656_pgflow_step_conditions.sql h1:rHQnXCeZ/QGxPlChdTMxumtsTtYHr1ej183Dd+auw34=
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
\set ON_ERROR_STOP on
\set QUIET on

begin;
select plan(4);

select pgflow_tests.reset_db();

select pgflow.create_flow('cascade_skip_preexisting');
select pgflow.add_step('cascade_skip_preexisting', 'target', '{}', step_type => 'map');
select pgflow.add_step('cascade_skip_preexisting', 'already_skipped', '{}', step_type => 'map');

select pgflow.start_flow('cascade_skip_preexisting', '[1, 2]'::jsonb);

select ok(
(
select count(*) > 0
from pgmq.q_cascade_skip_preexisting q
join pgflow.step_tasks st on st.message_id = q.msg_id
where st.flow_slug = 'cascade_skip_preexisting'
and st.step_slug = 'already_skipped'
),
'Setup: already_skipped has queued messages before cascade call'
);

update pgflow.step_states
set status = 'skipped',
skip_reason = 'preexisting_skip',
skipped_at = now(),
remaining_tasks = null
where flow_slug = 'cascade_skip_preexisting'
and step_slug = 'already_skipped';

select pgflow._cascade_force_skip_steps(
(select run_id from pgflow.runs where flow_slug = 'cascade_skip_preexisting'),
'target',
'condition_unmet'
);

select is_empty(
$$
select 1
from pgmq.q_cascade_skip_preexisting q
join pgflow.step_tasks st on st.message_id = q.msg_id
where st.flow_slug = 'cascade_skip_preexisting'
and st.step_slug = 'target'
$$,
'Target step messages should be archived'
);

select isnt_empty(
$$
select 1
from pgmq.q_cascade_skip_preexisting q
join pgflow.step_tasks st on st.message_id = q.msg_id
where st.flow_slug = 'cascade_skip_preexisting'
and st.step_slug = 'already_skipped'
$$,
'Preexisting skipped step messages should remain queued'
);

select is(
(select status from pgflow.step_states where flow_slug = 'cascade_skip_preexisting' and step_slug = 'target'),
'skipped'::text,
'Target step should be marked skipped'
);

select * from finish();
rollback;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(5);
select plan(6);

select pgflow_tests.reset_db();

Expand Down Expand Up @@ -92,6 +92,17 @@ select is(
'previously active messages should be in archive'
);

select is(
(
select error_message
from pgflow.step_states
where run_id = (select run_id from run_ids)
and step_slug = 'checker'
),
'Condition not met',
'checker failure should use stable condition error message'
);

drop table if exists run_ids;
drop table if exists pre_failure_msgs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ select is(
'Step with matched ifNot pattern and whenUnmet=fail should be failed'
);

-- Test 2: Error message should indicate condition not met
-- Test 2: Error message should remain stable and minimal
select is(
(select error_message from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'no_admin_step'),
'Condition not met',
'Error message should indicate condition not met'
'Error message should use stable condition error message'
);

-- Test 3: No task should be created for failed step
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
-- Verifies that a root step with unmet condition and whenUnmet='fail'
-- causes the run to fail immediately
begin;
select plan(4);
select plan(5);

-- Reset database
select pgflow_tests.reset_db();
Expand Down Expand Up @@ -30,11 +30,12 @@ select is(
'Step with unmet condition and whenUnmet=fail should be failed'
);

-- Test 2: error_message should indicate condition unmet
select ok(
-- Test 2: error_message should remain stable and minimal
select is(
(select error_message from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'checked_step') ILIKE '%condition%',
'Failed step should have error message about condition'
where run_id = (select run_id from run_ids) and step_slug = 'checked_step'),
'Condition not met',
'Failed step should use stable condition error message'
);

-- Test 3: No task should be created
Expand All @@ -52,6 +53,19 @@ select is(
'Run should fail when step condition fails with fail mode'
);

-- Test 5: Run-level error event should use same stable message
select is(
(
select payload->>'error_message'
from pgflow_tests.get_realtime_message(
event_type => 'run:failed',
run_id => (select run_id from run_ids)
)
),
'Condition not met',
'Run failed event should use stable condition error message'
);

-- Clean up
drop table if exists run_ids;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
begin;
select plan(6);
select plan(7);

select pgflow_tests.reset_db();

Expand Down Expand Up @@ -82,6 +82,19 @@ select is(
'run:failed payload should include failed status'
);

select is(
(
select payload->>'error_message'
from pgflow_tests.get_realtime_message(
event_type => 'step:failed',
run_id => (select run_id from run_ids),
step_slug => 'guarded'
)
),
'Condition not met',
'step:failed payload should use stable condition error message'
);

drop table if exists run_ids;

select finish();
Expand Down
Loading