Skip to main content

Building RAG Systems with Vector Databases

Ryan Dahlberg
Ryan Dahlberg
September 15, 2025 10 min read
Share:
Building RAG Systems with Vector Databases

Building RAG Systems with Vector Databases

Retrieval-Augmented Generation (RAG) has emerged as one of the most practical patterns for building AI applications that need to work with large knowledge bases. By combining semantic search with large language models, RAG systems can provide accurate, contextual responses while maintaining factual grounding and reducing hallucinations.

In this guide, I’ll walk through building a production-ready RAG system, from vector database selection to embedding strategies and integration patterns that actually work in production.

The RAG Architecture

At its core, a RAG system has three main components:

1. Document Processing Pipeline

Your knowledge base needs to be chunked, embedded, and stored efficiently:

from typing import List, Dict
import hashlib

class DocumentProcessor:
    def __init__(self, chunk_size: int = 512, overlap: int = 50):
        self.chunk_size = chunk_size
        self.overlap = overlap

    def chunk_document(self, text: str, metadata: Dict) -> List[Dict]:
        """Split document into overlapping chunks with metadata"""
        chunks = []
        start = 0

        while start < len(text):
            end = start + self.chunk_size
            chunk_text = text[start:end]

            # Generate stable chunk ID
            chunk_id = hashlib.sha256(
                f"{metadata['source']}:{start}".encode()
            ).hexdigest()[:16]

            chunks.append({
                'id': chunk_id,
                'text': chunk_text,
                'metadata': {
                    **metadata,
                    'chunk_start': start,
                    'chunk_end': end,
                    'chunk_index': len(chunks)
                }
            })

            start += self.chunk_size - self.overlap

        return chunks

The vector database is the heart of your RAG system. Here’s a production setup using Qdrant:

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Optional

class VectorStore:
    def __init__(
        self,
        collection_name: str,
        embedding_model: str = "all-MiniLM-L6-v2",
        host: str = "localhost",
        port: int = 6333
    ):
        self.client = QdrantClient(host=host, port=port)
        self.collection_name = collection_name
        self.encoder = SentenceTransformer(embedding_model)
        self.vector_size = self.encoder.get_sentence_embedding_dimension()

        # Create collection if it doesn't exist
        self._ensure_collection()

    def _ensure_collection(self):
        """Create collection with appropriate vector configuration"""
        collections = self.client.get_collections().collections

        if not any(c.name == self.collection_name for c in collections):
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(
                    size=self.vector_size,
                    distance=Distance.COSINE
                )
            )

    def add_documents(self, chunks: List[Dict]) -> None:
        """Embed and store document chunks"""
        texts = [chunk['text'] for chunk in chunks]
        embeddings = self.encoder.encode(texts, show_progress_bar=True)

        points = [
            PointStruct(
                id=chunk['id'],
                vector=embedding.tolist(),
                payload={
                    'text': chunk['text'],
                    **chunk['metadata']
                }
            )
            for chunk, embedding in zip(chunks, embeddings)
        ]

        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

    def search(
        self,
        query: str,
        limit: int = 5,
        score_threshold: float = 0.7
    ) -> List[Dict]:
        """Semantic search for relevant chunks"""
        query_vector = self.encoder.encode(query).tolist()

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            limit=limit,
            score_threshold=score_threshold
        )

        return [
            {
                'id': hit.id,
                'score': hit.score,
                'text': hit.payload['text'],
                'metadata': {
                    k: v for k, v in hit.payload.items()
                    if k != 'text'
                }
            }
            for hit in results
        ]

3. LLM Integration with Context

The final piece ties retrieval to generation:

from anthropic import Anthropic
from typing import List, Dict, Optional

class RAGSystem:
    def __init__(
        self,
        vector_store: VectorStore,
        api_key: str,
        model: str = "claude-3-5-sonnet-20241022"
    ):
        self.vector_store = vector_store
        self.client = Anthropic(api_key=api_key)
        self.model = model

    def query(
        self,
        question: str,
        num_chunks: int = 5,
        temperature: float = 0.3,
        max_tokens: int = 2048
    ) -> Dict:
        """Query the RAG system"""
        # Retrieve relevant chunks
        chunks = self.vector_store.search(
            query=question,
            limit=num_chunks
        )

        if not chunks:
            return {
                'answer': "I couldn't find relevant information to answer that question.",
                'sources': []
            }

        # Build context from retrieved chunks
        context = self._build_context(chunks)

        # Generate answer with LLM
        prompt = self._build_prompt(question, context)

        response = self.client.messages.create(
            model=self.model,
            max_tokens=max_tokens,
            temperature=temperature,
            messages=[{
                "role": "user",
                "content": prompt
            }]
        )

        return {
            'answer': response.content[0].text,
            'sources': [
                {
                    'source': chunk['metadata'].get('source'),
                    'score': chunk['score']
                }
                for chunk in chunks
            ],
            'chunks_used': len(chunks)
        }

    def _build_context(self, chunks: List[Dict]) -> str:
        """Format retrieved chunks as context"""
        context_parts = []

        for i, chunk in enumerate(chunks, 1):
            source = chunk['metadata'].get('source', 'Unknown')
            context_parts.append(
                f"[Source {i}: {source}]\n{chunk['text']}\n"
            )

        return "\n---\n".join(context_parts)

    def _build_prompt(self, question: str, context: str) -> str:
        """Build the final prompt with context"""
        return f"""You are a helpful assistant that answers questions based on provided context.

Context Information:
{context}

Question: {question}

Instructions:
- Answer the question using ONLY the information from the context above
- If the context doesn't contain enough information, say so clearly
- Cite specific sources when making claims
- Be concise but thorough
- If multiple sources conflict, acknowledge the disagreement

Answer:"""

Choosing the Right Vector Database

The vector database landscape has exploded. Here’s my decision matrix:

For Prototyping: Chroma

import chromadb
from chromadb.config import Settings

client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./chroma_data"
))

collection = client.create_collection(
    name="knowledge_base",
    metadata={"hnsw:space": "cosine"}
)

Pros:

  • Zero setup, embedded database
  • Great for development
  • Simple API

Cons:

  • Limited scalability
  • No multi-tenancy
  • Basic filtering

For Production: Qdrant

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams

client = QdrantClient(
    url="https://your-cluster.qdrant.io",
    api_key="your-api-key"
)

client.create_collection(
    collection_name="production_kb",
    vectors_config=VectorParams(
        size=384,
        distance=Distance.COSINE
    ),
    # Enable payload indexing for fast filtering
    optimizers_config={
        "indexing_threshold": 20000
    }
)

Pros:

  • Excellent performance at scale
  • Rich filtering capabilities
  • Great cloud offering
  • Active development

Cons:

  • More operational complexity
  • Costs for cloud version

For Enterprise: Pinecone

Pros:

  • Fully managed
  • Excellent documentation
  • Battle-tested at scale

Cons:

  • Most expensive option
  • Vendor lock-in
  • Less flexible filtering

Embedding Strategies That Matter

Your embedding model choice dramatically impacts RAG quality:

Model Selection

from sentence_transformers import SentenceTransformer

# Lightweight, fast, good for general use
model_small = SentenceTransformer('all-MiniLM-L6-v2')  # 384 dimensions

# Better accuracy, more resource intensive
model_medium = SentenceTransformer('all-mpnet-base-v2')  # 768 dimensions

# Domain-specific: legal documents
model_legal = SentenceTransformer('nlpaueb/legal-bert-base-uncased')

# Multi-lingual support
model_multilingual = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')

Hybrid Search Pattern

Combine semantic and keyword search for best results:

from qdrant_client.models import Filter, FieldCondition, MatchValue

class HybridVectorStore(VectorStore):
    def hybrid_search(
        self,
        query: str,
        filters: Optional[Dict] = None,
        limit: int = 5
    ) -> List[Dict]:
        """Combine semantic search with metadata filtering"""
        query_vector = self.encoder.encode(query).tolist()

        # Build metadata filters
        query_filter = None
        if filters:
            conditions = [
                FieldCondition(
                    key=key,
                    match=MatchValue(value=value)
                )
                for key, value in filters.items()
            ]
            query_filter = Filter(must=conditions)

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=query_filter,
            limit=limit
        )

        return [self._format_result(hit) for hit in results]

Chunking Strategies

Bad chunking kills RAG performance. Here are battle-tested approaches:

1. Fixed-Size with Overlap

def chunk_by_tokens(text: str, chunk_size: int = 512, overlap: int = 50):
    """Token-aware chunking"""
    import tiktoken

    enc = tiktoken.get_encoding("cl100k_base")
    tokens = enc.encode(text)
    chunks = []

    for i in range(0, len(tokens), chunk_size - overlap):
        chunk_tokens = tokens[i:i + chunk_size]
        chunk_text = enc.decode(chunk_tokens)
        chunks.append(chunk_text)

    return chunks

2. Semantic Chunking

def chunk_by_semantics(text: str, similarity_threshold: float = 0.5):
    """Group sentences by semantic similarity"""
    import nltk
    from sentence_transformers import SentenceTransformer, util

    # Split into sentences
    sentences = nltk.sent_tokenize(text)

    # Embed sentences
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = model.encode(sentences)

    chunks = []
    current_chunk = [sentences[0]]

    for i in range(1, len(sentences)):
        # Compare with previous sentence
        similarity = util.cos_sim(embeddings[i-1], embeddings[i]).item()

        if similarity > similarity_threshold:
            current_chunk.append(sentences[i])
        else:
            chunks.append(' '.join(current_chunk))
            current_chunk = [sentences[i]]

    if current_chunk:
        chunks.append(' '.join(current_chunk))

    return chunks

3. Document-Structure Aware

def chunk_markdown(text: str) -> List[Dict]:
    """Chunk markdown while preserving structure"""
    import re

    # Split by headers
    sections = re.split(r'\n(#{1,6}\s+.*)\n', text)
    chunks = []

    current_headers = []
    current_content = []

    for i, section in enumerate(sections):
        if re.match(r'^#{1,6}\s+', section):
            # This is a header
            if current_content:
                chunks.append({
                    'text': '\n'.join(current_content),
                    'headers': current_headers.copy(),
                    'type': 'section'
                })
                current_content = []

            # Update header hierarchy
            level = len(re.match(r'^(#+)', section).group(1))
            current_headers = current_headers[:level-1]
            current_headers.append(section.strip('# ').strip())
        else:
            current_content.append(section)

    if current_content:
        chunks.append({
            'text': '\n'.join(current_content),
            'headers': current_headers,
            'type': 'section'
        })

    return chunks

Production Considerations

1. Caching Strategy

from functools import lru_cache
import hashlib

class CachedRAGSystem(RAGSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._embedding_cache = {}

    def _get_cached_embedding(self, text: str):
        """Cache embeddings for frequently used queries"""
        text_hash = hashlib.md5(text.encode()).hexdigest()

        if text_hash not in self._embedding_cache:
            self._embedding_cache[text_hash] = \
                self.vector_store.encoder.encode(text)

        return self._embedding_cache[text_hash]

2. Monitoring and Observability

import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class RAGMetrics:
    query_time: float
    retrieval_time: float
    llm_time: float
    chunks_retrieved: int
    tokens_used: int
    cache_hit: bool

class ObservableRAGSystem(RAGSystem):
    def query(self, question: str, **kwargs) -> Dict:
        start_time = time.time()

        # Track retrieval time
        retrieval_start = time.time()
        chunks = self.vector_store.search(query=question)
        retrieval_time = time.time() - retrieval_start

        # Track LLM time
        llm_start = time.time()
        response = super().query(question, **kwargs)
        llm_time = time.time() - llm_start

        # Log metrics
        metrics = RAGMetrics(
            query_time=time.time() - start_time,
            retrieval_time=retrieval_time,
            llm_time=llm_time,
            chunks_retrieved=len(chunks),
            tokens_used=self._estimate_tokens(response['answer']),
            cache_hit=False
        )

        self._log_metrics(metrics)

        return response

    def _log_metrics(self, metrics: RAGMetrics):
        """Send to your observability platform"""
        print(f"Query completed in {metrics.query_time:.2f}s")
        print(f"  Retrieval: {metrics.retrieval_time:.2f}s")
        print(f"  LLM: {metrics.llm_time:.2f}s")
        print(f"  Chunks: {metrics.chunks_retrieved}")
        print(f"  Tokens: {metrics.tokens_used}")

3. Error Handling and Fallbacks

class RobustRAGSystem(RAGSystem):
    def query(self, question: str, **kwargs) -> Dict:
        try:
            # Try primary retrieval
            return super().query(question, **kwargs)
        except Exception as e:
            # Log error
            print(f"Primary RAG failed: {e}")

            # Fallback to direct LLM
            return self._fallback_query(question)

    def _fallback_query(self, question: str) -> Dict:
        """Fallback when retrieval fails"""
        response = self.client.messages.create(
            model=self.model,
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"Answer this question: {question}\n\n"
                          f"Note: I don't have access to my knowledge base, "
                          f"so I can only provide general information."
            }]
        )

        return {
            'answer': response.content[0].text,
            'sources': [],
            'fallback': True
        }

Advanced Patterns

Multi-Query Retrieval

Generate multiple search queries for better recall:

class MultiQueryRAG(RAGSystem):
    def _generate_query_variants(self, question: str) -> List[str]:
        """Generate alternative phrasings"""
        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",  # Faster, cheaper
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"""Generate 3 alternative phrasings of this question:

{question}

Return only the questions, one per line."""
            }]
        )

        variants = response.content[0].text.strip().split('\n')
        return [question] + variants

    def query(self, question: str, **kwargs) -> Dict:
        """Search with multiple query variants"""
        variants = self._generate_query_variants(question)

        all_chunks = []
        seen_ids = set()

        for variant in variants:
            chunks = self.vector_store.search(
                query=variant,
                limit=3  # Fewer per variant
            )

            for chunk in chunks:
                if chunk['id'] not in seen_ids:
                    all_chunks.append(chunk)
                    seen_ids.add(chunk['id'])

        # Re-rank by score
        all_chunks.sort(key=lambda x: x['score'], reverse=True)
        top_chunks = all_chunks[:kwargs.get('num_chunks', 5)]

        # Continue with standard flow
        context = self._build_context(top_chunks)
        return self._generate_answer(question, context, top_chunks)

Re-Ranking for Better Precision

from sentence_transformers import CrossEncoder

class RerankedRAGSystem(RAGSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Cross-encoder for re-ranking
        self.reranker = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

    def query(self, question: str, **kwargs) -> Dict:
        # Get more candidates
        initial_chunks = self.vector_store.search(
            query=question,
            limit=20  # Over-retrieve
        )

        # Re-rank with cross-encoder
        pairs = [[question, chunk['text']] for chunk in initial_chunks]
        scores = self.reranker.predict(pairs)

        # Combine scores
        for chunk, score in zip(initial_chunks, scores):
            chunk['rerank_score'] = score

        # Sort by re-rank score
        initial_chunks.sort(key=lambda x: x['rerank_score'], reverse=True)
        top_chunks = initial_chunks[:kwargs.get('num_chunks', 5)]

        # Generate answer
        context = self._build_context(top_chunks)
        return self._generate_answer(question, context, top_chunks)

Key Takeaways

Building production RAG systems requires:

  1. Smart chunking - Match your content structure
  2. Quality embeddings - Choose the right model for your domain
  3. Hybrid search - Combine semantic and metadata filtering
  4. Observability - Track metrics at every stage
  5. Error handling - Graceful degradation when things fail
  6. Re-ranking - Don’t trust embedding similarity alone

The patterns I’ve shared here power RAG systems handling millions of queries. Start simple with basic retrieval, then layer in sophistication as you learn where your system struggles.

Further Reading


Building AI systems that actually work in production. One vector at a time.

#LLM Integration #RAG #Vector Databases #Semantic Search #Embeddings #AI Architecture