Data Flow¶
This document describes the data flow through RAG Modulo, from user requests to responses, covering the complete lifecycle of search queries, document processing, and authentication.
Search Request Flow¶
Overview¶
RAG Modulo uses a 6-stage pipeline architecture for processing search queries:
1. Pipeline Resolution โ 2. Query Enhancement โ 3. Retrieval
โ 4. Reranking โ 5. Reasoning (CoT) โ 6. Generation
Detailed Flow¶
โโโโโโโโโโโโโโโ
โ User โ
โ (Frontend) โ
โโโโโโโโฌโโโโโโโ
โ POST /api/search { question, collection_id }
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API Gateway (FastAPI) โ
โ - JWT Middleware validates token โ
โ - Extract user_id from token (not client input) โ
โ - Inject dependencies (SearchService, db) โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SearchRouter (search_router.py) โ
โ - Validate SearchInput schema โ
โ - Override user_id from JWT token (security) โ
โ - Call SearchService.search() โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SearchService (search_service.py) โ
โ โ
โ Stage 1: Pipeline Resolution โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Get user's default pipeline โ โ
โ โ - Create pipeline if none exists โ โ
โ โ - Load pipeline configuration โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Stage 2: Query Enhancement โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Analyze query complexity โ โ
โ โ - Rewrite query for better retrieval โ โ
โ โ - Add conversation context if available โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Stage 3: Retrieval โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Generate embeddings for query โ โ
โ โ - Search vector database (Milvus) โ โ
โ โ - Retrieve top_k documents โ โ
โ โ - Filter by collection_id โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Stage 4: Reranking โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Apply cross-encoder reranking โ โ
โ โ - Score documents for relevance โ โ
โ โ - Reorder results by score โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Stage 5: Reasoning (Chain of Thought) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Detect if complex question โ โ
โ โ - Decompose into sub-questions โ โ
โ โ - Execute iterative reasoning โ โ
โ โ - Accumulate context across steps โ โ
โ โ - Apply quality scoring (0.0-1.0) โ โ
โ โ - Retry if quality < 0.6 (up to 3x) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ Stage 6: Generation โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Build context from retrieved docs โ โ
โ โ - Load prompt template โ โ
โ โ - Call LLM provider (WatsonX/OpenAI/etc) โ โ
โ โ - Parse structured output (XML/JSON) โ โ
โ โ - Extract answer and thinking โ โ
โ โ - Track token usage โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SearchOutput Response โ
โ { โ
โ answer: "Generated answer...", โ
โ documents: [{...}, {...}], โ
โ query_results: [...], โ
โ rewritten_query: "Enhanced query", โ
โ cot_output: { โ
โ reasoning_steps: [...], โ
โ quality_score: 0.85 โ
โ }, โ
โ token_warning: {...}, โ
โ execution_time: 2.45 โ
โ } โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโ
โ Frontend โ
โ - Display โ
โ - Sources โ
โ - CoT Steps โ
โโโโโโโโโโโโโโโโ
Request Processing Details¶
Stage 1: Pipeline Resolution¶
async def _resolve_user_default_pipeline(self, user_id: UUID4) -> Pipeline:
# Get user's LLM provider configuration
provider = await self.llm_provider_service.get_user_provider(user_id)
# Get or create default pipeline
pipeline = await self.pipeline_service.get_user_default_pipeline(
user_id=user_id,
provider=provider
)
# Create pipeline if none exists
if not pipeline:
pipeline = await self.pipeline_service.create_default_pipeline(
user_id=user_id,
provider=provider
)
return pipeline
Stage 2: Query Enhancement¶
async def _enhance_query(self, query: str, context: SearchContext) -> str:
# Check if query needs enhancement
if len(query.split()) < 5:
return query # Short queries don't need enhancement
# Use query rewriter
rewriter = await self.pipeline_service.get_query_rewriter(
context.user_id
)
# Enhance with conversation context
conversation_history = await self._get_conversation_history(
context.conversation_id
)
enhanced = await rewriter.rewrite(
query=query,
conversation_history=conversation_history
)
return enhanced
Stage 3: Retrieval¶
async def _retrieve_documents(
self,
query: str,
collection_id: UUID4,
top_k: int = 10
) -> list[QueryResult]:
# Generate embeddings
embeddings = await self.embedding_service.embed_text(query)
# Search vector database
results = await self.vector_store.search(
collection_name=str(collection_id),
query_vector=embeddings,
top_k=top_k,
filters={"collection_id": str(collection_id)}
)
return results
Stage 5: Chain of Thought Reasoning¶
async def _apply_chain_of_thought(
self,
question: str,
documents: list[Document],
context: SearchContext
) -> ChainOfThoughtOutput:
# 1. Classify question
classification = await self.cot_service.classify_question(question)
if not classification.requires_cot:
return None # Skip CoT for simple questions
# 2. Decompose question
decomposition = await self.cot_service.decompose_question(
question=question,
max_depth=3
)
# 3. Execute reasoning steps
reasoning_steps = []
accumulated_context = []
for sub_question in decomposition.sub_questions:
# Search for each sub-question
sub_result = await self._retrieve_documents(
query=sub_question,
collection_id=context.collection_id
)
# Build reasoning with accumulated context
reasoning = await self._build_reasoning_step(
sub_question=sub_question,
documents=sub_result,
accumulated_context=accumulated_context
)
# Quality check with retry logic
if reasoning.quality_score < 0.6:
reasoning = await self._retry_reasoning(
sub_question,
documents,
max_retries=3
)
reasoning_steps.append(reasoning)
accumulated_context.append(reasoning.reasoning)
# 4. Synthesize final answer
final_answer = await self.cot_service.synthesize(
original_question=question,
reasoning_steps=reasoning_steps
)
return ChainOfThoughtOutput(
reasoning_steps=reasoning_steps,
final_answer=final_answer,
quality_score=final_answer.quality_score
)
Stage 6: Generation¶
async def _generate_answer(
self,
question: str,
documents: list[Document],
cot_output: ChainOfThoughtOutput | None,
context: SearchContext
) -> str:
# Build context from documents
context_text = self._build_context(documents)
# Add CoT reasoning if available
if cot_output:
context_text += f"\n\nReasoning:\n{cot_output.final_answer}"
# Load prompt template
template = await self.prompt_service.get_template(
name="rag_generation",
user_id=context.user_id
)
# Format prompt
prompt = template.format(
question=question,
context=context_text
)
# Call LLM provider
provider = await self.llm_provider_factory.get_provider(
provider_name=context.provider_name,
model_id=context.model_id
)
# Generate with structured output
response = await provider.generate_response(
prompt=prompt,
max_tokens=1024,
temperature=0.7
)
# Parse structured output (XML tags)
parsed = self._parse_llm_response(response)
# Track token usage
await self.token_service.track_usage(
user_id=context.user_id,
tokens=parsed.token_count
)
return parsed.answer
Document Processing Flow¶
Upload and Processing Pipeline¶
โโโโโโโโโโโโโโโ
โ User โ
โ Uploads โ
โ Document โ
โโโโโโโโฌโโโโโโโ
โ POST /api/files/upload
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FileRouter (file_router.py) โ
โ - Validate file type and size โ
โ - Create Collection if needed โ
โ - Store file in MinIO โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ FileManagementService (file_service.py) โ
โ - Save file to object storage (MinIO) โ
โ - Create File record in database โ
โ - Set status = PROCESSING โ
โ - Trigger document processing โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ DocumentProcessor (document_processor.py) โ
โ โ
โ 1. Detect file format (.pdf, .docx, etc) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Use file extension โ โ
โ โ - Select appropriate processor โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ 2. Extract content using Docling โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ PDF: Text + tables + OCR images โ โ
โ โ DOCX: Structured document parsing โ โ
โ โ PPTX: Slide content extraction โ โ
โ โ XLSX: Sheet and table processing โ โ
โ โ Images: OCR text extraction โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ 3. Generate chunks (hierarchical strategy) โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Split into semantic chunks โ โ
โ โ - Maintain context across chunks โ โ
โ โ - Add metadata (page, section, etc) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ 4. Generate embeddings โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Call embedding model โ โ
โ โ - Create vector representations โ โ
โ โ - Batch processing for efficiency โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ โ
โ 5. Store in vector database โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ - Create collection if needed โ โ
โ โ - Insert vectors with metadata โ โ
โ โ - Index for fast retrieval โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Update File Status โ
โ - Set status = COMPLETED โ
โ - Update collection status โ
โ - Generate suggested questions โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโ
โ Notify โ
โ Frontend โ
โ (WebSocket) โ
โโโโโโโโโโโโโโโโ
Document Processing Code Example¶
async def process_document(
self,
file_path: str,
collection_id: UUID4,
user_id: UUID4
) -> ProcessingResult:
# 1. Detect file type
file_type = self._detect_file_type(file_path)
# 2. Get appropriate processor
processor = self._get_processor(file_type)
# 3. Extract content
documents = []
async for doc in processor.process(file_path, collection_id):
documents.append(doc)
# 4. Generate embeddings
for doc in documents:
doc.embedding = await self.embedding_service.embed_text(doc.content)
# 5. Store in vector database
await self.vector_store.insert(
collection_name=str(collection_id),
documents=documents
)
# 6. Update status
await self.file_repository.update_status(
file_id=file_id,
status=FileStatus.COMPLETED
)
return ProcessingResult(
success=True,
chunks_created=len(documents),
collection_id=collection_id
)
Authentication Flow¶
OIDC Authentication with IBM Cloud Identity¶
โโโโโโโโโโโโโโโ
โ User โ
โ (Browser) โ
โโโโโโโโฌโโโโโโโ
โ Click "Login"
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Frontend (React) โ
โ - Redirect to OIDC provider โ
โ - URL: /api/auth/login โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ IBM Cloud Identity (OIDC Provider) โ
โ - User enters credentials โ
โ - Two-factor authentication (if enabled) โ
โ - Generate authorization code โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Redirect to callback URL
โ with authorization code
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ AuthRouter (auth_router.py) โ
โ - Exchange code for tokens โ
โ - Validate ID token signature โ
โ - Extract user info (sub, email, name) โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ UserService (user_service.py) โ
โ - Find or create user in database โ
โ - Update user profile โ
โ - Initialize user defaults (pipeline, provider) โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Generate JWT Token โ
โ - Sign token with JWT_SECRET_KEY โ
โ - Include: user_id, email, name, role โ
โ - Set expiration (24 hours) โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Return to Frontend โ
โ - Store JWT in localStorage โ
โ - Include in all API requests โ
โ - Header: Authorization: Bearer <token> โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Subsequent API Requests¶
โโโโโโโโโโโโโโโ
โ User โ
โ (Frontend) โ
โโโโโโโโฌโโโโโโโ
โ API Request + JWT Token
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Authentication Middleware โ
โ 1. Extract token from Authorization header โ
โ 2. Verify JWT signature โ
โ 3. Check expiration โ
โ 4. Validate issuer and audience โ
โ 5. Extract user claims โ
โ 6. Add user to request.state.user โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Router Endpoint โ
โ - Use get_current_user() dependency โ
โ - Access user_id from JWT (never client input) โ
โ - Verify resource ownership โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Conversation Flow¶
Chat History Management¶
โโโโโโโโโโโโโโโ
โ User โ
โ Sends โ
โ Message โ
โโโโโโโโฌโโโโโโโ
โ POST /api/conversations/message
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ ConversationRouter (conversation_router.py) โ
โ - Get or create conversation session โ
โ - Store user message โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SearchService (search_service.py) โ
โ - Load conversation history โ
โ - Add context to query enhancement โ
โ - Execute search pipeline โ
โ - Generate answer โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Store Assistant Response โ
โ - Save message to database โ
โ - Update conversation summary โ
โ - Track token usage โ
โโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโ
โ Return to โ
โ Frontend โ
โ (WebSocket) โ
โโโโโโโโโโโโโโโโ
Data Persistence¶
Database Operations¶
PostgreSQL stores all metadata: - User accounts and profiles - Collection metadata - File metadata and status - Conversation history - Pipeline configurations - LLM provider settings - Token usage tracking
Milvus stores vector embeddings: - Document chunk embeddings - Collection-based isolation - Efficient similarity search - Automatic indexing
MinIO stores binary files: - Uploaded documents - Generated podcasts - Temporary processing files - Model artifacts
Related Documentation¶
- Components - System architecture
- Security - Authentication details
- Performance - Optimization strategies