Skip to content

Development Best Practices

Overview

This document outlines development best practices for OVES Access Management, ensuring code quality, security, and maintainability.

Code Standards

Python Code Style

Follow PEP 8 with these specific guidelines:

# Good: Clear, descriptive names
def authenticate_user_with_azure_ad(username: str, password: str) -> AuthResult:
    """Authenticate user against Azure AD."""
    pass

# Good: Type hints for better maintainability
from typing import Optional, Dict, Any

def get_user_permissions(user_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve user permissions from database."""
    pass

Validation Pattern (Preferred)

Use the Validator Array Pattern for clean, maintainable validation:

# Preferred approach - simplified validator array
def validate_user_registration(data: Dict[str, Any]) -> ValidationResult:
    validators = [
        {
            'check': lambda: 'email' in data and '@' in data['email'],
            'signal': 'INVALID_EMAIL'
        },
        {
            'check': lambda: len(data.get('password', '')) >= 12,
            'signal': 'PASSWORD_TOO_SHORT'
        },
        {
            'check': lambda: data.get('username', '').isalnum(),
            'signal': 'INVALID_USERNAME'
        }
    ]

    for validator in validators:
        if not validator['check']():
            return ValidationResult(False, validator['signal'])

    return ValidationResult(True, 'VALID')

Documentation Standards

def process_access_request(request_data: Dict[str, Any]) -> AccessDecision:
    """
    Process user access request following OVES security model.

    This function implements the core access control logic, ensuring:
    - Business owner authority is respected
    - Single source of truth is maintained
    - Personnel changes don't affect system continuity

    Args:
        request_data: Dictionary containing:
            - user_id: Unique user identifier
            - resource: Requested resource path
            - action: Requested action (read/write/delete)
            - context: Additional request context

    Returns:
        AccessDecision: Contains decision and reasoning

    Raises:
        ValidationError: If request_data is malformed
        AuthorizationError: If user lacks permission
    """
    pass

Security Best Practices

Input Validation

Always validate and sanitize input:

def sanitize_input(data: str) -> str:
    """Sanitize user input to prevent injection attacks."""
    # Remove potentially dangerous characters
    import re
    return re.sub(r'[<>"\';\\]', '', data)

def validate_user_input(data: Dict[str, Any]) -> bool:
    """Validate user input using validator array pattern."""
    validators = [
        {
            'check': lambda: all(key in data for key in ['username', 'email']),
            'signal': 'MISSING_REQUIRED_FIELDS'
        },
        {
            'check': lambda: len(data.get('username', '')) <= 50,
            'signal': 'USERNAME_TOO_LONG'
        }
    ]

    for validator in validators:
        if not validator['check']():
            raise ValidationError(validator['signal'])

    return True

Password Handling

import bcrypt
from secrets import token_urlsafe

def hash_password(password: str) -> str:
    """Hash password using bcrypt."""
    salt = bcrypt.gensalt()
    return bcrypt.hashpw(password.encode('utf-8'), salt).decode('utf-8')

def verify_password(password: str, hashed: str) -> bool:
    """Verify password against hash."""
    return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))

def generate_secure_token() -> str:
    """Generate cryptographically secure token."""
    return token_urlsafe(32)

Database Security

# Good: Use parameterized queries
def get_user_by_id(user_id: str) -> Optional[User]:
    query = "SELECT * FROM users WHERE id = %s"
    cursor.execute(query, (user_id,))
    return cursor.fetchone()

# Bad: SQL injection vulnerability
def get_user_by_id_bad(user_id: str) -> Optional[User]:
    query = f"SELECT * FROM users WHERE id = '{user_id}'"
    cursor.execute(query)  # NEVER DO THIS
    return cursor.fetchone()

Testing Standards

Unit Testing

import pytest
from unittest.mock import Mock, patch

class TestUserAuthentication:
    """Test user authentication functionality."""

    def test_successful_authentication(self):
        """Test successful user authentication."""
        # Arrange
        user_service = UserService()
        username = "test_user"
        password = "secure_password"

        # Act
        result = user_service.authenticate(username, password)

        # Assert
        assert result.success is True
        assert result.user_id is not None

    @patch('azure_ad_client.authenticate')
    def test_azure_ad_authentication(self, mock_azure_auth):
        """Test Azure AD authentication integration."""
        # Arrange
        mock_azure_auth.return_value = {"user_id": "123", "email": "test@example.com"}
        auth_service = AuthenticationService()

        # Act
        result = auth_service.authenticate_with_azure_ad("test@example.com", "password")

        # Assert
        assert result.success is True
        mock_azure_auth.assert_called_once()

Integration Testing

def test_end_to_end_user_registration():
    """Test complete user registration flow."""
    # Test data
    user_data = {
        "username": "newuser",
        "email": "newuser@example.com",
        "password": "SecurePassword123!"
    }

    # Register user
    response = client.post("/api/users/register", json=user_data)
    assert response.status_code == 201

    # Verify user was created
    user = User.get_by_email(user_data["email"])
    assert user is not None
    assert user.username == user_data["username"]

Error Handling

Exception Hierarchy

class OVESException(Exception):
    """Base exception for OVES system."""
    pass

class AuthenticationError(OVESException):
    """Authentication-related errors."""
    pass

class AuthorizationError(OVESException):
    """Authorization-related errors."""
    pass

class ValidationError(OVESException):
    """Input validation errors."""
    def __init__(self, signal: str, details: str = ""):
        self.signal = signal
        self.details = details
        super().__init__(f"{signal}: {details}")

Error Response Format

def handle_validation_error(error: ValidationError) -> Dict[str, Any]:
    """Handle validation errors consistently."""
    return {
        "error": {
            "type": "validation_error",
            "signal": error.signal,
            "message": str(error),
            "timestamp": datetime.utcnow().isoformat(),
            "request_id": get_request_id()
        }
    }

Performance Guidelines

Database Optimization

# Good: Use database indexes and efficient queries
def get_user_permissions_optimized(user_id: str) -> List[Permission]:
    """Get user permissions with optimized query."""
    query = """
        SELECT p.resource, p.action 
        FROM permissions p
        JOIN user_roles ur ON p.role_id = ur.role_id
        WHERE ur.user_id = %s
        AND p.active = true
    """
    return execute_query(query, (user_id,))

# Good: Use connection pooling
from sqlalchemy.pool import QueuePool

engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20
)

Caching Strategy

from functools import lru_cache
import redis

# In-memory caching for frequently accessed data
@lru_cache(maxsize=1000)
def get_user_roles(user_id: str) -> List[str]:
    """Get user roles with caching."""
    return database.get_user_roles(user_id)

# Redis caching for session data
def cache_user_session(session_id: str, user_data: Dict[str, Any], ttl: int = 3600):
    """Cache user session data in Redis."""
    redis_client.setex(session_id, ttl, json.dumps(user_data))

Documentation Standards

API Documentation

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

class UserRegistrationRequest(BaseModel):
    """User registration request model."""
    username: str
    email: str
    password: str

@router.post("/users/register", response_model=UserResponse)
async def register_user(request: UserRegistrationRequest):
    """
    Register new user in OVES system.

    This endpoint creates a new user account following OVES security standards:
    - Validates input data
    - Hashes password securely
    - Creates audit log entry
    - Maintains business owner authority

    Args:
        request: User registration data

    Returns:
        UserResponse: Created user information (without password)

    Raises:
        HTTPException: 400 if validation fails, 409 if user exists
    """
    pass

Architecture Documentation

Follow PlantUML standards for diagrams:

@startuml
!include <C4/C4_Context.puml>

title OVES Authentication Flow

Person(User, "User", "System user")
System(OVES, "OVES Access Management", "Core access control system")
System_Ext(AzureAD, "Azure AD", "Identity provider")

Rel(User, OVES, "Authenticate", "HTTPS")
Rel(OVES, AzureAD, "Verify credentials", "OAUTH2")

note top of OVES : Single source of truth\nfor access decisions
@enduml

Deployment Practices

Environment Configuration

import os
from dataclasses import dataclass

@dataclass
class Config:
    """Application configuration."""
    database_url: str
    jwt_secret: str
    azure_tenant_id: str
    debug: bool = False

    @classmethod
    def from_environment(cls) -> 'Config':
        """Load configuration from environment variables."""
        return cls(
            database_url=os.getenv('DATABASE_URL', 'sqlite:///app.db'),
            jwt_secret=os.getenv('JWT_SECRET', 'dev-secret'),
            azure_tenant_id=os.getenv('AZURE_TENANT_ID', ''),
            debug=os.getenv('DEBUG', 'false').lower() == 'true'
        )

Health Checks

async def health_check() -> Dict[str, Any]:
    """Comprehensive system health check."""
    health_validators = [
        {
            'check': lambda: database.is_connected(),
            'signal': 'DATABASE_DOWN'
        },
        {
            'check': lambda: azure_ad_client.is_reachable(),
            'signal': 'AZURE_AD_UNREACHABLE'
        }
    ]

    for validator in health_validators:
        if not validator['check']():
            return {
                "status": "unhealthy",
                "error": validator['signal'],
                "timestamp": datetime.utcnow().isoformat()
            }

    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat()
    }

Key Principles: - Business owner maintains ultimate authority - Single source of truth for all access decisions - System designed for personnel continuity - Security by design, not as an afterthought