Skip to content

agenticraft_foundation.integration.csp_orchestration

CSP orchestration adapter — converts workflow DAGs to CSP processes for deadlock freedom, liveness analysis, and refinement checking.

CSP Process Algebra integration for workflow orchestration.

This module provides workflow verification using CSP refinement checking.

Key Features: - Workflow specification using CSP process algebra - Refinement checking of workflow implementations - Deadlock freedom verification - Liveness analysis for workflow progress

WorkflowNodeType

Bases: str, Enum

Types of nodes in a workflow.

WorkflowSpec dataclass

Specification of a workflow using CSP.

ATTRIBUTE DESCRIPTION
name

Workflow name

TYPE: str

process

The CSP process representing the workflow

TYPE: Process

events_of_interest

Events to check for liveness

TYPE: frozenset[Event]

invariants

Custom invariant predicates

TYPE: list[Callable[[Process], bool]]

from_dag(name, nodes, edges, sync_events=None) classmethod

Create a workflow spec from a DAG definition.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

nodes

Node ID to type mapping

TYPE: dict[str, WorkflowNodeType]

edges

List of (source, target) edges

TYPE: list[tuple[str, str]]

sync_events

Events that require synchronization

TYPE: set[str] | None DEFAULT: None

RETURNS DESCRIPTION
WorkflowSpec

WorkflowSpec instance

sequential_tasks(name, task_ids) classmethod

Create a sequential workflow spec.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

task_ids

Ordered list of task IDs

TYPE: list[str]

RETURNS DESCRIPTION
WorkflowSpec

WorkflowSpec instance

parallel_tasks(name, task_ids, sync_on_complete=True) classmethod

Create a parallel workflow spec.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

task_ids

List of task IDs to run in parallel

TYPE: list[str]

sync_on_complete

Whether tasks must all complete (sync on TICK)

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
WorkflowSpec

WorkflowSpec instance

choice_tasks(name, task_ids) classmethod

Create a choice (branching) workflow spec.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

task_ids

List of alternative task IDs

TYPE: list[str]

RETURNS DESCRIPTION
WorkflowSpec

WorkflowSpec instance

WorkflowVerificationResult dataclass

Result of workflow verification.

ATTRIBUTE DESCRIPTION
is_valid

Whether the workflow passes all checks

TYPE: bool

is_deadlock_free

Whether the workflow cannot deadlock

TYPE: bool

is_live

Whether all events of interest are live

TYPE: bool

refines_spec

Whether implementation refines specification

TYPE: bool

violations

List of verification violations

TYPE: list[str]

traces_sample

Sample of workflow traces

TYPE: list[tuple[Event, ...]]

counterexample

Counterexample trace if refinement fails

TYPE: tuple[Event, ...] | None

passed(traces_sample=None) classmethod

Create a passing verification result.

failed(violations, is_deadlock_free=True, is_live=True, refines_spec=True, counterexample=None) classmethod

Create a failing verification result.

CSPOrchestrationAdapter

Adapter integrating CSP process algebra with OrchestrationService.

This adapter provides workflow verification using CSP refinement checking. It can verify that workflow implementations conform to their specifications and detect potential issues like deadlocks.

Usage

adapter = CSPOrchestrationAdapter()

Register a workflow specification

spec = WorkflowSpec.sequential_tasks("my_workflow", ["task1", "task2"]) adapter.register_spec(spec)

Verify a workflow implementation

result = adapter.verify_workflow("my_workflow", implementation_process)

Check deadlock freedom

is_safe = adapter.check_deadlock_freedom("my_workflow")

Analyze liveness

liveness = adapter.analyze_liveness("my_workflow")

__init__()

Initialize the adapter.

register_spec(spec)

Register a workflow specification.

PARAMETER DESCRIPTION
spec

The workflow specification

TYPE: WorkflowSpec

get_spec(name)

Get a registered workflow specification.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

RETURNS DESCRIPTION
WorkflowSpec | None

The specification, or None if not registered

register_implementation(name, process)

Register a workflow implementation.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

process

The CSP process implementation

TYPE: Process

verify_spec(spec)

Verify a workflow specification is valid.

Checks: - Deadlock freedom - Liveness of events of interest - Custom invariants

PARAMETER DESCRIPTION
spec

The workflow specification

TYPE: WorkflowSpec

RETURNS DESCRIPTION
WorkflowVerificationResult

Verification result

verify_workflow(name, implementation=None, mode='trace')

Verify a workflow implementation against its specification.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

implementation

Optional implementation process (uses registered if None)

TYPE: Process | None DEFAULT: None

mode

Refinement mode ("trace", "failures", "fd")

TYPE: str DEFAULT: 'trace'

RETURNS DESCRIPTION
WorkflowVerificationResult

Verification result

check_deadlock_freedom(name)

Check if a workflow specification is deadlock-free.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

RETURNS DESCRIPTION
bool

True if deadlock-free

analyze_workflow_liveness(name, events=None)

Analyze liveness of events in a workflow.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

events

Events to check (uses spec's events_of_interest if None)

TYPE: set[Event] | None DEFAULT: None

RETURNS DESCRIPTION
dict[Event, bool]

Mapping of event to liveness status

get_workflow_traces(name, max_length=20, max_count=100)

Get sample traces from a workflow.

PARAMETER DESCRIPTION
name

Workflow name

TYPE: str

max_length

Maximum trace length

TYPE: int DEFAULT: 20

max_count

Maximum number of traces

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
list[tuple[Event, ...]]

List of traces

compose_workflows(name, workflow_names, composition_type='parallel', sync_events=None)

Compose multiple workflows into a new workflow.

PARAMETER DESCRIPTION
name

Name for the composed workflow

TYPE: str

workflow_names

Names of workflows to compose

TYPE: list[str]

composition_type

"parallel", "sequential", or "choice"

TYPE: str DEFAULT: 'parallel'

sync_events

Events to synchronize on (for parallel)

TYPE: set[str] | None DEFAULT: None

RETURNS DESCRIPTION
WorkflowSpec | None

Composed workflow spec, or None if any workflow not found

workflow_to_process(dag)

Convert an orchestration DAG to a CSP process.

This method can be used to convert the OrchestrationService's internal DAG representation to a CSP process for verification.

PARAMETER DESCRIPTION
dag

DAG definition with nodes and edges

TYPE: dict[str, Any]

RETURNS DESCRIPTION
Process

CSP process representation