Workflow Templates API Reference¶
Overview¶
Workflow Templates provide production-ready, customizable workflows for common business scenarios. Each template implements best practices and can be configured for specific needs.
Class Reference¶
WorkflowTemplates¶
class WorkflowTemplates:
"""
Collection of production-ready workflow templates.
Static methods that generate complete workflows for
research, content creation, data processing, and more.
"""
Template Methods¶
research_workflow()¶
@staticmethod
def research_workflow(
topic: str,
sources: List[str] = ["web", "academic", "news"],
depth: str = "comprehensive",
output_format: str = "report",
tools: Optional[Dict[str, BaseTool]] = None,
max_sources: int = 10,
quality_threshold: float = 0.7
) -> Workflow
Create a comprehensive research workflow.
Parameters:
- topic
: Research topic
- sources
: Information sources to use
- depth
: Research depth ("quick", "standard", "comprehensive")
- output_format
: Output format ("summary", "report", "presentation")
- tools
: Custom tools for research
- max_sources
: Maximum number of sources to analyze
- quality_threshold
: Minimum quality score for sources
Returns:
- Workflow
: Configured research workflow
Example:
from agenticraft.workflows.templates import WorkflowTemplates
# Comprehensive research workflow
research = WorkflowTemplates.research_workflow(
topic="AI Safety and Alignment",
sources=["academic", "news", "blogs", "forums"],
depth="comprehensive",
output_format="report",
max_sources=20,
quality_threshold=0.8
)
# Quick research
quick_research = WorkflowTemplates.research_workflow(
topic="Latest AI developments",
sources=["news", "blogs"],
depth="quick",
output_format="summary"
)
Generated Workflow Structure:
1. Define Research Scope
2. Parallel Source Gathering
- Academic papers
- News articles
- Blog posts
- Forum discussions
3. Quality Filtering
4. Information Extraction
5. Fact Verification
6. Synthesis and Analysis
7. Report Generation
8. Citation Compilation
content_pipeline()¶
@staticmethod
def content_pipeline(
content_type: str,
target_audience: str = "general",
tone: str = "professional",
length: str = "medium",
seo_optimized: bool = False,
stages: Optional[List[str]] = None,
review_loops: int = 1
) -> Workflow
Create a content creation and publishing workflow.
Parameters:
- content_type
: Type of content ("blog_post", "article", "social_media", "video_script")
- target_audience
: Target audience description
- tone
: Writing tone ("professional", "casual", "academic", "creative")
- length
: Content length ("short", "medium", "long")
- seo_optimized
: Include SEO optimization
- stages
: Custom pipeline stages
- review_loops
: Number of review iterations
Returns:
- Workflow
: Configured content pipeline
Example:
# Blog post pipeline
blog_pipeline = WorkflowTemplates.content_pipeline(
content_type="blog_post",
target_audience="developers",
tone="technical",
length="long",
seo_optimized=True,
review_loops=2
)
# Social media content
social_pipeline = WorkflowTemplates.content_pipeline(
content_type="social_media",
target_audience="general",
tone="casual",
length="short",
stages=["ideation", "creation", "optimization", "scheduling"]
)
Default Stages:
1. Topic Research
2. Outline Creation
3. Content Drafting
4. Review and Edit
5. SEO Optimization (if enabled)
6. Final Review
7. Publishing Preparation
8. Distribution
data_processing()¶
@staticmethod
def data_processing(
input_format: str,
output_format: str,
transformations: List[str],
validation_rules: Optional[Dict[str, Any]] = None,
error_handling: str = "log_and_continue",
batch_size: Optional[int] = None,
parallel_processing: bool = True
) -> Workflow
Create a data processing pipeline.
Parameters:
- input_format
: Input data format ("csv", "json", "xml", "database")
- output_format
: Output data format
- transformations
: List of transformations to apply
- validation_rules
: Data validation rules
- error_handling
: Error handling strategy
- batch_size
: Process data in batches
- parallel_processing
: Enable parallel processing
Returns:
- Workflow
: Configured data processing workflow
Example:
# ETL pipeline
etl = WorkflowTemplates.data_processing(
input_format="csv",
output_format="parquet",
transformations=[
"remove_duplicates",
"clean_missing",
"normalize_dates",
"calculate_metrics",
"aggregate_by_region"
],
validation_rules={
"required_columns": ["id", "date", "amount"],
"date_format": "YYYY-MM-DD",
"amount_range": (0, 1000000)
},
batch_size=10000,
parallel_processing=True
)
# Real-time processing
streaming = WorkflowTemplates.data_processing(
input_format="json_stream",
output_format="database",
transformations=["validate", "enrich", "store"],
error_handling="dead_letter_queue"
)
Pipeline Structure:
1. Data Ingestion
2. Format Validation
3. Parallel Transformation
- Clean data
- Apply business rules
- Calculate derived fields
4. Quality Checks
5. Output Generation
6. Delivery/Storage
multi_agent_collaboration()¶
@staticmethod
def multi_agent_collaboration(
task: str,
agents: List[Dict[str, Any]],
coordination_style: str = "orchestrated",
communication_pattern: str = "hub_spoke",
decision_making: str = "consensus",
timeout: Optional[float] = None
) -> Workflow
Create a multi-agent collaboration workflow.
Parameters:
- task
: Collaborative task description
- agents
: List of agent configurations
- coordination_style
: How agents coordinate ("orchestrated", "choreographed", "hybrid")
- communication_pattern
: Communication pattern ("hub_spoke", "mesh", "chain")
- decision_making
: Decision strategy ("consensus", "voting", "hierarchical")
- timeout
: Overall timeout for collaboration
Returns:
- Workflow
: Configured multi-agent workflow
Example:
# Research team collaboration
research_team = WorkflowTemplates.multi_agent_collaboration(
task="Comprehensive market analysis",
agents=[
{"name": "data_analyst", "role": "Analyze quantitative data"},
{"name": "market_researcher", "role": "Research competitors"},
{"name": "strategist", "role": "Develop recommendations"},
{"name": "coordinator", "role": "Synthesize findings"}
],
coordination_style="orchestrated",
communication_pattern="hub_spoke",
decision_making="consensus"
)
# Creative team
creative_team = WorkflowTemplates.multi_agent_collaboration(
task="Design new product campaign",
agents=[
{"name": "copywriter", "role": "Create messaging"},
{"name": "designer", "role": "Design visuals"},
{"name": "strategist", "role": "Define strategy"},
{"name": "reviewer", "role": "Quality control"}
],
communication_pattern="mesh",
decision_making="voting"
)
customer_service()¶
@staticmethod
def customer_service(
channels: List[str] = ["email", "chat"],
escalation_levels: int = 3,
knowledge_base: Optional[str] = None,
sentiment_analysis: bool = True,
auto_responses: bool = True,
sla_config: Optional[Dict[str, int]] = None
) -> Workflow
Create a customer service workflow.
Parameters:
- channels
: Support channels to handle
- escalation_levels
: Number of escalation levels
- knowledge_base
: Knowledge base identifier
- sentiment_analysis
: Enable sentiment analysis
- auto_responses
: Enable automatic responses
- sla_config
: Service level agreement configuration
Returns:
- Workflow
: Configured customer service workflow
Example:
# Omnichannel support
support = WorkflowTemplates.customer_service(
channels=["email", "chat", "phone", "social"],
escalation_levels=3,
knowledge_base="support_kb_v2",
sentiment_analysis=True,
sla_config={
"response_time": 3600, # 1 hour
"resolution_time": 86400 # 24 hours
}
)
code_review()¶
@staticmethod
def code_review(
review_types: List[str] = ["style", "security", "performance"],
languages: List[str] = ["python", "javascript"],
tools: Optional[Dict[str, Any]] = None,
auto_fix: bool = False,
threshold: float = 0.8
) -> Workflow
Create a code review workflow.
Parameters:
- review_types
: Types of review to perform
- languages
: Programming languages to support
- tools
: Code analysis tools
- auto_fix
: Enable automatic fixes
- threshold
: Quality threshold
Returns:
- Workflow
: Configured code review workflow
Template Customization¶
Extending Templates¶
# Get base template
base_research = WorkflowTemplates.research_workflow(
topic="AI Ethics",
sources=["academic"]
)
# Customize by adding steps
custom_research = base_research.add_steps([
Step("peer_review", "Get peer review of findings"),
Step("publish", "Publish to repository")
])
# Modify existing steps
custom_research.modify_step(
"synthesis",
new_description="Synthesize with ethical framework"
)
Template Composition¶
# Combine multiple templates
class CompositeTemplates:
@staticmethod
def research_and_content(topic: str) -> Workflow:
"""Research topic then create content."""
# Research phase
research = WorkflowTemplates.research_workflow(
topic=topic,
output_format="summary"
)
# Content phase
content = WorkflowTemplates.content_pipeline(
content_type="blog_post",
stages=["outline", "draft", "edit", "publish"]
)
# Combine workflows
return Workflow.combine(
name="research_to_content",
workflows=[research, content],
connection_type="sequential"
)
Dynamic Template Generation¶
class DynamicTemplateGenerator:
"""Generate templates based on requirements."""
@staticmethod
def generate_from_requirements(
requirements: Dict[str, Any]
) -> Workflow:
"""Generate workflow from requirements."""
# Analyze requirements
complexity = requirements.get("complexity", "medium")
domain = requirements.get("domain", "general")
constraints = requirements.get("constraints", {})
# Select base template
if domain == "research":
base = WorkflowTemplates.research_workflow(...)
elif domain == "content":
base = WorkflowTemplates.content_pipeline(...)
else:
base = WorkflowTemplates.data_processing(...)
# Apply constraints
if constraints.get("time_limit"):
base.set_timeout(constraints["time_limit"])
if constraints.get("parallel_execution"):
base.enable_parallelism()
return base
Configuration Presets¶
Industry-Specific Presets¶
class IndustryPresets:
"""Pre-configured templates for specific industries."""
@staticmethod
def healthcare_data_processing() -> Workflow:
"""HIPAA-compliant data processing."""
return WorkflowTemplates.data_processing(
input_format="hl7",
output_format="fhir",
transformations=[
"validate_phi",
"deidentify",
"standardize_codes",
"quality_metrics"
],
validation_rules={
"hipaa_compliant": True,
"encryption": "AES-256"
}
)
@staticmethod
def financial_reporting() -> Workflow:
"""Financial reporting workflow."""
return WorkflowTemplates.data_processing(
input_format="database",
output_format="report",
transformations=[
"reconciliation",
"regulatory_checks",
"risk_calculations",
"report_generation"
],
validation_rules={
"sox_compliant": True,
"audit_trail": True
}
)
Scale Presets¶
# Small scale
small_scale = {
"batch_size": 100,
"parallel_processing": False,
"timeout": 300,
"resources": "minimal"
}
# Enterprise scale
enterprise_scale = {
"batch_size": 10000,
"parallel_processing": True,
"timeout": 3600,
"resources": "auto_scale",
"checkpointing": True,
"monitoring": True
}
Performance Optimization¶
Template Performance Profiles¶
Template | Typical Duration | Resource Usage | Scalability |
---|---|---|---|
Research Workflow | 5-30 min | Medium | Horizontal |
Content Pipeline | 10-60 min | Low | Vertical |
Data Processing | Variable | High | Both |
Multi-Agent | 15-45 min | High | Horizontal |
Optimization Strategies¶
# Optimize research workflow
optimized_research = WorkflowTemplates.research_workflow(
topic="Quick research",
sources=["web"], # Limit sources
depth="quick", # Reduce depth
max_sources=5, # Limit sources
parallel_source_fetching=True # Parallel fetching
)
# Optimize data processing
optimized_data = WorkflowTemplates.data_processing(
input_format="parquet", # Efficient format
output_format="parquet",
transformations=["essential_only"],
batch_size=50000, # Large batches
parallel_processing=True,
cache_intermediate=True # Cache results
)
Monitoring and Metrics¶
Built-in Metrics¶
# Execute with metrics collection
result = await agent.run_workflow(
"Execute template",
template_workflow,
collect_metrics=True
)
# Access metrics
metrics = result.metrics
print(f"Total duration: {metrics.total_duration}")
print(f"Steps completed: {metrics.steps_completed}")
print(f"Resource usage: {metrics.resource_usage}")
print(f"Error rate: {metrics.error_rate}")
Custom Metrics¶
# Add custom metrics to templates
template = WorkflowTemplates.research_workflow(
topic="AI Safety",
custom_metrics={
"sources_analyzed": Counter(),
"facts_extracted": Counter(),
"confidence_scores": Histogram()
}
)
Error Recovery¶
Template-Specific Recovery¶
# Research workflow with recovery
research = WorkflowTemplates.research_workflow(
topic="Complex topic",
error_recovery={
"source_unavailable": "use_cache",
"parsing_error": "try_alternative_parser",
"synthesis_failure": "fallback_to_summary"
}
)
# Data processing with recovery
data_pipeline = WorkflowTemplates.data_processing(
input_format="csv",
output_format="database",
error_recovery={
"validation_error": "quarantine_record",
"transformation_error": "log_and_skip",
"database_error": "retry_with_backoff"
}
)
See Also¶
- Workflow Patterns - Building blocks for templates
- Workflow Visualization - Visualizing templates
- WorkflowAgent - Template execution
- Examples - Template examples