agenticraft_foundation.integration.csp_orchestration¶
CSP orchestration adapter — converts workflow DAGs to CSP processes for deadlock freedom, liveness analysis, and refinement checking.
CSP Process Algebra integration for workflow orchestration.
This module provides workflow verification using CSP refinement checking.
Key Features: - Workflow specification using CSP process algebra - Refinement checking of workflow implementations - Deadlock freedom verification - Liveness analysis for workflow progress
WorkflowNodeType
¶
Bases: str, Enum
Types of nodes in a workflow.
WorkflowSpec
dataclass
¶
Specification of a workflow using CSP.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Workflow name
TYPE:
|
process |
The CSP process representing the workflow
TYPE:
|
events_of_interest |
Events to check for liveness
TYPE:
|
invariants |
Custom invariant predicates
TYPE:
|
from_dag(name, nodes, edges, sync_events=None)
classmethod
¶
Create a workflow spec from a DAG definition.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
nodes
|
Node ID to type mapping
TYPE:
|
edges
|
List of (source, target) edges
TYPE:
|
sync_events
|
Events that require synchronization
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec
|
WorkflowSpec instance |
sequential_tasks(name, task_ids)
classmethod
¶
Create a sequential workflow spec.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
task_ids
|
Ordered list of task IDs
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec
|
WorkflowSpec instance |
parallel_tasks(name, task_ids, sync_on_complete=True)
classmethod
¶
Create a parallel workflow spec.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
task_ids
|
List of task IDs to run in parallel
TYPE:
|
sync_on_complete
|
Whether tasks must all complete (sync on TICK)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec
|
WorkflowSpec instance |
choice_tasks(name, task_ids)
classmethod
¶
Create a choice (branching) workflow spec.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
task_ids
|
List of alternative task IDs
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec
|
WorkflowSpec instance |
WorkflowVerificationResult
dataclass
¶
Result of workflow verification.
| ATTRIBUTE | DESCRIPTION |
|---|---|
is_valid |
Whether the workflow passes all checks
TYPE:
|
is_deadlock_free |
Whether the workflow cannot deadlock
TYPE:
|
is_live |
Whether all events of interest are live
TYPE:
|
refines_spec |
Whether implementation refines specification
TYPE:
|
violations |
List of verification violations
TYPE:
|
traces_sample |
Sample of workflow traces
TYPE:
|
counterexample |
Counterexample trace if refinement fails
TYPE:
|
CSPOrchestrationAdapter
¶
Adapter integrating CSP process algebra with OrchestrationService.
This adapter provides workflow verification using CSP refinement checking. It can verify that workflow implementations conform to their specifications and detect potential issues like deadlocks.
Usage
adapter = CSPOrchestrationAdapter()
Register a workflow specification¶
spec = WorkflowSpec.sequential_tasks("my_workflow", ["task1", "task2"]) adapter.register_spec(spec)
Verify a workflow implementation¶
result = adapter.verify_workflow("my_workflow", implementation_process)
Check deadlock freedom¶
is_safe = adapter.check_deadlock_freedom("my_workflow")
Analyze liveness¶
liveness = adapter.analyze_liveness("my_workflow")
__init__()
¶
Initialize the adapter.
register_spec(spec)
¶
Register a workflow specification.
| PARAMETER | DESCRIPTION |
|---|---|
spec
|
The workflow specification
TYPE:
|
get_spec(name)
¶
Get a registered workflow specification.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec | None
|
The specification, or None if not registered |
register_implementation(name, process)
¶
Register a workflow implementation.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
process
|
The CSP process implementation
TYPE:
|
verify_spec(spec)
¶
Verify a workflow specification is valid.
Checks: - Deadlock freedom - Liveness of events of interest - Custom invariants
| PARAMETER | DESCRIPTION |
|---|---|
spec
|
The workflow specification
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowVerificationResult
|
Verification result |
verify_workflow(name, implementation=None, mode='trace')
¶
Verify a workflow implementation against its specification.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
implementation
|
Optional implementation process (uses registered if None)
TYPE:
|
mode
|
Refinement mode ("trace", "failures", "fd")
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowVerificationResult
|
Verification result |
check_deadlock_freedom(name)
¶
Check if a workflow specification is deadlock-free.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
bool
|
True if deadlock-free |
analyze_workflow_liveness(name, events=None)
¶
get_workflow_traces(name, max_length=20, max_count=100)
¶
Get sample traces from a workflow.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Workflow name
TYPE:
|
max_length
|
Maximum trace length
TYPE:
|
max_count
|
Maximum number of traces
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[Event, ...]]
|
List of traces |
compose_workflows(name, workflow_names, composition_type='parallel', sync_events=None)
¶
Compose multiple workflows into a new workflow.
| PARAMETER | DESCRIPTION |
|---|---|
name
|
Name for the composed workflow
TYPE:
|
workflow_names
|
Names of workflows to compose
TYPE:
|
composition_type
|
"parallel", "sequential", or "choice"
TYPE:
|
sync_events
|
Events to synchronize on (for parallel)
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
WorkflowSpec | None
|
Composed workflow spec, or None if any workflow not found |
workflow_to_process(dag)
¶
Convert an orchestration DAG to a CSP process.
This method can be used to convert the OrchestrationService's internal DAG representation to a CSP process for verification.
| PARAMETER | DESCRIPTION |
|---|---|
dag
|
DAG definition with nodes and edges
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Process
|
CSP process representation |