Performance Tuning Guide
Performance Tuning Guide
Comprehensive performance optimization strategies for Rizk SDK, based on real-world deployments.
Performance Overview
Rizk SDK provides multiple layers of performance optimization:
- Multi-layer Caching: L1 (memory) + L2 (Redis) + L3 (future)
- Framework Detection Caching: Compiled regex patterns with LRU cache
- Streaming Optimization: Real-time processing with backpressure handling
- Connection Pooling: Redis connection pools for high throughput
- Lazy Loading: On-demand adapter registration
- Batch Processing: Efficient analytics and telemetry batching
Cache Optimization
1. Cache Hierarchy Configuration
from rizk.sdk.cache.cache_hierarchy import CacheHierarchy, CacheHierarchyConfigfrom rizk.sdk.cache.redis_adapter import RedisConfig
# High-performance cache configurationcache_config = CacheHierarchyConfig( # L1: Local memory cache (fastest) l1_enabled=True, l1_max_size=50000, # Larger for high-traffic apps l1_ttl_seconds=600, # 10 minutes
# L2: Distributed Redis cache l2_enabled=True, l2_ttl_seconds=3600, # 1 hour l2_fallback_on_error=True,
# Performance optimizations async_write_behind=True, # Non-blocking writes promotion_threshold=1, # Aggressive promotion
# Monitoring metrics_enabled=True, health_check_interval=30)
# Redis configuration for maximum performanceredis_config = RedisConfig( url="redis://redis-cluster:6379", max_connections=200, # High connection pool socket_timeout=1.0, # Fast timeouts socket_connect_timeout=2.0, retry_on_timeout=True, retry_attempts=2, # Quick retries enable_cluster=True, # Distributed Redis key_prefix="perf:rizk:", default_ttl=1800)
cache_hierarchy = CacheHierarchy(cache_config)
2. Cache Performance Monitoring
import timefrom typing import Dict, Any
class CachePerformanceMonitor: """Monitor and optimize cache performance."""
def __init__(self, cache_hierarchy: CacheHierarchy): self.cache = cache_hierarchy self.metrics_history = []
def collect_metrics(self) -> Dict[str, Any]: """Collect current cache performance metrics.""" stats = self.cache.get_stats()
metrics = { "timestamp": time.time(), "l1_hit_rate": stats.get("l1_hit_rate", 0), "l2_hit_rate": stats.get("l2_hit_rate", 0), "overall_hit_rate": stats.get("overall_hit_rate", 0), "avg_latency_ms": stats.get("avg_latency_ms", 0), "l1_size": stats.get("l1_size", 0), "total_requests": stats.get("total_requests", 0) }
self.metrics_history.append(metrics) return metrics
def analyze_performance(self) -> Dict[str, Any]: """Analyze cache performance and provide recommendations.""" if not self.metrics_history: return {"status": "no_data"}
latest = self.metrics_history[-1] recommendations = []
# Hit rate analysis if latest["overall_hit_rate"] < 70: recommendations.append({ "issue": "Low cache hit rate", "current": f"{latest['overall_hit_rate']:.1f}%", "target": ">80%", "action": "Increase cache size or TTL" })
# Latency analysis if latest["avg_latency_ms"] > 50: recommendations.append({ "issue": "High cache latency", "current": f"{latest['avg_latency_ms']:.1f}ms", "target": "<20ms", "action": "Check Redis connection or reduce cache size" })
# Memory usage analysis if latest["l1_size"] > 40000: # 80% of 50k max recommendations.append({ "issue": "L1 cache near capacity", "current": f"{latest['l1_size']} items", "target": "<40k items", "action": "Increase l1_max_size or reduce TTL" })
return { "status": "healthy" if not recommendations else "needs_attention", "current_metrics": latest, "recommendations": recommendations }
# Usagemonitor = CachePerformanceMonitor(cache_hierarchy)
# Periodic monitoringdef monitor_cache_performance(): metrics = monitor.collect_metrics() analysis = monitor.analyze_performance()
print(f"Cache Performance: {metrics['overall_hit_rate']:.1f}% hit rate, " f"{metrics['avg_latency_ms']:.1f}ms latency")
if analysis["recommendations"]: print("Performance recommendations:") for rec in analysis["recommendations"]: print(f" - {rec['issue']}: {rec['action']}")
Framework Detection Optimization
1. Optimized Framework Detection Configuration
from rizk.sdk.config import RizkConfig
# High-performance framework detectionconfig = RizkConfig( framework_detection_cache_size=20000, # Large cache lazy_loading=True, # Load adapters on demand debug_mode=False, # Disable debug overhead verbose=False # Disable verbose logging)
2. Framework Detection Performance Monitoring
from rizk.sdk.utils.framework_detection import detect_frameworkfrom rizk.sdk.performance import performance_instrumentedimport timefrom functools import lru_cache
@lru_cache(maxsize=10000)def cached_framework_detection(context_hash: str) -> str: """Cached framework detection for repeated contexts.""" return detect_framework()
@performance_instrumented("framework_detection", "detect")def optimized_framework_detection() -> str: """Optimized framework detection with caching.""" # Create context hash for caching context_hash = hash(str(globals().keys())) return cached_framework_detection(str(context_hash))
# Performance testingdef benchmark_framework_detection(iterations: int = 1000): """Benchmark framework detection performance."""
# Warm up for _ in range(10): optimized_framework_detection()
# Benchmark start_time = time.time() for _ in range(iterations): optimized_framework_detection() end_time = time.time()
avg_time_ms = (end_time - start_time) / iterations * 1000 print(f"Framework detection: {avg_time_ms:.2f}ms average over {iterations} iterations")
return avg_time_ms
# Run benchmarkbenchmark_framework_detection()
Streaming Performance Optimization
1. High-Performance Streaming Configuration
from rizk.sdk.streaming.types import StreamConfig
# Optimized streaming configurationstream_config = StreamConfig( # Performance settings max_chunk_size=4096, # Larger chunks for efficiency buffer_size=50, # Large buffer for throughput timeout_seconds=120.0, # Longer timeout for complex operations
# Guardrail optimization enable_guardrails=True, realtime_validation=True, validation_interval=3, # Validate every 3 chunks for performance
# Caching optimization enable_caching=True, cache_partial_responses=True, cache_ttl_seconds=900, # 15 minutes
# Monitoring optimization enable_metrics=True, metrics_interval=10.0, # Less frequent metrics for performance
# Framework-specific optimizations framework_specific={ "openai": { "stream_options": { "include_usage": True, "parallel_tool_calls": True } }, "anthropic": { "max_tokens": 8192, "stream": True }, "langchain": { "streaming": True, "chunk_size": 4096 } })
Performance Best Practices
1. Configuration Checklist
# High-performance production configuration checklistPERFORMANCE_CONFIG_CHECKLIST = { "cache": { "l1_max_size": 50000, # ✅ Large L1 cache "l2_enabled": True, # ✅ Redis distributed cache "async_write_behind": True, # ✅ Non-blocking writes "promotion_threshold": 1, # ✅ Aggressive promotion }, "framework": { "lazy_loading": True, # ✅ On-demand loading "cache_size": 20000, # ✅ Large detection cache "debug_mode": False, # ✅ No debug overhead }, "streaming": { "max_chunk_size": 4096, # ✅ Large chunks "buffer_size": 50, # ✅ Large buffer "validation_interval": 3, # ✅ Reduced validation }, "redis": { "max_connections": 200, # ✅ High connection pool "socket_timeout": 1.0, # ✅ Fast timeouts "enable_cluster": True, # ✅ Distributed Redis }, "analytics": { "batch_size": 200, # ✅ Large batches "flush_interval": 10, # ✅ Frequent flushes "keep_raw_data": False, # ✅ Memory optimization }}
def validate_performance_config(config: RizkConfig) -> List[str]: """Validate configuration against performance best practices.""" issues = []
if config.framework_detection_cache_size < 10000: issues.append("Framework cache size too small for high performance")
if config.debug_mode: issues.append("Debug mode enabled - disable for production performance")
if config.verbose: issues.append("Verbose logging enabled - disable for production performance")
return issues
2. Monitoring and Alerting
class PerformanceMonitor: """Comprehensive performance monitoring."""
def __init__(self): self.thresholds = { "cache_hit_rate": 80, # Minimum 80% hit rate "avg_latency_ms": 50, # Maximum 50ms latency "error_rate": 1, # Maximum 1% error rate "memory_usage_mb": 2000, # Maximum 2GB memory "redis_response_ms": 10 # Maximum 10ms Redis response }
def check_performance_health(self) -> Dict[str, any]: """Check overall performance health.""" health_status = { "overall": "healthy", "components": {}, "alerts": [] }
# Check cache performance cache_stats = cache_hierarchy.get_stats() if cache_stats["overall_hit_rate"] < self.thresholds["cache_hit_rate"]: health_status["overall"] = "degraded" health_status["alerts"].append({ "component": "cache", "issue": f"Hit rate {cache_stats['overall_hit_rate']:.1f}% below threshold {self.thresholds['cache_hit_rate']}%" })
return health_status
# Usagemonitor = PerformanceMonitor()
def check_system_health(): health = monitor.check_performance_health()
if health["overall"] == "healthy": print("✅ System performance: HEALTHY") else: print("âš ï¸ Performance issues detected:") for alert in health["alerts"]: print(f" - {alert['component']}: {alert['issue']}")
Next Steps
- Security Best Practices - Secure your high-performance setup
- Scaling Guide - Scale your optimized configuration
- Production Setup - Deploy your performance-tuned system
Performance Optimization Checklist
✅ Cache hierarchy configured for workload
✅ Redis connection pool optimized
✅ Framework detection cached
✅ Streaming configuration tuned
✅ Analytics batching optimized
✅ Performance monitoring enabled
✅ Benchmarks established
✅ Health checks implemented
Maximum performance for enterprise LLM governance