GuardrailsEngine API Reference
GuardrailsEngine API Reference
The GuardrailsEngine
is the core component for policy enforcement and evaluation in Rizk SDK. It orchestrates multiple guardrail components to process messages, check outputs, and apply policies.
Class Overview
from rizk.sdk import Rizkfrom rizk.sdk.guardrails.engine import GuardrailsEngine
# Get guardrails engine instanceguardrails = Rizk.get_guardrails()
# Process a messageresult = await guardrails.process_message("Hello world")print(f"Allowed: {result['allowed']}")
Class Structure
class GuardrailsEngine: """Main engine for Rizk guardrails policy enforcement."""
fast_rules: Optional[FastRulesEngine] policy_augmentation: Optional[PolicyAugmentation] llm_fallback: Optional[LLMFallback] state_manager: Optional[StateManager] tracer: Optional[Tracer]
Initialization
GuardrailsEngine.get_instance()
Static method to get or create the singleton GuardrailsEngine instance.
@classmethoddef get_instance( cls, config: Optional[Dict[str, Any]] = None) -> "GuardrailsEngine"
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
config | Optional[Dict[str, Any]] | None | Configuration dictionary for initialization |
Configuration Keys
Key | Type | Default | Description |
---|---|---|---|
policies_path | str | Auto-detected | Path to policy files directory |
llm_service | LLMServiceProtocol | DefaultLLMService() | LLM service for policy evaluation |
state_ttl_seconds | int | 3600 | TTL for conversation state |
state_cleanup_interval | int | 300 | State cleanup interval in seconds |
llm_cache_size | int | 1000 | LLM fallback cache size |
Returns
GuardrailsEngine
: The singleton guardrails engine instance
Example
# Basic initializationguardrails = GuardrailsEngine.get_instance()
# With custom configurationconfig = { "policies_path": "/app/custom-policies", "state_ttl_seconds": 7200, "llm_cache_size": 5000}guardrails = GuardrailsEngine.get_instance(config)
GuardrailsEngine.lazy_initialize()
Static method to initialize components if not already initialized.
@classmethoddef lazy_initialize(cls) -> None
Example
# Get instance and ensure initializationguardrails = GuardrailsEngine.get_instance()GuardrailsEngine.lazy_initialize()
Core Methods
process_message()
Process a user message using the configured guardrail decision logic.
async def process_message( self, message: str, context: Optional[Dict[str, Any]] = None) -> GuardrailProcessingResult
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
message | str | - | The user message to evaluate |
context | Optional[Dict[str, Any]] | None | Optional context dictionary |
Context Keys
Key | Type | Description |
---|---|---|
conversation_id | str | Conversation identifier |
organization_id | str | Organization identifier |
project_id | str | Project identifier |
user_id | str | User identifier |
policy_ids | List[str] | Specific policies to apply |
Returns
GuardrailProcessingResult
: Dictionary with evaluation results
GuardrailProcessingResult Structure
Field | Type | Description |
---|---|---|
allowed | bool | Whether the message is allowed |
confidence | float | Confidence score (0.0-1.0) |
decision_layer | str | Layer that made the decision |
violated_policies | Optional[List[str]] | IDs of violated policies |
applied_policies | Optional[List[str]] | IDs of applied policies |
response | Optional[str] | Alternative response if blocked |
blocked_reason | Optional[str] | Reason for blocking |
error | Optional[str] | Error description if occurred |
Examples
Basic message processing:
result = await guardrails.process_message("Can you help me with my account?")print(f"Allowed: {result['allowed']}")print(f"Confidence: {result['confidence']}")print(f"Decision layer: {result['decision_layer']}")
With context:
context = { "conversation_id": "conv_123", "organization_id": "acme_corp", "project_id": "customer_service", "user_id": "user_456"}
result = await guardrails.process_message( "I need help accessing sensitive data", context)
if not result['allowed']: print(f"Blocked: {result['blocked_reason']}") print(f"Violated policies: {result['violated_policies']}")
Error handling:
try: result = await guardrails.process_message(message, context) if result['error']: print(f"Guardrails error: {result['error']}") elif result['allowed']: # Process the allowed message response = process_user_message(message) else: # Handle blocked message response = result.get('response', 'Request blocked by policy')except Exception as e: print(f"Unexpected error: {e}") # Implement fallback behavior
check_output()
Evaluate an AI-generated response using guardrail policies.
async def check_output( self, ai_response: str, context: Optional[Dict[str, Any]] = None) -> GuardrailOutputCheckResult
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
ai_response | str | - | The AI-generated response to evaluate |
context | Optional[Dict[str, Any]] | None | Optional context dictionary |
Returns
GuardrailOutputCheckResult
: Dictionary with output evaluation results
GuardrailOutputCheckResult Structure
Field | Type | Description |
---|---|---|
allowed | bool | Whether the output is allowed |
confidence | float | Confidence score (0.0-1.0) |
decision_layer | str | Layer that made the decision |
violated_policies | Optional[List[str]] | IDs of violated policies |
blocked_reason | Optional[str] | Reason for blocking |
transformed_response | Optional[str] | Redacted/modified response |
error | Optional[str] | Error description if occurred |
Examples
Basic output checking:
ai_response = "Here's your account balance: $1,234.56"result = await guardrails.check_output(ai_response)
if result['allowed']: print("Output approved for delivery")else: print(f"Output blocked: {result['blocked_reason']}") if result['transformed_response']: print(f"Suggested alternative: {result['transformed_response']}")
With conversation context:
context = { "conversation_id": "conv_123", "organization_id": "finance_org", "user_id": "user_456"}
result = await guardrails.check_output( "Your SSN is 123-45-6789", context)
if not result['allowed']: # Use transformed response if available safe_response = result.get( 'transformed_response', 'Information has been redacted for privacy' ) print(f"Safe response: {safe_response}")
evaluate()
Simplified evaluation interface for framework adapters.
async def evaluate( self, input_text: str, context: Optional[Dict[str, Any]] = None, direction: str = "inbound") -> Decision
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
input_text | str | - | Text to evaluate against policies |
context | Optional[Dict[str, Any]] | None | Context information |
direction | str | "inbound" | Evaluation direction: "inbound" or "outbound" |
Returns
Decision
: Simple decision object
Decision Structure
Field | Type | Description |
---|---|---|
allowed | bool | Whether the input is allowed |
reason | Optional[str] | Reason for blocking if not allowed |
policy_id | Optional[str] | ID of the policy that triggered |
confidence | Optional[float] | Confidence score |
Examples
Inbound evaluation:
decision = await guardrails.evaluate( "Can you show me user passwords?", context={"user_id": "admin_001"}, direction="inbound")
if not decision.allowed: print(f"Request blocked: {decision.reason}") print(f"Policy: {decision.policy_id}")
Outbound evaluation:
decision = await guardrails.evaluate( "Password: secret123", direction="outbound")
if not decision.allowed: print("Output contains sensitive information")
Utility Methods
augment_system_prompt()
Augment a system prompt with policy guidelines.
def augment_system_prompt( self, original_prompt: str, message: str) -> str
Parameters
Parameter | Type | Description |
---|---|---|
original_prompt | str | The original system prompt |
message | str | User message for context |
Returns
str
: Augmented system prompt with guidelines
Example
original = "You are a helpful assistant."user_message = "Help me with financial advice"
augmented = guardrails.augment_system_prompt(original, user_message)print(augmented)# Output: "You are a helpful assistant.\n\nIMPORTANT GUIDELINES:\n- Never provide specific investment advice..."
Context Management
set_current_guidelines()
Set guidelines in the current OpenTelemetry context.
@classmethoddef set_current_guidelines(cls, guidelines: List[str]) -> None
Example
guidelines = [ "Always verify user identity before sharing account information", "Never provide passwords or sensitive authentication details"]GuardrailsEngine.set_current_guidelines(guidelines)
clear_current_guidelines()
Clear guidelines from the current context.
@classmethoddef clear_current_guidelines(cls) -> None
Example
# Clear guidelines after processingGuardrailsEngine.clear_current_guidelines()
Error Handling
The GuardrailsEngine uses the @handle_errors
decorator for robust error handling:
Error Response Structure
When errors occur, methods return structured error responses:
# For process_message errorserror_result = { "allowed": False, "confidence": 1.0, "decision_layer": "error", "error": "Guardrails processing failed", "violated_policies": [], "blocked_reason": "Guardrails internal error"}
# For check_output errorserror_result = { "allowed": False, "confidence": 1.0, "decision_layer": "error", "error": "Guardrails output check failed", "violated_policies": [], "blocked_reason": "Guardrails internal error"}
Handling Errors
result = await guardrails.process_message(message)
if result.get('error'): # Handle guardrails system error print(f"System error: {result['error']}") # Implement fallback logic fallback_result = handle_without_guardrails(message)elif not result['allowed']: # Handle policy violation print(f"Policy violation: {result['blocked_reason']}") response = result.get('response', 'Request blocked')else: # Message approved response = process_approved_message(message)
Thread Safety
The GuardrailsEngine implements thread-safe patterns:
- Singleton Pattern: Thread-safe instance creation with double-checked locking
- Lazy Initialization: Thread-safe component initialization using threading events
- State Management: Thread-safe conversation state with reentrant locks
import threadingimport asyncio
# Safe to use across multiple threadsasync def worker(worker_id: int): guardrails = GuardrailsEngine.get_instance()
for i in range(10): message = f"Message {i} from worker {worker_id}" result = await guardrails.process_message(message) print(f"Worker {worker_id}: {result['allowed']}")
# Run multiple workersasync def main(): tasks = [worker(i) for i in range(5)] await asyncio.gather(*tasks)
Performance Considerations
Caching
The GuardrailsEngine implements intelligent caching:
- LLM Cache: Caches LLM evaluation results (configurable size)
- Policy Cache: Caches policy matching results
- Framework Detection Cache: Caches framework detection results
Batch Processing
For high-throughput scenarios:
# Process multiple messagesmessages = ["message1", "message2", "message3"]context = {"conversation_id": "conv_123"}
results = await asyncio.gather(*[ guardrails.process_message(msg, context) for msg in messages])
for i, result in enumerate(results): print(f"Message {i}: {result['allowed']}")
Performance Monitoring
import time
# Monitor processing timestart_time = time.time()result = await guardrails.process_message(message)processing_time = time.time() - start_time
print(f"Processing took {processing_time:.3f} seconds")print(f"Decision layer: {result['decision_layer']}")
Integration Examples
With Framework Adapters
# Framework adapter integrationclass MyFrameworkAdapter: def __init__(self): self.guardrails = GuardrailsEngine.get_instance()
async def apply_input_guardrails(self, user_input: str, context: dict): result = await self.guardrails.process_message(user_input, context)
if not result['allowed']: raise PolicyViolationError(result['blocked_reason'])
return result.get('response', user_input)
async def apply_output_guardrails(self, ai_output: str, context: dict): result = await self.guardrails.check_output(ai_output, context)
if not result['allowed']: return result.get( 'transformed_response', 'Response blocked by policy' )
return ai_output
With Custom LLM Services
from rizk.sdk.guardrails.llm_service import LLMServiceProtocol
class CustomLLMService(LLMServiceProtocol): async def evaluate_policy(self, message: str, policy: dict) -> dict: # Custom LLM evaluation logic return { "allowed": True, "confidence": 0.9, "reasoning": "Content appears safe" }
# Use custom LLM serviceconfig = {"llm_service": CustomLLMService()}guardrails = GuardrailsEngine.get_instance(config)
Best Practices
1. Context Management
Always provide context for better policy evaluation:
context = { "conversation_id": conversation_id, "organization_id": "my_org", "project_id": "my_project", "user_id": user_id, "user_role": user_role}
result = await guardrails.process_message(message, context)
2. Error Handling
Implement comprehensive error handling:
async def safe_process_message(message: str, context: dict) -> str: try: result = await guardrails.process_message(message, context)
if result.get('error'): # Log system error and use fallback logger.error(f"Guardrails error: {result['error']}") return fallback_process(message)
if not result['allowed']: # Handle policy violation return result.get('response', 'Request blocked')
# Process approved message return process_message(message)
except Exception as e: logger.error(f"Unexpected error: {e}") return "I'm sorry, I cannot process that request right now."
3. Performance Optimization
Monitor and optimize performance:
# Use async/await properlyasync def batch_process(messages: List[str], context: dict): # Process in parallel for better performance tasks = [ guardrails.process_message(msg, context) for msg in messages ] results = await asyncio.gather(*tasks, return_exceptions=True)
return [ r if not isinstance(r, Exception) else {"allowed": False, "error": str(r)} for r in results ]
4. Resource Management
Properly manage resources in long-running applications:
# Initialize once, use many timesguardrails = GuardrailsEngine.get_instance({ "llm_cache_size": 10000, # Larger cache for production "state_ttl_seconds": 7200 # 2-hour conversation TTL})
# Use throughout application lifecycleasync def handle_request(message: str, context: dict): result = await guardrails.process_message(message, context) return process_result(result)
Related APIs
- Rizk Class API - SDK initialization and configuration
- Decorators API - Function and class decorators
- Types API - Type definitions and protocols
- Configuration API - Configuration management