Skip to content

Graph Executor

The Graph Executor is responsible for executing node graphs in the correct order with dependency resolution.

Features

  • Topological Sorting: Uses Kahn's algorithm to determine execution order
  • Function Execution: Executes regular Python functions from ops.py
  • Progress Node Execution: Executes progress-aware classes from progress_ops.py with real-time progress reporting
  • Streaming Execution: Provides real-time status updates via Server-Sent Events
  • Error Handling: Gracefully handles execution errors and reports them

Progress Node Support

Progress nodes are classes with __call__ methods that can report progress during execution. The executor:

  1. Detects progress nodes by checking the progress_class_registry
  2. Instantiates the class
  3. Sets up a progress callback on the _progress_reporter attribute
  4. Executes the __call__ method in a separate thread
  5. Streams progress updates via SSE during execution

See the Progress Nodes Guide for details on creating progress nodes.

API Reference

psynapse_backend.executor

GraphExecutor

GraphExecutor is responsible for executing the node graph. The graph is executed in the correct order with dependency resolution by topological sorting using Kahn's algorithm.

Parameters:

Name Type Description Default
nodepacks_dir str

The directory containing the nodepacks.

required

Attributes:

Name Type Description
nodepacks_dir

The directory containing the nodepacks.

function_registry

A dictionary of functions from the nodepacks.

progress_class_registry

A dictionary of progress classes from the nodepacks.

stream_class_registry

A dictionary of stream classes from the nodepacks.

topological_sort(nodes, edges)

Perform topological sort using Kahn's algorithm. Returns list of node IDs in execution order.

Parameters:

Name Type Description Default
nodes list[dict]

The nodes of the graph.

required
edges list[dict]

The edges of the graph.

required

Returns:

Type Description
list[str]

A list of node IDs in execution order.

execute_graph(nodes, edges, env_vars=None)

Execute the node graph and return results for ViewNodes.

Parameters:

Name Type Description Default
nodes list[dict]

The nodes of the graph.

required
edges list[dict]

The edges of the graph.

required
env_vars dict[str, str] | None

Optional environment variables to set during execution.

None

Returns:

Type Description
dict[str, Any]

A dictionary of node IDs and their results.

execute_graph_streaming(nodes, edges, env_vars=None)

Execute the node graph and yield status updates for each node.

Parameters:

Name Type Description Default
nodes list[dict]

The nodes of the graph.

required
edges list[dict]

The edges of the graph.

required
env_vars dict[str, str] | None

Optional environment variables to set during execution.

None

Yields:

Type Description
Any

Status dictionaries containing execution progress information.