Workflow Patterns API Reference¶
Overview¶
Workflow Patterns provide pre-built, reusable workflow structures for common scenarios. These patterns implement best practices and can be customized for specific use cases.
Class Reference¶
WorkflowPatterns¶
class WorkflowPatterns:
"""
Collection of common workflow patterns.
Static methods that generate workflow structures for
parallel execution, conditional logic, loops, and more.
"""
Pattern Methods¶
parallel_tasks()¶
@staticmethod
def parallel_tasks(
name: str,
tasks: List[Union[Step, Dict[str, Any]]],
max_concurrent: Optional[int] = None,
timeout: Optional[float] = None,
error_handling: str = "fail_fast"
) -> Workflow
Create a workflow that executes multiple tasks in parallel.
Parameters:
- name
: Workflow name
- tasks
: List of tasks to execute in parallel
- max_concurrent
: Maximum concurrent executions (None = unlimited)
- timeout
: Overall timeout in seconds
- error_handling
: How to handle errors ("fail_fast", "continue", "collect")
Returns:
- Workflow
: Configured parallel workflow
Example:
from agenticraft.workflows.patterns import WorkflowPatterns
# Create parallel data fetching workflow
parallel_fetch = WorkflowPatterns.parallel_tasks(
name="fetch_all_data",
tasks=[
{"name": "fetch_users", "description": "Get user data from API"},
{"name": "fetch_orders", "description": "Get order data from API"},
{"name": "fetch_products", "description": "Get product catalog"}
],
max_concurrent=3,
error_handling="continue"
)
# With Step objects
parallel_process = WorkflowPatterns.parallel_tasks(
name="process_files",
tasks=[
Step("process_csv", "Process CSV files", tool=csv_processor),
Step("process_json", "Process JSON files", tool=json_processor),
Step("process_xml", "Process XML files", tool=xml_processor)
],
timeout=300
)
conditional_branch()¶
@staticmethod
def conditional_branch(
name: str,
condition: Union[str, Callable],
if_branch: List[Step],
else_branch: Optional[List[Step]] = None,
condition_step: Optional[Step] = None
) -> Workflow
Create a workflow with conditional branching logic.
Parameters:
- name
: Workflow name
- condition
: Condition expression or callable
- if_branch
: Steps to execute if condition is true
- else_branch
: Steps to execute if condition is false
- condition_step
: Optional step to evaluate condition
Returns:
- Workflow
: Configured conditional workflow
Example:
# Simple conditional
conditional_flow = WorkflowPatterns.conditional_branch(
name="quality_check",
condition="score > 0.8",
if_branch=[
Step("approve", "Approve the submission"),
Step("publish", "Publish to production")
],
else_branch=[
Step("review", "Send for manual review"),
Step("notify", "Notify reviewers")
]
)
# With condition evaluation step
validation_flow = WorkflowPatterns.conditional_branch(
name="validate_data",
condition_step=Step("validate", "Validate data quality"),
condition=lambda result: result.get("is_valid", False),
if_branch=[Step("process", "Process valid data")],
else_branch=[Step("cleanup", "Clean invalid data")]
)
retry_loop()¶
@staticmethod
def retry_loop(
name: str,
task: Union[Step, List[Step]],
max_retries: int = 3,
backoff_factor: float = 2.0,
retry_conditions: Optional[List[str]] = None,
fallback: Optional[Step] = None
) -> Workflow
Create a workflow with retry logic and exponential backoff.
Parameters:
- name
: Workflow name
- task
: Task(s) to retry
- max_retries
: Maximum retry attempts
- backoff_factor
: Exponential backoff multiplier
- retry_conditions
: Conditions that trigger retry
- fallback
: Fallback step if all retries fail
Returns:
- Workflow
: Configured retry workflow
Example:
# Simple retry
retry_api = WorkflowPatterns.retry_loop(
name="api_call_with_retry",
task=Step("call_api", "Call external API", tool=api_tool),
max_retries=5,
backoff_factor=2.0,
retry_conditions=["timeout", "rate_limit", "500_error"]
)
# With fallback
resilient_fetch = WorkflowPatterns.retry_loop(
name="fetch_with_fallback",
task=[
Step("fetch_primary", "Fetch from primary source"),
Step("parse", "Parse response")
],
max_retries=3,
fallback=Step("fetch_cache", "Get from cache")
)
map_reduce()¶
@staticmethod
def map_reduce(
name: str,
map_tasks: List[Step],
reduce_task: Step,
batch_size: Optional[int] = None,
map_timeout: Optional[float] = None
) -> Workflow
Create a map-reduce pattern for data processing.
Parameters:
- name
: Workflow name
- map_tasks
: Tasks to execute in parallel (map phase)
- reduce_task
: Task to aggregate results (reduce phase)
- batch_size
: Process maps in batches
- map_timeout
: Timeout for each map task
Returns:
- Workflow
: Configured map-reduce workflow
Example:
# Data aggregation workflow
analytics = WorkflowPatterns.map_reduce(
name="sales_analytics",
map_tasks=[
Step("analyze_north", "Analyze North region", tool=analyzer),
Step("analyze_south", "Analyze South region", tool=analyzer),
Step("analyze_east", "Analyze East region", tool=analyzer),
Step("analyze_west", "Analyze West region", tool=analyzer)
],
reduce_task=Step("aggregate", "Combine regional data", tool=aggregator),
batch_size=2,
map_timeout=300
)
# Document processing
doc_processor = WorkflowPatterns.map_reduce(
name="process_documents",
map_tasks=[Step(f"process_doc_{i}", f"Process document {i}")
for i in range(10)],
reduce_task=Step("merge_results", "Merge all processed documents")
)
sequential_pipeline()¶
@staticmethod
def sequential_pipeline(
name: str,
stages: List[Union[Step, List[Step]]],
error_handling: str = "stop_on_error",
checkpoints: bool = False
) -> Workflow
Create a sequential pipeline with optional stage grouping.
Parameters:
- name
: Workflow name
- stages
: List of stages (single step or step groups)
- error_handling
: Error strategy ("stop_on_error", "skip_failed", "compensate")
- checkpoints
: Enable checkpointing between stages
Returns:
- Workflow
: Configured pipeline workflow
Example:
# ETL pipeline
etl = WorkflowPatterns.sequential_pipeline(
name="customer_etl",
stages=[
# Extract stage
[
Step("extract_db", "Extract from database"),
Step("extract_api", "Extract from API")
],
# Transform stage
Step("transform", "Transform and clean data"),
# Load stage
[
Step("load_warehouse", "Load to data warehouse"),
Step("load_cache", "Update cache")
]
],
checkpoints=True
)
# Simple pipeline
process_pipeline = WorkflowPatterns.sequential_pipeline(
name="document_pipeline",
stages=[
Step("parse", "Parse document"),
Step("analyze", "Analyze content"),
Step("summarize", "Generate summary"),
Step("store", "Store results")
],
error_handling="compensate"
)
fan_out_fan_in()¶
@staticmethod
def fan_out_fan_in(
name: str,
splitter: Step,
processors: List[Step],
combiner: Step,
dynamic_processors: bool = False
) -> Workflow
Create a fan-out/fan-in pattern for dynamic parallelism.
Parameters:
- name
: Workflow name
- splitter
: Step that splits work into chunks
- processors
: Steps that process chunks in parallel
- combiner
: Step that combines results
- dynamic_processors
: Allow dynamic number of processors
Returns:
- Workflow
: Configured fan-out/fan-in workflow
Example:
# Dynamic parallel processing
batch_processor = WorkflowPatterns.fan_out_fan_in(
name="batch_processing",
splitter=Step("split", "Split into batches", tool=splitter_tool),
processors=[
Step("process_batch", "Process a batch", tool=processor_tool)
],
combiner=Step("combine", "Combine results", tool=combiner_tool),
dynamic_processors=True
)
iterative_refinement()¶
@staticmethod
def iterative_refinement(
name: str,
initial_step: Step,
refinement_step: Step,
evaluation_step: Step,
max_iterations: int = 5,
target_condition: Optional[str] = None
) -> Workflow
Create an iterative refinement pattern.
Parameters:
- name
: Workflow name
- initial_step
: Initial processing step
- refinement_step
: Step that refines the result
- evaluation_step
: Step that evaluates quality
- max_iterations
: Maximum refinement iterations
- target_condition
: Condition to meet for completion
Returns:
- Workflow
: Configured iterative workflow
Example:
# Content refinement
content_refiner = WorkflowPatterns.iterative_refinement(
name="refine_content",
initial_step=Step("draft", "Create initial draft"),
refinement_step=Step("improve", "Improve content"),
evaluation_step=Step("evaluate", "Evaluate quality"),
max_iterations=3,
target_condition="quality_score > 0.9"
)
Pattern Combinations¶
Nested Patterns¶
# Combine multiple patterns
complex_workflow = WorkflowPatterns.sequential_pipeline(
name="complex_data_pipeline",
stages=[
# Parallel data fetching
WorkflowPatterns.parallel_tasks(
name="fetch_stage",
tasks=[
Step("fetch_a", "Fetch dataset A"),
Step("fetch_b", "Fetch dataset B")
]
),
# Conditional processing
WorkflowPatterns.conditional_branch(
name="process_stage",
condition="len(data) > 1000",
if_branch=[
WorkflowPatterns.map_reduce(
name="large_data_process",
map_tasks=[...],
reduce_task=Step("aggregate", "Aggregate results")
)
],
else_branch=[
Step("simple_process", "Process small dataset")
]
),
# Retry for reliability
WorkflowPatterns.retry_loop(
name="save_stage",
task=Step("save", "Save results"),
max_retries=3
)
]
)
Pattern Factory¶
class WorkflowFactory:
"""Factory for creating customized workflow patterns."""
@staticmethod
def create_data_pipeline(
source_type: str,
processing_type: str,
destination_type: str,
**options
) -> Workflow:
"""Create a data pipeline based on types."""
# Select appropriate patterns
if source_type == "multiple":
extract = WorkflowPatterns.parallel_tasks(...)
else:
extract = Step("extract", f"Extract from {source_type}")
if processing_type == "batch":
process = WorkflowPatterns.map_reduce(...)
elif processing_type == "stream":
process = WorkflowPatterns.sequential_pipeline(...)
else:
process = Step("process", "Process data")
# Combine into pipeline
return WorkflowPatterns.sequential_pipeline(
name="data_pipeline",
stages=[extract, process, load],
**options
)
Configuration Options¶
Error Handling Strategies¶
# Fail fast - stop on first error
fail_fast = WorkflowPatterns.parallel_tasks(
name="critical_tasks",
tasks=[...],
error_handling="fail_fast"
)
# Continue on error - complete all possible tasks
continue_on_error = WorkflowPatterns.parallel_tasks(
name="best_effort_tasks",
tasks=[...],
error_handling="continue"
)
# Collect errors - gather all errors for analysis
collect_errors = WorkflowPatterns.parallel_tasks(
name="validation_tasks",
tasks=[...],
error_handling="collect"
)
Timeout Configuration¶
# Global timeout
timed_workflow = WorkflowPatterns.sequential_pipeline(
name="timed_pipeline",
stages=[...],
timeout=3600 # 1 hour total
)
# Per-step timeout
steps_with_timeout = [
Step("quick_task", "Fast operation", timeout=10),
Step("slow_task", "Slow operation", timeout=300),
Step("critical_task", "Important task", timeout=None) # No timeout
]
Checkpoint Options¶
# Enable checkpoints
checkpointed = WorkflowPatterns.sequential_pipeline(
name="long_running",
stages=[...],
checkpoints=True,
checkpoint_options={
"storage": "disk",
"compression": "gzip",
"retention": "7d"
}
)
Performance Considerations¶
Pattern Performance Characteristics¶
Pattern | Overhead | Best For | Avoid When |
---|---|---|---|
Parallel Tasks | Low | I/O bound tasks | Sequential dependencies |
Conditional Branch | Minimal | Decision trees | Complex conditions |
Retry Loop | Variable | Unreliable operations | Non-idempotent tasks |
Map-Reduce | Medium | Data processing | Small datasets |
Sequential Pipeline | Low | Step-by-step processes | Parallel opportunities |
Optimization Tips¶
# Optimize parallel execution
optimized_parallel = WorkflowPatterns.parallel_tasks(
name="optimized",
tasks=tasks,
max_concurrent=os.cpu_count(), # Match CPU cores
error_handling="continue" # Don't block on single failure
)
# Optimize map-reduce
optimized_mapreduce = WorkflowPatterns.map_reduce(
name="optimized_mr",
map_tasks=map_tasks,
reduce_task=reduce_task,
batch_size=100 # Process in batches to reduce overhead
)
Error Handling¶
Pattern-Specific Error Handling¶
try:
workflow = WorkflowPatterns.parallel_tasks(
name="tasks",
tasks=invalid_tasks
)
except PatternValidationError as e:
if e.pattern == "parallel_tasks":
print(f"Invalid parallel configuration: {e.message}")
# Handle specific pattern errors
# Runtime error handling
try:
result = await agent.run_workflow("Execute", workflow)
except WorkflowExecutionError as e:
if e.pattern == "retry_loop":
print(f"All retries exhausted: {e.last_error}")
elif e.pattern == "conditional_branch":
print(f"Condition evaluation failed: {e.condition}")
See Also¶
- Workflow Visualization - Visualizing patterns
- Workflow Templates - Complete workflow templates
- WorkflowAgent - Pattern execution
- Examples - Pattern examples