Development Best Practices¶
Overview¶
This document outlines development best practices for OVES Access Management, ensuring code quality, security, and maintainability.
Code Standards¶
Python Code Style¶
Follow PEP 8 with these specific guidelines:
# Good: Clear, descriptive names
def authenticate_user_with_azure_ad(username: str, password: str) -> AuthResult:
"""Authenticate user against Azure AD."""
pass
# Good: Type hints for better maintainability
from typing import Optional, Dict, Any
def get_user_permissions(user_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve user permissions from database."""
pass
Validation Pattern (Preferred)¶
Use the Validator Array Pattern for clean, maintainable validation:
# Preferred approach - simplified validator array
def validate_user_registration(data: Dict[str, Any]) -> ValidationResult:
validators = [
{
'check': lambda: 'email' in data and '@' in data['email'],
'signal': 'INVALID_EMAIL'
},
{
'check': lambda: len(data.get('password', '')) >= 12,
'signal': 'PASSWORD_TOO_SHORT'
},
{
'check': lambda: data.get('username', '').isalnum(),
'signal': 'INVALID_USERNAME'
}
]
for validator in validators:
if not validator['check']():
return ValidationResult(False, validator['signal'])
return ValidationResult(True, 'VALID')
Documentation Standards¶
def process_access_request(request_data: Dict[str, Any]) -> AccessDecision:
"""
Process user access request following OVES security model.
This function implements the core access control logic, ensuring:
- Business owner authority is respected
- Single source of truth is maintained
- Personnel changes don't affect system continuity
Args:
request_data: Dictionary containing:
- user_id: Unique user identifier
- resource: Requested resource path
- action: Requested action (read/write/delete)
- context: Additional request context
Returns:
AccessDecision: Contains decision and reasoning
Raises:
ValidationError: If request_data is malformed
AuthorizationError: If user lacks permission
"""
pass
Security Best Practices¶
Input Validation¶
Always validate and sanitize input:
def sanitize_input(data: str) -> str:
"""Sanitize user input to prevent injection attacks."""
# Remove potentially dangerous characters
import re
return re.sub(r'[<>"\';\\]', '', data)
def validate_user_input(data: Dict[str, Any]) -> bool:
"""Validate user input using validator array pattern."""
validators = [
{
'check': lambda: all(key in data for key in ['username', 'email']),
'signal': 'MISSING_REQUIRED_FIELDS'
},
{
'check': lambda: len(data.get('username', '')) <= 50,
'signal': 'USERNAME_TOO_LONG'
}
]
for validator in validators:
if not validator['check']():
raise ValidationError(validator['signal'])
return True
Password Handling¶
import bcrypt
from secrets import token_urlsafe
def hash_password(password: str) -> str:
"""Hash password using bcrypt."""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def generate_secure_token() -> str:
"""Generate cryptographically secure token."""
return token_urlsafe(32)
Database Security¶
# Good: Use parameterized queries
def get_user_by_id(user_id: str) -> Optional[User]:
query = "SELECT * FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
return cursor.fetchone()
# Bad: SQL injection vulnerability
def get_user_by_id_bad(user_id: str) -> Optional[User]:
query = f"SELECT * FROM users WHERE id = '{user_id}'"
cursor.execute(query) # NEVER DO THIS
return cursor.fetchone()
Testing Standards¶
Unit Testing¶
import pytest
from unittest.mock import Mock, patch
class TestUserAuthentication:
"""Test user authentication functionality."""
def test_successful_authentication(self):
"""Test successful user authentication."""
# Arrange
user_service = UserService()
username = "test_user"
password = "secure_password"
# Act
result = user_service.authenticate(username, password)
# Assert
assert result.success is True
assert result.user_id is not None
@patch('azure_ad_client.authenticate')
def test_azure_ad_authentication(self, mock_azure_auth):
"""Test Azure AD authentication integration."""
# Arrange
mock_azure_auth.return_value = {"user_id": "123", "email": "test@example.com"}
auth_service = AuthenticationService()
# Act
result = auth_service.authenticate_with_azure_ad("test@example.com", "password")
# Assert
assert result.success is True
mock_azure_auth.assert_called_once()
Integration Testing¶
def test_end_to_end_user_registration():
"""Test complete user registration flow."""
# Test data
user_data = {
"username": "newuser",
"email": "newuser@example.com",
"password": "SecurePassword123!"
}
# Register user
response = client.post("/api/users/register", json=user_data)
assert response.status_code == 201
# Verify user was created
user = User.get_by_email(user_data["email"])
assert user is not None
assert user.username == user_data["username"]
Error Handling¶
Exception Hierarchy¶
class OVESException(Exception):
"""Base exception for OVES system."""
pass
class AuthenticationError(OVESException):
"""Authentication-related errors."""
pass
class AuthorizationError(OVESException):
"""Authorization-related errors."""
pass
class ValidationError(OVESException):
"""Input validation errors."""
def __init__(self, signal: str, details: str = ""):
self.signal = signal
self.details = details
super().__init__(f"{signal}: {details}")
Error Response Format¶
def handle_validation_error(error: ValidationError) -> Dict[str, Any]:
"""Handle validation errors consistently."""
return {
"error": {
"type": "validation_error",
"signal": error.signal,
"message": str(error),
"timestamp": datetime.utcnow().isoformat(),
"request_id": get_request_id()
}
}
Performance Guidelines¶
Database Optimization¶
# Good: Use database indexes and efficient queries
def get_user_permissions_optimized(user_id: str) -> List[Permission]:
"""Get user permissions with optimized query."""
query = """
SELECT p.resource, p.action
FROM permissions p
JOIN user_roles ur ON p.role_id = ur.role_id
WHERE ur.user_id = %s
AND p.active = true
"""
return execute_query(query, (user_id,))
# Good: Use connection pooling
from sqlalchemy.pool import QueuePool
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
Caching Strategy¶
from functools import lru_cache
import redis
# In-memory caching for frequently accessed data
@lru_cache(maxsize=1000)
def get_user_roles(user_id: str) -> List[str]:
"""Get user roles with caching."""
return database.get_user_roles(user_id)
# Redis caching for session data
def cache_user_session(session_id: str, user_data: Dict[str, Any], ttl: int = 3600):
"""Cache user session data in Redis."""
redis_client.setex(session_id, ttl, json.dumps(user_data))
Documentation Standards¶
API Documentation¶
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
class UserRegistrationRequest(BaseModel):
"""User registration request model."""
username: str
email: str
password: str
@router.post("/users/register", response_model=UserResponse)
async def register_user(request: UserRegistrationRequest):
"""
Register new user in OVES system.
This endpoint creates a new user account following OVES security standards:
- Validates input data
- Hashes password securely
- Creates audit log entry
- Maintains business owner authority
Args:
request: User registration data
Returns:
UserResponse: Created user information (without password)
Raises:
HTTPException: 400 if validation fails, 409 if user exists
"""
pass
Architecture Documentation¶
Follow PlantUML standards for diagrams:
@startuml
!include <C4/C4_Context.puml>
title OVES Authentication Flow
Person(User, "User", "System user")
System(OVES, "OVES Access Management", "Core access control system")
System_Ext(AzureAD, "Azure AD", "Identity provider")
Rel(User, OVES, "Authenticate", "HTTPS")
Rel(OVES, AzureAD, "Verify credentials", "OAUTH2")
note top of OVES : Single source of truth\nfor access decisions
@enduml
Deployment Practices¶
Environment Configuration¶
import os
from dataclasses import dataclass
@dataclass
class Config:
"""Application configuration."""
database_url: str
jwt_secret: str
azure_tenant_id: str
debug: bool = False
@classmethod
def from_environment(cls) -> 'Config':
"""Load configuration from environment variables."""
return cls(
database_url=os.getenv('DATABASE_URL', 'sqlite:///app.db'),
jwt_secret=os.getenv('JWT_SECRET', 'dev-secret'),
azure_tenant_id=os.getenv('AZURE_TENANT_ID', ''),
debug=os.getenv('DEBUG', 'false').lower() == 'true'
)
Health Checks¶
async def health_check() -> Dict[str, Any]:
"""Comprehensive system health check."""
health_validators = [
{
'check': lambda: database.is_connected(),
'signal': 'DATABASE_DOWN'
},
{
'check': lambda: azure_ad_client.is_reachable(),
'signal': 'AZURE_AD_UNREACHABLE'
}
]
for validator in health_validators:
if not validator['check']():
return {
"status": "unhealthy",
"error": validator['signal'],
"timestamp": datetime.utcnow().isoformat()
}
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat()
}
Key Principles: - Business owner maintains ultimate authority - Single source of truth for all access decisions - System designed for personnel continuity - Security by design, not as an afterthought