Telemetry API Reference¶
Complete API documentation for AgentiCraft's telemetry system.
Core Classes¶
TelemetryConfig¶
Configuration class for telemetry initialization.
class TelemetryConfig:
"""Telemetry configuration and initialization.
Args:
enabled (bool): Enable/disable telemetry. Default: False
exporter_type (str): Type of exporter to use. Options: "console", "otlp", "prometheus"
service_name (str): Service name for identification. Default: "agenticraft"
otlp_endpoint (str): OTLP collector endpoint. Default: "localhost:4317"
otlp_headers (dict): Optional headers for OTLP exporter
prometheus_port (int): Port for Prometheus metrics endpoint. Default: 8000
auto_instrument (bool): Enable automatic instrumentation. Default: True
sample_rate (float): Sampling rate (0.0-1.0). Default: 1.0
batch_size (int): Batch size for span export. Default: 512
export_interval_ms (int): Export interval in milliseconds. Default: 5000
console_pretty_print (bool): Pretty print console output. Default: True
debug (bool): Enable debug logging. Default: False
"""
def initialize(self) -> None:
"""Initialize telemetry with configured settings."""
def shutdown(self) -> None:
"""Shutdown telemetry and flush remaining data."""
Example Usage¶
from agenticraft.telemetry import TelemetryConfig
# Development configuration
config = TelemetryConfig(
enabled=True,
exporter_type="console",
debug=True
)
config.initialize()
# Production configuration
config = TelemetryConfig(
enabled=True,
exporter_type="otlp",
otlp_endpoint="telemetry.company.com:4317",
otlp_headers={"Authorization": "Bearer token"},
sample_rate=0.1,
auto_instrument=True
)
config.initialize()
# Cleanup on shutdown
config.shutdown()
Tracing API¶
create_span¶
Create a new span for tracing operations.
def create_span(
name: str,
kind: SpanKind = SpanKind.INTERNAL,
attributes: Optional[Dict[str, Any]] = None,
links: Optional[List[Link]] = None
) -> Span:
"""Create a new span.
Args:
name: Span name (use dot notation: "component.operation")
kind: Span kind (INTERNAL, SERVER, CLIENT, PRODUCER, CONSUMER)
attributes: Initial span attributes
links: Links to other spans
Returns:
OpenTelemetry Span object
Example:
with create_span("database.query", attributes={"db.name": "users"}):
result = await db.query("SELECT * FROM users")
"""
get_current_span¶
Get the currently active span.
def get_current_span() -> Optional[Span]:
"""Get the current active span.
Returns:
Current span or None if no span is active
Example:
span = get_current_span()
if span:
span.add_event("Processing started")
"""
Span Methods¶
class Span:
"""OpenTelemetry span with extended functionality."""
def set_attribute(self, key: str, value: Any) -> None:
"""Set an attribute on the span."""
def set_attributes(self, attributes: Dict[str, Any]) -> None:
"""Set multiple attributes at once."""
def add_event(
self,
name: str,
attributes: Optional[Dict[str, Any]] = None,
timestamp: Optional[int] = None
) -> None:
"""Add an event to the span timeline."""
def set_status(self, status: Status) -> None:
"""Set the span status (OK, ERROR)."""
def record_exception(
self,
exception: Exception,
attributes: Optional[Dict[str, Any]] = None,
timestamp: Optional[int] = None,
escaped: bool = False
) -> None:
"""Record an exception with stacktrace."""
Example: Comprehensive Span Usage¶
from agenticraft.telemetry import create_span
from opentelemetry.trace import StatusCode
async def process_document(doc_id: str, content: str):
with create_span(
"document.process",
attributes={
"document.id": doc_id,
"document.size": len(content),
"processor.version": "2.0"
}
) as span:
try:
# Track progress
span.add_event("Validation started")
validate_document(content)
span.add_event("Validation completed")
# Process document
span.add_event("Processing started")
result = await heavy_processing(content)
# Add result attributes
span.set_attributes({
"result.score": result.score,
"result.category": result.category,
"processing.duration_ms": result.duration
})
span.add_event("Processing completed successfully")
span.set_status(StatusCode.OK)
return result
except ValidationError as e:
span.record_exception(e)
span.set_status(StatusCode.ERROR, "Validation failed")
raise
except Exception as e:
span.record_exception(e)
span.set_status(StatusCode.ERROR, "Processing failed")
raise
Metrics API¶
record_metric¶
Record a metric value.
def record_metric(
name: str,
value: Union[int, float],
metric_type: MetricType = MetricType.COUNTER,
attributes: Optional[Dict[str, Any]] = None,
unit: str = ""
) -> None:
"""Record a metric value.
Args:
name: Metric name (use dot notation)
value: Metric value
metric_type: Type of metric (COUNTER, GAUGE, HISTOGRAM)
attributes: Metric attributes/labels
unit: Unit of measurement
Example:
record_metric(
"documents.processed",
value=1,
metric_type=MetricType.COUNTER,
attributes={"type": "pdf", "size": "large"}
)
"""
MetricType Enum¶
class MetricType(Enum):
"""Types of metrics."""
COUNTER = "counter" # Monotonically increasing value
GAUGE = "gauge" # Point-in-time value
HISTOGRAM = "histogram" # Distribution of values
Creating Metric Instruments¶
def create_counter(
name: str,
description: str = "",
unit: str = ""
) -> Counter:
"""Create a counter metric.
Args:
name: Metric name
description: Human-readable description
unit: Unit of measurement
Returns:
Counter instrument
Example:
request_counter = create_counter(
"http.requests",
description="Total HTTP requests",
unit="1"
)
request_counter.add(1, {"method": "GET", "status": 200})
"""
def create_histogram(
name: str,
description: str = "",
unit: str = "",
boundaries: Optional[List[float]] = None
) -> Histogram:
"""Create a histogram metric.
Args:
name: Metric name
description: Human-readable description
unit: Unit of measurement
boundaries: Histogram bucket boundaries
Returns:
Histogram instrument
Example:
latency_histogram = create_histogram(
"http.request.duration",
description="HTTP request latency",
unit="ms",
boundaries=[0, 10, 25, 50, 100, 250, 500, 1000]
)
latency_histogram.record(42.5, {"endpoint": "/api/users"})
"""
def create_gauge(
name: str,
description: str = "",
unit: str = ""
) -> ObservableGauge:
"""Create a gauge metric.
Args:
name: Metric name
description: Human-readable description
unit: Unit of measurement
Returns:
ObservableGauge instrument
Example:
def get_queue_size():
return queue.size()
queue_gauge = create_gauge(
"queue.size",
description="Current queue size"
)
queue_gauge.add_callback(get_queue_size)
"""
Automatic Metrics¶
AgentiCraft automatically records these metrics:
# Token usage
"agenticraft.tokens.prompt" # Prompt tokens used
"agenticraft.tokens.completion" # Completion tokens used
"agenticraft.tokens.total" # Total tokens used
# Latency
"agenticraft.latency.agent" # Agent operation latency
"agenticraft.latency.tool" # Tool execution latency
"agenticraft.latency.provider" # LLM provider latency
"agenticraft.latency.memory" # Memory operation latency
# Errors
"agenticraft.errors.count" # Error count by operation
# Memory
"agenticraft.memory.hits" # Memory cache hits
"agenticraft.memory.misses" # Memory cache misses
"agenticraft.memory.operations" # Total memory operations
Decorators¶
@trace_method¶
Decorator for tracing class methods.
from agenticraft.telemetry.decorators import trace_method
class DocumentProcessor:
@trace_method("processor.analyze")
async def analyze(self, document: str) -> dict:
"""This method is automatically traced."""
return {"length": len(document)}
@trace_method(
"processor.validate",
attributes={"validator": "strict", "version": "2.0"}
)
def validate(self, document: str) -> bool:
"""Traced with custom attributes."""
return len(document) > 0
@trace_function¶
Decorator for tracing standalone functions.
from agenticraft.telemetry.decorators import trace_function
@trace_function("utils.calculate_score")
def calculate_score(data: dict) -> float:
"""This function is automatically traced."""
return sum(data.values()) / len(data)
@trace_function(
"utils.process_batch",
capture_args=True, # Include function arguments as span attributes
capture_result=True # Include return value as span attribute
)
async def process_batch(items: List[str]) -> int:
"""Traced with argument and result capture."""
processed = [item.upper() for item in items]
return len(processed)
@timed_metric¶
Decorator for recording execution time metrics.
from agenticraft.telemetry.decorators import timed_metric
@timed_metric("custom.processing.duration", unit="ms")
async def process_data(data: dict) -> dict:
"""Execution time is automatically recorded."""
await asyncio.sleep(0.1)
return {"processed": True}
@timed_metric(
"api.request.duration",
attributes_from_args=["endpoint", "method"]
)
def handle_request(endpoint: str, method: str, data: dict) -> dict:
"""Metric includes endpoint and method as attributes."""
return {"status": "ok"}
Context Propagation¶
set_span_in_context¶
Manually set span in context.
from agenticraft.telemetry import set_span_in_context
span = create_span("parent.operation")
context = set_span_in_context(span)
# Use context for child operations
async with context:
await child_operation() # Will be linked to parent
extract_context / inject_context¶
For distributed tracing across services.
from agenticraft.telemetry import extract_context, inject_context
# Service A - Inject context into headers
headers = {}
inject_context(headers)
response = await http_client.post(url, headers=headers)
# Service B - Extract context from headers
context = extract_context(request.headers)
with context:
# This span is linked to Service A's span
with create_span("service_b.handle_request"):
process_request()
Integration Helpers¶
instrument_agent¶
Automatically instrument an agent instance.
from agenticraft.telemetry import instrument_agent
from agenticraft import Agent
agent = Agent(name="MyAgent")
instrument_agent(agent) # Now all operations are traced
instrument_tool¶
Automatically instrument a tool instance.
from agenticraft.telemetry import instrument_tool
from agenticraft.tools import WebSearchTool
tool = WebSearchTool()
instrument_tool(tool) # Tool execution is now traced
instrument_provider¶
Automatically instrument an LLM provider.
from agenticraft.telemetry import instrument_provider
from agenticraft.providers import OpenAIProvider
provider = OpenAIProvider()
instrument_provider(provider) # All LLM calls are traced
Advanced Usage¶
Custom Span Processors¶
from agenticraft.telemetry import add_span_processor
from opentelemetry.sdk.trace import SpanProcessor
class CustomSpanProcessor(SpanProcessor):
def on_start(self, span, parent_context):
# Called when span starts
span.set_attribute("custom.timestamp", time.time())
def on_end(self, span):
# Called when span ends
if span.status.status_code == StatusCode.ERROR:
# Handle errors
alert_on_error(span)
add_span_processor(CustomSpanProcessor())
Custom Exporters¶
from agenticraft.telemetry import add_exporter
from opentelemetry.sdk.trace.export import SpanExporter
class CustomExporter(SpanExporter):
def export(self, spans):
# Send spans to custom backend
for span in spans:
send_to_backend(span)
return SpanExportResult.SUCCESS
add_exporter(CustomExporter())
Sampling Strategies¶
from agenticraft.telemetry import set_sampler
from opentelemetry.sdk.trace.sampling import (
TraceIdRatioBased,
ParentBased,
AlwaysOff,
AlwaysOn
)
# Sample 10% of traces
set_sampler(TraceIdRatioBased(0.1))
# Sample based on parent
set_sampler(ParentBased(root=TraceIdRatioBased(0.1)))
# Custom sampler
class CustomSampler(Sampler):
def should_sample(self, context, trace_id, name, kind, attributes, links):
# Sample high-priority operations
if attributes.get("priority") == "high":
return SamplingResult(Decision.RECORD_AND_SAMPLE)
return SamplingResult(Decision.DROP)
set_sampler(CustomSampler())
Error Handling¶
All telemetry operations are designed to fail gracefully:
# Telemetry errors won't crash your application
with create_span("operation") as span:
try:
span.set_attribute("key", value) # Safe even if telemetry fails
except Exception:
# Telemetry errors are logged but don't propagate
pass
To handle telemetry errors explicitly:
from agenticraft.telemetry import set_error_handler
def handle_telemetry_error(error: Exception):
logger.error(f"Telemetry error: {error}")
# Could send to monitoring system
set_error_handler(handle_telemetry_error)
Performance Tips¶
-
Use batch processing:
-
Limit attribute size:
-
Use sampling in production:
-
Avoid high-cardinality attributes:
Thread Safety¶
All telemetry APIs are thread-safe and can be used in multi-threaded applications:
import threading
def worker(worker_id):
with create_span(f"worker.{worker_id}"):
# Each thread gets its own span context
process_task()
threads = [
threading.Thread(target=worker, args=(i,))
for i in range(10)
]
for t in threads:
t.start()
Async Safety¶
Telemetry properly handles async context propagation:
async def parent_operation():
with create_span("parent"):
# Context is preserved across await
await child_operation()
# Even with concurrent operations
await asyncio.gather(
child_operation(),
child_operation(),
child_operation()
)
async def child_operation():
# Automatically linked to parent
with create_span("child"):
await asyncio.sleep(0.1)