Enhanced WorkflowAgent API Reference¶
Overview¶
The Enhanced WorkflowAgent extends the base WorkflowAgent with advanced capabilities including visual planning, dynamic workflow modification, checkpoint/resume support, and real-time progress streaming.
Class Reference¶
WorkflowAgent¶
class WorkflowAgent(Agent):
"""
Advanced agent for executing complex workflows with enhanced features.
Provides visual planning, dynamic modification, checkpointing,
and progress streaming capabilities.
"""
Initialization¶
from agenticraft.agents.workflow import WorkflowAgent
agent = WorkflowAgent(
name: str = "WorkflowExecutor",
model: str = "gpt-4",
provider: Optional[str] = None,
enable_checkpoints: bool = False,
checkpoint_dir: Optional[str] = None,
enable_visualization: bool = True,
enable_streaming: bool = False,
progress_callback: Optional[Callable] = None,
max_parallel_steps: int = 5,
step_timeout: Optional[float] = None,
retry_failed_steps: bool = True,
**kwargs
)
Parameters¶
Parameter | Type | Default | Description |
---|---|---|---|
name |
str | "WorkflowExecutor" | Agent name |
model |
str | "gpt-4" | LLM model to use |
provider |
Optional[str] | None | LLM provider |
enable_checkpoints |
bool | False | Enable checkpoint/resume |
checkpoint_dir |
Optional[str] | None | Directory for checkpoints |
enable_visualization |
bool | True | Enable workflow visualization |
enable_streaming |
bool | False | Enable progress streaming |
progress_callback |
Optional[Callable] | None | Progress update callback |
max_parallel_steps |
int | 5 | Maximum parallel executions |
step_timeout |
Optional[float] | None | Default step timeout |
retry_failed_steps |
bool | True | Retry failed steps |
Core Methods¶
run_workflow()¶
async def run_workflow(
self,
task: str,
workflow: Union[Workflow, List[Step], Dict],
context: Optional[Dict[str, Any]] = None,
checkpoint_id: Optional[str] = None,
resume_from_checkpoint: bool = False,
**kwargs
) -> WorkflowResult
Execute a workflow with optional checkpoint/resume.
Parameters:
- task
: Task description
- workflow
: Workflow to execute
- context
: Execution context
- checkpoint_id
: Unique checkpoint identifier
- resume_from_checkpoint
: Resume from existing checkpoint
Returns:
- WorkflowResult
: Execution results with step outputs
Example:
# Execute with checkpointing
result = await agent.run_workflow(
task="Process quarterly data",
workflow=data_pipeline,
checkpoint_id="q4_2024_processing",
resume_from_checkpoint=True
)
# Access results
for step_name, step_result in result.steps.items():
print(f"{step_name}: {step_result.status}")
if step_result.output:
print(f" Output: {step_result.output}")
stream_workflow()¶
async def stream_workflow(
self,
task: str,
workflow: Union[Workflow, List[Step], Dict],
context: Optional[Dict[str, Any]] = None,
**kwargs
) -> AsyncIterator[WorkflowProgress]
Stream workflow execution progress in real-time.
Parameters:
- task
: Task description
- workflow
: Workflow to execute
- context
: Execution context
Yields:
- WorkflowProgress
: Progress updates during execution
Example:
# Stream execution progress
async for progress in agent.stream_workflow(
"Analyze customer feedback",
feedback_workflow
):
print(f"Step: {progress.current_step}")
print(f"Status: {progress.status}")
print(f"Progress: {progress.percentage:.1f}%")
if progress.step_output:
print(f"Output: {progress.step_output}")
plan_workflow()¶
async def plan_workflow(
self,
task: str,
requirements: Optional[Dict[str, Any]] = None,
constraints: Optional[Dict[str, Any]] = None,
output_format: str = "workflow",
visualize: bool = True
) -> Union[Workflow, str]
Use AI to plan a workflow based on task description.
Parameters:
- task
: Task to plan workflow for
- requirements
: Specific requirements
- constraints
: Constraints (time, resources, etc.)
- output_format
: Output format ("workflow", "mermaid", "json")
- visualize
: Generate visualization
Returns:
- Union[Workflow, str]
: Planned workflow or visualization
Example:
# AI-powered workflow planning
planned_workflow = await agent.plan_workflow(
task="Create a competitive analysis report",
requirements={
"sources": ["web", "industry_reports"],
"depth": "comprehensive",
"deliverables": ["report", "presentation"]
},
constraints={
"time_limit": "2 hours",
"budget": "$100"
},
output_format="workflow"
)
# Get visual plan
visual_plan = await agent.plan_workflow(
task="Data migration pipeline",
output_format="mermaid",
visualize=True
)
print(visual_plan) # Mermaid diagram
modify_workflow()¶
Dynamically modify a workflow structure.
Parameters:
- workflow
: Workflow to modify
- modifications
: Modifications to apply
Returns:
- Workflow
: Modified workflow
Modification Types:
modifications = {
"add_steps": [Step(...)],
"remove_steps": ["step_name"],
"modify_steps": {
"step_name": {"description": "New description"}
},
"reorder_steps": ["step1", "step3", "step2"],
"add_dependencies": {
"step_name": ["dependency1", "dependency2"]
}
}
Example:
# Add error handling step
modified = agent.modify_workflow(
original_workflow,
{
"add_steps": [
Step("error_handler", "Handle errors",
condition="any_step_failed")
],
"modify_steps": {
"risky_step": {"retry_count": 3}
}
}
)
visualize_execution()¶
def visualize_execution(
self,
workflow: Workflow,
execution_result: Optional[WorkflowResult] = None,
format: str = "mermaid",
show_timing: bool = True,
show_outputs: bool = False
) -> str
Visualize workflow with execution results.
Parameters:
- workflow
: Workflow to visualize
- execution_result
: Execution results to overlay
- format
: Visualization format
- show_timing
: Show execution times
- show_outputs
: Show step outputs
Returns:
- str
: Visualization with execution data
checkpoint_workflow()¶
async def checkpoint_workflow(
self,
checkpoint_id: str,
workflow: Workflow,
current_state: Dict[str, Any],
completed_steps: List[str],
step_outputs: Dict[str, Any]
) -> bool
Save workflow checkpoint for resume capability.
Parameters:
- checkpoint_id
: Unique checkpoint identifier
- workflow
: Workflow being executed
- current_state
: Current execution state
- completed_steps
: List of completed step names
- step_outputs
: Outputs from completed steps
Returns:
- bool
: Success status
resume_from_checkpoint()¶
async def resume_from_checkpoint(
self,
checkpoint_id: str
) -> Tuple[Workflow, Dict[str, Any], List[str], Dict[str, Any]]
Resume workflow from checkpoint.
Returns:
- Tuple
: (workflow, state, completed_steps, outputs)
WorkflowResult¶
@dataclass
class WorkflowResult:
"""Result of workflow execution."""
workflow_id: str
status: WorkflowStatus
steps: Dict[str, StepResult]
start_time: datetime
end_time: Optional[datetime]
total_duration: Optional[float]
error: Optional[str]
metadata: Dict[str, Any]
WorkflowProgress¶
@dataclass
class WorkflowProgress:
"""Real-time workflow progress update."""
workflow_id: str
current_step: str
status: StepStatus
percentage: float
elapsed_time: float
estimated_remaining: Optional[float]
step_output: Optional[Any]
message: Optional[str]
Advanced Features¶
Parallel Execution¶
# Configure parallel execution
agent = WorkflowAgent(
max_parallel_steps=10,
parallel_strategy="adaptive" # or "fixed", "resource_based"
)
# Define parallel workflow
parallel_workflow = Workflow(
name="parallel_processing",
steps=[
Step("task1", "Process dataset 1", parallel_group="group1"),
Step("task2", "Process dataset 2", parallel_group="group1"),
Step("task3", "Process dataset 3", parallel_group="group1"),
Step("merge", "Merge results", depends_on=["task1", "task2", "task3"])
]
)
result = await agent.run_workflow("Parallel processing", parallel_workflow)
Dynamic Workflow Generation¶
class DynamicWorkflowAgent(WorkflowAgent):
"""Agent that generates workflows dynamically."""
async def generate_adaptive_workflow(
self,
task: str,
initial_analysis: Dict[str, Any]
) -> Workflow:
"""Generate workflow based on initial analysis."""
# Analyze task complexity
complexity = await self.analyze_complexity(task)
# Generate appropriate workflow
if complexity > 0.8:
return WorkflowPatterns.map_reduce(...)
elif complexity > 0.5:
return WorkflowPatterns.parallel_tasks(...)
else:
return WorkflowPatterns.sequential_pipeline(...)
Progress Monitoring¶
# With callback
def progress_handler(progress: WorkflowProgress):
print(f"[{progress.percentage:.0f}%] {progress.current_step}: {progress.status}")
if progress.estimated_remaining:
print(f" ETA: {progress.estimated_remaining:.0f}s")
agent = WorkflowAgent(
progress_callback=progress_handler,
progress_update_interval=1.0 # Update every second
)
# With async iteration
async for progress in agent.stream_workflow(task, workflow):
await update_ui(progress)
if progress.status == StepStatus.FAILED:
await alert_user(progress.message)
Workflow Optimization¶
# Enable workflow optimization
agent = WorkflowAgent(
enable_optimization=True,
optimization_strategy="performance" # or "cost", "balanced"
)
# Optimize existing workflow
optimized = await agent.optimize_workflow(
workflow,
constraints={
"max_duration": 3600,
"max_cost": 100,
"required_quality": 0.9
}
)
# Get optimization suggestions
suggestions = await agent.analyze_workflow(workflow)
print(suggestions)
# {
# "parallel_opportunities": [...],
# "redundant_steps": [...],
# "optimization_potential": 0.35
# }
Error Handling and Recovery¶
# Configure error handling
agent = WorkflowAgent(
retry_failed_steps=True,
retry_strategy="exponential_backoff",
max_retries=3,
error_handlers={
"network_error": lambda e: wait_and_retry(e),
"validation_error": lambda e: fix_and_retry(e),
"critical_error": lambda e: alert_and_stop(e)
}
)
# Execute with error recovery
try:
result = await agent.run_workflow(
"Critical process",
workflow,
on_error="continue_with_defaults" # or "stop", "skip"
)
except WorkflowExecutionError as e:
# Access partial results
partial = e.partial_results
failed_step = e.failed_at
# Attempt recovery
recovery_workflow = agent.create_recovery_workflow(
original_workflow=workflow,
failed_step=failed_step,
partial_results=partial
)
result = await agent.run_workflow(
"Recovery process",
recovery_workflow
)
Integration Examples¶
With Reasoning Patterns¶
# Combine with reasoning for intelligent execution
reasoning_agent = ReasoningAgent(reasoning_pattern="chain_of_thought")
workflow_agent = WorkflowAgent()
# Plan with reasoning
reasoning_result = await reasoning_agent.think_and_act(
"Plan the optimal workflow for data migration"
)
# Parse and execute
workflow = workflow_agent.parse_reasoning_to_workflow(reasoning_result)
result = await workflow_agent.run_workflow("Execute plan", workflow)
With Streaming¶
# Stream workflow execution with detailed updates
agent = WorkflowAgent(enable_streaming=True)
async def process_with_ui_updates():
async for progress in agent.stream_workflow(task, workflow):
# Update progress bar
update_progress_bar(progress.percentage)
# Show current step
update_step_display(progress.current_step, progress.status)
# Log outputs
if progress.step_output:
log_output(progress.step_output)
With Templates¶
# Use templates with enhanced execution
template = WorkflowTemplates.research_workflow(
topic="Market Analysis",
depth="comprehensive"
)
# Execute with enhancements
agent = WorkflowAgent(
enable_checkpoints=True,
enable_visualization=True
)
# Visualize before execution
preview = agent.visualize_execution(template)
display(preview)
# Execute with monitoring
result = await agent.run_workflow(
"Q4 Market Analysis",
template,
checkpoint_id="market_analysis_q4"
)
Performance Optimization¶
Execution Strategies¶
# Configure execution strategy
agent = WorkflowAgent(
execution_strategy="adaptive", # Dynamically adjust parallelism
resource_limits={
"max_memory": "4GB",
"max_cpu": 4,
"max_concurrent_api_calls": 10
},
performance_tracking=True
)
# Get performance metrics
metrics = agent.get_performance_metrics()
print(f"Average step duration: {metrics.avg_step_duration}s")
print(f"Parallelism efficiency: {metrics.parallelism_efficiency:.2%}")
Caching¶
# Enable caching
agent = WorkflowAgent(
enable_caching=True,
cache_strategy="content_based", # Cache based on inputs
cache_ttl=3600 # 1 hour
)
# Manual cache management
agent.cache_step_result("data_fetch", result_data)
cached = agent.get_cached_result("data_fetch")
Best Practices¶
- Use Checkpoints for Long Workflows: Enable for workflows > 5 minutes
- Monitor Progress: Use callbacks or streaming for user feedback
- Plan Before Execution: Use
plan_workflow()
for complex tasks - Visualize Complex Workflows: Always visualize before execution
- Handle Errors Gracefully: Configure appropriate error handlers
- Optimize Parallel Execution: Group independent steps
- Cache Expensive Operations: Enable caching for repeated workflows
See Also¶
- Workflow Patterns - Pre-built workflow patterns
- Workflow Templates - Production-ready templates
- Workflow Visualization - Visualization options
- Examples - Complete examples