Observability
Observability
Rizk SDK provides comprehensive observability for LLM applications through OpenTelemetry integration, distributed tracing, and hierarchical context management. This document explains how the observability system works and how to leverage it effectively.
Overview
The observability system automatically instruments your LLM applications with:
- Distributed Tracing: Track requests across components and services
- Hierarchical Context: Organize traces by organization, project, and agent
- Performance Metrics: Monitor latency, throughput, and error rates
- Custom Attributes: Add business-specific metadata to traces
- Framework Integration: Native support for all major LLM frameworks
from rizk.sdk import Rizkfrom rizk.sdk.decorators import workflow
# Initialize observabilityrizk = Rizk.init( app_name="MyLLMApp", api_key="your-api-key", enabled=True)
@workflow(name="customer_support", organization_id="acme", project_id="support")def handle_customer_query(query: str) -> str: # Automatically traced with hierarchical context return process_query(query)
OpenTelemetry Integration
Automatic Instrumentation
Rizk SDK integrates with OpenTelemetry through the Traceloop SDK, providing automatic instrumentation:
# Automatic instrumentation includes:# - HTTP requests and responses# - Database queries# - LLM API calls (OpenAI, Anthropic, etc.)# - Framework-specific operations# - Custom application logic
Trace Structure
Each trace follows a hierarchical structure:
Trace: customer_support_workflow├── Span: input_validation├── Span: llm_processing│ ├── Span: openai_chat_completion│ └── Span: response_formatting├── Span: output_guardrails└── Span: result_logging
Span Attributes
Spans are enriched with contextual attributes:
# Automatic attributes added to spans:span_attributes = { # Hierarchical context "organization.id": "acme_corp", "project.id": "customer_support", "agent.id": "support_assistant", "conversation.id": "conv_12345", "user.id": "user_6789",
# Framework information "framework.name": "langchain", "framework.version": "0.1.0",
# Function metadata "function.name": "handle_customer_query", "function.version": 1,
# Performance metrics "duration.ms": 1250, "tokens.input": 150, "tokens.output": 75, "cost.usd": 0.0023}
Hierarchical Context Management
Context Levels
Rizk SDK supports multiple levels of hierarchical context:
# Level 1: Organizationorganization_id = "acme_corp"
# Level 2: Projectproject_id = "customer_support"
# Level 3: Agent/Serviceagent_id = "support_assistant"
# Level 4: Conversationconversation_id = "conv_12345"
# Level 5: Useruser_id = "user_6789"
Context Propagation
Context is automatically propagated through your application:
from rizk.sdk.decorators import workflow, task, agent
@workflow( name="support_workflow", organization_id="acme_corp", project_id="customer_support")def handle_support_request(request: dict) -> dict: """Top-level workflow with context."""
@agent( name="support_agent", organization_id="acme_corp", # Inherited project_id="customer_support", # Inherited agent_id="support_assistant" ) def create_support_agent(): """Agent inherits workflow context.""" return {"agent": "support_assistant"}
@task( name="process_request", organization_id="acme_corp", # Inherited project_id="customer_support", # Inherited task_id="request_processing" ) def process_request(data: dict): """Task inherits workflow context.""" return {"processed": True}
agent = create_support_agent() result = process_request(request) return result
Dynamic Context
Context can be set dynamically during execution:
from rizk.sdk import Rizk
def handle_user_request(user_id: str, request: str): """Handle request with dynamic user context."""
# Set dynamic context Rizk.set_association_properties({ "user.id": user_id, "conversation.id": f"conv_{user_id}_{int(time.time())}", "request.type": "support_inquiry" })
@workflow(name="user_request_workflow") def process_user_request(): # Context automatically included in traces return f"Processed request for user {user_id}"
return process_user_request()
Tracing Configuration
Basic Configuration
from rizk.sdk import Rizk
# Basic tracing setuprizk = Rizk.init( app_name="MyApp", api_key="your-api-key", enabled=True, # Enable tracing telemetry_enabled=False, # Disable anonymous telemetry trace_content=True # Include content in traces)
Custom OTLP Endpoints
# Send traces to custom endpointrizk = Rizk.init( app_name="MyApp", api_key="your-api-key", opentelemetry_endpoint="https://your-otlp-endpoint.com", headers={ "Authorization": "Bearer your-token", "X-Custom-Header": "custom-value" })
Advanced Configuration
from opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Custom exporter and processorcustom_exporter = OTLPSpanExporter( endpoint="https://your-endpoint.com", headers={"authorization": "Bearer token"})
custom_processor = BatchSpanProcessor( span_exporter=custom_exporter, max_queue_size=2048, max_export_batch_size=512, export_timeout_millis=30000)
rizk = Rizk.init( app_name="MyApp", api_key="your-api-key", processor=custom_processor, exporter=custom_exporter)
Framework-Specific Observability
OpenAI Agents
from rizk.sdk.decorators import workflow, agent, tool
@tool(name="search_tool", organization_id="demo", project_id="agents")def search_web(query: str) -> str: """Search tool with automatic tracing.""" # Tool usage automatically traced return f"Search results for: {query}"
@agent(name="research_agent", organization_id="demo", project_id="agents")def create_research_agent(): """Agent creation with tracing.""" # Agent lifecycle automatically traced return {"agent": "researcher", "tools": [search_web]}
@workflow(name="research_workflow", organization_id="demo", project_id="agents")def run_research(topic: str): """Complete workflow tracing.""" # Full workflow execution traced agent = create_research_agent() return f"Research completed on: {topic}"
LangChain Integration
from rizk.sdk.decorators import workflow, agentfrom langchain.callbacks import get_openai_callback
@workflow(name="langchain_workflow", organization_id="demo", project_id="langchain")def run_langchain_process(query: str): """LangChain process with enhanced tracing."""
# Automatic callback integration with get_openai_callback() as cb: # LangChain operations automatically traced # Token usage and costs captured result = process_with_langchain(query)
# Cost information added to trace Rizk.set_association_properties({ "tokens.total": cb.total_tokens, "tokens.prompt": cb.prompt_tokens, "tokens.completion": cb.completion_tokens, "cost.total_usd": cb.total_cost })
return result
def process_with_langchain(query: str): """Simulate LangChain processing.""" return f"LangChain processed: {query}"
CrewAI Integration
from rizk.sdk.decorators import crew, agent, task
@agent(name="writer", organization_id="demo", project_id="crewai")def create_writer(): """Writer agent with tracing.""" return {"role": "writer", "goal": "create content"}
@task(name="writing_task", organization_id="demo", project_id="crewai")def create_writing_task(): """Writing task with tracing.""" return {"task": "write article", "agent": "writer"}
@crew(name="content_crew", organization_id="demo", project_id="crewai")def run_content_crew(topic: str): """Crew execution with comprehensive tracing.""" writer = create_writer() task = create_writing_task()
# Crew execution automatically traced # Individual agent and task performance captured return f"Content crew completed work on: {topic}"
Custom Metrics and Attributes
Adding Custom Attributes
from rizk.sdk import Rizkfrom opentelemetry import trace
@workflow(name="custom_attributes_workflow")def process_with_custom_attributes(user_data: dict): """Add custom attributes to traces."""
# Get current span current_span = trace.get_current_span()
# Add custom attributes current_span.set_attribute("user.tier", user_data.get("tier", "standard")) current_span.set_attribute("request.priority", "high") current_span.set_attribute("feature.flags", "new_ui,beta_features")
# Or use Rizk's association properties Rizk.set_association_properties({ "business.unit": "customer_success", "geo.region": "us-west-2", "experiment.variant": "control" })
return {"processed": True}
Performance Metrics
import timefrom rizk.sdk.decorators import workflow
@workflow(name="performance_monitoring")def monitor_performance(): """Monitor custom performance metrics.""" start_time = time.time()
# Simulate processing time.sleep(0.1)
processing_time = time.time() - start_time
# Add performance metrics Rizk.set_association_properties({ "performance.processing_time_ms": processing_time * 1000, "performance.memory_usage_mb": 45.2, "performance.cpu_usage_percent": 23.5 })
return {"status": "completed"}
Error Tracking and Debugging
Automatic Error Capture
from rizk.sdk.decorators import workflow
@workflow(name="error_handling_workflow")def process_with_error_handling(data: dict): """Automatic error capture and tracing.""" try: # Simulate processing that might fail if data.get("invalid"): raise ValueError("Invalid data provided")
return {"processed": True}
except Exception as e: # Errors automatically captured in traces # Stack traces and error details included current_span = trace.get_current_span() current_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) current_span.set_attribute("error.type", type(e).__name__) current_span.set_attribute("error.message", str(e))
# Re-raise or handle as needed raise
Debug Tracing
import loggingfrom rizk.sdk.decorators import workflow
# Enable debug logginglogging.getLogger("rizk").setLevel(logging.DEBUG)logging.getLogger("opentelemetry").setLevel(logging.DEBUG)
@workflow(name="debug_workflow")def debug_process(): """Process with debug-level tracing.""" # Detailed trace information logged # Span creation and completion logged # Attribute setting logged return {"debug": "enabled"}
Observability Best Practices
1. Meaningful Span Names
# ✅ Good - Descriptive, consistent naming@workflow(name="customer_onboarding_v2")@task(name="validate_customer_email")@agent(name="onboarding_assistant")
# ⌠Avoid - Generic or unclear names@workflow(name="process")@task(name="step1")@agent(name="bot")
2. Appropriate Context Granularity
# ✅ Good - Balanced context depth@workflow( name="order_processing", organization_id="ecommerce", project_id="orders")def process_order():
@task( name="payment_validation", task_id="payment_check" ) def validate_payment(): pass
# ⌠Avoid - Too much or too little context@workflow(name="order_processing") # Missing contextdef process_order():
@task( name="payment_validation", organization_id="ecommerce", project_id="orders", service_id="payments", region_id="us-west", datacenter_id="dc1" # Too granular ) def validate_payment(): pass
3. Sensitive Data Handling
# ✅ Good - Exclude sensitive data@workflow(name="user_authentication")def authenticate_user(username: str, password: str): """Authenticate user without logging sensitive data."""
# Don't include password in traces Rizk.set_association_properties({ "user.username": username, "auth.method": "password", # "user.password": password # ⌠Never do this })
return {"authenticated": True}
# ✅ Good - Hash or mask sensitive datadef process_payment(card_number: str): """Process payment with masked card data."""
masked_card = f"****-****-****-{card_number[-4:]}"
Rizk.set_association_properties({ "payment.card_last_four": card_number[-4:], "payment.card_masked": masked_card, # "payment.card_number": card_number # ⌠Never do this })
4. Performance Optimization
# ✅ Good - Conditional detailed tracingimport os
@workflow(name="performance_optimized")def optimized_process(): """Optimize tracing for performance."""
# Detailed tracing only in development if os.getenv("ENVIRONMENT") == "development": Rizk.set_association_properties({ "debug.detailed_timing": True, "debug.memory_tracking": True })
# Always include essential metrics Rizk.set_association_properties({ "request.id": "req_12345", "user.tier": "premium" })
Monitoring and Alerting
Key Metrics to Monitor
# Essential observability metrics:metrics_to_monitor = { "request_latency": "95th percentile response time", "error_rate": "Percentage of failed requests", "throughput": "Requests per second", "token_usage": "LLM token consumption", "cost_per_request": "Average cost per request", "guardrails_violations": "Policy violation rate", "framework_errors": "Framework-specific errors"}
Custom Dashboards
# Dashboard queries for common metrics:dashboard_queries = { "latency_p95": "histogram_quantile(0.95, rate(request_duration_seconds_bucket[5m]))", "error_rate": "rate(request_errors_total[5m]) / rate(requests_total[5m])", "cost_trend": "rate(llm_cost_usd_total[1h])", "top_users": "topk(10, sum by (user_id) (rate(requests_total[24h])))", "framework_distribution": "sum by (framework_name) (rate(requests_total[24h]))"}
Integration with Observability Platforms
Popular Platforms
# Jaegerrizk = Rizk.init( app_name="MyApp", opentelemetry_endpoint="http://jaeger:14268/api/traces")
# Datadogrizk = Rizk.init( app_name="MyApp", opentelemetry_endpoint="https://trace.agent.datadoghq.com", headers={"DD-API-KEY": "your-dd-api-key"})
# New Relicrizk = Rizk.init( app_name="MyApp", opentelemetry_endpoint="https://otlp.nr-data.net:4317", headers={"api-key": "your-nr-license-key"})
# Honeycombrizk = Rizk.init( app_name="MyApp", opentelemetry_endpoint="https://api.honeycomb.io", headers={"x-honeycomb-team": "your-honeycomb-key"})
Summary
Rizk SDK’s observability system provides:
✅ Comprehensive Tracing - Full request lifecycle visibility
✅ Hierarchical Context - Enterprise-grade organization
✅ Framework Integration - Native support for all LLM frameworks
✅ Performance Monitoring - Latency, cost, and usage metrics
✅ Error Tracking - Automatic error capture and debugging
✅ Custom Attributes - Business-specific metadata support
✅ Platform Integration - Works with all major observability platforms
The observability system gives you complete visibility into your LLM applications, enabling proactive monitoring, debugging, and optimization at enterprise scale.