Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions apps/sim/hooks/use-collaborative-workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { useUndoRedoStore } from '@/stores/undo-redo'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { filterNewEdges, mergeSubblockState } from '@/stores/workflows/utils'
import { filterNewEdges, filterValidEdges, mergeSubblockState } from '@/stores/workflows/utils'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import type { BlockState, Loop, Parallel, Position } from '@/stores/workflows/workflow/types'

Expand Down Expand Up @@ -226,9 +226,12 @@ export function useCollaborativeWorkflow() {
case EDGES_OPERATIONS.BATCH_ADD_EDGES: {
const { edges } = payload
if (Array.isArray(edges) && edges.length > 0) {
const newEdges = filterNewEdges(edges, useWorkflowStore.getState().edges)
const blocks = useWorkflowStore.getState().blocks
const currentEdges = useWorkflowStore.getState().edges
const validEdges = filterValidEdges(edges, blocks)
const newEdges = filterNewEdges(validEdges, currentEdges)
if (newEdges.length > 0) {
useWorkflowStore.getState().batchAddEdges(newEdges)
useWorkflowStore.getState().batchAddEdges(newEdges, { skipValidation: true })
}
}
break
Expand Down Expand Up @@ -1004,7 +1007,11 @@ export function useCollaborativeWorkflow() {

if (edges.length === 0) return false

const newEdges = filterNewEdges(edges, useWorkflowStore.getState().edges)
// Filter out invalid edges (e.g., edges targeting trigger blocks) and duplicates
const blocks = useWorkflowStore.getState().blocks
const currentEdges = useWorkflowStore.getState().edges
const validEdges = filterValidEdges(edges, blocks)
const newEdges = filterNewEdges(validEdges, currentEdges)
if (newEdges.length === 0) return false

const operationId = crypto.randomUUID()
Expand All @@ -1020,7 +1027,7 @@ export function useCollaborativeWorkflow() {
userId: session?.user?.id || 'unknown',
})

useWorkflowStore.getState().batchAddEdges(newEdges)
useWorkflowStore.getState().batchAddEdges(newEdges, { skipValidation: true })

if (!options?.skipUndoRedo) {
newEdges.forEach((edge) => undoRedo.recordAddEdge(edge.id))
Expand Down Expand Up @@ -1484,9 +1491,23 @@ export function useCollaborativeWorkflow() {

if (blocks.length === 0) return false

// Filter out invalid edges (e.g., edges targeting trigger blocks)
// Combine existing blocks with new blocks for validation
const existingBlocks = useWorkflowStore.getState().blocks
const newBlocksMap = blocks.reduce(
(acc, block) => {
acc[block.id] = block
return acc
},
{} as Record<string, BlockState>
)
const allBlocks = { ...existingBlocks, ...newBlocksMap }
const validEdges = filterValidEdges(edges, allBlocks)

logger.info('Batch adding blocks collaboratively', {
blockCount: blocks.length,
edgeCount: edges.length,
edgeCount: validEdges.length,
filteredEdges: edges.length - validEdges.length,
})

const operationId = crypto.randomUUID()
Expand All @@ -1496,16 +1517,18 @@ export function useCollaborativeWorkflow() {
operation: {
operation: BLOCKS_OPERATIONS.BATCH_ADD_BLOCKS,
target: OPERATION_TARGETS.BLOCKS,
payload: { blocks, edges, loops, parallels, subBlockValues },
payload: { blocks, edges: validEdges, loops, parallels, subBlockValues },
},
workflowId: activeWorkflowId || '',
userId: session?.user?.id || 'unknown',
})

useWorkflowStore.getState().batchAddBlocks(blocks, edges, subBlockValues)
useWorkflowStore.getState().batchAddBlocks(blocks, validEdges, subBlockValues, {
skipEdgeValidation: true,
})

if (!options?.skipUndoRedo) {
undoRedo.recordBatchAddBlocks(blocks, edges, subBlockValues)
undoRedo.recordBatchAddBlocks(blocks, validEdges, subBlockValues)
}

return true
Expand Down
29 changes: 28 additions & 1 deletion apps/sim/stores/workflows/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import type { Edge } from 'reactflow'
import { v4 as uuidv4 } from 'uuid'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { mergeSubBlockValues, mergeSubblockStateWithValues } from '@/lib/workflows/subblocks'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import { normalizeName } from '@/executor/constants'
import { isAnnotationOnlyBlock, normalizeName } from '@/executor/constants'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import type {
BlockState,
Expand All @@ -17,6 +18,32 @@ import { TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'

const WEBHOOK_SUBBLOCK_FIELDS = ['webhookId', 'triggerPath']

/**
* Checks if an edge is valid (source and target exist, not annotation-only, target is not a trigger)
*/
function isValidEdge(
edge: Edge,
blocks: Record<string, { type: string; triggerMode?: boolean }>
): boolean {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
if (!sourceBlock || !targetBlock) return false
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
if (isAnnotationOnlyBlock(targetBlock.type)) return false
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
return true
}

/**
* Filters edges to only include valid ones (target exists and is not a trigger block)
*/
export function filterValidEdges(
edges: Edge[],
blocks: Record<string, { type: string; triggerMode?: boolean }>
): Edge[] {
return edges.filter((edge) => isValidEdge(edge, blocks))
}

export function filterNewEdges(edgesToAdd: Edge[], currentEdges: Edge[]): Edge[] {
return edgesToAdd.filter((edge) => {
if (edge.source === edge.target) return false
Expand Down
43 changes: 16 additions & 27 deletions apps/sim/stores/workflows/workflow/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { DEFAULT_DUPLICATE_OFFSET } from '@/lib/workflows/autolayout/constants'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { TriggerUtils } from '@/lib/workflows/triggers/triggers'
import { getBlock } from '@/blocks'
import type { SubBlockConfig } from '@/blocks/types'
import { isAnnotationOnlyBlock, normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { normalizeName, RESERVED_BLOCK_NAMES } from '@/executor/constants'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { filterNewEdges, getUniqueBlockName, mergeSubblockState } from '@/stores/workflows/utils'
import {
filterNewEdges,
filterValidEdges,
getUniqueBlockName,
mergeSubblockState,
} from '@/stores/workflows/utils'
import type {
Position,
SubBlockState,
Expand Down Expand Up @@ -91,26 +95,6 @@ function resolveInitialSubblockValue(config: SubBlockConfig): unknown {
return null
}

function isValidEdge(
edge: Edge,
blocks: Record<string, { type: string; triggerMode?: boolean }>
): boolean {
const sourceBlock = blocks[edge.source]
const targetBlock = blocks[edge.target]
if (!sourceBlock || !targetBlock) return false
if (isAnnotationOnlyBlock(sourceBlock.type)) return false
if (isAnnotationOnlyBlock(targetBlock.type)) return false
if (TriggerUtils.isTriggerBlock(targetBlock)) return false
return true
}

function filterValidEdges(
edges: Edge[],
blocks: Record<string, { type: string; triggerMode?: boolean }>
): Edge[] {
return edges.filter((edge) => isValidEdge(edge, blocks))
}

const initialState = {
blocks: {},
edges: [],
Expand Down Expand Up @@ -356,7 +340,8 @@ export const useWorkflowStore = create<WorkflowStore>()(
data?: Record<string, any>
}>,
edges?: Edge[],
subBlockValues?: Record<string, Record<string, unknown>>
subBlockValues?: Record<string, Record<string, unknown>>,
options?: { skipEdgeValidation?: boolean }
) => {
const currentBlocks = get().blocks
const currentEdges = get().edges
Expand All @@ -381,7 +366,10 @@ export const useWorkflowStore = create<WorkflowStore>()(
}

if (edges && edges.length > 0) {
const validEdges = filterValidEdges(edges, newBlocks)
// Skip validation if already validated by caller (e.g., collaborative layer)
const validEdges = options?.skipEdgeValidation
? edges
: filterValidEdges(edges, newBlocks)
const existingEdgeIds = new Set(currentEdges.map((e) => e.id))
for (const edge of validEdges) {
if (!existingEdgeIds.has(edge.id)) {
Expand Down Expand Up @@ -516,11 +504,12 @@ export const useWorkflowStore = create<WorkflowStore>()(
get().updateLastSaved()
},

batchAddEdges: (edges: Edge[]) => {
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => {
const blocks = get().blocks
const currentEdges = get().edges

const validEdges = filterValidEdges(edges, blocks)
// Skip validation if already validated by caller (e.g., collaborative layer)
const validEdges = options?.skipValidation ? edges : filterValidEdges(edges, blocks)
const filtered = filterNewEdges(validEdges, currentEdges)
const newEdges = [...currentEdges]

Expand Down
5 changes: 3 additions & 2 deletions apps/sim/stores/workflows/workflow/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,13 @@ export interface WorkflowActions {
batchAddBlocks: (
blocks: BlockState[],
edges?: Edge[],
subBlockValues?: Record<string, Record<string, unknown>>
subBlockValues?: Record<string, Record<string, unknown>>,
options?: { skipEdgeValidation?: boolean }
) => void
batchRemoveBlocks: (ids: string[]) => void
batchToggleEnabled: (ids: string[]) => void
batchToggleHandles: (ids: string[]) => void
batchAddEdges: (edges: Edge[]) => void
batchAddEdges: (edges: Edge[], options?: { skipValidation?: boolean }) => void
batchRemoveEdges: (ids: string[]) => void
clear: () => Partial<WorkflowState>
updateLastSaved: () => void
Expand Down