Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkgs/core/schemas/0100_function_archive_task_message.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
create or replace function pgflow._archive_task_message(
p_run_id uuid,
p_step_slug text,
p_task_index int
)
returns void
language sql
volatile
set search_path to ''
as $$
SELECT pgmq.archive(
r.flow_slug,
ARRAY_AGG(st.message_id)
)
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = p_run_id
AND st.step_slug = p_step_slug
AND st.task_index = p_task_index
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
$$;
38 changes: 38 additions & 0 deletions pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,43 @@ BEGIN
failed_at = now()
WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id;

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', cascade_resolve_conditions.run_id,
'step_slug', v_first_fail.step_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
concat('step:', v_first_fail.step_slug, ':failed'),
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', cascade_resolve_conditions.run_id,
'flow_slug', v_first_fail.flow_slug,
'status', 'failed',
'error_message', 'Condition not met',
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', cascade_resolve_conditions.run_id),
false
);

PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = cascade_resolve_conditions.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;

RETURN false;
END IF;

Expand Down Expand Up @@ -180,6 +217,7 @@ BEGIN
FROM unmet_skip_steps uss
WHERE ss.run_id = cascade_resolve_conditions.run_id
AND ss.step_slug = uss.step_slug
AND ss.status = 'created'
RETURNING
ss.*,
realtime.send(
Expand Down
8 changes: 6 additions & 2 deletions pkgs/core/schemas/0100_function_complete_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ IF v_dependent_map_slug IS NOT NULL THEN
AND st.task_index = complete_task.task_index
AND st.message_id IS NOT NULL;

-- Return empty result
RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false;
-- Return the failed task row (API contract: always return task row)
RETURN QUERY
SELECT * FROM pgflow.step_tasks st
WHERE st.run_id = complete_task.run_id
AND st.step_slug = complete_task.step_slug
AND st.task_index = complete_task.task_index;
RETURN;
END IF;

Expand Down
4 changes: 2 additions & 2 deletions pkgs/core/schemas/0100_function_create_flow_from_shape.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ BEGIN
timeout => (v_step_options->>'timeout')::int,
start_delay => (v_step_options->>'startDelay')::int,
step_type => v_step->>'stepType',
when_unmet => v_step->>'whenUnmet',
when_exhausted => v_step->>'whenExhausted',
when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'),
when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'),
required_input_pattern => CASE
WHEN (v_step->'requiredInputPattern'->>'defined')::boolean
THEN v_step->'requiredInputPattern'->'value'
Expand Down
84 changes: 48 additions & 36 deletions pkgs/core/schemas/0100_function_fail_task.sql
Original file line number Diff line number Diff line change
Expand Up @@ -140,45 +140,42 @@ maybe_fail_step AS (
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
run_update AS (
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING pgflow.runs.status
)
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps when step was skipped (not failed, run continues)
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING (status = 'failed') INTO v_run_failed;
SELECT
COALESCE((SELECT status = 'failed' FROM run_update), false),
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
COALESCE((SELECT is_exhausted FROM task_status), false)
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;

-- Capture when_exhausted mode and check if step was skipped for later processing
-- Capture when_exhausted mode for later skip handling
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

SELECT (status = 'skipped') INTO v_step_skipped
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;

-- Check if step failed by querying the step_states table
SELECT (status = 'failed') INTO v_step_failed
FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug;
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;

-- Send broadcast event for step failure if the step was failed
IF v_step_failed THEN
IF v_task_exhausted AND v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
Expand All @@ -194,8 +191,8 @@ IF v_step_failed THEN
);
END IF;

-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_step_skipped THEN
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
Expand Down Expand Up @@ -237,11 +234,26 @@ END IF;
AND dep.dep_slug = fail_task.step_slug
AND child_state.step_slug = dep.step_slug;

-- Start any steps that became ready after decrementing remaining_deps
PERFORM pgflow.start_ready_steps(fail_task.run_id);
-- Evaluate conditions on newly-ready dependent steps
-- This must happen before cascade_complete_taskless_steps so that
-- skipped steps can set initial_tasks=0 for their map dependents
IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN
-- Run was failed due to a condition with when_unmet='fail'
-- Archive the failed task's message before returning
PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index);
-- Return the task row (API contract)
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;

-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);

-- Start steps that became ready after condition resolution and taskless completion
PERFORM pgflow.start_ready_steps(fail_task.run_id);
END IF;

-- Try to complete the run (remaining_steps may now be 0)
Expand Down
4 changes: 4 additions & 0 deletions pkgs/core/src/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ export type Database = {
[_ in never]: never
}
Functions: {
_archive_task_message: {
Args: { p_run_id: string; p_step_slug: string; p_task_index: number }
Returns: undefined
}
_cascade_force_skip_steps: {
Args: { run_id: string; skip_reason: string; step_slug: string }
Returns: number
Expand Down
Loading