Memory Performance Guide¶
Optimization techniques and best practices for high-performance memory operations in AgentiCraft.
Overview¶
This guide covers: - Performance benchmarks and expectations - Optimization strategies for vector and graph memory - Scaling considerations - Monitoring and debugging performance issues
Performance Benchmarks¶
Expected Performance Metrics¶
Operation | Vector Memory | Knowledge Graph | Target Latency |
---|---|---|---|
Store (single) | 5-10ms | 2-5ms | <10ms |
Retrieve (by key) | 2-5ms | 1-2ms | <5ms |
Search (semantic) | 20-50ms | N/A | <50ms |
Graph traversal | N/A | 5-20ms | <20ms |
Batch store (100) | 50-100ms | 20-50ms | <100ms |
Memory scan (10k) | 100-200ms | 50-100ms | <200ms |
Hardware Requirements¶
# Recommended specifications for different scales
SMALL_SCALE = {
"memory_items": "< 10,000",
"ram": "4GB",
"cpu": "2 cores",
"storage": "10GB SSD",
"concurrent_users": "< 10"
}
MEDIUM_SCALE = {
"memory_items": "10,000 - 100,000",
"ram": "16GB",
"cpu": "4-8 cores",
"storage": "100GB SSD",
"concurrent_users": "10-100"
}
LARGE_SCALE = {
"memory_items": "> 100,000",
"ram": "32GB+",
"cpu": "8-16 cores",
"storage": "500GB+ NVMe SSD",
"concurrent_users": "> 100"
}
Vector Memory Optimization¶
ChromaDB Configuration¶
from agenticraft.memory.vector import ChromaDBMemory
import chromadb
# Optimized configuration for performance
def create_optimized_memory(collection_name: str, scale: str = "medium"):
"""Create optimized vector memory for different scales."""
# HNSW parameters based on scale
hnsw_configs = {
"small": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 100, # Lower for faster indexing
"hnsw:M": 16, # Fewer connections
"hnsw:search_ef": 50, # Faster search
"hnsw:num_threads": 2
},
"medium": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 200, # Balanced
"hnsw:M": 32, # Default connections
"hnsw:search_ef": 100, # Good accuracy
"hnsw:num_threads": 4
},
"large": {
"hnsw:space": "cosine",
"hnsw:construction_ef": 400, # Better quality
"hnsw:M": 48, # More connections
"hnsw:search_ef": 200, # Higher accuracy
"hnsw:num_threads": 8
}
}
# Create client with performance settings
client = chromadb.PersistentClient(
path="./chroma_db",
settings=chromadb.Settings(
anonymized_telemetry=False,
persist_directory="./chroma_db",
chroma_cache_dir="./chroma_cache"
)
)
# Create collection with optimized settings
collection = client.get_or_create_collection(
name=collection_name,
metadata=hnsw_configs.get(scale, hnsw_configs["medium"])
)
return ChromaDBMemory(
collection_name=collection_name,
persist_directory="./chroma_db"
)
Embedding Optimization¶
import numpy as np
from sentence_transformers import SentenceTransformer
from typing import List, Union
import torch
class OptimizedEmbeddings:
"""Optimized embedding generation."""
def __init__(
self,
model_name: str = "all-MiniLM-L6-v2",
device: str = None,
batch_size: int = 32
):
# Auto-detect device
if device is None:
device = "cuda" if torch.cuda.is_available() else "cpu"
self.device = device
self.batch_size = batch_size
# Load model with optimizations
self.model = SentenceTransformer(model_name)
self.model.to(device)
# Enable eval mode for inference
self.model.eval()
# Dimension reduction (optional)
self.use_pca = False
self.pca = None
def encode(
self,
texts: Union[str, List[str]],
normalize: bool = True
) -> np.ndarray:
"""Encode texts to embeddings with optimization."""
if isinstance(texts, str):
texts = [texts]
# Batch processing
embeddings = []
with torch.no_grad(): # Disable gradients for inference
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
# Encode batch
batch_embeddings = self.model.encode(
batch,
convert_to_numpy=True,
normalize_embeddings=normalize,
device=self.device
)
embeddings.append(batch_embeddings)
# Concatenate results
embeddings = np.vstack(embeddings)
# Apply dimension reduction if enabled
if self.use_pca and self.pca is not None:
embeddings = self.pca.transform(embeddings)
return embeddings
def enable_dimension_reduction(self, target_dim: int = 128):
"""Enable PCA dimension reduction."""
from sklearn.decomposition import PCA
self.use_pca = True
self.pca = PCA(n_components=target_dim)
# Fit PCA on sample data (in production, use representative data)
sample_texts = [
"sample text for PCA fitting",
"another sample for dimension calculation"
]
sample_embeddings = self.encode(sample_texts, normalize=False)
self.pca.fit(sample_embeddings)
Search Optimization¶
class OptimizedVectorSearch:
"""Optimized vector search strategies."""
def __init__(self, memory: ChromaDBMemory):
self.memory = memory
self.search_cache = {}
self.cache_size = 1000
async def cached_search(
self,
query: str,
limit: int = 10,
**kwargs
) -> List[Dict]:
"""Search with caching."""
# Create cache key
cache_key = f"{query}:{limit}:{str(kwargs)}"
# Check cache
if cache_key in self.search_cache:
return self.search_cache[cache_key]
# Perform search
results = await self.memory.search(query, limit, **kwargs)
# Update cache
self._update_cache(cache_key, results)
return results
async def approximate_search(
self,
query: str,
limit: int = 10,
sample_rate: float = 0.1
) -> List[Dict]:
"""Approximate search for large collections."""
# Get collection size
stats = self.memory.get_stats()
total_items = stats["total_memories"]
if total_items < 10000:
# Use exact search for small collections
return await self.memory.search(query, limit)
# Sample subset for approximate search
sample_size = int(total_items * sample_rate)
# Perform search on sample
# In production, implement proper sampling
results = await self.memory.search(
query,
limit=min(limit * 2, sample_size)
)
return results[:limit]
def _update_cache(self, key: str, value: List[Dict]):
"""Update cache with LRU eviction."""
if len(self.search_cache) >= self.cache_size:
# Remove oldest entry
oldest = next(iter(self.search_cache))
del self.search_cache[oldest]
self.search_cache[key] = value
Knowledge Graph Optimization¶
Graph Structure Optimization¶
from collections import defaultdict
from typing import Set, Dict, List
import networkx as nx
class OptimizedKnowledgeGraph:
"""Optimized knowledge graph implementation."""
def __init__(self, capacity: int = 100000):
self.capacity = capacity
# Use efficient data structures
self.entities = {} # entity_name -> Entity
self.entity_index = defaultdict(set) # entity_type -> Set[entity_names]
# Adjacency list for relationships
self.forward_edges = defaultdict(list) # from -> [(relation, to)]
self.backward_edges = defaultdict(list) # to -> [(relation, from)]
# Relationship index
self.relation_index = defaultdict(set) # relation -> Set[(from, to)]
# Cache for frequent queries
self.path_cache = {}
self.subgraph_cache = {}
def add_entity_optimized(
self,
name: str,
entity_type: str,
attributes: Dict = None
):
"""Add entity with indexing."""
if len(self.entities) >= self.capacity:
self._evict_lru_entity()
self.entities[name] = {
"type": entity_type,
"attributes": attributes or {},
"access_count": 0,
"last_access": datetime.now()
}
# Update index
self.entity_index[entity_type].add(name)
def add_relationship_optimized(
self,
from_entity: str,
relation: str,
to_entity: str
):
"""Add relationship with dual indexing."""
# Forward edge
self.forward_edges[from_entity].append((relation, to_entity))
# Backward edge for reverse lookups
self.backward_edges[to_entity].append((relation, from_entity))
# Relation index
self.relation_index[relation].add((from_entity, to_entity))
# Invalidate caches
self._invalidate_caches(from_entity, to_entity)
def find_paths_optimized(
self,
start: str,
end: str,
max_depth: int = 3
) -> List[List[str]]:
"""Find paths using bidirectional search."""
# Check cache
cache_key = f"{start}:{end}:{max_depth}"
if cache_key in self.path_cache:
return self.path_cache[cache_key]
# Bidirectional BFS
forward_frontier = {start: [[start]]}
backward_frontier = {end: [[end]]}
for depth in range(max_depth // 2 + 1):
# Expand smaller frontier
if len(forward_frontier) <= len(backward_frontier):
new_forward = self._expand_frontier(
forward_frontier,
self.forward_edges
)
# Check for intersection
paths = self._find_intersections(
new_forward,
backward_frontier
)
if paths:
self.path_cache[cache_key] = paths
return paths
forward_frontier = new_forward
else:
new_backward = self._expand_frontier(
backward_frontier,
self.backward_edges
)
# Check for intersection
paths = self._find_intersections(
forward_frontier,
new_backward,
reverse=True
)
if paths:
self.path_cache[cache_key] = paths
return paths
backward_frontier = new_backward
return []
def _expand_frontier(
self,
frontier: Dict[str, List[List[str]]],
edges: Dict[str, List]
) -> Dict[str, List[List[str]]]:
"""Expand search frontier."""
new_frontier = {}
for node, paths in frontier.items():
for relation, neighbor in edges.get(node, []):
if neighbor not in new_frontier:
new_frontier[neighbor] = []
for path in paths:
if neighbor not in path: # Avoid cycles
new_path = path + [relation, neighbor]
new_frontier[neighbor].append(new_path)
return new_frontier
def _invalidate_caches(self, *entities):
"""Invalidate caches for affected entities."""
# Clear path cache entries containing these entities
keys_to_remove = []
for key in self.path_cache:
if any(entity in key for entity in entities):
keys_to_remove.append(key)
for key in keys_to_remove:
del self.path_cache[key]
# Clear subgraph cache
for entity in entities:
self.subgraph_cache.pop(entity, None)
Query Optimization¶
class GraphQueryOptimizer:
"""Optimize graph queries."""
def __init__(self, graph: KnowledgeGraphMemory):
self.graph = graph
# Query statistics
self.query_stats = defaultdict(lambda: {
"count": 0,
"total_time": 0,
"avg_time": 0
})
async def optimized_entity_search(
self,
entity_type: str = None,
pattern: str = None,
limit: int = 100
) -> List[Dict]:
"""Optimized entity search."""
start_time = time.time()
if entity_type:
# Use type index
candidates = self.graph.entity_index.get(entity_type, set())
else:
candidates = set(self.graph.entities.keys())
# Apply pattern filter if provided
if pattern:
pattern_lower = pattern.lower()
candidates = {
name for name in candidates
if pattern_lower in name.lower()
}
# Sort by access count for relevance
sorted_entities = sorted(
candidates,
key=lambda x: self.graph.entities[x]["access_count"],
reverse=True
)
# Update statistics
query_time = time.time() - start_time
self._update_stats("entity_search", query_time)
return [
{
"name": name,
"type": self.graph.entities[name]["type"],
"attributes": self.graph.entities[name]["attributes"]
}
for name in sorted_entities[:limit]
]
async def batch_relationship_query(
self,
entity_names: List[str]
) -> Dict[str, List[Dict]]:
"""Batch query relationships for multiple entities."""
results = {}
# Single pass through relationships
for entity in entity_names:
forward = self.graph.forward_edges.get(entity, [])
backward = self.graph.backward_edges.get(entity, [])
results[entity] = {
"outgoing": [
{"relation": rel, "to": to}
for rel, to in forward
],
"incoming": [
{"relation": rel, "from": from_e}
for rel, from_e in backward
]
}
return results
def _update_stats(self, query_type: str, duration: float):
"""Update query statistics."""
stats = self.query_stats[query_type]
stats["count"] += 1
stats["total_time"] += duration
stats["avg_time"] = stats["total_time"] / stats["count"]
Scaling Strategies¶
Horizontal Scaling¶
class DistributedMemory:
"""Distributed memory across multiple nodes."""
def __init__(self, nodes: List[str]):
self.nodes = nodes
self.node_count = len(nodes)
# Consistent hashing for distribution
self.hash_ring = self._create_hash_ring()
# Connection pool for each node
self.connections = {
node: self._create_connection(node)
for node in nodes
}
def _get_node_for_key(self, key: str) -> str:
"""Get node responsible for key."""
key_hash = hashlib.md5(key.encode()).hexdigest()
# Find node in hash ring
for node_hash, node in sorted(self.hash_ring.items()):
if key_hash <= node_hash:
return node
# Wrap around to first node
return self.hash_ring[min(self.hash_ring.keys())]
async def store(self, key: str, value: Any, **kwargs):
"""Store in appropriate node."""
node = self._get_node_for_key(key)
connection = self.connections[node]
return await connection.store(key, value, **kwargs)
async def retrieve(self, key: str) -> Optional[Any]:
"""Retrieve from appropriate node."""
node = self._get_node_for_key(key)
connection = self.connections[node]
return await connection.retrieve(key)
async def search(self, query: str, limit: int = 10) -> List[Dict]:
"""Fan-out search across all nodes."""
# Parallel search on all nodes
tasks = [
connection.search(query, limit=limit)
for connection in self.connections.values()
]
all_results = await asyncio.gather(*tasks)
# Merge and sort results
merged = []
for results in all_results:
merged.extend(results)
# Sort by similarity and return top results
merged.sort(key=lambda x: x.get("similarity", 0), reverse=True)
return merged[:limit]
def _create_hash_ring(self) -> Dict[str, str]:
"""Create consistent hash ring."""
ring = {}
for node in self.nodes:
# Multiple virtual nodes for better distribution
for i in range(150):
virtual_node = f"{node}:{i}"
node_hash = hashlib.md5(virtual_node.encode()).hexdigest()
ring[node_hash] = node
return ring
Memory Partitioning¶
class PartitionedMemory:
"""Partition memory by criteria."""
def __init__(self):
# Partition by time
self.time_partitions = {
"hot": ChromaDBMemory("hot_data"), # Last 24 hours
"warm": ChromaDBMemory("warm_data"), # Last week
"cold": ChromaDBMemory("cold_data") # Older
}
# Partition by type
self.type_partitions = {
"conversations": ChromaDBMemory("conversations"),
"facts": ChromaDBMemory("facts"),
"documents": ChromaDBMemory("documents")
}
async def store(self, key: str, value: Any, metadata: Dict):
"""Store in appropriate partition."""
# Time-based partition
timestamp = metadata.get("timestamp", datetime.now().isoformat())
partition = self._get_time_partition(timestamp)
await partition.store(key, value, metadata)
# Type-based partition (if applicable)
data_type = metadata.get("type")
if data_type in self.type_partitions:
await self.type_partitions[data_type].store(
key, value, metadata
)
async def search(self, query: str, **kwargs) -> List[Dict]:
"""Search across partitions."""
# Determine which partitions to search
search_hot = kwargs.get("include_recent", True)
search_warm = kwargs.get("include_week", True)
search_cold = kwargs.get("include_old", False)
tasks = []
if search_hot:
tasks.append(self.time_partitions["hot"].search(query))
if search_warm:
tasks.append(self.time_partitions["warm"].search(query))
if search_cold:
tasks.append(self.time_partitions["cold"].search(query))
# Parallel search
results = await asyncio.gather(*tasks)
# Merge results
merged = []
for partition_results in results:
merged.extend(partition_results)
# Sort by relevance
merged.sort(key=lambda x: x["similarity"], reverse=True)
return merged[:kwargs.get("limit", 10)]
def _get_time_partition(self, timestamp: str):
"""Determine time partition."""
ts = datetime.fromisoformat(timestamp)
age = datetime.now() - ts
if age.days < 1:
return self.time_partitions["hot"]
elif age.days < 7:
return self.time_partitions["warm"]
else:
return self.time_partitions["cold"]
async def migrate_partitions(self):
"""Migrate data between partitions."""
# Move from hot to warm
hot_data = await self.time_partitions["hot"].search(
"", # All data
limit=10000
)
for item in hot_data:
ts = item["metadata"].get("timestamp")
if ts:
age = datetime.now() - datetime.fromisoformat(ts)
if age.days >= 1:
# Move to warm
await self.time_partitions["warm"].store(
item["id"],
item["content"],
item["metadata"]
)
await self.time_partitions["hot"].delete(item["id"])
Monitoring and Debugging¶
Performance Monitoring¶
import psutil
from prometheus_client import Counter, Histogram, Gauge
class MemoryPerformanceMonitor:
"""Monitor memory system performance."""
def __init__(self):
# Metrics
self.operation_counter = Counter(
'memory_operations_total',
'Total memory operations',
['operation', 'memory_type']
)
self.operation_duration = Histogram(
'memory_operation_duration_seconds',
'Memory operation duration',
['operation', 'memory_type']
)
self.memory_size = Gauge(
'memory_items_total',
'Total items in memory',
['memory_type']
)
self.error_counter = Counter(
'memory_errors_total',
'Total memory errors',
['operation', 'error_type']
)
# System metrics
self.cpu_percent = Gauge('memory_cpu_percent', 'CPU usage')
self.memory_percent = Gauge('memory_ram_percent', 'RAM usage')
async def monitor_operation(
self,
operation: str,
memory_type: str,
func,
*args,
**kwargs
):
"""Monitor a memory operation."""
start_time = time.time()
try:
# Execute operation
result = await func(*args, **kwargs)
# Record success
self.operation_counter.labels(
operation=operation,
memory_type=memory_type
).inc()
duration = time.time() - start_time
self.operation_duration.labels(
operation=operation,
memory_type=memory_type
).observe(duration)
return result
except Exception as e:
# Record error
self.error_counter.labels(
operation=operation,
error_type=type(e).__name__
).inc()
raise
finally:
# Update system metrics
self.cpu_percent.set(psutil.cpu_percent())
self.memory_percent.set(psutil.virtual_memory().percent)
def get_operation_stats(self) -> Dict:
"""Get operation statistics."""
# This would integrate with Prometheus
return {
"operations": {
"total": self.operation_counter._value.sum(),
"by_type": self.operation_counter._value
},
"performance": {
"avg_duration": self.operation_duration._sum.value() /
self.operation_duration._count.value()
if self.operation_duration._count.value() > 0 else 0
},
"errors": {
"total": self.error_counter._value.sum()
},
"system": {
"cpu_percent": self.cpu_percent._value.get(),
"memory_percent": self.memory_percent._value.get()
}
}
Debug Utilities¶
class MemoryDebugger:
"""Debug utilities for memory systems."""
@staticmethod
async def analyze_memory_usage(memory: BaseMemory) -> Dict:
"""Analyze memory usage patterns."""
stats = memory.get_stats()
# Sample queries for analysis
test_queries = [
"test query short",
"this is a longer test query with more words",
"specific technical query about machine learning"
]
query_performance = []
for query in test_queries:
start = time.time()
results = await memory.search(query)
duration = time.time() - start
query_performance.append({
"query": query,
"duration_ms": duration * 1000,
"results": len(results),
"avg_similarity": sum(r.get("similarity", 0) for r in results) / len(results) if results else 0
})
return {
"stats": stats,
"query_performance": query_performance,
"recommendations": MemoryDebugger._get_recommendations(stats, query_performance)
}
@staticmethod
def _get_recommendations(stats: Dict, performance: List[Dict]) -> List[str]:
"""Get optimization recommendations."""
recommendations = []
# Check memory size
if stats.get("total_memories", 0) > 50000:
recommendations.append(
"Consider partitioning or archiving old memories"
)
# Check query performance
avg_duration = sum(p["duration_ms"] for p in performance) / len(performance)
if avg_duration > 100:
recommendations.append(
"Query performance is slow. Consider indexing optimization"
)
# Check similarity scores
avg_similarity = sum(p["avg_similarity"] for p in performance) / len(performance)
if avg_similarity < 0.5:
recommendations.append(
"Low similarity scores. Consider improving embeddings"
)
return recommendations
@staticmethod
async def profile_memory_operation(
memory: BaseMemory,
operation: str,
*args,
**kwargs
):
"""Profile a specific operation."""
import cProfile
import pstats
from io import StringIO
profiler = cProfile.Profile()
# Profile operation
profiler.enable()
try:
if operation == "store":
result = await memory.store(*args, **kwargs)
elif operation == "search":
result = await memory.search(*args, **kwargs)
elif operation == "retrieve":
result = await memory.retrieve(*args, **kwargs)
else:
raise ValueError(f"Unknown operation: {operation}")
finally:
profiler.disable()
# Get profile results
stream = StringIO()
stats = pstats.Stats(profiler, stream=stream)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
return {
"result": result,
"profile": stream.getvalue()
}
Best Practices Summary¶
1. Choose the Right Configuration¶
# Match configuration to your use case
if data_volume < 10000:
config = "small"
elif data_volume < 100000:
config = "medium"
else:
config = "large"
2. Monitor Performance¶
# Always monitor in production
monitor = MemoryPerformanceMonitor()
memory = ChromaDBMemory()
# Wrap operations
result = await monitor.monitor_operation(
"search",
"vector",
memory.search,
query
)
3. Use Appropriate Caching¶
# Cache frequently accessed data
cache = OptimizedVectorSearch(memory)
results = await cache.cached_search(query)
4. Plan for Scale¶
Next Steps¶
- Memory Overview - Memory system concepts
- Memory Patterns - Usage patterns
- API Reference - Complete API docs
- Examples - Working examples