Graph Executor
The Graph Executor is responsible for executing node graphs in the correct order with dependency resolution.
Features
- Topological Sorting: Uses Kahn's algorithm to determine execution order
- Function Execution: Executes regular Python functions from
ops.py - Progress Node Execution: Executes progress-aware classes from
progress_ops.pywith real-time progress reporting - Streaming Execution: Provides real-time status updates via Server-Sent Events
- Error Handling: Gracefully handles execution errors and reports them
Progress Node Support
Progress nodes are classes with __call__ methods that can report progress during execution. The executor:
- Detects progress nodes by checking the
progress_class_registry - Instantiates the class
- Sets up a progress callback on the
_progress_reporterattribute - Executes the
__call__method in a separate thread - Streams progress updates via SSE during execution
See the Progress Nodes Guide for details on creating progress nodes.
API Reference
psynapse_backend.executor
GraphExecutor
GraphExecutor is responsible for executing the node graph. The graph is executed in the correct order with dependency resolution by topological sorting using Kahn's algorithm.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodepacks_dir
|
str
|
The directory containing the nodepacks. |
required |
Attributes:
| Name | Type | Description |
|---|---|---|
nodepacks_dir |
The directory containing the nodepacks. |
|
function_registry |
A dictionary of functions from the nodepacks. |
|
progress_class_registry |
A dictionary of progress classes from the nodepacks. |
|
stream_class_registry |
A dictionary of stream classes from the nodepacks. |
topological_sort(nodes, edges)
Perform topological sort using Kahn's algorithm. Returns list of node IDs in execution order.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodes
|
list[dict]
|
The nodes of the graph. |
required |
edges
|
list[dict]
|
The edges of the graph. |
required |
Returns:
| Type | Description |
|---|---|
list[str]
|
A list of node IDs in execution order. |
execute_graph(nodes, edges, env_vars=None)
Execute the node graph and return results for ViewNodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodes
|
list[dict]
|
The nodes of the graph. |
required |
edges
|
list[dict]
|
The edges of the graph. |
required |
env_vars
|
dict[str, str] | None
|
Optional environment variables to set during execution. |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
A dictionary of node IDs and their results. |
execute_graph_streaming(nodes, edges, env_vars=None)
Execute the node graph and yield status updates for each node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
nodes
|
list[dict]
|
The nodes of the graph. |
required |
edges
|
list[dict]
|
The edges of the graph. |
required |
env_vars
|
dict[str, str] | None
|
Optional environment variables to set during execution. |
None
|
Yields:
| Type | Description |
|---|---|
Any
|
Status dictionaries containing execution progress information. |