Building RAG Systems with Vector Databases
Building RAG Systems with Vector Databases
Retrieval-Augmented Generation (RAG) has emerged as one of the most practical patterns for building AI applications that need to work with large knowledge bases. By combining semantic search with large language models, RAG systems can provide accurate, contextual responses while maintaining factual grounding and reducing hallucinations.
In this guide, I’ll walk through building a production-ready RAG system, from vector database selection to embedding strategies and integration patterns that actually work in production.
The RAG Architecture
At its core, a RAG system has three main components:
1. Document Processing Pipeline
Your knowledge base needs to be chunked, embedded, and stored efficiently:
from typing import List, Dict
import hashlib
class DocumentProcessor:
def __init__(self, chunk_size: int = 512, overlap: int = 50):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk_document(self, text: str, metadata: Dict) -> List[Dict]:
"""Split document into overlapping chunks with metadata"""
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
# Generate stable chunk ID
chunk_id = hashlib.sha256(
f"{metadata['source']}:{start}".encode()
).hexdigest()[:16]
chunks.append({
'id': chunk_id,
'text': chunk_text,
'metadata': {
**metadata,
'chunk_start': start,
'chunk_end': end,
'chunk_index': len(chunks)
}
})
start += self.chunk_size - self.overlap
return chunks
2. Vector Database for Semantic Search
The vector database is the heart of your RAG system. Here’s a production setup using Qdrant:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional
class VectorStore:
def __init__(
self,
collection_name: str,
embedding_model: str = "all-MiniLM-L6-v2",
host: str = "localhost",
port: int = 6333
):
self.client = QdrantClient(host=host, port=port)
self.collection_name = collection_name
self.encoder = SentenceTransformer(embedding_model)
self.vector_size = self.encoder.get_sentence_embedding_dimension()
# Create collection if it doesn't exist
self._ensure_collection()
def _ensure_collection(self):
"""Create collection with appropriate vector configuration"""
collections = self.client.get_collections().collections
if not any(c.name == self.collection_name for c in collections):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.vector_size,
distance=Distance.COSINE
)
)
def add_documents(self, chunks: List[Dict]) -> None:
"""Embed and store document chunks"""
texts = [chunk['text'] for chunk in chunks]
embeddings = self.encoder.encode(texts, show_progress_bar=True)
points = [
PointStruct(
id=chunk['id'],
vector=embedding.tolist(),
payload={
'text': chunk['text'],
**chunk['metadata']
}
)
for chunk, embedding in zip(chunks, embeddings)
]
self.client.upsert(
collection_name=self.collection_name,
points=points
)
def search(
self,
query: str,
limit: int = 5,
score_threshold: float = 0.7
) -> List[Dict]:
"""Semantic search for relevant chunks"""
query_vector = self.encoder.encode(query).tolist()
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=limit,
score_threshold=score_threshold
)
return [
{
'id': hit.id,
'score': hit.score,
'text': hit.payload['text'],
'metadata': {
k: v for k, v in hit.payload.items()
if k != 'text'
}
}
for hit in results
]
3. LLM Integration with Context
The final piece ties retrieval to generation:
from anthropic import Anthropic
from typing import List, Dict, Optional
class RAGSystem:
def __init__(
self,
vector_store: VectorStore,
api_key: str,
model: str = "claude-3-5-sonnet-20241022"
):
self.vector_store = vector_store
self.client = Anthropic(api_key=api_key)
self.model = model
def query(
self,
question: str,
num_chunks: int = 5,
temperature: float = 0.3,
max_tokens: int = 2048
) -> Dict:
"""Query the RAG system"""
# Retrieve relevant chunks
chunks = self.vector_store.search(
query=question,
limit=num_chunks
)
if not chunks:
return {
'answer': "I couldn't find relevant information to answer that question.",
'sources': []
}
# Build context from retrieved chunks
context = self._build_context(chunks)
# Generate answer with LLM
prompt = self._build_prompt(question, context)
response = self.client.messages.create(
model=self.model,
max_tokens=max_tokens,
temperature=temperature,
messages=[{
"role": "user",
"content": prompt
}]
)
return {
'answer': response.content[0].text,
'sources': [
{
'source': chunk['metadata'].get('source'),
'score': chunk['score']
}
for chunk in chunks
],
'chunks_used': len(chunks)
}
def _build_context(self, chunks: List[Dict]) -> str:
"""Format retrieved chunks as context"""
context_parts = []
for i, chunk in enumerate(chunks, 1):
source = chunk['metadata'].get('source', 'Unknown')
context_parts.append(
f"[Source {i}: {source}]\n{chunk['text']}\n"
)
return "\n---\n".join(context_parts)
def _build_prompt(self, question: str, context: str) -> str:
"""Build the final prompt with context"""
return f"""You are a helpful assistant that answers questions based on provided context.
Context Information:
{context}
Question: {question}
Instructions:
- Answer the question using ONLY the information from the context above
- If the context doesn't contain enough information, say so clearly
- Cite specific sources when making claims
- Be concise but thorough
- If multiple sources conflict, acknowledge the disagreement
Answer:"""
Choosing the Right Vector Database
The vector database landscape has exploded. Here’s my decision matrix:
For Prototyping: Chroma
import chromadb
from chromadb.config import Settings
client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_data"
))
collection = client.create_collection(
name="knowledge_base",
metadata={"hnsw:space": "cosine"}
)
Pros:
- Zero setup, embedded database
- Great for development
- Simple API
Cons:
- Limited scalability
- No multi-tenancy
- Basic filtering
For Production: Qdrant
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams
client = QdrantClient(
url="https://your-cluster.qdrant.io",
api_key="your-api-key"
)
client.create_collection(
collection_name="production_kb",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE
),
# Enable payload indexing for fast filtering
optimizers_config={
"indexing_threshold": 20000
}
)
Pros:
- Excellent performance at scale
- Rich filtering capabilities
- Great cloud offering
- Active development
Cons:
- More operational complexity
- Costs for cloud version
For Enterprise: Pinecone
Pros:
- Fully managed
- Excellent documentation
- Battle-tested at scale
Cons:
- Most expensive option
- Vendor lock-in
- Less flexible filtering
Embedding Strategies That Matter
Your embedding model choice dramatically impacts RAG quality:
Model Selection
from sentence_transformers import SentenceTransformer
# Lightweight, fast, good for general use
model_small = SentenceTransformer('all-MiniLM-L6-v2') # 384 dimensions
# Better accuracy, more resource intensive
model_medium = SentenceTransformer('all-mpnet-base-v2') # 768 dimensions
# Domain-specific: legal documents
model_legal = SentenceTransformer('nlpaueb/legal-bert-base-uncased')
# Multi-lingual support
model_multilingual = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
Hybrid Search Pattern
Combine semantic and keyword search for best results:
from qdrant_client.models import Filter, FieldCondition, MatchValue
class HybridVectorStore(VectorStore):
def hybrid_search(
self,
query: str,
filters: Optional[Dict] = None,
limit: int = 5
) -> List[Dict]:
"""Combine semantic search with metadata filtering"""
query_vector = self.encoder.encode(query).tolist()
# Build metadata filters
query_filter = None
if filters:
conditions = [
FieldCondition(
key=key,
match=MatchValue(value=value)
)
for key, value in filters.items()
]
query_filter = Filter(must=conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=query_filter,
limit=limit
)
return [self._format_result(hit) for hit in results]
Chunking Strategies
Bad chunking kills RAG performance. Here are battle-tested approaches:
1. Fixed-Size with Overlap
def chunk_by_tokens(text: str, chunk_size: int = 512, overlap: int = 50):
"""Token-aware chunking"""
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(text)
chunks = []
for i in range(0, len(tokens), chunk_size - overlap):
chunk_tokens = tokens[i:i + chunk_size]
chunk_text = enc.decode(chunk_tokens)
chunks.append(chunk_text)
return chunks
2. Semantic Chunking
def chunk_by_semantics(text: str, similarity_threshold: float = 0.5):
"""Group sentences by semantic similarity"""
import nltk
from sentence_transformers import SentenceTransformer, util
# Split into sentences
sentences = nltk.sent_tokenize(text)
# Embed sentences
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
# Compare with previous sentence
similarity = util.cos_sim(embeddings[i-1], embeddings[i]).item()
if similarity > similarity_threshold:
current_chunk.append(sentences[i])
else:
chunks.append(' '.join(current_chunk))
current_chunk = [sentences[i]]
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
3. Document-Structure Aware
def chunk_markdown(text: str) -> List[Dict]:
"""Chunk markdown while preserving structure"""
import re
# Split by headers
sections = re.split(r'\n(#{1,6}\s+.*)\n', text)
chunks = []
current_headers = []
current_content = []
for i, section in enumerate(sections):
if re.match(r'^#{1,6}\s+', section):
# This is a header
if current_content:
chunks.append({
'text': '\n'.join(current_content),
'headers': current_headers.copy(),
'type': 'section'
})
current_content = []
# Update header hierarchy
level = len(re.match(r'^(#+)', section).group(1))
current_headers = current_headers[:level-1]
current_headers.append(section.strip('# ').strip())
else:
current_content.append(section)
if current_content:
chunks.append({
'text': '\n'.join(current_content),
'headers': current_headers,
'type': 'section'
})
return chunks
Production Considerations
1. Caching Strategy
from functools import lru_cache
import hashlib
class CachedRAGSystem(RAGSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._embedding_cache = {}
def _get_cached_embedding(self, text: str):
"""Cache embeddings for frequently used queries"""
text_hash = hashlib.md5(text.encode()).hexdigest()
if text_hash not in self._embedding_cache:
self._embedding_cache[text_hash] = \
self.vector_store.encoder.encode(text)
return self._embedding_cache[text_hash]
2. Monitoring and Observability
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class RAGMetrics:
query_time: float
retrieval_time: float
llm_time: float
chunks_retrieved: int
tokens_used: int
cache_hit: bool
class ObservableRAGSystem(RAGSystem):
def query(self, question: str, **kwargs) -> Dict:
start_time = time.time()
# Track retrieval time
retrieval_start = time.time()
chunks = self.vector_store.search(query=question)
retrieval_time = time.time() - retrieval_start
# Track LLM time
llm_start = time.time()
response = super().query(question, **kwargs)
llm_time = time.time() - llm_start
# Log metrics
metrics = RAGMetrics(
query_time=time.time() - start_time,
retrieval_time=retrieval_time,
llm_time=llm_time,
chunks_retrieved=len(chunks),
tokens_used=self._estimate_tokens(response['answer']),
cache_hit=False
)
self._log_metrics(metrics)
return response
def _log_metrics(self, metrics: RAGMetrics):
"""Send to your observability platform"""
print(f"Query completed in {metrics.query_time:.2f}s")
print(f" Retrieval: {metrics.retrieval_time:.2f}s")
print(f" LLM: {metrics.llm_time:.2f}s")
print(f" Chunks: {metrics.chunks_retrieved}")
print(f" Tokens: {metrics.tokens_used}")
3. Error Handling and Fallbacks
class RobustRAGSystem(RAGSystem):
def query(self, question: str, **kwargs) -> Dict:
try:
# Try primary retrieval
return super().query(question, **kwargs)
except Exception as e:
# Log error
print(f"Primary RAG failed: {e}")
# Fallback to direct LLM
return self._fallback_query(question)
def _fallback_query(self, question: str) -> Dict:
"""Fallback when retrieval fails"""
response = self.client.messages.create(
model=self.model,
max_tokens=1024,
messages=[{
"role": "user",
"content": f"Answer this question: {question}\n\n"
f"Note: I don't have access to my knowledge base, "
f"so I can only provide general information."
}]
)
return {
'answer': response.content[0].text,
'sources': [],
'fallback': True
}
Advanced Patterns
Multi-Query Retrieval
Generate multiple search queries for better recall:
class MultiQueryRAG(RAGSystem):
def _generate_query_variants(self, question: str) -> List[str]:
"""Generate alternative phrasings"""
response = self.client.messages.create(
model="claude-3-5-haiku-20241022", # Faster, cheaper
max_tokens=500,
messages=[{
"role": "user",
"content": f"""Generate 3 alternative phrasings of this question:
{question}
Return only the questions, one per line."""
}]
)
variants = response.content[0].text.strip().split('\n')
return [question] + variants
def query(self, question: str, **kwargs) -> Dict:
"""Search with multiple query variants"""
variants = self._generate_query_variants(question)
all_chunks = []
seen_ids = set()
for variant in variants:
chunks = self.vector_store.search(
query=variant,
limit=3 # Fewer per variant
)
for chunk in chunks:
if chunk['id'] not in seen_ids:
all_chunks.append(chunk)
seen_ids.add(chunk['id'])
# Re-rank by score
all_chunks.sort(key=lambda x: x['score'], reverse=True)
top_chunks = all_chunks[:kwargs.get('num_chunks', 5)]
# Continue with standard flow
context = self._build_context(top_chunks)
return self._generate_answer(question, context, top_chunks)
Re-Ranking for Better Precision
from sentence_transformers import CrossEncoder
class RerankedRAGSystem(RAGSystem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Cross-encoder for re-ranking
self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')
def query(self, question: str, **kwargs) -> Dict:
# Get more candidates
initial_chunks = self.vector_store.search(
query=question,
limit=20 # Over-retrieve
)
# Re-rank with cross-encoder
pairs = [[question, chunk['text']] for chunk in initial_chunks]
scores = self.reranker.predict(pairs)
# Combine scores
for chunk, score in zip(initial_chunks, scores):
chunk['rerank_score'] = score
# Sort by re-rank score
initial_chunks.sort(key=lambda x: x['rerank_score'], reverse=True)
top_chunks = initial_chunks[:kwargs.get('num_chunks', 5)]
# Generate answer
context = self._build_context(top_chunks)
return self._generate_answer(question, context, top_chunks)
Key Takeaways
Building production RAG systems requires:
- Smart chunking - Match your content structure
- Quality embeddings - Choose the right model for your domain
- Hybrid search - Combine semantic and metadata filtering
- Observability - Track metrics at every stage
- Error handling - Graceful degradation when things fail
- Re-ranking - Don’t trust embedding similarity alone
The patterns I’ve shared here power RAG systems handling millions of queries. Start simple with basic retrieval, then layer in sophistication as you learn where your system struggles.
Further Reading
Building AI systems that actually work in production. One vector at a time.