Security Hardening¶
This guide covers security hardening strategies for RAG Modulo deployment on IBM Cloud, ensuring comprehensive protection of data, applications, and infrastructure.
Overview¶
The security hardening strategy provides:
- Defense in Depth: Multiple layers of security controls
- Zero Trust Architecture: Never trust, always verify
- Compliance: Meet regulatory and industry standards
- Monitoring: Continuous security monitoring and alerting
- Incident Response: Rapid response to security incidents
Security Architecture¶
graph TB
subgraph "External Threats"
ATTACK[Attackers]
MALWARE[Malware]
BOT[Botnets]
end
subgraph "Security Layers"
WAF[Web Application Firewall]
DDoS[DDoS Protection]
SSL[SSL/TLS Termination]
IAM[Identity & Access Management]
SECRETS[Secrets Management]
ENCRYPT[Encryption]
MONITOR[Security Monitoring]
end
subgraph "Applications"
FE[Frontend App]
BE[Backend App]
end
subgraph "Data Layer"
PG[PostgreSQL]
OS[Object Storage]
ZL[Zilliz Cloud]
ES[Event Streams]
end
subgraph "Network Security"
VPC[VPC]
NSG[Network Security Groups]
NLB[Network Load Balancer]
VPN[VPN Gateway]
end
ATTACK --> WAF
MALWARE --> DDoS
BOT --> SSL
WAF --> IAM
DDoS --> SECRETS
SSL --> ENCRYPT
IAM --> FE
SECRETS --> BE
ENCRYPT --> MONITOR
FE --> VPC
BE --> NSG
VPC --> NLB
NSG --> VPN
NLB --> PG
VPN --> OS
PG --> ZL
OS --> ES Network Security¶
1. VPC Configuration¶
VPC Setup¶
# VPC configuration
resource "ibm_is_vpc" "rag_modulo_vpc" {
name = "${var.project_name}-vpc"
resource_group = var.resource_group_id
tags = [
"project:${var.project_name}",
"environment:${var.environment}",
"security:high"
]
}
# Public gateway for outbound internet access
resource "ibm_is_public_gateway" "rag_modulo_pgw" {
name = "${var.project_name}-pgw"
vpc = ibm_is_vpc.rag_modulo_vpc.id
zone = "${var.region}-1"
resource_group = var.resource_group_id
}
# Subnet for applications
resource "ibm_is_subnet" "rag_modulo_subnet" {
name = "${var.project_name}-subnet"
vpc = ibm_is_vpc.rag_modulo_vpc.id
zone = "${var.region}-1"
ipv4_cidr_block = "10.240.0.0/24"
public_gateway = ibm_is_public_gateway.rag_modulo_pgw.id
resource_group = var.resource_group_id
}
Network Security Groups¶
# Network Security Group for applications
resource "ibm_is_security_group" "rag_modulo_sg" {
name = "${var.project_name}-sg"
vpc = ibm_is_vpc.rag_modulo_vpc.id
resource_group = var.resource_group_id
}
# Allow HTTPS inbound
resource "ibm_is_security_group_rule" "https_inbound" {
group = ibm_is_security_group.rag_modulo_sg.id
direction = "inbound"
remote = "0.0.0.0/0"
tcp {
port_min = 443
port_max = 443
}
}
# Allow HTTP inbound (redirected to HTTPS)
resource "ibm_is_security_group_rule" "http_inbound" {
group = ibm_is_security_group.rag_modulo_sg.id
direction = "inbound"
remote = "0.0.0.0/0"
tcp {
port_min = 80
port_max = 80
}
}
# Allow outbound HTTPS
resource "ibm_is_security_group_rule" "https_outbound" {
group = ibm_is_security_group.rag_modulo_sg.id
direction = "outbound"
remote = "0.0.0.0/0"
tcp {
port_min = 443
port_max = 443
}
}
# Allow outbound PostgreSQL
resource "ibm_is_security_group_rule" "postgresql_outbound" {
group = ibm_is_security_group.rag_modulo_sg.id
direction = "outbound"
remote = "0.0.0.0/0"
tcp {
port_min = 5432
port_max = 5432
}
}
2. Load Balancer Security¶
Application Load Balancer¶
# Application Load Balancer
resource "ibm_is_lb" "rag_modulo_lb" {
name = "${var.project_name}-lb"
type = "public"
subnets = [ibm_is_subnet.rag_modulo_subnet.id]
resource_group = var.resource_group_id
}
# HTTPS listener
resource "ibm_is_lb_listener" "rag_modulo_https" {
lb = ibm_is_lb.rag_modulo_lb.id
port = 443
protocol = "https"
certificate = ibm_is_lb_certificate.rag_modulo_cert.crn
default_pool = ibm_is_lb_pool.rag_modulo_pool.id
}
# SSL certificate
resource "ibm_is_lb_certificate" "rag_modulo_cert" {
name = "${var.project_name}-cert"
lb = ibm_is_lb.rag_modulo_lb.id
certificate = var.ssl_certificate
private_key = var.ssl_private_key
}
# Load balancer pool
resource "ibm_is_lb_pool" "rag_modulo_pool" {
name = "${var.project_name}-pool"
lb = ibm_is_lb.rag_modulo_lb.id
algorithm = "round_robin"
protocol = "https"
health_delay = 5
health_retries = 2
health_timeout = 2
health_type = "https"
health_monitor = "https://backend-app.example.com/health"
}
Identity and Access Management¶
1. IAM Configuration¶
Service IDs¶
# Service ID for applications
resource "ibm_iam_service_id" "rag_modulo_service_id" {
name = "${var.project_name}-service-id"
description = "Service ID for RAG Modulo applications"
}
# Service ID for Terraform
resource "ibm_iam_service_id" "terraform_service_id" {
name = "${var.project_name}-terraform-service-id"
description = "Service ID for Terraform operations"
}
IAM Policies¶
# Policy for Code Engine access
resource "ibm_iam_service_policy" "code_engine_policy" {
iam_service_id = ibm_iam_service_id.rag_modulo_service_id.id
roles = ["Code Engine Developer", "Code Engine Administrator"]
resources {
service = "codeengine"
}
}
# Policy for database access
resource "ibm_iam_service_policy" "database_policy" {
iam_service_id = ibm_iam_service_id.rag_modulo_service_id.id
roles = ["Database Administrator"]
resources {
service = "databases-for-postgresql"
resource_group_id = var.resource_group_id
}
}
# Policy for object storage access
resource "ibm_iam_service_policy" "object_storage_policy" {
iam_service_id = ibm_iam_service_id.rag_modulo_service_id.id
roles = ["Object Storage Manager"]
resources {
service = "cloud-object-storage"
resource_group_id = var.resource_group_id
}
}
2. API Key Management¶
API Key Rotation¶
#!/bin/bash
# API key rotation script
set -e
# Configuration
OLD_API_KEY="$1"
NEW_API_KEY="$2"
SERVICE_ID="$3"
if [ -z "$OLD_API_KEY" ] || [ -z "$NEW_API_KEY" ] || [ -z "$SERVICE_ID" ]; then
echo "Usage: $0 <old_api_key> <new_api_key> <service_id>"
exit 1
fi
# Create new API key
echo "Creating new API key..."
ibmcloud iam service-api-key-create "rag-modulo-api-key-$(date +%Y%m%d)" "$SERVICE_ID" --description "RAG Modulo API key created on $(date)"
# Update applications with new API key
echo "Updating applications with new API key..."
ibmcloud ce app update rag-modulo-backend --env "IBMCLOUD_API_KEY=$NEW_API_KEY"
ibmcloud ce app update rag-modulo-frontend --env "IBMCLOUD_API_KEY=$NEW_API_KEY"
# Verify applications are working
echo "Verifying applications..."
sleep 30
curl -f "https://backend-app.example.com/health" || exit 1
curl -f "https://frontend-app.example.com/health" || exit 1
# Delete old API key
echo "Deleting old API key..."
ibmcloud iam service-api-key-delete "$OLD_API_KEY" "$SERVICE_ID" --force
echo "API key rotation completed successfully"
Secrets Management¶
1. IBM Cloud Secrets Manager¶
Secrets Configuration¶
# Secrets Manager instance
resource "ibm_resource_instance" "secrets_manager" {
name = "${var.project_name}-secrets-manager"
service = "secrets-manager"
plan = "standard"
location = var.region
resource_group_id = var.resource_group_id
}
# Database password secret
resource "ibm_sm_secret" "database_password" {
instance_id = ibm_resource_instance.secrets_manager.guid
secret_type = "arbitrary"
name = "rag-modulo-database-password"
description = "Database password for RAG Modulo"
secret_data = jsonencode({
password = var.postgresql_admin_password
})
}
# API keys secret
resource "ibm_sm_secret" "api_keys" {
instance_id = ibm_resource_instance.secrets_manager.guid
secret_type = "arbitrary"
name = "rag-modulo-api-keys"
description = "API keys for RAG Modulo"
secret_data = jsonencode({
ibmcloud_api_key = var.ibmcloud_api_key
zilliz_api_key = var.zilliz_api_key
event_streams_api_key = var.event_streams_api_key
})
}
Secrets Integration¶
# Ansible playbook for secrets integration
---
- name: Configure secrets management
hosts: localhost
gather_facts: false
vars:
secrets_manager_instance_id: "{{ secrets_manager_instance_id }}"
tasks:
- name: Get database password from Secrets Manager
ansible.builtin.shell: |
ibmcloud secrets-manager secret get "rag-modulo-database-password" \
--instance-id "$secrets_manager_instance_id" \
--output json | jq -r '.secret_data.password'
register: database_password
no_log: true
- name: Get API keys from Secrets Manager
ansible.builtin.shell: |
ibmcloud secrets-manager secret get "rag-modulo-api-keys" \
--instance-id "$secrets_manager_instance_id" \
--output json | jq -r '.secret_data'
register: api_keys
no_log: true
- name: Update application with secrets
ansible.builtin.shell: |
ibmcloud ce app update rag-modulo-backend \
--env "DATABASE_PASSWORD=$database_password" \
--env "ZILLIZ_API_KEY=$(echo '$api_keys' | jq -r '.zilliz_api_key')" \
--env "EVENT_STREAMS_API_KEY=$(echo '$api_keys' | jq -r '.event_streams_api_key')"
environment:
IBMCLOUD_API_KEY: "{{ ibmcloud_api_key }}"
2. Environment Variable Security¶
Secure Environment Configuration¶
# Secure environment variables
secure_env_vars:
# Database configuration
DATABASE_URL: "postgresql://username:${DATABASE_PASSWORD}@host:port/database?sslmode=require"
DATABASE_PASSWORD: "{{ vault_database_password }}"
# API keys
IBMCLOUD_API_KEY: "{{ vault_ibmcloud_api_key }}"
ZILLIZ_API_KEY: "{{ vault_zilliz_api_key }}"
EVENT_STREAMS_API_KEY: "{{ vault_event_streams_api_key }}"
# Security settings
JWT_SECRET: "{{ vault_jwt_secret }}"
ENCRYPTION_KEY: "{{ vault_encryption_key }}"
# Production safeguards
SKIP_AUTH: "false"
DEBUG: "false"
LOG_LEVEL: "INFO"
Data Encryption¶
1. Encryption at Rest¶
Database Encryption¶
# PostgreSQL encryption configuration
postgresql_encryption:
enabled: true
encryption_key: "{{ vault_database_encryption_key }}"
key_rotation: "90d"
# Encryption settings
settings:
ssl_mode: "require"
ssl_cert: "{{ vault_ssl_cert }}"
ssl_key: "{{ vault_ssl_key }}"
ssl_ca: "{{ vault_ssl_ca }}"
Object Storage Encryption¶
# Object Storage encryption configuration
object_storage_encryption:
enabled: true
encryption_type: "AES256"
key_management: "ibm-cloud-key-protect"
# Bucket encryption
bucket_encryption:
- bucket: "rag-modulo-app-data"
encryption: "AES256"
key_id: "{{ vault_object_storage_key_id }}"
2. Encryption in Transit¶
TLS Configuration¶
# TLS configuration
tls_config:
enabled: true
version: "TLS 1.2"
ciphers: "ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256"
# Certificate management
certificate:
provider: "letsencrypt"
auto_renewal: true
renewal_threshold: "30d"
# HSTS configuration
hsts:
enabled: true
max_age: "31536000"
include_subdomains: true
preload: true
Application TLS¶
# Application TLS configuration
import ssl
from fastapi import FastAPI
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
app = FastAPI()
# Force HTTPS redirect
app.add_middleware(HTTPSRedirectMiddleware)
# TLS configuration
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain("cert.pem", "key.pem")
ssl_context.set_ciphers("ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-GCM-SHA256")
ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
Application Security¶
1. Input Validation¶
Request Validation¶
# Input validation
from pydantic import BaseModel, validator
import re
class SearchRequest(BaseModel):
query: str
collection_id: str
limit: int = 10
@validator('query')
def validate_query(cls, v):
if not v or len(v.strip()) == 0:
raise ValueError('Query cannot be empty')
if len(v) > 1000:
raise ValueError('Query too long')
# Check for SQL injection patterns
if re.search(r'[;\'"]', v):
raise ValueError('Invalid characters in query')
return v.strip()
@validator('collection_id')
def validate_collection_id(cls, v):
if not re.match(r'^[a-zA-Z0-9-_]+$', v):
raise ValueError('Invalid collection ID format')
return v
@validator('limit')
def validate_limit(cls, v):
if v < 1 or v > 100:
raise ValueError('Limit must be between 1 and 100')
return v
SQL Injection Prevention¶
# SQL injection prevention
import psycopg2
from psycopg2 import sql
def safe_query(cursor, query_template, params):
"""Execute query with parameterized statements"""
try:
cursor.execute(query_template, params)
return cursor.fetchall()
except psycopg2.Error as e:
logger.error(f"Database error: {e}")
raise HTTPException(status_code=500, detail="Database error")
# Example usage
def search_documents(collection_id: str, query: str, limit: int):
with get_db_connection() as conn:
with conn.cursor() as cursor:
# Use parameterized query
query_template = """
SELECT id, title, content, created_at
FROM documents
WHERE collection_id = %s
AND content ILIKE %s
ORDER BY created_at DESC
LIMIT %s
"""
params = (collection_id, f"%{query}%", limit)
return safe_query(cursor, query_template, params)
2. Authentication and Authorization¶
JWT Authentication¶
# JWT authentication
import jwt
from datetime import datetime, timedelta
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
class JWTAuth:
def __init__(self, secret_key: str, algorithm: str = "HS256"):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, user_id: str, expires_delta: timedelta = None):
"""Create JWT token"""
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=24)
payload = {
"user_id": user_id,
"exp": expire,
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str):
"""Verify JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# Authentication dependency
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
jwt_auth = JWTAuth(os.getenv("JWT_SECRET"))
payload = jwt_auth.verify_token(credentials.credentials)
return payload["user_id"]
Role-Based Access Control¶
# Role-based access control
from enum import Enum
from functools import wraps
class Role(Enum):
ADMIN = "admin"
USER = "user"
READONLY = "readonly"
def require_role(required_role: Role):
"""Decorator to require specific role"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
user_id = kwargs.get("current_user")
user_role = get_user_role(user_id)
if not has_permission(user_role, required_role):
raise HTTPException(status_code=403, detail="Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator
def has_permission(user_role: Role, required_role: Role) -> bool:
"""Check if user has required permission"""
role_hierarchy = {
Role.ADMIN: [Role.ADMIN, Role.USER, Role.READONLY],
Role.USER: [Role.USER, Role.READONLY],
Role.READONLY: [Role.READONLY]
}
return required_role in role_hierarchy.get(user_role, [])
# Usage example
@app.post("/api/collections")
@require_role(Role.ADMIN)
async def create_collection(
collection: CollectionCreate,
current_user: str = Depends(get_current_user)
):
# Only admins can create collections
pass
3. Rate Limiting¶
API Rate Limiting¶
# Rate limiting
from fastapi import FastAPI, Request
from fastapi.middleware.base import BaseHTTPMiddleware
import time
from collections import defaultdict
class RateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, calls: int = 100, period: int = 60):
super().__init__(app)
self.calls = calls
self.period = period
self.clients = defaultdict(list)
async def dispatch(self, request: Request, call_next):
client_ip = request.client.host
now = time.time()
# Clean old requests
self.clients[client_ip] = [
req_time for req_time in self.clients[client_ip]
if now - req_time < self.period
]
# Check rate limit
if len(self.clients[client_ip]) >= self.calls:
return JSONResponse(
status_code=429,
content={"detail": "Rate limit exceeded"}
)
# Add current request
self.clients[client_ip].append(now)
response = await call_next(request)
return response
# Apply rate limiting
app.add_middleware(RateLimitMiddleware, calls=100, period=60)
Security Monitoring¶
1. Security Event Monitoring¶
Security Event Collection¶
# Security event collection
import logging
from datetime import datetime
from typing import Dict, Any
class SecurityEventLogger:
def __init__(self):
self.logger = logging.getLogger("security")
self.logger.setLevel(logging.INFO)
# Create security event handler
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_auth_failure(self, user_id: str, ip_address: str, reason: str):
"""Log authentication failure"""
self.logger.warning(
f"Authentication failure - User: {user_id}, IP: {ip_address}, Reason: {reason}"
)
def log_suspicious_activity(self, activity: str, details: Dict[str, Any]):
"""Log suspicious activity"""
self.logger.warning(
f"Suspicious activity - {activity}: {details}"
)
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""Log general security event"""
self.logger.info(
f"Security event - {event_type}: {details}"
)
# Global security logger
security_logger = SecurityEventLogger()
Security Metrics¶
# Security metrics
from prometheus_client import Counter, Histogram, Gauge
# Security event counters
auth_failures = Counter('auth_failures_total', 'Total authentication failures', ['user_id', 'reason'])
suspicious_activities = Counter('suspicious_activities_total', 'Total suspicious activities', ['activity_type'])
security_events = Counter('security_events_total', 'Total security events', ['event_type'])
# Security response time
security_response_time = Histogram('security_response_time_seconds', 'Security response time')
# Active security threats
active_threats = Gauge('active_threats', 'Number of active security threats')
# Example usage
def log_auth_failure(user_id: str, reason: str):
auth_failures.labels(user_id=user_id, reason=reason).inc()
security_logger.log_auth_failure(user_id, get_client_ip(), reason)
def log_suspicious_activity(activity: str, details: Dict[str, Any]):
suspicious_activities.labels(activity_type=activity).inc()
security_logger.log_suspicious_activity(activity, details)
2. Security Alerting¶
Alert Rules¶
# Security alert rules
security_alerts:
- name: "high_auth_failures"
condition: "rate(auth_failures_total[5m]) > 10"
duration: "2m"
severity: "critical"
description: "High rate of authentication failures"
- name: "suspicious_activity_detected"
condition: "rate(suspicious_activities_total[5m]) > 5"
duration: "1m"
severity: "warning"
description: "Suspicious activity detected"
- name: "security_event_spike"
condition: "rate(security_events_total[5m]) > 20"
duration: "5m"
severity: "warning"
description: "Unusual spike in security events"
Incident Response¶
# Incident response automation
import asyncio
from datetime import datetime
from typing import List, Dict
class SecurityIncidentResponse:
def __init__(self):
self.active_incidents = {}
self.response_team = ["devops@company.com", "security@company.com"]
async def handle_security_alert(self, alert: Dict[str, Any]):
"""Handle security alert"""
incident_id = f"SEC-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
# Create incident
incident = {
"id": incident_id,
"type": alert["type"],
"severity": alert["severity"],
"timestamp": datetime.now(),
"status": "open",
"details": alert["details"]
}
self.active_incidents[incident_id] = incident
# Notify response team
await self.notify_response_team(incident)
# Take automated actions
await self.take_automated_actions(incident)
return incident_id
async def notify_response_team(self, incident: Dict[str, Any]):
"""Notify security response team"""
# Send email notification
await self.send_email_notification(incident)
# Send Slack notification
await self.send_slack_notification(incident)
async def take_automated_actions(self, incident: Dict[str, Any]):
"""Take automated security actions"""
if incident["severity"] == "critical":
# Block suspicious IP
await self.block_suspicious_ip(incident["details"]["ip_address"])
# Increase monitoring
await self.increase_monitoring(incident["details"]["user_id"])
# Generate security report
await self.generate_security_report(incident)
# Global incident response
incident_response = SecurityIncidentResponse()
Compliance and Auditing¶
1. Audit Logging¶
Audit Event Collection¶
# Audit logging
import json
from datetime import datetime
from typing import Dict, Any
class AuditLogger:
def __init__(self):
self.logger = logging.getLogger("audit")
self.logger.setLevel(logging.INFO)
# Create audit handler
handler = logging.StreamHandler()
formatter = logging.Formatter('%(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_user_action(self, user_id: str, action: str, resource: str, details: Dict[str, Any]):
"""Log user action"""
audit_event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "user_action",
"user_id": user_id,
"action": action,
"resource": resource,
"details": details,
"ip_address": get_client_ip(),
"user_agent": get_user_agent()
}
self.logger.info(json.dumps(audit_event))
def log_system_event(self, event_type: str, details: Dict[str, Any]):
"""Log system event"""
audit_event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "system_event",
"system_event_type": event_type,
"details": details
}
self.logger.info(json.dumps(audit_event))
def log_security_event(self, event_type: str, details: Dict[str, Any]):
"""Log security event"""
audit_event = {
"timestamp": datetime.utcnow().isoformat(),
"event_type": "security_event",
"security_event_type": event_type,
"details": details
}
self.logger.info(json.dumps(audit_event))
# Global audit logger
audit_logger = AuditLogger()
Compliance Reporting¶
# Compliance reporting
from datetime import datetime, timedelta
from typing import List, Dict
class ComplianceReporter:
def __init__(self):
self.audit_logger = AuditLogger()
def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Generate compliance report"""
report = {
"report_period": {
"start": start_date.isoformat(),
"end": end_date.isoformat()
},
"user_actions": self.get_user_actions(start_date, end_date),
"system_events": self.get_system_events(start_date, end_date),
"security_events": self.get_security_events(start_date, end_date),
"compliance_summary": self.get_compliance_summary(start_date, end_date)
}
return report
def get_user_actions(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get user actions for compliance report"""
# Query audit logs for user actions
# This would typically query a database or log aggregation system
pass
def get_system_events(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get system events for compliance report"""
# Query audit logs for system events
pass
def get_security_events(self, start_date: datetime, end_date: datetime) -> List[Dict[str, Any]]:
"""Get security events for compliance report"""
# Query audit logs for security events
pass
def get_compliance_summary(self, start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""Get compliance summary"""
return {
"total_events": 0,
"security_incidents": 0,
"compliance_score": 100,
"recommendations": []
}
# Global compliance reporter
compliance_reporter = ComplianceReporter()
2. Data Privacy¶
Data Classification¶
# Data classification
from enum import Enum
from typing import Dict, Any
class DataClassification(Enum):
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
class DataClassifier:
def __init__(self):
self.classification_rules = {
"email": DataClassification.CONFIDENTIAL,
"phone": DataClassification.CONFIDENTIAL,
"ssn": DataClassification.RESTRICTED,
"credit_card": DataClassification.RESTRICTED,
"api_key": DataClassification.RESTRICTED,
"password": DataClassification.RESTRICTED
}
def classify_data(self, data: Dict[str, Any]) -> Dict[str, DataClassification]:
"""Classify data based on content"""
classifications = {}
for key, value in data.items():
classification = DataClassification.INTERNAL # Default
for pattern, data_class in self.classification_rules.items():
if pattern.lower() in key.lower():
classification = data_class
break
classifications[key] = classification
return classifications
def apply_data_protection(self, data: Dict[str, Any], classifications: Dict[str, DataClassification]) -> Dict[str, Any]:
"""Apply data protection based on classification"""
protected_data = {}
for key, value in data.items():
classification = classifications.get(key, DataClassification.INTERNAL)
if classification == DataClassification.RESTRICTED:
# Mask or remove restricted data
protected_data[key] = "***REDACTED***"
elif classification == DataClassification.CONFIDENTIAL:
# Partially mask confidential data
if isinstance(value, str) and len(value) > 4:
protected_data[key] = value[:2] + "***" + value[-2:]
else:
protected_data[key] = "***MASKED***"
else:
protected_data[key] = value
return protected_data
# Global data classifier
data_classifier = DataClassifier()
Security Testing¶
1. Vulnerability Scanning¶
Container Security Scanning¶
# Container security scanning
container_security_scanning:
enabled: true
tools:
- name: "trivy"
image: "aquasec/trivy"
command: "trivy image --exit-code 1 --severity HIGH,CRITICAL"
targets:
- "rag-modulo-backend:latest"
- "rag-modulo-frontend:latest"
- name: "dockle"
image: "goodwithtech/dockle"
command: "dockle --exit-code 1"
targets:
- "rag-modulo-backend:latest"
- "rag-modulo-frontend:latest"
schedule: "0 2 * * *" # Daily at 2 AM
reporting:
- format: "json"
output: "/reports/security-scan.json"
- format: "html"
output: "/reports/security-scan.html"
Application Security Testing¶
# Application security testing
application_security_testing:
enabled: true
tools:
- name: "owasp-zap"
image: "owasp/zap2docker-stable"
command: "zap-baseline.py -t https://backend-app.example.com"
- name: "nikto"
image: "sullo/nikto"
command: "nikto -h https://frontend-app.example.com"
schedule: "0 3 * * *" # Daily at 3 AM
reporting:
- format: "json"
output: "/reports/owasp-scan.json"
- format: "html"
output: "/reports/owasp-scan.html"
2. Penetration Testing¶
Penetration Testing Script¶
#!/bin/bash
# Penetration testing script
set -e
# Configuration
TARGET_URL="$1"
REPORT_DIR="/reports/penetration-test"
DATE=$(date +%Y%m%d_%H%M%S)
if [ -z "$TARGET_URL" ]; then
echo "Usage: $0 <target_url>"
exit 1
fi
# Create report directory
mkdir -p "$REPORT_DIR"
# Run penetration tests
echo "Running penetration tests on $TARGET_URL..."
# SQL injection testing
echo "Testing for SQL injection..."
sqlmap -u "$TARGET_URL/api/search?query=test" --batch --output-dir="$REPORT_DIR/sqlmap"
# XSS testing
echo "Testing for XSS vulnerabilities..."
xsser -u "$TARGET_URL" --output="$REPORT_DIR/xsser.txt"
# Directory traversal testing
echo "Testing for directory traversal..."
dirb "$TARGET_URL" "$REPORT_DIR/dirb.txt"
# SSL/TLS testing
echo "Testing SSL/TLS configuration..."
testssl.sh "$TARGET_URL" > "$REPORT_DIR/testssl.txt"
# Generate summary report
echo "Generating penetration test summary..."
cat > "$REPORT_DIR/summary.txt" << EOF
Penetration Test Summary
=======================
Target: $TARGET_URL
Date: $(date)
Tester: Automated Security Testing
Tests Performed:
- SQL Injection (sqlmap)
- XSS (xsser)
- Directory Traversal (dirb)
- SSL/TLS (testssl.sh)
Reports:
- SQL Injection: $REPORT_DIR/sqlmap/
- XSS: $REPORT_DIR/xsser.txt
- Directory Traversal: $REPORT_DIR/dirb.txt
- SSL/TLS: $REPORT_DIR/testssl.txt
EOF
echo "Penetration testing completed. Reports saved to $REPORT_DIR"
Security Best Practices¶
1. Development Security¶
- Secure Coding: Follow secure coding practices
- Code Review: Security-focused code reviews
- Dependency Management: Regular dependency updates
- Secret Management: Never hardcode secrets
2. Deployment Security¶
- Least Privilege: Use minimal required permissions
- Network Segmentation: Isolate different components
- Regular Updates: Keep all components updated
- Monitoring: Continuous security monitoring
3. Operational Security¶
- Incident Response: Clear incident response procedures
- Regular Audits: Periodic security audits
- Training: Regular security training for team
- Documentation: Maintain security documentation