Workflows Quick Reference¶
Basic Workflow Creation¶
from agenticraft.agents.workflow import WorkflowAgent
from agenticraft.core.workflow import Workflow, Step
# Simple workflow
workflow = Workflow(
name="data_pipeline",
steps=[
Step("fetch", "Fetch data"),
Step("process", "Process data"),
Step("save", "Save results")
]
)
# With dependencies
workflow = Workflow(
name="complex_pipeline",
steps=[
Step("fetch_a", "Fetch dataset A"),
Step("fetch_b", "Fetch dataset B"),
Step("merge", "Merge datasets", depends_on=["fetch_a", "fetch_b"]),
Step("analyze", "Analyze merged data", depends_on=["merge"])
]
)
Visualization¶
from agenticraft.workflows import visualize_workflow
# Quick visualization
mermaid = visualize_workflow(workflow) # Default: mermaid
ascii = visualize_workflow(workflow, format="ascii")
json = visualize_workflow(workflow, format="json")
html = visualize_workflow(workflow, format="html")
# With progress
viz = visualize_workflow(
workflow,
show_progress=True,
progress_data=execution_result.progress
)
Workflow Patterns¶
Parallel Tasks¶
from agenticraft.workflows.patterns import WorkflowPatterns
parallel = WorkflowPatterns.parallel_tasks(
name="parallel_work",
tasks=[task1, task2, task3],
max_concurrent=3
)
Conditional Branch¶
conditional = WorkflowPatterns.conditional_branch(
name="decision",
condition="score > 0.8",
if_branch=[approve_step],
else_branch=[review_step]
)
Retry Loop¶
retry = WorkflowPatterns.retry_loop(
name="resilient_task",
task=risky_step,
max_retries=3,
backoff_factor=2.0
)
Map-Reduce¶
mapreduce = WorkflowPatterns.map_reduce(
name="data_aggregation",
map_tasks=[process1, process2, process3],
reduce_task=aggregate_step
)
Workflow Templates¶
Research¶
from agenticraft.workflows.templates import WorkflowTemplates
research = WorkflowTemplates.research_workflow(
topic="AI Safety",
sources=["academic", "news"],
depth="comprehensive"
)
Content Pipeline¶
content = WorkflowTemplates.content_pipeline(
content_type="blog_post",
target_audience="developers",
seo_optimized=True
)
Data Processing¶
etl = WorkflowTemplates.data_processing(
input_format="csv",
output_format="parquet",
transformations=["clean", "validate", "aggregate"]
)
Enhanced WorkflowAgent¶
Basic Usage¶
agent = WorkflowAgent(
name="Processor",
enable_checkpoints=True,
enable_visualization=True
)
# Execute workflow
result = await agent.run_workflow(
task="Process Q4 data",
workflow=workflow
)
With Checkpoints¶
# Enable checkpointing
agent = WorkflowAgent(enable_checkpoints=True)
# Execute with checkpoint
result = await agent.run_workflow(
"Long task",
workflow,
checkpoint_id="task_001",
resume_from_checkpoint=True
)
Streaming Progress¶
# Stream execution progress
async for progress in agent.stream_workflow("Task", workflow):
print(f"{progress.current_step}: {progress.percentage:.0f}%")
if progress.status == "failed":
print(f"Error: {progress.message}")
Visual Planning¶
# AI-powered planning
planned = await agent.plan_workflow(
task="Create marketing campaign",
requirements={"channels": ["email", "social"]},
output_format="workflow" # or "mermaid"
)
Common Configurations¶
High Reliability¶
reliable_agent = WorkflowAgent(
retry_failed_steps=True,
max_retries=3,
retry_strategy="exponential_backoff",
enable_checkpoints=True,
checkpoint_interval=300 # Every 5 minutes
)
High Performance¶
fast_agent = WorkflowAgent(
max_parallel_steps=20,
enable_caching=True,
cache_ttl=3600,
batch_size=1000
)
Development Mode¶
dev_agent = WorkflowAgent(
enable_visualization=True,
enable_streaming=True,
verbose_logging=True,
step_timeout=60 # Quick timeout for testing
)
Step Configuration¶
# Full step configuration
step = Step(
name="process_data",
description="Process the dataset",
tool=processor_tool, # Optional tool
depends_on=["fetch_data"], # Dependencies
retry_count=3, # Retries on failure
timeout=300, # 5 minute timeout
condition="len(data) > 0", # Conditional execution
parallel=True, # Allow parallel
checkpoint=True, # Checkpoint after
on_error="continue", # Error handling
metadata={"priority": "high"} # Custom metadata
)
Error Handling¶
# Workflow-level error handling
try:
result = await agent.run_workflow(task, workflow)
except WorkflowExecutionError as e:
print(f"Failed at: {e.failed_step}")
print(f"Completed: {list(e.partial_results.keys())}")
# Get partial results
for step, output in e.partial_results.items():
if output.success:
print(f"✓ {step}: {output.data}")
# Step-level error handling
workflow = Workflow(
name="resilient",
steps=[
Step("risky", "Risky operation",
on_error="retry",
fallback="safe_operation"),
Step("safe_operation", "Fallback",
skip_by_default=True)
]
)
Workflow Modification¶
# Dynamic modification
modified = agent.modify_workflow(
workflow,
modifications={
"add_steps": [
Step("validate", "Validate results")
],
"remove_steps": ["old_step"],
"modify_steps": {
"process": {"timeout": 600}
},
"reorder_steps": ["fetch", "validate", "process"]
}
)
Performance Tips¶
- Use Parallel Patterns for independent tasks
- Enable Caching for repeated workflows
- Set Resource Limits to prevent overload
- Use Checkpoints for long workflows
- Batch Operations when possible
# Optimized configuration
agent = WorkflowAgent(
max_parallel_steps=os.cpu_count(),
enable_caching=True,
resource_limits={
"max_memory": "4GB",
"max_concurrent_api_calls": 10
},
batch_size=100
)
Visualization Options¶
Mermaid¶
ASCII¶
HTML¶
Quick Patterns¶
ETL Pipeline¶
etl = WorkflowPatterns.sequential_pipeline(
name="etl",
stages=[
[Step("extract_db", "From DB"), Step("extract_api", "From API")],
Step("transform", "Transform data"),
Step("load", "Load to warehouse")
]
)
Approval Flow¶
approval = WorkflowPatterns.conditional_branch(
name="approval",
condition_step=Step("evaluate", "Evaluate request"),
condition="risk_level == 'low'",
if_branch=[Step("auto_approve", "Approve")],
else_branch=[Step("manual_review", "Review")]
)
Batch Processing¶
batch = WorkflowPatterns.map_reduce(
name="batch_process",
map_tasks=[Step(f"process_{i}", f"Process batch {i}")
for i in range(10)],
reduce_task=Step("combine", "Combine results")
)
Need more details? See the full documentation.