Building Production RAG Pipelines: The Complete Guide
Retrieval-Augmented Generation (RAG) has become the dominant pattern for building LLM applications that need access to private, current, or domain-specific knowledge. This guide is a comprehensive, code-first walkthrough of every component in a production RAG pipeline - from document ingestion and chunking strategies through vector databases, retrieval, reranking, generation, evaluation, and advanced patterns like Graph RAG and Corrective RAG. Every code example is real, working Python you can copy into your project today.
1. What is RAG?
Retrieval-Augmented Generation (RAG) is an architecture pattern that enhances Large Language Model outputs by retrieving relevant information from external knowledge sources and injecting it into the prompt context before generation. Instead of relying solely on the model's parametric knowledge (what it learned during training), RAG grounds responses in actual documents, databases, or APIs - dramatically reducing hallucinations and enabling access to private or up-to-date information.
The term was coined by Patrick Lewis et al. in their 2020 paper "Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks", but the pattern has evolved far beyond the original formulation. In 2026, RAG is the default architecture for enterprise LLM applications, powering everything from customer support chatbots to legal research tools to internal knowledge bases.
Why RAG Beats Fine-Tuning for Most Use Cases
The first question teams face when building LLM applications with domain-specific knowledge is: should we fine-tune or use RAG? For the vast majority of use cases, RAG wins:
| Factor | RAG | Fine-Tuning |
|---|---|---|
| Data freshness | Update documents anytime, instant effect | Requires retraining to incorporate new data |
| Cost | Embedding cost is one-time; retrieval is cheap | GPU hours for training; repeated for updates |
| Hallucination control | Grounded in retrieved documents; auditable | Model may still hallucinate confidently |
| Transparency | Can cite exact source documents | Knowledge is opaque, baked into weights |
| Setup complexity | Moderate - vector DB + retrieval pipeline | High - data curation, training infra, eval |
| Best for | Knowledge retrieval, Q&A, search, support | Style/tone adaptation, specialized reasoning |
Rule of thumb: Use RAG when you need the model to know specific facts. Use fine-tuning when you need the model to behave differently (output format, tone, domain-specific reasoning patterns). Use both together for the best results in complex applications.
The RAG Pipeline
Every RAG system follows the same fundamental pipeline, regardless of complexity:
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
β Ingest βββββΆβ Chunk βββββΆβ Embed βββββΆβ Store βββββΆβ Retrieve βββββΆβ Generate β
β β β β β β β β β β β β
β PDFs, β β Split β β Convert β β Vector β β Semantic β β LLM + β
β Web, DB β β into β β text to β β database β β search + β β context β
β Markdown β β segments β β vectors β β (index) β β rerank β β = answer β
ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ
Each stage has multiple implementation options with different tradeoffs. The rest of this guide covers each stage in depth.
Naive RAG vs Advanced RAG vs Modular RAG
The RAG ecosystem has evolved into three distinct paradigms:
Naive RAG is the simplest implementation: chunk documents, embed them, store in a vector database, retrieve top-k similar chunks for a query, and pass them to an LLM. This works surprisingly well for simple use cases but suffers from several limitations - poor retrieval precision, no handling of multi-hop questions, chunk boundary issues, and no quality control on retrieved context.
Advanced RAG adds pre-retrieval and post-retrieval optimizations to the naive pipeline. Pre-retrieval improvements include query rewriting, HyDE (Hypothetical Document Embeddings), and query decomposition. Post-retrieval improvements include reranking, contextual compression, and filtering. Advanced RAG also uses better chunking strategies (semantic chunking, parent-child relationships) and hybrid search (combining dense and sparse retrieval).
Modular RAG breaks the pipeline into composable, independently optimizable modules. Each module (routing, retrieval, reranking, generation, evaluation) can be swapped, chained, or run in parallel. This enables patterns like adaptive retrieval (deciding whether to retrieve at all), iterative retrieval (multiple retrieval rounds), and self-reflective RAG (the model evaluates its own output and re-retrieves if needed). Frameworks like LangGraph and LlamaIndex Workflows make modular RAG practical.
| Paradigm | Complexity | Quality | Best For |
|---|---|---|---|
| Naive RAG | Low | Baseline | Prototypes, simple Q&A, internal tools |
| Advanced RAG | Medium | High | Production apps, customer-facing products |
| Modular RAG | High | Highest | Complex multi-step reasoning, enterprise |
2. Document Ingestion
The quality of your RAG system is bounded by the quality of your ingestion pipeline. Garbage in, garbage out - no amount of clever retrieval can compensate for poorly extracted text. Document ingestion involves loading raw content from various sources and converting it into clean, structured text that can be chunked and embedded.
Loading PDFs with PyMuPDF
PDFs are the most common document format in enterprise RAG systems and also the most problematic. PDF is a presentation format, not a content format - text extraction requires reconstructing reading order from positioned glyphs. PyMuPDF (fitz) is the fastest and most reliable Python PDF library:
import fitz # PyMuPDF
def load_pdf(path: str) -> list[str]:
"""Extract text from each page of a PDF."""
doc = fitz.open(path)
return [page.get_text() for page in doc]
# Usage
pages = load_pdf("technical_manual.pdf")
print(f"Loaded {len(pages)} pages")
print(pages[0][:500]) # Preview first page
For PDFs with complex layouts (tables, multi-column, headers/footers), use PyMuPDF's advanced extraction:
import fitz
def load_pdf_structured(path: str) -> list[dict]:
"""Extract text with metadata from PDF pages."""
doc = fitz.open(path)
results = []
for i, page in enumerate(doc):
# Extract text blocks with position info
blocks = page.get_text("dict")["blocks"]
text_blocks = []
for block in blocks:
if block["type"] == 0: # Text block
for line in block["lines"]:
text = " ".join(span["text"] for span in line["spans"])
text_blocks.append(text)
results.append({
"page": i + 1,
"text": "\n".join(text_blocks),
"metadata": {
"source": path,
"page": i + 1,
"total_pages": len(doc)
}
})
return results
Loading Web Pages with BeautifulSoup
import requests
from bs4 import BeautifulSoup
def load_webpage(url: str) -> str:
"""Extract main text content from a web page."""
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove script, style, and nav elements
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
# Extract text and clean whitespace
text = soup.get_text(separator="\n", strip=True)
lines = [line.strip() for line in text.splitlines() if line.strip()]
return "\n".join(lines)
# Usage
content = load_webpage("https://docs.python.org/3/tutorial/index.html")
Loading Markdown, CSV, and Databases
import csv
import pathlib
import sqlite3
def load_markdown(path: str) -> str:
"""Load a Markdown file as plain text."""
return pathlib.Path(path).read_text(encoding="utf-8")
def load_csv(path: str, text_columns: list[str]) -> list[str]:
"""Load specific columns from a CSV as text chunks."""
documents = []
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
text = " | ".join(f"{col}: {row[col]}" for col in text_columns)
documents.append(text)
return documents
def load_from_database(db_path: str, query: str) -> list[str]:
"""Load text from a SQLite database."""
conn = sqlite3.connect(db_path)
cursor = conn.execute(query)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
conn.close()
return [
" | ".join(f"{col}: {val}" for col, val in zip(columns, row))
for row in rows
]
LangChain Document Loaders
LangChain provides a unified interface for 160+ document sources. Each loader returns a list of Document objects with page_content and metadata fields:
from langchain_community.document_loaders import (
PyMuPDFLoader,
WebBaseLoader,
CSVLoader,
DirectoryLoader,
UnstructuredMarkdownLoader,
NotionDirectoryLoader,
ConfluenceLoader,
)
# PDF
pdf_docs = PyMuPDFLoader("report.pdf").load()
# Web page
web_docs = WebBaseLoader("https://example.com/docs").load()
# CSV with specific column
csv_docs = CSVLoader("data.csv", source_column="url").load()
# Entire directory of mixed file types
dir_docs = DirectoryLoader(
"./knowledge_base/",
glob="**/*.*",
show_progress=True,
use_multithreading=True,
).load()
# Notion export
notion_docs = NotionDirectoryLoader("./notion_export/").load()
print(f"Loaded {len(dir_docs)} documents")
for doc in dir_docs[:3]:
print(f" Source: {doc.metadata['source']}")
print(f" Length: {len(doc.page_content)} chars")
Pro tip: Always preserve metadata during ingestion. Source file path, page number, section heading, and timestamp are critical for citation, filtering, and debugging retrieval issues in production.
3. Chunking Strategies
Chunking is arguably the most impactful and underappreciated stage of the RAG pipeline. How you split documents into chunks directly determines retrieval quality. Too large and you dilute relevant information with noise. Too small and you lose context. The wrong splitting strategy can break sentences mid-thought or separate a heading from its content.
Fixed-Size Chunking
The simplest approach: split text into chunks of exactly N characters (or tokens) with optional overlap. Fast and predictable, but completely unaware of document structure:
def fixed_size_chunks(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split text into fixed-size chunks with overlap."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
# Usage
chunks = fixed_size_chunks(document_text, chunk_size=512, overlap=50)
Recursive Character Splitting
The most popular strategy in production. Recursively splits on a hierarchy of separators (\n\n β \n β . β ) to respect document structure while staying within size limits:
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len,
)
chunks = splitter.split_text(document_text)
print(f"Created {len(chunks)} chunks")
print(f"Average chunk size: {sum(len(c) for c in chunks) / len(chunks):.0f} chars")
# With LangChain Documents (preserves metadata)
from langchain_text_splitters import RecursiveCharacterTextSplitter
doc_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=50,
)
split_docs = doc_splitter.split_documents(documents)
Semantic Chunking
Uses embedding similarity to find natural breakpoints in text. Adjacent sentences with low similarity indicate a topic shift - that's where we split:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings
semantic_splitter = SemanticChunker(
OpenAIEmbeddings(model="text-embedding-3-small"),
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=75, # Split at 75th percentile dissimilarity
)
semantic_chunks = semantic_splitter.split_text(document_text)
Semantic chunking produces higher-quality chunks but is significantly slower and more expensive (requires embedding every sentence). Use it for high-value documents where retrieval quality is critical.
Document-Structure-Aware Chunking
For Markdown, HTML, or other structured documents, split on structural boundaries (headings, sections) rather than arbitrary character counts:
from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
strip_headers=False,
)
md_chunks = md_splitter.split_text(markdown_text)
for chunk in md_chunks[:3]:
print(f"Headers: {chunk.metadata}")
print(f"Content: {chunk.page_content[:100]}...")
print()
Chunking Strategy Comparison
| Strategy | Best For | Chunk Size | Overlap | Pros | Cons |
|---|---|---|---|---|---|
| Fixed-size | Homogeneous text, logs | 256-1024 chars | 10-20% | Simple, fast, predictable | Breaks mid-sentence, ignores structure |
| Recursive character | General-purpose (default choice) | 512-1000 chars | 50-100 chars | Respects paragraphs/sentences, reliable | Still character-based, not semantic |
| Semantic | High-value docs, research papers | Variable | None (natural breaks) | Topic-coherent chunks, best quality | Slow, expensive (embedding calls), variable sizes |
| Document-structure | Markdown, HTML, code docs | Per-section | None | Preserves headings/hierarchy as metadata | Requires structured input, sections may be too large |
Chunk size vs retrieval quality: Smaller chunks (256-512 chars) improve retrieval precision - the retrieved chunk is more likely to be exactly relevant. Larger chunks (1000-2000 chars) improve context - the LLM gets more surrounding information. The sweet spot for most applications is 512-1000 characters with 50-100 character overlap. Always benchmark with your actual data.
4. Embeddings
Embeddings are dense vector representations of text that capture semantic meaning in a high-dimensional space. Two pieces of text with similar meaning will have vectors that are close together (high cosine similarity), even if they share no words in common. Embeddings are the bridge between human-readable text and machine-searchable vector space - they're what makes semantic search possible.
When you embed the chunk "The mitochondria is the powerhouse of the cell" and the query "What organelle produces energy?", the resulting vectors will be close in embedding space despite having zero word overlap. This is the fundamental advantage over keyword search.
OpenAI Embeddings
from openai import OpenAI
client = OpenAI() # Uses OPENAI_API_KEY env var
def embed_openai(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""Generate embeddings using OpenAI's API."""
response = client.embeddings.create(input=texts, model=model)
return [item.embedding for item in response.data]
# Embed chunks
chunk_embeddings = embed_openai(chunks)
print(f"Embedding dimensions: {len(chunk_embeddings[0])}") # 1536 for small, 3072 for large
# Embed a query
query_embedding = embed_openai(["How does RAG work?"])[0]
Cohere Embed v3
import cohere
co = cohere.Client() # Uses CO_API_KEY env var
# Cohere distinguishes between document and query embeddings
doc_embeddings = co.embed(
texts=chunks,
model="embed-english-v3.0",
input_type="search_document",
).embeddings
query_embedding = co.embed(
texts=["How does RAG work?"],
model="embed-english-v3.0",
input_type="search_query",
).embeddings[0]
Open-Source Embeddings with Sentence Transformers
For full control, privacy, and zero API costs, open-source embedding models are excellent. The BGE and Nomic families consistently rank at the top of the MTEB leaderboard:
from sentence_transformers import SentenceTransformer
import numpy as np
# Load model (downloads on first use, ~1.3GB for BGE-large)
model = SentenceTransformer("BAAI/bge-large-en-v1.5")
# Embed documents - BGE models recommend prepending "Represent this sentence: "
# for documents, but it's optional and the impact is small
embeddings = model.encode(chunks, show_progress_bar=True, batch_size=32)
print(f"Shape: {embeddings.shape}") # (num_chunks, 1024)
# Embed a query - BGE recommends prepending "Represent this sentence for searching: "
query_embedding = model.encode(["How does RAG work?"])
# Compute cosine similarity
from numpy.linalg import norm
similarities = np.dot(embeddings, query_embedding.T).flatten() / (
norm(embeddings, axis=1) * norm(query_embedding)
)
top_indices = np.argsort(similarities)[::-1][:5]
for idx in top_indices:
print(f" Score: {similarities[idx]:.4f} | {chunks[idx][:80]}...")
Other excellent open-source options:
# Nomic Embed - strong quality, 768 dimensions, Apache 2.0 license
model = SentenceTransformer("nomic-ai/nomic-embed-text-v1.5", trust_remote_code=True)
# GTE-large - strong multilingual support
model = SentenceTransformer("thenlper/gte-large")
# E5-mistral - instruction-tuned, very high quality
model = SentenceTransformer("intfloat/e5-mistral-7b-instruct")
Embedding Model Comparison
| Model | Dimensions | MTEB Score | Speed | Cost |
|---|---|---|---|---|
| text-embedding-3-small (OpenAI) | 1536 | 62.3 | Fast (API) | $0.02 / 1M tokens |
| text-embedding-3-large (OpenAI) | 3072 | 64.6 | Fast (API) | $0.13 / 1M tokens |
| embed-english-v3.0 (Cohere) | 1024 | 64.5 | Fast (API) | $0.10 / 1M tokens |
| BAAI/bge-large-en-v1.5 | 1024 | 63.6 | ~500 docs/sec (GPU) | Free (self-hosted) |
| nomic-embed-text-v1.5 | 768 | 62.3 | ~800 docs/sec (GPU) | Free (Apache 2.0) |
| e5-mistral-7b-instruct | 4096 | 66.6 | ~50 docs/sec (GPU) | Free (self-hosted, needs 16GB+ VRAM) |
Choosing an embedding model: For most teams, text-embedding-3-small is the best starting point - cheap, fast, and good quality. If you need to keep data on-premise or want zero marginal cost, bge-large-en-v1.5 or nomic-embed-text-v1.5 are excellent. For maximum quality and you have GPU budget, e5-mistral-7b-instruct leads the benchmarks.
5. Vector Databases
Vector databases are purpose-built storage systems optimized for storing, indexing, and querying high-dimensional vectors. They use approximate nearest neighbor (ANN) algorithms - HNSW, IVF, ScaNN - to find similar vectors in milliseconds, even across millions of documents. Choosing the right vector database depends on your scale, infrastructure preferences, and whether you need hybrid search.
ChromaDB - Local, Easy, Great for Prototyping
ChromaDB is the easiest vector database to get started with. It runs in-process, requires no external services, and handles embedding automatically if you configure a model:
import chromadb
# Persistent storage (survives restarts)
client = chromadb.PersistentClient(path="./chroma_db")
# Create a collection with cosine similarity
collection = client.create_collection(
name="docs",
metadata={"hnsw:space": "cosine"}
)
# Add documents - ChromaDB can auto-embed if you configure an embedding function
collection.add(
documents=chunks,
ids=[f"doc_{i}" for i in range(len(chunks))],
metadatas=[{"source": "manual.pdf", "page": i // 5} for i in range(len(chunks))],
)
# Query
results = collection.query(
query_texts=["How does RAG work?"],
n_results=5,
where={"source": "manual.pdf"}, # Optional metadata filter
)
for doc, score in zip(results["documents"][0], results["distances"][0]):
print(f"Score: {score:.4f} | {doc[:80]}...")
# Update a document
collection.update(
ids=["doc_0"],
documents=["Updated content for document 0"],
metadatas=[{"source": "manual_v2.pdf", "page": 1}],
)
# Delete documents
collection.delete(ids=["doc_0", "doc_1"])
# Delete by metadata filter
collection.delete(where={"source": "outdated.pdf"})
pgvector - PostgreSQL Extension
If you already run PostgreSQL, pgvector adds vector similarity search without introducing a new database. This is the pragmatic choice for teams that want to keep their stack simple:
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.orm import declarative_base, Session
from pgvector.sqlalchemy import Vector
Base = declarative_base()
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True)
content = Column(Text, nullable=False)
source = Column(String(500))
embedding = Column(Vector(1024)) # Match your embedding dimensions
# Connect and create tables
engine = create_engine("postgresql://user:pass@localhost:5432/ragdb")
Base.metadata.create_all(engine)
# Insert documents with embeddings
with Session(engine) as session:
for i, (chunk, emb) in enumerate(zip(chunks, embeddings)):
doc = Document(content=chunk, source="manual.pdf", embedding=emb.tolist())
session.add(doc)
session.commit()
# Similarity search using cosine distance
from sqlalchemy import text
with Session(engine) as session:
query_emb = query_embedding.tolist()
results = session.execute(
text("""
SELECT content, source,
1 - (embedding <=> :query_emb) AS similarity
FROM documents
ORDER BY embedding <=> :query_emb
LIMIT 5
"""),
{"query_emb": str(query_emb)},
).fetchall()
for row in results:
print(f"Score: {row.similarity:.4f} | {row.content[:80]}...")
Create an HNSW index for fast approximate search:
-- Create HNSW index for cosine distance
CREATE INDEX ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Set search parameters at query time
SET hnsw.ef_search = 100;
Pinecone - Managed, Scalable
from pinecone import Pinecone, ServerlessSpec
pc = Pinecone() # Uses PINECONE_API_KEY env var
# Create index
pc.create_index(
name="rag-docs",
dimension=1024,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
index = pc.Index("rag-docs")
# Upsert vectors
vectors = [
{"id": f"doc_{i}", "values": emb.tolist(), "metadata": {"text": chunk, "source": "manual.pdf"}}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
]
index.upsert(vectors=vectors, batch_size=100)
# Query
results = index.query(vector=query_embedding.tolist(), top_k=5, include_metadata=True)
for match in results.matches:
print(f"Score: {match.score:.4f} | {match.metadata['text'][:80]}...")
Vector Database Comparison
| Database | Type | Best For | Scaling | Hybrid Search | Managed Option |
|---|---|---|---|---|---|
| ChromaDB | Embedded | Prototyping, small-medium datasets | Single node | No | No |
| Pinecone | Managed SaaS | Production, zero-ops teams | Automatic | Yes (sparse-dense) | Yes (only option) |
| Weaviate | Self-hosted / Cloud | Hybrid search, multi-modal | Horizontal | Yes (BM25 + vector) | Yes |
| Qdrant | Self-hosted / Cloud | High performance, filtering | Horizontal | Yes (sparse vectors) | Yes |
| pgvector | PostgreSQL extension | Existing Postgres users, simplicity | Vertical (Postgres limits) | Yes (with pg_trgm/tsvector) | Yes (any managed Postgres) |
| Milvus | Self-hosted / Cloud | Massive scale (billions of vectors) | Horizontal (distributed) | Yes | Yes (Zilliz Cloud) |
Recommendation: Start with ChromaDB for prototyping. Move to pgvector if you already use PostgreSQL and have <10M vectors. Use Pinecone or Qdrant Cloud for production workloads where you want managed infrastructure. Use Milvus or Weaviate for billion-scale deployments.
6. Retrieval Strategies
Retrieval is where most RAG pipelines succeed or fail. The naive approach - embed the query, find the top-k most similar chunks - works for simple cases but breaks down with complex queries, ambiguous phrasing, or when relevant information is spread across multiple chunks. Advanced retrieval strategies dramatically improve both precision (are the retrieved chunks relevant?) and recall (did we find all the relevant chunks?).
Naive Similarity Search
The baseline: embed the query, compute cosine similarity against all document embeddings, return the top-k results:
import numpy as np
def naive_search(query_embedding, doc_embeddings, chunks, k=5):
"""Simple cosine similarity search."""
similarities = np.dot(doc_embeddings, query_embedding) / (
np.linalg.norm(doc_embeddings, axis=1) * np.linalg.norm(query_embedding)
)
top_k = np.argsort(similarities)[::-1][:k]
return [(chunks[i], similarities[i]) for i in top_k]
Maximum Marginal Relevance (MMR)
MMR balances relevance with diversity. Instead of returning the 5 most similar chunks (which may all say the same thing), MMR iteratively selects chunks that are both relevant to the query AND different from already-selected chunks:
from langchain_community.vectorstores import Chroma
vectorstore = Chroma(
collection_name="docs",
persist_directory="./chroma_db",
embedding_function=embedding_model,
)
# MMR search - lambda_mult controls relevance vs diversity tradeoff
# 1.0 = pure relevance, 0.0 = pure diversity
results = vectorstore.max_marginal_relevance_search(
query="How does RAG work?",
k=5,
fetch_k=20, # Fetch 20 candidates, then select 5 diverse ones
lambda_mult=0.7, # 70% relevance, 30% diversity
)
Hybrid Search (Dense + Sparse/BM25)
Combines semantic search (dense embeddings) with keyword search (BM25/sparse). Dense search excels at understanding meaning; sparse search excels at exact term matching. Together, they cover each other's weaknesses:
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
from langchain_community.vectorstores import Chroma
# Dense retriever (semantic)
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
# Sparse retriever (keyword/BM25)
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 10
# Ensemble - weighted combination
hybrid_retriever = EnsembleRetriever(
retrievers=[dense_retriever, bm25_retriever],
weights=[0.6, 0.4], # 60% semantic, 40% keyword
)
results = hybrid_retriever.invoke("What is the maximum token limit for GPT-4?")
Multi-Query Retrieval
A single query may not capture all aspects of the user's information need. Multi-query retrieval uses an LLM to generate multiple reformulations of the original query, retrieves for each, and merges the results:
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)
multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
llm=llm,
)
# This generates 3 query variations, retrieves for each, and deduplicates
results = multi_query_retriever.invoke("How do I optimize RAG pipeline latency?")
# Internally generates queries like:
# - "What are techniques to reduce RAG response time?"
# - "How to speed up retrieval augmented generation?"
# - "RAG pipeline performance optimization methods"
Contextual Compression
Retrieved chunks often contain irrelevant information alongside the relevant parts. Contextual compression uses an LLM to extract only the relevant portions from each retrieved chunk:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
)
# Returns compressed chunks - only the relevant sentences
results = compression_retriever.invoke("What embedding model should I use?")
Parent Document Retriever
A clever strategy that solves the chunk-size dilemma: embed small chunks for precise retrieval, but return the parent (larger) chunk to the LLM for more context:
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Small chunks for retrieval (high precision)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
# Large chunks for context (high recall)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
store = InMemoryStore()
parent_retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Index documents (creates both parent and child chunks)
parent_retriever.add_documents(documents)
# Query returns parent chunks (1000 chars) matched via child chunks (200 chars)
results = parent_retriever.invoke("How does RAG work?")
7. Reranking
Retrieval is a two-stage problem: recall (find all potentially relevant documents) and precision (rank the truly relevant ones at the top). Embedding-based retrieval is excellent at recall but mediocre at precision - it uses a bi-encoder that embeds query and document independently, so it can't model fine-grained query-document interactions. Reranking adds a second stage that uses a cross-encoder to jointly process the query and each candidate document, producing much more accurate relevance scores.
The typical pattern: retrieve 20-50 candidates with fast vector search, then rerank to select the top 5. This gives you the speed of bi-encoders with the accuracy of cross-encoders.
Cross-Encoder Reranking
from sentence_transformers import CrossEncoder
import numpy as np
# Load cross-encoder model (~80MB)
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
query = "How does RAG reduce hallucinations?"
retrieved_docs = [doc.page_content for doc in initial_results] # 20 candidates
# Score each query-document pair
pairs = [(query, doc) for doc in retrieved_docs]
scores = reranker.predict(pairs)
# Sort by reranker score (descending)
ranked_indices = np.argsort(scores)[::-1]
top_docs = [retrieved_docs[i] for i in ranked_indices[:5]]
top_scores = [scores[i] for i in ranked_indices[:5]]
for score, doc in zip(top_scores, top_docs):
print(f"Score: {score:.4f} | {doc[:80]}...")
Cohere Rerank API
Cohere's Rerank API is the easiest way to add reranking without self-hosting a model. It consistently outperforms open-source cross-encoders:
import cohere
co = cohere.Client() # Uses CO_API_KEY env var
results = co.rerank(
query="How does RAG reduce hallucinations?",
documents=retrieved_docs,
model="rerank-english-v3.0",
top_n=5,
)
for result in results.results:
print(f"Score: {result.relevance_score:.4f} | Index: {result.index}")
print(f" {retrieved_docs[result.index][:80]}...")
ColBERT - Late Interaction Reranking
ColBERT uses a "late interaction" mechanism - it generates token-level embeddings for both query and document, then computes fine-grained similarity via MaxSim. This gives cross-encoder-level quality with much better efficiency because document embeddings can be precomputed:
from ragatouille import RAGPretrainedModel
# Load ColBERT model
colbert = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")
# Index documents (precomputes token embeddings)
colbert.index(
collection=chunks,
index_name="my_index",
split_documents=False,
)
# Search with ColBERT reranking built-in
results = colbert.search(query="How does RAG reduce hallucinations?", k=5)
for result in results:
print(f"Score: {result['score']:.4f} | {result['content'][:80]}...")
When to use reranking: Always, in production. The quality improvement is substantial (10-30% better precision) and the latency cost is small (50-200ms for 20 candidates). Start with Cohere Rerank for simplicity, or cross-encoder/ms-marco-MiniLM-L-6-v2 for self-hosted. Use ColBERT when you need both quality and speed at scale.
8. Generation
Generation is the final stage: taking the retrieved (and reranked) context and producing a grounded, accurate answer. The quality of generation depends heavily on prompt engineering - how you structure the context, what instructions you give the model, and how you handle edge cases like insufficient context.
The RAG Prompt Template
A well-designed RAG prompt has four components: system instructions, retrieved context, the user's question, and output formatting guidelines:
RAG_SYSTEM_PROMPT = """You are a helpful assistant that answers questions based on the provided context.
RULES:
1. Answer ONLY based on the provided context. Do not use prior knowledge.
2. If the context does not contain enough information to answer, say "I don't have enough information to answer this question based on the available documents."
3. Cite your sources by referencing [Source N] after each claim.
4. Be concise and direct. Do not repeat the question.
5. If the context contains conflicting information, acknowledge the conflict and present both perspectives.
"""
RAG_USER_PROMPT = """CONTEXT:
{context}
QUESTION: {question}
Answer the question based on the context above. Cite sources using [Source N] notation."""
def format_context(retrieved_docs: list[dict]) -> str:
"""Format retrieved documents with source numbers."""
formatted = []
for i, doc in enumerate(retrieved_docs, 1):
source = doc.get("source", "Unknown")
formatted.append(f"[Source {i}] (from {source}):\n{doc['content']}")
return "\n\n".join(formatted)
Complete Generation with Streaming
from openai import OpenAI
client = OpenAI()
def generate_answer(question: str, retrieved_docs: list[dict], stream: bool = True):
"""Generate a RAG answer with optional streaming."""
context = format_context(retrieved_docs)
messages = [
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": RAG_USER_PROMPT.format(
context=context, question=question
)},
]
if stream:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.1, # Low temperature for factual accuracy
max_tokens=1024,
stream=True,
)
full_response = ""
for chunk in response:
if chunk.choices[0].delta.content:
token = chunk.choices[0].delta.content
print(token, end="", flush=True)
full_response += token
print()
return full_response
else:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0.1,
max_tokens=1024,
)
return response.choices[0].message.content
Handling No Context
A critical production concern: what happens when retrieval returns nothing relevant? Naive systems will hallucinate an answer. Robust systems detect this and respond appropriately:
def should_answer(retrieved_docs: list[dict], threshold: float = 0.3) -> bool:
"""Check if retrieved documents are relevant enough to answer."""
if not retrieved_docs:
return False
# Check if the best retrieval score meets the threshold
best_score = max(doc.get("score", 0) for doc in retrieved_docs)
return best_score >= threshold
def generate_with_fallback(question: str, retrieved_docs: list[dict]):
"""Generate answer with graceful fallback for low-confidence retrieval."""
if not should_answer(retrieved_docs):
return (
"I don't have enough relevant information in my knowledge base to "
"answer this question confidently. Could you rephrase your question, "
"or let me know which specific documents I should search?"
)
return generate_answer(question, retrieved_docs, stream=False)
Anti-Hallucination Techniques
Beyond prompt engineering, several techniques reduce hallucination in RAG systems:
- Low temperature (0.0-0.2): Reduces creative generation, keeps answers grounded in context.
- Citation enforcement: Require the model to cite [Source N] for every claim. Claims without citations are likely hallucinated.
- Confidence scoring: Ask the model to rate its confidence (1-5) based on context support. Filter or flag low-confidence answers.
- Faithfulness checking: Use a second LLM call to verify that every claim in the answer is supported by the retrieved context.
- Retrieval score thresholds: Don't generate if the best retrieval score is below a threshold - admit ignorance instead.
9. Complete Production Pipeline
Here's a complete, end-to-end RAG pipeline class that combines all the components we've covered. This is production-ready code you can use as a starting point for your own system:
"""Complete production RAG pipeline combining ingestion, retrieval, and generation."""
import fitz
import chromadb
import numpy as np
from sentence_transformers import SentenceTransformer, CrossEncoder
from langchain_text_splitters import RecursiveCharacterTextSplitter
from openai import OpenAI
class RAGPipeline:
"""End-to-end RAG pipeline with ingestion, retrieval, reranking, and generation."""
def __init__(
self,
collection_name: str = "documents",
embed_model: str = "BAAI/bge-large-en-v1.5",
rerank_model: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
llm_model: str = "gpt-4o",
chroma_path: str = "./chroma_db",
chunk_size: int = 512,
chunk_overlap: int = 50,
):
self.llm_model = llm_model
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize components
self.embedder = SentenceTransformer(embed_model)
self.reranker = CrossEncoder(rerank_model)
self.llm = OpenAI()
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, chunk_overlap=chunk_overlap
)
# Initialize vector store
self.client = chromadb.PersistentClient(path=chroma_path)
self.collection = self.client.get_or_create_collection(
name=collection_name, metadata={"hnsw:space": "cosine"}
)
def ingest(self, file_path: str) -> int:
"""Ingest a PDF file: extract text, chunk, embed, and store."""
# Extract text
doc = fitz.open(file_path)
pages = [page.get_text() for page in doc]
full_text = "\n\n".join(pages)
# Chunk
chunks = self.splitter.split_text(full_text)
# Embed
embeddings = self.embedder.encode(chunks, show_progress_bar=True).tolist()
# Store with metadata
existing = self.collection.count()
ids = [f"doc_{existing + i}" for i in range(len(chunks))]
metadatas = [{"source": file_path, "chunk_index": i} for i in range(len(chunks))]
self.collection.add(
documents=chunks, embeddings=embeddings, ids=ids, metadatas=metadatas
)
return len(chunks)
def query(self, question: str, top_k: int = 5, rerank_top_n: int = 20) -> dict:
"""Query the pipeline: retrieve, rerank, and generate an answer."""
# Embed query
query_embedding = self.embedder.encode([question]).tolist()
# Retrieve candidates
results = self.collection.query(
query_embeddings=query_embedding, n_results=rerank_top_n
)
candidates = results["documents"][0]
metadatas = results["metadatas"][0]
if not candidates:
return {"answer": "No relevant documents found.", "sources": []}
# Rerank
pairs = [(question, doc) for doc in candidates]
scores = self.reranker.predict(pairs)
ranked = sorted(
zip(scores, candidates, metadatas), key=lambda x: x[0], reverse=True
)
top_docs = ranked[:top_k]
# Format context
context_parts = []
sources = []
for i, (score, text, meta) in enumerate(top_docs, 1):
context_parts.append(f"[Source {i}] (score: {score:.3f}):\n{text}")
sources.append({"source": meta["source"], "score": float(score), "text": text})
context = "\n\n".join(context_parts)
# Generate
response = self.llm.chat.completions.create(
model=self.llm_model,
messages=[
{"role": "system", "content": (
"Answer based ONLY on the provided context. "
"Cite sources as [Source N]. If the context is insufficient, say so."
)},
{"role": "user", "content": f"CONTEXT:\n{context}\n\nQUESTION: {question}"},
],
temperature=0.1,
max_tokens=1024,
)
return {
"answer": response.choices[0].message.content,
"sources": sources,
"tokens_used": response.usage.total_tokens,
}
def evaluate(self, test_cases: list[dict]) -> dict:
"""Evaluate pipeline on test cases with question/expected_answer pairs."""
results = {"total": len(test_cases), "scores": []}
for case in test_cases:
result = self.query(case["question"])
# Simple keyword overlap score (use RAGAS for production eval)
expected_words = set(case["expected_answer"].lower().split())
answer_words = set(result["answer"].lower().split())
overlap = len(expected_words & answer_words) / max(len(expected_words), 1)
results["scores"].append({
"question": case["question"],
"overlap_score": round(overlap, 3),
"tokens_used": result["tokens_used"],
})
results["avg_score"] = round(
sum(s["overlap_score"] for s in results["scores"]) / len(results["scores"]), 3
)
return results
# Usage
if __name__ == "__main__":
rag = RAGPipeline()
# Ingest documents
num_chunks = rag.ingest("technical_manual.pdf")
print(f"Ingested {num_chunks} chunks")
# Query
result = rag.query("How does the authentication system work?")
print(f"\nAnswer: {result['answer']}")
print(f"\nSources ({len(result['sources'])}):")
for src in result["sources"]:
print(f" {src['source']} (score: {src['score']:.3f})")
# Evaluate
test_cases = [
{"question": "What is the default timeout?", "expected_answer": "The default timeout is 30 seconds."},
{"question": "How do I reset my password?", "expected_answer": "Navigate to settings and click reset password."},
]
eval_results = rag.evaluate(test_cases)
print(f"\nEvaluation: avg_score={eval_results['avg_score']}")
10. Evaluation
You can't improve what you can't measure. RAG evaluation is uniquely challenging because you need to assess both the retrieval quality (did we find the right documents?) and the generation quality (did the LLM produce a faithful, relevant answer?). The RAGAS (Retrieval Augmented Generation Assessment) framework has emerged as the standard for RAG evaluation, providing automated metrics that correlate well with human judgment.
RAGAS Metrics
RAGAS defines four core metrics that together give a comprehensive picture of RAG quality:
- Faithfulness: Does the answer contain only information supported by the retrieved context? Measures hallucination. Score 0-1, higher is better.
- Answer Relevancy: Is the answer relevant to the question? Penalizes incomplete or off-topic answers. Score 0-1.
- Context Precision: Are the relevant documents ranked higher than irrelevant ones? Measures retrieval ranking quality. Score 0-1.
- Context Recall: Were all the relevant documents retrieved? Requires ground-truth answers to compute. Score 0-1.
Running RAGAS Evaluation
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import Dataset
# Prepare evaluation dataset
eval_data = {
"question": [
"What is the default timeout for API requests?",
"How do I configure authentication?",
"What databases are supported?",
],
"answer": [
"The default timeout is 30 seconds, configurable via the TIMEOUT env var [Source 1].",
"Authentication is configured via OAuth2. Set CLIENT_ID and CLIENT_SECRET [Source 1].",
"PostgreSQL, MySQL, and SQLite are supported out of the box [Source 2].",
],
"contexts": [
["The API timeout defaults to 30 seconds. Set TIMEOUT=60 for longer requests."],
["Configure OAuth2 by setting CLIENT_ID and CLIENT_SECRET environment variables."],
["Supported databases: PostgreSQL, MySQL, SQLite. MongoDB support is experimental."],
],
"ground_truth": [
"The default timeout is 30 seconds.",
"Authentication uses OAuth2 with CLIENT_ID and CLIENT_SECRET.",
"PostgreSQL, MySQL, and SQLite are supported.",
],
}
dataset = Dataset.from_dict(eval_data)
# Run evaluation
results = evaluate(
dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
)
print(results)
# {'faithfulness': 0.95, 'answer_relevancy': 0.92,
# 'context_precision': 0.88, 'context_recall': 0.90}
# Per-question breakdown
df = results.to_pandas()
print(df[["question", "faithfulness", "answer_relevancy"]].to_string())
Building an Evaluation Dataset
The hardest part of RAG evaluation is creating the test dataset. Here's a practical approach using an LLM to generate question-answer pairs from your documents:
from openai import OpenAI
client = OpenAI()
def generate_eval_pairs(chunks: list[str], n_per_chunk: int = 2) -> list[dict]:
"""Generate question-answer pairs from document chunks for evaluation."""
eval_pairs = []
for chunk in chunks[:50]: # Limit to avoid excessive API calls
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Generate {n_per_chunk} question-answer pairs from this text.
Format as JSON array: [{{"question": "...", "answer": "...", "context": "..."}}]
TEXT:
{chunk}""",
}],
temperature=0.7,
response_format={"type": "json_object"},
)
import json
pairs = json.loads(response.choices[0].message.content)
if "pairs" in pairs:
eval_pairs.extend(pairs["pairs"])
elif isinstance(pairs, list):
eval_pairs.extend(pairs)
return eval_pairs
# Generate and save evaluation dataset
eval_pairs = generate_eval_pairs(chunks)
print(f"Generated {len(eval_pairs)} evaluation pairs")
Evaluation cadence: Run RAGAS evaluation after every change to your chunking strategy, embedding model, retrieval parameters, or prompt template. Track metrics over time in a dashboard. Set alerts for faithfulness drops below 0.85 - that indicates a hallucination regression.
11. Advanced Patterns
Beyond the standard retrieve-and-generate pipeline, several advanced patterns have emerged that significantly improve RAG quality for complex use cases. These patterns address specific failure modes: ambiguous queries, multi-hop reasoning, unreliable retrieval, and multi-modal content.
HyDE - Hypothetical Document Embeddings
The core insight: a hypothetical answer to a question is more semantically similar to the actual answer than the question itself. HyDE uses the LLM to generate a hypothetical answer, embeds that, and uses it for retrieval:
from openai import OpenAI
client = OpenAI()
def hyde_query(question: str, embedder, vectorstore, k: int = 5):
"""HyDE: Generate hypothetical answer, embed it, retrieve with it."""
# Step 1: Generate hypothetical answer
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"Write a short paragraph answering: {question}",
}],
temperature=0.7,
max_tokens=200,
)
hypothetical_answer = response.choices[0].message.content
# Step 2: Embed the hypothetical answer (not the question)
hyde_embedding = embedder.encode([hypothetical_answer])
# Step 3: Retrieve using the hypothetical answer embedding
results = vectorstore.query(query_embeddings=hyde_embedding.tolist(), n_results=k)
return results
Step-Back Prompting
For specific questions, step back to a more general question first, retrieve for both, and combine the context:
def step_back_query(question: str, llm, retriever):
"""Generate a broader step-back question for better retrieval."""
response = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Given the question: '{question}'\n"
"Generate a more general step-back question that would help "
"retrieve broader context. Return only the question."
),
}],
temperature=0.3,
)
step_back_q = response.choices[0].message.content
# Retrieve for both original and step-back questions
original_docs = retriever.invoke(question)
stepback_docs = retriever.invoke(step_back_q)
# Combine and deduplicate
all_docs = {doc.page_content: doc for doc in original_docs + stepback_docs}
return list(all_docs.values())
Self-RAG - Self-Reflective Retrieval
Self-RAG adds reflection tokens that let the model decide: (1) whether to retrieve at all, (2) whether retrieved documents are relevant, and (3) whether the generated answer is supported by the context:
def self_rag_query(question: str, retriever, llm):
"""Self-RAG: Retrieve, assess relevance, generate, verify faithfulness."""
# Step 1: Decide if retrieval is needed
needs_retrieval = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Does this question require external knowledge to answer?\n"
f"Question: {question}\nAnswer YES or NO only."
),
}],
).choices[0].message.content.strip().upper()
if "NO" in needs_retrieval:
# Generate without retrieval
return generate_answer(question, [])
# Step 2: Retrieve and filter relevant documents
docs = retriever.invoke(question)
relevant_docs = []
for doc in docs:
is_relevant = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Is this document relevant to the question?\n"
f"Question: {question}\nDocument: {doc.page_content[:500]}\n"
f"Answer YES or NO only."
),
}],
).choices[0].message.content.strip().upper()
if "YES" in is_relevant:
relevant_docs.append(doc)
# Step 3: Generate answer
answer = generate_answer(question, relevant_docs)
# Step 4: Verify faithfulness
is_faithful = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Is this answer fully supported by the context?\n"
f"Answer: {answer}\nContext: {format_context(relevant_docs)}\n"
f"Answer YES or NO only."
),
}],
).choices[0].message.content.strip().upper()
if "NO" in is_faithful:
return "I cannot provide a reliable answer based on the available documents."
return answer
Corrective RAG (CRAG)
CRAG evaluates retrieval quality and falls back to web search when the knowledge base doesn't have good answers:
def corrective_rag(question: str, retriever, web_search_fn, llm, threshold: float = 0.5):
"""CRAG: Evaluate retrieval confidence, fall back to web search if needed."""
docs = retriever.invoke(question)
# Assess retrieval quality
assessment = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Rate how well these documents answer the question (0.0-1.0).\n"
f"Question: {question}\n"
f"Documents: {[d.page_content[:200] for d in docs[:3]]}\n"
f"Return only a number."
),
}],
).choices[0].message.content.strip()
confidence = float(assessment)
if confidence >= threshold:
return generate_answer(question, docs) # Knowledge base is sufficient
else:
# Fall back to web search
web_results = web_search_fn(question)
combined = docs + web_results
return generate_answer(question, combined)
Graph RAG - Knowledge Graphs + RAG
Graph RAG builds a knowledge graph from documents and uses graph traversal alongside vector search for retrieval. This excels at multi-hop reasoning and relationship queries:
from neo4j import GraphDatabase
import networkx as nx
def build_knowledge_graph(chunks: list[str], llm) -> nx.DiGraph:
"""Extract entities and relationships from chunks to build a knowledge graph."""
graph = nx.DiGraph()
for chunk in chunks:
# Use LLM to extract (subject, predicate, object) triples
response = llm.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": (
f"Extract entity-relationship triples from this text.\n"
f"Format: subject | predicate | object (one per line)\n\n{chunk}"
),
}],
)
for line in response.choices[0].message.content.strip().split("\n"):
parts = [p.strip() for p in line.split("|")]
if len(parts) == 3:
graph.add_edge(parts[0], parts[2], relation=parts[1])
return graph
def graph_rag_query(question: str, graph: nx.DiGraph, vectorstore, llm):
"""Combine graph traversal with vector search for multi-hop retrieval."""
# Vector search for initial entities
vector_results = vectorstore.query(query_texts=[question], n_results=5)
# Extract mentioned entities and traverse graph for related context
entities = extract_entities(question, llm) # LLM-based entity extraction
graph_context = []
for entity in entities:
if entity in graph:
neighbors = list(graph.neighbors(entity))[:5]
for neighbor in neighbors:
edge = graph[entity][neighbor]
graph_context.append(f"{entity} {edge['relation']} {neighbor}")
# Combine vector and graph context
combined_context = vector_results["documents"][0] + graph_context
return generate_answer(question, combined_context)
Multi-Modal RAG
Multi-modal RAG extends the pipeline to handle images, tables, and diagrams alongside text. Use vision-language models to describe visual content, then embed and retrieve those descriptions:
import base64
from openai import OpenAI
client = OpenAI()
def describe_image(image_path: str) -> str:
"""Use a vision model to generate a text description of an image."""
with open(image_path, "rb") as f:
image_b64 = base64.b64encode(f.read()).decode()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "Describe this image in detail for a knowledge base."},
{"type": "image_url", "image_url": {
"url": f"data:image/png;base64,{image_b64}"
}},
],
}],
max_tokens=500,
)
return response.choices[0].message.content
def ingest_multimodal(pdf_path: str, vectorstore, embedder):
"""Ingest both text and images from a PDF."""
doc = fitz.open(pdf_path)
for page in doc:
# Extract text
text = page.get_text()
if text.strip():
embedding = embedder.encode([text]).tolist()
vectorstore.add(documents=[text], embeddings=embedding, ids=[f"text_{page.number}"])
# Extract images
for img_index, img in enumerate(page.get_images(full=True)):
xref = img[0]
pix = fitz.Pixmap(doc, xref)
img_path = f"/tmp/page{page.number}_img{img_index}.png"
pix.save(img_path)
description = describe_image(img_path)
embedding = embedder.encode([description]).tolist()
vectorstore.add(
documents=[description],
embeddings=embedding,
ids=[f"img_{page.number}_{img_index}"],
metadatas=[{"type": "image", "source": pdf_path}],
)
12. Production Checklist
Shipping a RAG system to production requires more than a working pipeline. Here's a comprehensive checklist covering the operational concerns that separate a demo from a reliable product.
Monitoring
- Retrieval metrics: Track retrieval latency (p50, p95, p99), number of results returned, average similarity scores, and empty-result rate. Alert on score degradation.
- Generation metrics: Track LLM latency, token usage (input/output), cost per query, and error rates. Monitor for rate limiting and timeouts.
- Quality metrics: Run RAGAS evaluation on a sample of production queries weekly. Track faithfulness and answer relevancy trends.
- User feedback: Implement thumbs up/down on answers. This is the most valuable signal. Log the query, retrieved context, and answer for every negative feedback.
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger("rag_pipeline")
@dataclass
class QueryMetrics:
query: str
retrieval_latency_ms: float
generation_latency_ms: float
num_results: int
top_score: float
tokens_used: int
total_latency_ms: float
def query_with_metrics(pipeline, question: str) -> tuple[dict, QueryMetrics]:
"""Wrap pipeline query with comprehensive metrics collection."""
start = time.perf_counter()
# Retrieval phase
t0 = time.perf_counter()
results = pipeline.collection.query(
query_embeddings=pipeline.embedder.encode([question]).tolist(),
n_results=20,
)
retrieval_ms = (time.perf_counter() - t0) * 1000
# Generation phase
t0 = time.perf_counter()
answer = pipeline.query(question)
generation_ms = (time.perf_counter() - t0) * 1000
total_ms = (time.perf_counter() - start) * 1000
metrics = QueryMetrics(
query=question,
retrieval_latency_ms=round(retrieval_ms, 1),
generation_latency_ms=round(generation_ms, 1),
num_results=len(results["documents"][0]),
top_score=max(results["distances"][0]) if results["distances"][0] else 0,
tokens_used=answer.get("tokens_used", 0),
total_latency_ms=round(total_ms, 1),
)
logger.info(f"Query metrics: {metrics}")
return answer, metrics
Caching
RAG queries are expensive (embedding + vector search + LLM call). Caching identical or semantically similar queries saves cost and latency:
import hashlib
import json
from functools import lru_cache
class SemanticCache:
"""Cache RAG responses with semantic similarity matching."""
def __init__(self, embedder, similarity_threshold: float = 0.95):
self.embedder = embedder
self.threshold = similarity_threshold
self.cache = {} # {hash: {"embedding": [...], "response": {...}}}
def _hash_query(self, query: str) -> str:
return hashlib.sha256(query.encode()).hexdigest()
def get(self, query: str):
"""Check cache for exact or semantically similar query."""
# Exact match
key = self._hash_query(query)
if key in self.cache:
return self.cache[key]["response"]
# Semantic match
query_emb = self.embedder.encode([query])[0]
for entry in self.cache.values():
similarity = float(
np.dot(query_emb, entry["embedding"])
/ (np.linalg.norm(query_emb) * np.linalg.norm(entry["embedding"]))
)
if similarity >= self.threshold:
return entry["response"]
return None
def set(self, query: str, response: dict):
key = self._hash_query(query)
self.cache[key] = {
"embedding": self.embedder.encode([query])[0].tolist(),
"response": response,
}
Rate Limiting
import time
from collections import deque
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, max_requests: int = 100, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self.requests = deque()
def acquire(self) -> bool:
"""Check if a request is allowed. Returns True if allowed."""
now = time.time()
# Remove expired entries
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
def wait_and_acquire(self):
"""Block until a request is allowed."""
while not self.acquire():
time.sleep(0.1)
Cost Optimization
- Embedding caching: Cache embeddings for frequently queried terms. Don't re-embed the same query twice.
- Model tiering: Use
gpt-4o-minifor simple queries,gpt-4ofor complex ones. Route based on query complexity. - Batch embedding: Embed documents in batches of 100+ rather than one at a time. Most APIs charge per request, not per token.
- Context window management: Don't stuff the entire top-20 into the prompt. Rerank to top-3-5 and use contextual compression to minimize tokens.
- Open-source embeddings: Switch from OpenAI embeddings to
bge-largeornomic-embedfor zero marginal embedding cost.
A/B Testing Retrieval Strategies
- Run two retrieval configurations in parallel (e.g., cosine vs hybrid search, chunk_size=512 vs 1000).
- Route 50% of traffic to each variant.
- Compare RAGAS metrics and user feedback between variants.
- Use statistical significance testing (chi-squared for thumbs up/down, t-test for continuous metrics) before declaring a winner.
Handling Document Updates
- Incremental ingestion: Track document hashes. Only re-embed documents that have changed since the last ingestion run.
- Versioning: Store document version in metadata. When a document is updated, add new chunks and mark old ones as deprecated. Delete deprecated chunks after a grace period.
- Scheduled re-indexing: For rapidly changing data sources (wikis, Confluence), run a nightly job that diffs the source against the vector store and updates accordingly.
- TTL (Time-to-Live): For time-sensitive content (news, pricing), set a TTL on chunks and automatically expire them.
import hashlib
from datetime import datetime, timedelta
def incremental_ingest(pipeline, file_path: str, doc_hashes: dict) -> int:
"""Only re-ingest documents that have changed."""
with open(file_path, "rb") as f:
current_hash = hashlib.sha256(f.read()).hexdigest()
if doc_hashes.get(file_path) == current_hash:
print(f"Skipping {file_path} (unchanged)")
return 0
# Delete old chunks for this document
pipeline.collection.delete(where={"source": file_path})
# Re-ingest
num_chunks = pipeline.ingest(file_path)
doc_hashes[file_path] = current_hash
print(f"Re-ingested {file_path}: {num_chunks} chunks")
return num_chunks
The Complete Production Checklist
| Category | Item | Priority |
|---|---|---|
| Retrieval | Hybrid search (dense + BM25) | High |
| Retrieval | Reranking (cross-encoder or Cohere) | High |
| Retrieval | Metadata filtering (source, date, type) | Medium |
| Quality | RAGAS evaluation pipeline | High |
| Quality | User feedback collection (thumbs up/down) | High |
| Quality | Faithfulness monitoring and alerts | High |
| Performance | Semantic caching for repeated queries | Medium |
| Performance | Rate limiting on LLM API calls | High |
| Performance | Streaming responses for UX | Medium |
| Cost | Model tiering (cheap model for simple queries) | Medium |
| Cost | Open-source embeddings to eliminate API costs | Medium |
| Ops | Incremental document ingestion | High |
| Ops | Latency monitoring (p50/p95/p99) | High |
| Ops | A/B testing framework for retrieval strategies | Low |
| Security | Input sanitization (prompt injection defense) | High |
| Security | Document-level access control in retrieval | High |
RAG is not a "set it and forget it" system. Treat it like any production service: monitor, measure, iterate. The teams that invest in evaluation and observability consistently build the best RAG applications.