Migrating to Enhanced Workflows¶
This guide helps you migrate from basic workflows to AgentiCraft's Enhanced Workflows introduced in v0.2.0.
What's Changed¶
Before (v0.1.x)¶
# Basic workflow as list of tuples
workflow = [
("fetch_data", "Get data from API"),
("process", "Process the data"),
("save", "Save results")
]
agent = WorkflowAgent(name="Processor")
result = agent.run_workflow("Execute pipeline", workflow)
After (v0.2.0)¶
# Rich workflow with full features
from agenticraft.core.workflow import Workflow, Step
workflow = Workflow(
name="data_pipeline",
steps=[
Step("fetch_data", "Get data from API", retry_count=3),
Step("process", "Process the data", depends_on=["fetch_data"]),
Step("save", "Save results", depends_on=["process"])
]
)
agent = WorkflowAgent(
name="Processor",
enable_checkpoints=True,
enable_visualization=True
)
# Visualize before execution
print(visualize_workflow(workflow))
# Execute with progress tracking
async for progress in agent.stream_workflow("Execute pipeline", workflow):
print(f"{progress.percentage:.0f}%: {progress.current_step}")
Migration Steps¶
1. Update Your Imports¶
# Old
from agenticraft import WorkflowAgent
# New
from agenticraft.agents.workflow import WorkflowAgent
from agenticraft.core.workflow import Workflow, Step
from agenticraft.workflows import visualize_workflow
from agenticraft.workflows.patterns import WorkflowPatterns
from agenticraft.workflows.templates import WorkflowTemplates
2. Convert Workflow Definitions¶
Simple Sequential Workflow¶
# Old
workflow = [
("step1", "First step"),
("step2", "Second step"),
("step3", "Third step")
]
# New - Basic conversion
workflow = Workflow(
name="my_workflow",
steps=[
Step("step1", "First step"),
Step("step2", "Second step"),
Step("step3", "Third step")
]
)
# New - With enhancements
workflow = Workflow(
name="my_workflow",
description="My enhanced workflow",
steps=[
Step("step1", "First step", timeout=60),
Step("step2", "Second step", retry_count=2),
Step("step3", "Third step", checkpoint=True)
]
)
Workflow with Dependencies¶
# Old - No native dependency support
workflow = [
("fetch_users", "Get users"),
("fetch_orders", "Get orders"),
("merge", "Merge data"), # Had to handle deps manually
("analyze", "Analyze")
]
# New - Explicit dependencies
workflow = Workflow(
name="data_analysis",
steps=[
Step("fetch_users", "Get users"),
Step("fetch_orders", "Get orders"),
Step("merge", "Merge data", depends_on=["fetch_users", "fetch_orders"]),
Step("analyze", "Analyze", depends_on=["merge"])
]
)
3. Update Agent Creation¶
# Old
agent = WorkflowAgent(name="MyAgent")
# New - With enhanced features
agent = WorkflowAgent(
name="MyAgent",
enable_checkpoints=True, # Save progress
enable_visualization=True, # Visualize workflows
enable_streaming=True, # Stream progress
max_parallel_steps=5, # Parallel execution
retry_failed_steps=True # Auto-retry failures
)
4. Update Execution Code¶
Synchronous to Asynchronous¶
# Old - Synchronous
result = agent.run_workflow("Task", workflow)
print(result)
# New - Asynchronous with more options
result = await agent.run_workflow(
task="Task",
workflow=workflow,
checkpoint_id="task_001", # Enable resume
context={"user_id": 123} # Pass context
)
# Access detailed results
for step_name, step_result in result.steps.items():
print(f"{step_name}: {step_result.status}")
if step_result.output:
print(f" Output: {step_result.output}")
Add Progress Tracking¶
# Old - No progress visibility
result = agent.run_workflow("Long task", workflow)
# New - Real-time progress
async for progress in agent.stream_workflow("Long task", workflow):
print(f"Step: {progress.current_step}")
print(f"Status: {progress.status}")
print(f"Progress: {progress.percentage:.1f}%")
# Update UI
update_progress_bar(progress.percentage)
5. Leverage New Features¶
Workflow Visualization¶
# Visualize before execution
visualization = visualize_workflow(workflow, format="mermaid")
print(visualization)
# Or as ASCII for terminals
ascii_viz = visualize_workflow(workflow, format="ascii")
print(ascii_viz)
# Interactive HTML
html_viz = visualize_workflow(workflow, format="html", interactive=True)
save_to_file("workflow.html", html_viz)
Use Workflow Patterns¶
# Old - Manual parallel setup
# Complex manual implementation...
# New - Use patterns
parallel_workflow = WorkflowPatterns.parallel_tasks(
name="parallel_processing",
tasks=[
Step("task1", "Process dataset 1"),
Step("task2", "Process dataset 2"),
Step("task3", "Process dataset 3")
],
max_concurrent=3
)
Use Templates¶
# Old - Build from scratch
workflow = [
("research", "Research topic"),
("outline", "Create outline"),
("draft", "Write draft"),
# ... many more steps
]
# New - Use templates
content_workflow = WorkflowTemplates.content_pipeline(
content_type="blog_post",
target_audience="developers",
tone="technical",
seo_optimized=True
)
Code Examples¶
Example 1: Data Processing Pipeline¶
Before¶
def process_data():
agent = WorkflowAgent(name="DataProcessor")
workflow = [
("extract", "Extract from database"),
("validate", "Validate data"),
("transform", "Transform format"),
("load", "Load to warehouse")
]
result = agent.run_workflow("Process daily data", workflow)
return result
After¶
async def process_data():
# Use template
workflow = WorkflowTemplates.data_processing(
input_format="database",
output_format="warehouse",
transformations=["validate", "clean", "transform"],
validation_rules={
"required_fields": ["id", "timestamp", "value"],
"value_range": (0, 1000000)
}
)
# Enhanced agent
agent = WorkflowAgent(
name="DataProcessor",
enable_checkpoints=True,
checkpoint_dir="./etl_checkpoints"
)
# Execute with monitoring
result = await agent.run_workflow(
"Process daily data",
workflow,
checkpoint_id=f"daily_{datetime.now().date()}"
)
# Check results
if result.status == "completed":
print(f"Processed in {result.total_duration:.2f}s")
else:
print(f"Failed at: {result.error}")
return result
Example 2: Multi-Step Analysis¶
Before¶
class Analyzer:
def __init__(self):
self.agent = WorkflowAgent(name="Analyzer")
def analyze(self, data):
workflow = [
("preprocess", "Prepare data"),
("analyze", "Run analysis"),
("report", "Generate report")
]
return self.agent.run_workflow(f"Analyze {data}", workflow)
After¶
class Analyzer:
def __init__(self):
self.agent = WorkflowAgent(
name="Analyzer",
enable_visualization=True,
enable_streaming=True
)
async def analyze(self, data):
# Create workflow with patterns
workflow = WorkflowPatterns.sequential_pipeline(
name="analysis_pipeline",
stages=[
# Parallel preprocessing
WorkflowPatterns.parallel_tasks(
name="preprocess",
tasks=[
Step("clean", "Clean data"),
Step("normalize", "Normalize values"),
Step("validate", "Validate integrity")
]
),
# Analysis with retries
WorkflowPatterns.retry_loop(
name="analyze",
task=Step("ml_analysis", "Run ML analysis"),
max_retries=3
),
# Conditional reporting
WorkflowPatterns.conditional_branch(
name="report",
condition="confidence > 0.8",
if_branch=[Step("auto_report", "Generate report")],
else_branch=[Step("manual_review", "Flag for review")]
)
],
checkpoints=True
)
# Visualize the plan
print(visualize_workflow(workflow))
# Execute with progress
async for progress in self.agent.stream_workflow(
f"Analyze {data}",
workflow
):
yield progress # Stream to UI
Example 3: Conditional Workflow¶
Before¶
# Manual condition handling
def approval_workflow(request):
agent = WorkflowAgent(name="Approver")
# Had to handle conditions in code
if request.amount < 1000:
workflow = [("auto_approve", "Automatic approval")]
else:
workflow = [
("review", "Manual review"),
("approve", "Approval decision")
]
return agent.run_workflow("Approval", workflow)
After¶
async def approval_workflow(request):
# Use conditional pattern
workflow = WorkflowPatterns.conditional_branch(
name="approval_flow",
condition_step=Step("evaluate", "Evaluate request"),
condition=f"amount < 1000",
if_branch=[
Step("auto_approve", "Automatic approval"),
Step("notify", "Send notification")
],
else_branch=[
Step("assign_reviewer", "Assign to reviewer"),
Step("review", "Manual review"),
Step("decision", "Make decision"),
Step("notify", "Send notification")
]
)
agent = WorkflowAgent(name="Approver")
# Execute with context
result = await agent.run_workflow(
"Approval request",
workflow,
context={"amount": request.amount}
)
return result
Async Considerations¶
Converting Sync to Async¶
# Old synchronous code
def run_workflow(task):
agent = WorkflowAgent()
result = agent.run_workflow(task, workflow)
return result
# New async code
async def run_workflow(task):
agent = WorkflowAgent()
result = await agent.run_workflow(task, workflow)
return result
# With streaming
async def run_workflow_with_updates(task):
agent = WorkflowAgent(enable_streaming=True)
results = []
async for progress in agent.stream_workflow(task, workflow):
print(f"Progress: {progress.percentage}%")
results.append(progress)
return results
Integration with FastAPI¶
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket("/ws/workflow/{workflow_id}")
async def workflow_progress(websocket: WebSocket, workflow_id: str):
await websocket.accept()
agent = WorkflowAgent(enable_streaming=True)
workflow = get_workflow(workflow_id)
try:
async for progress in agent.stream_workflow("Execute", workflow):
await websocket.send_json({
"step": progress.current_step,
"status": progress.status,
"percentage": progress.percentage
})
except Exception as e:
await websocket.send_json({"error": str(e)})
Performance Improvements¶
Old Performance Characteristics¶
- No parallel execution
- No caching
- No progress visibility
- No checkpoint/resume
New Performance Features¶
# Optimize for performance
agent = WorkflowAgent(
max_parallel_steps=10, # Parallel execution
enable_caching=True, # Cache step results
cache_ttl=3600, # 1 hour cache
batch_size=1000, # Batch processing
resource_limits={
"max_memory": "4GB",
"max_concurrent_api_calls": 20
}
)
# Monitor performance
result = await agent.run_workflow(task, workflow, collect_metrics=True)
print(f"Execution time: {result.metrics.total_duration}s")
print(f"Parallel efficiency: {result.metrics.parallelism_efficiency:.0%}")
Rollback Plan¶
If you need to temporarily use old-style workflows:
class LegacyWorkflowAdapter:
"""Adapter for old-style workflows."""
@staticmethod
def convert_legacy_workflow(legacy_workflow: List[Tuple[str, str]]) -> Workflow:
"""Convert old format to new."""
steps = [
Step(name=name, description=desc)
for name, desc in legacy_workflow
]
return Workflow(
name="legacy_workflow",
steps=steps
)
@staticmethod
async def run_legacy_workflow(agent, task, legacy_workflow):
"""Run old-style workflow with new agent."""
workflow = LegacyWorkflowAdapter.convert_legacy_workflow(legacy_workflow)
return await agent.run_workflow(task, workflow)
# Use adapter
legacy_workflow = [("step1", "Do something"), ("step2", "Do more")]
result = await LegacyWorkflowAdapter.run_legacy_workflow(
agent, "Task", legacy_workflow
)
Common Issues and Solutions¶
Issue: Workflow execution is now async¶
Solution: Update calling code to use async/await
or use asyncio.run()
Issue: Old workflows don't have dependencies¶
Solution: Add dependencies where needed or use sequential execution
Issue: No progress visibility¶
Solution: Enable streaming and add progress handlers
Issue: Complex manual workflows¶
Solution: Replace with patterns and templates
Benefits of Migration¶
- Visualization: See workflows before execution
- Reliability: Checkpoints and retries
- Performance: Parallel execution
- Monitoring: Real-time progress
- Reusability: Patterns and templates
- Maintainability: Clear structure
Next Steps¶
- Start with simple workflow conversion
- Add visualization to existing workflows
- Enable checkpoints for long-running tasks
- Explore patterns for complex scenarios
- Use templates for common workflows
Getting Help¶
Enhanced Workflows provide significantly better control, visibility, and reliability. The migration is straightforward, and the benefits include visualization, checkpointing, progress tracking, and production-ready patterns.