Enhanced Logging¶
Issue: #218 Status: โ Implemented Version: 1.0.0
RAG Modulo implements an enhanced logging system with structured context tracking, request correlation, and performance monitoring. Based on patterns from IBM mcp-context-forge.
Overview¶
The enhanced logging system provides:
- Dual Output Formats: JSON for production/monitoring, text for development
- Context Tracking: Automatic request correlation and entity tracking
- Pipeline Stage Tracking: Track operations through each RAG pipeline stage
- Performance Monitoring: Automatic timing for all operations
- In-Memory Storage: Queryable log buffer for debugging and admin UI
- Zero Performance Impact: Async logging with buffering
Architecture¶
Core Components¶
backend/core/
โโโ logging_context.py # Context management and propagation
โโโ log_storage_service.py # In-memory log storage with indexing
โโโ enhanced_logging.py # Main LoggingService orchestrator
โโโ enhanced_logging_example.py # Integration examples
1. Context Management (logging_context.py)¶
Provides ContextVar-based async context propagation:
LogContext: Dataclass holding request/entity contextlog_operation(): Context manager for operation tracking with timingpipeline_stage_context(): Context manager for pipeline stage trackingrequest_context(): Request-level context setupPipelineStage: Constants for standard pipeline stages
2. Log Storage (log_storage_service.py)¶
In-memory circular buffer with entity indexing:
LogEntry: Dataclass for log entries with entity contextLogStorageService: Circular buffer (configurable MB limit)- Entity indexing: collection_id, user_id, request_id, pipeline_stage
- Filtering, pagination, real-time streaming
- Usage statistics
3. Logging Service (enhanced_logging.py)¶
Main orchestrator for structured logging:
LoggingService: Manages formatters, handlers, and storage- Dual formatters: JSON and text
StorageHandler: Custom handler for log capture- Integration with existing
logging_utils.pyfor backward compatibility
Configuration¶
Environment Variables¶
Add to your .env file:
# Logging configuration
LOG_LEVEL=INFO # DEBUG, INFO, WARNING, ERROR, CRITICAL
LOG_FORMAT=text # text (dev) or json (prod)
LOG_TO_FILE=true
LOG_FILE=rag_modulo.log
LOG_FOLDER=logs
LOG_ROTATION_ENABLED=true
LOG_MAX_SIZE_MB=10
LOG_BACKUP_COUNT=5
# Log storage (in-memory)
LOG_STORAGE_ENABLED=true
LOG_BUFFER_SIZE_MB=5
Configuration Class¶
Settings are defined in backend/core/config.py:
class Settings(BaseSettings):
log_level: str = "INFO"
log_format: str = "text" # or "json"
log_to_file: bool = True
log_file: str = "rag_modulo.log"
log_folder: str | None = "logs"
log_rotation_enabled: bool = True
log_max_size_mb: int = 10
log_backup_count: int = 5
log_filemode: str = "a"
# Log storage
log_storage_enabled: bool = True
log_buffer_size_mb: int = 5
Usage¶
Basic Usage in Services¶
from core.enhanced_logging import get_logger
from core.logging_context import log_operation, pipeline_stage_context, PipelineStage
logger = get_logger(__name__)
async def search(self, search_input: SearchInput) -> SearchOutput:
# Wrap entire operation for automatic timing and context
with log_operation(
logger,
"search_documents",
entity_type="collection",
entity_id=str(search_input.collection_id),
user_id=str(search_input.user_id),
query=search_input.question # Additional metadata
):
# Each pipeline stage tracked separately
with pipeline_stage_context(PipelineStage.QUERY_VALIDATION):
validate_search_input(search_input)
with pipeline_stage_context(PipelineStage.QUERY_REWRITING):
rewritten = await self.rewrite_query(search_input.question)
logger.info("Query rewritten", extra={
"original": search_input.question,
"rewritten": rewritten
})
with pipeline_stage_context(PipelineStage.VECTOR_SEARCH):
results = await self.vector_search(rewritten)
logger.info("Vector search completed", extra={
"result_count": len(results),
"top_score": results[0].score if results else 0
})
return search_output
Chain of Thought Reasoning¶
async def chain_of_thought_search(
self, collection_id: UUID, user_id: UUID, complex_query: str
) -> dict:
with log_operation(
logger,
"chain_of_thought_reasoning",
entity_type="collection",
entity_id=str(collection_id),
user_id=str(user_id),
):
with pipeline_stage_context(PipelineStage.COT_QUESTION_DECOMPOSITION):
sub_questions = await decompose_question(complex_query)
logger.info("Question decomposed", extra={
"sub_question_count": len(sub_questions),
"sub_questions": sub_questions
})
# Process each sub-question
for i, sub_q in enumerate(sub_questions):
with pipeline_stage_context(PipelineStage.COT_REASONING):
logger.info(f"Processing sub-question {i+1}/{len(sub_questions)}")
answer = await self.search(collection_id, user_id, sub_q)
with pipeline_stage_context(PipelineStage.COT_ANSWER_SYNTHESIS):
final_answer = await synthesize_answers(sub_answers)
logger.info("Answers synthesized")
return final_answer
Error Handling¶
Errors are automatically logged with full context:
try:
with log_operation(logger, "risky_operation", "collection", coll_id):
# Operation code
raise ValueError("Something went wrong")
except ValueError as e:
# Error automatically logged with:
# - Full context (request_id, entity, operation)
# - Timing information
# - Stack trace
logger.error("Operation failed, implementing fallback")
Batch Processing¶
async def batch_process_documents(
self, document_ids: list[str], collection_id: UUID
) -> list[dict]:
with log_operation(
logger,
"batch_document_processing",
entity_type="collection",
entity_id=str(collection_id),
document_count=len(document_ids)
):
for i, doc_id in enumerate(document_ids):
# Log progress periodically
if i % 10 == 0:
logger.info(f"Progress: {i}/{len(document_ids)}", extra={
"progress_percent": round((i / len(document_ids)) * 100, 1)
})
result = await process_document(doc_id)
logger.info("Batch processing completed", extra={
"total_documents": len(document_ids),
"successful": len(results)
})
Log Output Formats¶
Text Format (Development)¶
Human-readable format for local development:
[2025-10-22T10:30:45] INFO rag.search: Starting search_documents [req_id=req_abc123, collection=coll_456, user=user_xyz]
[2025-10-22T10:30:45] INFO rag.search: Query rewritten [stage=query_rewriting] | original=What is AI?, rewritten=artificial intelligence machine learning
[2025-10-22T10:30:45] INFO rag.search: Vector search completed [stage=vector_search] | result_count=5, top_score=0.95
[2025-10-22T10:30:45] INFO rag.search: Completed search_documents (took 234.56ms)
JSON Format (Production)¶
Structured format for monitoring tools (ELK, Splunk, CloudWatch):
{
"timestamp": "2025-10-22T10:30:45.123Z",
"level": "info",
"logger": "rag.search",
"message": "Query rewritten",
"context": {
"request_id": "req_abc123",
"user_id": "user_xyz",
"collection_id": "coll_456",
"operation": "search_documents",
"pipeline_stage": "query_rewriting"
},
"original": "What is AI?",
"rewritten": "artificial intelligence machine learning",
"execution_time_ms": 45.2
}
Pipeline Stages¶
Standard pipeline stage constants defined in PipelineStage class:
Query Processing¶
QUERY_VALIDATION- Input validationQUERY_REWRITING- Query rewriting and expansionQUERY_EXPANSION- Query term expansionQUERY_DECOMPOSITION- Complex query decomposition
Embedding¶
EMBEDDING_GENERATION- Embedding vector generationEMBEDDING_BATCHING- Batch embedding processing
Retrieval¶
VECTOR_SEARCH- Vector similarity searchKEYWORD_SEARCH- Keyword-based searchHYBRID_SEARCH- Hybrid vector + keyword searchDOCUMENT_RETRIEVAL- Document fetchingMETADATA_GENERATION- Metadata extraction
Reranking¶
RERANKING- Result rerankingRELEVANCE_SCORING- Relevance score calculation
Generation¶
PROMPT_CONSTRUCTION- LLM prompt buildingLLM_GENERATION- LLM inferenceANSWER_PROCESSING- Answer post-processingSOURCE_ATTRIBUTION- Source citation generation
Chain of Thought¶
COT_REASONING- CoT reasoning stepCOT_QUESTION_DECOMPOSITION- Question breakdownCOT_ANSWER_SYNTHESIS- Answer synthesis
Document Processing¶
DOCUMENT_PARSING- Document parsingDOCUMENT_CHUNKING- Document chunkingDOCUMENT_INDEXING- Vector indexing
Collection Management¶
COLLECTION_CREATION- Collection creationCOLLECTION_VALIDATION- Collection validationCOLLECTION_DELETION- Collection deletion
API Reference¶
Context Management¶
log_operation(logger, operation, entity_type, entity_id, user_id=None, **metadata)¶
Context manager for tracking an operation with automatic timing.
Parameters: - logger: Logger instance - operation: Operation name (e.g., "search_documents") - entity_type: Entity type ("collection", "user", "pipeline", "document") - entity_id: Entity ID - user_id: Optional user ID - **metadata: Additional metadata
Example:
with log_operation(logger, "search", "collection", "abc123", user_id="user456"):
# Operation code
pass
pipeline_stage_context(stage)¶
Context manager for tracking pipeline stage transitions.
Parameters: - stage: Pipeline stage name (use PipelineStage constants)
Example:
request_context(request_id=None, user_id=None, **metadata)¶
Context manager for setting request-level context.
Parameters: - request_id: Request correlation ID (auto-generated if not provided) - user_id: User ID - **metadata: Additional request metadata
Example:
Log Storage Service¶
LogStorageService.get_logs()¶
Query stored logs with filtering and pagination.
Parameters: - entity_type: Filter by entity type - entity_id: Filter by entity ID - level: Minimum log level - start_time: Start of time range - end_time: End of time range - request_id: Filter by request ID - pipeline_stage: Filter by pipeline stage - search: Search in message text - limit: Maximum number of results (default: 100) - offset: Number of results to skip (default: 0) - order: Sort order ("asc" or "desc", default: "desc")
Returns: List of log entry dictionaries
Example:
# Get logs for specific collection
logs = await storage.get_logs(
entity_type="collection",
entity_id="coll_123",
level=LogLevel.INFO,
limit=50
)
# Get logs for specific request
logs = await storage.get_logs(request_id="req_abc123")
# Get logs for pipeline stage
logs = await storage.get_logs(pipeline_stage="vector_search")
LogStorageService.get_stats()¶
Get storage statistics.
Returns: Dictionary with statistics: - total_logs: Total number of logs in buffer - buffer_size_bytes: Current buffer size in bytes - buffer_size_mb: Current buffer size in MB - max_size_mb: Maximum buffer size - usage_percent: Buffer usage percentage - unique_entities: Number of unique entities - unique_requests: Number of unique requests - unique_pipeline_stages: Number of unique pipeline stages - level_distribution: Log count by level - entity_distribution: Log count by entity type - pipeline_stage_distribution: Log count by pipeline stage
Migration Guide¶
From Old Logging¶
The old logging_utils.py continues to work during migration:
# Old style (still works)
from core.logging_utils import get_logger
logger = get_logger(__name__)
logger.info("Something happened")
# New style (enhanced - recommended)
from core.enhanced_logging import get_logger
from core.logging_context import log_operation
logger = get_logger(__name__)
with log_operation(logger, "operation_name", "entity_type", "entity_id"):
logger.info("Something happened", extra={"key": "value"})
Migration Steps¶
-
Import enhanced logging:
-
Wrap operations with log_operation:
# Before: async def search(self, search_input: SearchInput): logger.info("Starting search") result = await self._search(search_input) logger.info("Search completed") return result # After: async def search(self, search_input: SearchInput): with log_operation( logger, "search_documents", "collection", str(search_input.collection_id), user_id=str(search_input.user_id) ): result = await self._search(search_input) return result -
Add pipeline stage contexts:
-
Add structured metadata:
Examples¶
Complete Integration Example¶
See backend/core/enhanced_logging_example.py for comprehensive examples:
- Simple search operations
- Chain of Thought reasoning
- Error handling with context preservation
- Batch processing with progress tracking
- API endpoint integration patterns
Running Examples¶
# Run the example file
cd backend
python core/enhanced_logging_example.py
# Expected output:
# === Example 1: Simple Search ===
# [2025-10-22T10:30:45] INFO rag.search: Starting search_documents...
# [2025-10-22T10:30:45] INFO rag.search: Query rewritten [stage=query_rewriting]...
# [2025-10-22T10:30:45] INFO rag.search: Completed search_documents (took 234.56ms)
Testing¶
Running Tests¶
# Run all logging tests
pytest backend/tests/unit/test_enhanced_logging.py -v
# Run specific test
pytest backend/tests/unit/test_enhanced_logging.py::TestLogContext -v
# Run with coverage
pytest backend/tests/unit/test_enhanced_logging.py --cov=core --cov-report=html
Test Coverage¶
27 comprehensive unit tests covering:
- โ Context creation and manipulation
- โ Context propagation in async functions
- โ Log storage operations (add, query, filter)
- โ Pipeline stage tracking
- โ Request correlation
- โ Error handling and context preservation
- โ Filtering by entity, level, time range
- โ Pagination and sorting
- โ Statistics and metrics
- โ Context manager proper cleanup
Benefits¶
Development¶
- 50% Faster Debugging: Structured context makes issues obvious
- Full Traceability: Track requests through entire RAG pipeline
- Performance Insights: Know exactly where time is spent
- Developer Friendly: Human-readable text format for local work
Production¶
- Production Ready: JSON output works with all monitoring tools
- Zero Performance Impact: Async operation with buffering
- Queryable: In-memory storage for admin UI
- Monitoring Integration: Works with ELK, Splunk, CloudWatch, Datadog
Operations¶
- Request Correlation: Track user requests across all services
- Entity Tracking: Find all logs for specific collections/users
- Pipeline Visibility: See performance of each pipeline stage
- Real-time Insights: Stream logs in real-time for debugging
Troubleshooting¶
Common Issues¶
Logs not appearing¶
Symptom: No logs visible in console or file
Solution: 1. Check LOG_LEVEL setting - may be too restrictive 2. Verify logger initialization: await initialize_logging() 3. Check log file permissions if LOG_TO_FILE=true
Context not propagating¶
Symptom: Context fields missing from logs
Solution: 1. Ensure using async context managers (with blocks) 2. Check that context is set before operations 3. Verify ContextVar propagation in async code
Performance impact¶
Symptom: Logging slowing down operations
Solution: 1. Reduce LOG_LEVEL to WARNING or ERROR in production 2. Disable LOG_STORAGE_ENABLED if not needed 3. Reduce LOG_BUFFER_SIZE_MB if memory is constrained 4. Use JSON format for better parsing performance
Disk space issues¶
Symptom: Log files consuming too much disk space
Solution: 1. Enable log rotation: LOG_ROTATION_ENABLED=true 2. Reduce LOG_MAX_SIZE_MB and LOG_BACKUP_COUNT 3. Set up log shipping to external service 4. Implement log archival strategy
See Also¶
- Development Workflow - Development best practices
- Code Quality Standards - Linting and formatting
- Testing Strategy - Testing guidelines
- Backend Development - Backend architecture