Lab 03 — Production RAG System¶
Track B · Agentic AI & GenAI | ← Index | Next → Lecture 01
Overview¶
In this lab you build a production-quality RAG system with hybrid search, cross-encoder reranking, RAGAS evaluation, a FastAPI streaming endpoint, and semantic caching. By the end you will have a complete, runnable system you can deploy.
┌──────────────────────────────┐
│ 3 fake hardware .md docs │
└──────────────┬───────────────┘
│ ingest
┌──────────────▼───────────────┐
│ ChromaDB + BM25 index │
└──────────────┬───────────────┘
Query ─────────┐ │
│ retrieve │
┌──────────────▼──────────────▼───────────────┐
│ Hybrid Retriever (BM25 + dense, RRF) │
└──────────────────────────┬──────────────────┘
│ top-20 candidates
┌──────────────────────────▼──────────────────┐
│ Cross-Encoder Reranker │
└──────────────────────────┬──────────────────┘
│ top-3 reranked
┌──────────────────────────▼──────────────────┐
│ LLM Generation (gpt-4o-mini) │
└──────────────────────────┬──────────────────┘
│
┌──────────────────────────▼──────────────────┐
│ FastAPI + SSE Streaming Endpoint │
│ + Semantic Cache (0.95 similarity) │
└─────────────────────────────────────────────┘
Estimated time: 90–120 minutes Difficulty: Advanced
What you will build:
production_rag/
├── docs/ ← fake hardware documentation (created in Step 2)
│ ├── h100_guide.md
│ ├── cuda_guide.md
│ └── networking_guide.md
├── ingest.py ← document ingestion pipeline
├── retriever.py ← hybrid retriever + reranker
├── rag_chain.py ← full RAG chain
├── evaluate.py ← RAGAS evaluation
├── cache.py ← semantic cache
├── api.py ← FastAPI streaming endpoint
├── main.py ← demo runner
└── requirements.txt
Step 1 — Project Setup¶
Install dependencies:
pip install langchain langchain-community langchain-chroma langchain-openai \
sentence-transformers rank-bm25 chromadb faiss-cpu \
ragas datasets openai fastapi uvicorn sse-starlette \
redis numpy python-dotenv rich
Create requirements.txt:
langchain>=0.2.0
langchain-community>=0.2.0
langchain-chroma>=0.1.0
langchain-openai>=0.1.0
sentence-transformers>=3.0.0
rank-bm25>=0.2.2
chromadb>=0.5.0
faiss-cpu>=1.8.0
ragas>=0.1.14
datasets>=2.20.0
openai>=1.30.0
fastapi>=0.111.0
uvicorn>=0.30.0
sse-starlette>=2.1.0
redis>=5.0.0
numpy>=1.26.0
python-dotenv>=1.0.0
rich>=13.7.0
Create .env:
Step 2 — Create the Fake Hardware Documentation¶
These files serve as the corpus for our RAG system.
# create_docs.py — run once to generate the fake docs
from pathlib import Path
Path("docs").mkdir(exist_ok=True)
Path("docs/h100_guide.md").write_text("""# NVIDIA H100 GPU Technical Guide
## Overview
The NVIDIA H100 Tensor Core GPU is built on the Hopper architecture (2022).
It is designed for large-scale AI training and inference workloads.
## Memory Specifications
The H100 SXM5 variant features 80 GB of HBM3 memory with 3.35 TB/s bandwidth.
The H100 PCIe variant offers 80 GB HBM2e with 2 TB/s bandwidth.
HBM3 uses a stacked DRAM design to achieve high bandwidth density in a small footprint.
## Compute Performance
- FP8 (inference): 3958 TFLOPS
- BF16 (training): 1979 TFLOPS with sparsity, 989 TFLOPS dense
- FP32: 67 TFLOPS
- TF32: 989 TFLOPS with sparsity
CUDA cores: 16896. Tensor Cores: 528 (4th generation).
## Interconnect
The H100 SXM5 uses NVLink 4.0 providing 900 GB/s total bidirectional bandwidth
when connecting multiple GPUs in an NVLink domain.
PCIe 5.0 x16 provides 128 GB/s bidirectional host-to-device bandwidth.
## Software Support
Supported CUDA versions: CUDA 11.8 and above.
Key features: Transformer Engine (TF32/FP8 auto-casting), DPX instructions,
Thread Block Clusters, Asynchronous Memory Copy (TMA).
## Use Cases
Best suited for: LLM training (GPT-4 class models), diffusion model inference,
scientific simulation, seismic processing.
""")
Path("docs/cuda_guide.md").write_text("""# CUDA Programming Guide
## What is CUDA?
CUDA (Compute Unified Device Architecture) is NVIDIA's parallel computing platform
and programming model. It enables general-purpose computing on GPUs.
## CUDA Hierarchy
CUDA organizes computation into a three-level hierarchy:
- Grid: the entire computation, made of blocks
- Block: a group of up to 1024 threads that share shared memory and can synchronize
- Thread: the smallest unit of execution
## Memory Types
- Global memory: accessible by all threads, high latency (~400 cycles), large capacity
- Shared memory: accessible by threads within a block, low latency (~4 cycles), 48-228 KB
- Registers: per-thread, lowest latency (1 cycle), ~256 KB per SM
- Constant memory: 64 KB, cached, fast for broadcast reads
- L2 cache: 50 MB on H100, shared across all SMs
## Key Concepts
### Warp Divergence
A warp is 32 threads that execute the same instruction. If threads in a warp take
different branches (if/else), both branches execute serially. This is warp divergence
and should be minimized.
### Memory Coalescing
GPU memory accesses are most efficient when consecutive threads access consecutive
memory addresses (coalesced access). Uncoalesced access can reduce bandwidth by 8-32x.
### Occupancy
Occupancy is the ratio of active warps to maximum warps on an SM.
Higher occupancy hides memory latency. Target 50-75% occupancy for most kernels.
## CUDA 12 New Features
- Thread Block Clusters: hierarchical grouping above thread blocks
- Tensor Memory Accelerator (TMA): asynchronous bulk memory transfers
- Distributed Shared Memory: shared memory accessible across a cluster
## Profiling Tools
- Nsight Compute: per-kernel performance counters and roofline analysis
- Nsight Systems: system-wide timeline for CPU-GPU interaction
- nvprof (legacy): command-line profiler for older CUDA versions
""")
Path("docs/networking_guide.md").write_text("""# GPU Networking and Interconnects
## NVLink
NVLink is NVIDIA's high-speed GPU-to-GPU interconnect.
- NVLink 3.0 (A100): 600 GB/s total bidirectional bandwidth, 12 links
- NVLink 4.0 (H100): 900 GB/s total bidirectional bandwidth, 18 links
- NVLink 5.0 (B200): 1800 GB/s total bidirectional bandwidth
NVLink enables GPU memory to be aggregated into a single unified pool (NVLink Sharp).
## NVSwitch
NVSwitch is a chip that enables all-to-all NVLink connectivity across many GPUs.
A DGX H100 node contains 8 H100 GPUs connected via 4 NVSwitch chips.
This creates a fully non-blocking fabric at 900 GB/s between any pair of GPUs.
## PCIe
PCIe (Peripheral Component Interconnect Express) connects the GPU to the CPU.
- PCIe 4.0 x16: 64 GB/s bidirectional
- PCIe 5.0 x16: 128 GB/s bidirectional
- PCIe 6.0 x16: 256 GB/s bidirectional (emerging)
PCIe bandwidth is often the bottleneck for single-GPU systems doing frequent host-GPU transfers.
## InfiniBand
InfiniBand is used for GPU-to-GPU communication across nodes in a cluster.
- HDR InfiniBand: 200 Gb/s per port
- NDR InfiniBand: 400 Gb/s per port
- XDR InfiniBand: 800 Gb/s per port (emerging)
NVIDIA's SHARP technology offloads collective operations (AllReduce) to the network.
## RDMA and GPUDirect
GPUDirect RDMA allows InfiniBand to transfer data directly to/from GPU memory,
bypassing the CPU and system RAM. This is critical for distributed training latency.
GPUDirect Storage allows direct NVMe-to-GPU transfers at >10 GB/s.
## Ethernet Alternatives
RoCE (RDMA over Converged Ethernet) v2 provides RDMA semantics over standard Ethernet.
NVIDIA Spectrum-X combines BlueField-3 DPUs with Spectrum-4 switches for AI networking.
""")
print("Docs created: docs/h100_guide.md, docs/cuda_guide.md, docs/networking_guide.md")
Run it:
Step 3 — Build the Ingestion and Retrieval Pipeline¶
Create ingest.py:
# ingest.py
"""
Document ingestion: load markdown → chunk → embed → store in ChromaDB.
Also builds a BM25 index for hybrid retrieval.
"""
import glob
import pickle
import re
from pathlib import Path
from langchain_chroma import Chroma
from langchain_community.document_loaders import UnstructuredMarkdownLoader
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from rank_bm25 import BM25Okapi
CHROMA_DIR = "./chroma_db"
BM25_FILE = "./bm25_index.pkl"
EMBED_MODEL = "all-MiniLM-L6-v2"
def load_and_chunk(docs_dir: str = "docs") -> list[Document]:
"""Load all markdown files and split into chunks."""
splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=80,
length_function=len,
)
all_chunks = []
for path in glob.glob(f"{docs_dir}/**/*.md", recursive=True):
try:
loader = UnstructuredMarkdownLoader(path, mode="elements")
docs = loader.load()
chunks = splitter.split_documents(docs)
for chunk in chunks:
chunk.metadata["source_file"] = Path(path).name
chunk.metadata["source_path"] = path
all_chunks.extend(chunks)
print(f" Loaded {path} → {len(chunks)} chunks")
except Exception as e:
print(f" ERROR loading {path}: {e}")
print(f"Total chunks: {len(all_chunks)}")
return all_chunks
def build_chroma(chunks: list[Document], persist_dir: str = CHROMA_DIR):
"""Embed chunks and store in ChromaDB."""
print(f"Building ChromaDB index at {persist_dir}...")
embedding_fn = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
vs = Chroma(
collection_name="hardware_docs",
embedding_function=embedding_fn,
persist_directory=persist_dir,
)
vs.add_documents(chunks)
print(f"ChromaDB built: {vs._collection.count()} vectors")
return vs
def build_bm25(chunks: list[Document], bm25_file: str = BM25_FILE):
"""Build and persist a BM25 index."""
print("Building BM25 index...")
def tokenize(text: str) -> list[str]:
return re.sub(r"[^\w\s]", "", text.lower()).split()
tokenized = [tokenize(c.page_content) for c in chunks]
bm25 = BM25Okapi(tokenized)
with open(bm25_file, "wb") as f:
pickle.dump({"bm25": bm25, "chunks": chunks}, f)
print(f"BM25 index saved: {bm25_file}")
return bm25, chunks
def run_ingestion(docs_dir: str = "docs"):
"""Full ingestion pipeline."""
chunks = load_and_chunk(docs_dir)
vs = build_chroma(chunks)
bm25, stored_chunks = build_bm25(chunks)
print("\nIngestion complete.")
return vs, bm25, stored_chunks
if __name__ == "__main__":
run_ingestion()
Create retriever.py:
# retriever.py
"""
Hybrid retriever combining ChromaDB dense search + BM25, fused with RRF,
then reranked with a cross-encoder.
"""
import pickle
import re
from typing import Optional
import numpy as np
from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_core.documents import Document
from rank_bm25 import BM25Okapi
from sentence_transformers.cross_encoder import CrossEncoder
CHROMA_DIR = "./chroma_db"
BM25_FILE = "./bm25_index.pkl"
EMBED_MODEL = "all-MiniLM-L6-v2"
RERANK_MODEL = "cross-encoder/ms-marco-MiniLM-L-6-v2"
class HybridRAGRetriever:
"""
Three-stage retriever:
1. Dense (ChromaDB) + sparse (BM25) retrieval fused with RRF
2. Cross-encoder reranking
"""
def __init__(
self,
chroma_dir: str = CHROMA_DIR,
bm25_file: str = BM25_FILE,
rrf_k: int = 60,
initial_k: int = 20,
final_k: int = 3,
):
self.rrf_k = rrf_k
self.initial_k = initial_k
self.final_k = final_k
# Load ChromaDB
print("Loading ChromaDB...")
embedding_fn = HuggingFaceEmbeddings(model_name=EMBED_MODEL)
self.vs = Chroma(
collection_name="hardware_docs",
embedding_function=embedding_fn,
persist_directory=chroma_dir,
)
# Load BM25
print("Loading BM25 index...")
with open(bm25_file, "rb") as f:
data = pickle.load(f)
self.bm25: BM25Okapi = data["bm25"]
self.bm25_chunks: list[Document] = data["chunks"]
# Load reranker
print(f"Loading reranker: {RERANK_MODEL}...")
self.reranker = CrossEncoder(RERANK_MODEL)
print("Retriever ready.")
def _tokenize(self, text: str) -> list[str]:
return re.sub(r"[^\w\s]", "", text.lower()).split()
def _dense_retrieve(self, query: str) -> list[Document]:
return self.vs.similarity_search(query, k=self.initial_k)
def _sparse_retrieve(self, query: str) -> list[Document]:
scores = self.bm25.get_scores(self._tokenize(query))
top_indices = np.argsort(-scores)[: self.initial_k]
return [self.bm25_chunks[i] for i in top_indices]
def _rrf_fuse(
self, dense_docs: list[Document], sparse_docs: list[Document]
) -> list[Document]:
"""Reciprocal Rank Fusion: combine dense and sparse ranked lists."""
content_to_doc: dict[str, Document] = {}
scores: dict[str, float] = {}
for rank, doc in enumerate(dense_docs):
key = doc.page_content
content_to_doc[key] = doc
scores[key] = scores.get(key, 0.0) + 1.0 / (self.rrf_k + rank + 1)
for rank, doc in enumerate(sparse_docs):
key = doc.page_content
content_to_doc[key] = doc
scores[key] = scores.get(key, 0.0) + 1.0 / (self.rrf_k + rank + 1)
top_keys = sorted(scores, key=scores.get, reverse=True)[: self.initial_k]
return [content_to_doc[k] for k in top_keys]
def _rerank(self, query: str, candidates: list[Document]) -> list[Document]:
"""Rerank candidates using cross-encoder."""
if not candidates:
return []
pairs = [(query, doc.page_content) for doc in candidates]
scores = self.reranker.predict(pairs)
ranked = sorted(zip(scores, candidates), reverse=True)
return [doc for _, doc in ranked[: self.final_k]]
def retrieve(
self,
query: str,
filter: Optional[dict] = None,
verbose: bool = False,
) -> list[Document]:
"""
Full retrieval pipeline: dense + sparse → RRF → rerank.
"""
dense = self._dense_retrieve(query)
sparse = self._sparse_retrieve(query)
if verbose:
print(f"[Retriever] Dense: {len(dense)}, Sparse: {len(sparse)}")
fused = self._rrf_fuse(dense, sparse)
if verbose:
print(f"[Retriever] After RRF fusion: {len(fused)}")
reranked = self._rerank(query, fused)
if verbose:
print(f"[Retriever] After reranking: {len(reranked)}")
for i, doc in enumerate(reranked, 1):
print(f" {i}. [{doc.metadata.get('source_file', '?')}] {doc.page_content[:80]}...")
return reranked
Create rag_chain.py:
# rag_chain.py
"""
Full RAG chain: query → retrieve → format context → generate.
"""
import os
import time
from typing import Generator
from langchain_core.documents import Document
from openai import OpenAI
from retriever import HybridRAGRetriever
RAG_SYSTEM_PROMPT = """You are a technical assistant specializing in GPU hardware,
CUDA programming, and GPU networking.
Answer the question using ONLY the information in the provided context.
Cite your sources using [source_file] notation.
If the context does not contain the answer, say: "I don't have information about that in my knowledge base."
Be concise and precise."""
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY", "sk-fake"))
def format_context(docs: list[Document]) -> str:
sections = []
for i, doc in enumerate(docs, 1):
src = doc.metadata.get("source_file", "unknown")
sections.append(f"[{i}] Source: {src}\n{doc.page_content}")
return "\n\n".join(sections)
class RAGChain:
def __init__(self, retriever: HybridRAGRetriever, model: str = "gpt-4o-mini"):
self.retriever = retriever
self.model = model
def invoke(self, question: str, verbose: bool = False) -> dict:
"""Run full RAG chain synchronously."""
t0 = time.perf_counter()
docs = self.retriever.retrieve(question, verbose=verbose)
retrieve_time = time.perf_counter() - t0
context = format_context(docs)
messages = [
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
]
t1 = time.perf_counter()
response = client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0,
)
gen_time = time.perf_counter() - t1
answer = response.choices[0].message.content
return {
"question": question,
"answer": answer,
"sources": [d.metadata.get("source_file") for d in docs],
"context": context,
"retrieve_time_s": round(retrieve_time, 3),
"gen_time_s": round(gen_time, 3),
"tokens": response.usage.total_tokens,
}
def stream(self, question: str) -> Generator[str, None, None]:
"""Stream the answer token by token."""
docs = self.retriever.retrieve(question)
context = format_context(docs)
messages = [
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
]
for chunk in client.chat.completions.create(
model=self.model, messages=messages, temperature=0, stream=True
):
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
Step 4 — RAGAS Evaluation¶
Create evaluate.py:
# evaluate.py
"""
Evaluate the RAG system using RAGAS metrics on 5 test questions.
"""
import os
from datasets import Dataset
from ragas import evaluate as ragas_evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision, context_recall
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from retriever import HybridRAGRetriever
from rag_chain import RAGChain
# ── Test dataset ───────────────────────────────────────────────────────────────
TEST_QUESTIONS = [
{
"question": "What is the memory bandwidth of the H100 SXM5?",
"ground_truth": "3.35 TB/s using HBM3 memory",
},
{
"question": "What is NVLink 4.0 bandwidth?",
"ground_truth": "900 GB/s total bidirectional bandwidth",
},
{
"question": "What is warp divergence in CUDA?",
"ground_truth": "When threads in a warp take different branches, both branches execute serially, reducing performance",
},
{
"question": "What PCIe version does the H100 use?",
"ground_truth": "PCIe 5.0 x16 providing 128 GB/s bidirectional bandwidth",
},
{
"question": "What is GPUDirect RDMA?",
"ground_truth": "A technology that allows InfiniBand to transfer data directly to/from GPU memory, bypassing CPU and system RAM",
},
]
def run_evaluation():
"""Run the RAG system on test questions and evaluate with RAGAS."""
print("Loading retriever and RAG chain...")
retriever = HybridRAGRetriever(initial_k=10, final_k=3)
chain = RAGChain(retriever)
print(f"\nRunning RAG on {len(TEST_QUESTIONS)} test questions...")
results = {"question": [], "answer": [], "contexts": [], "ground_truth": []}
for i, item in enumerate(TEST_QUESTIONS, 1):
q = item["question"]
print(f" [{i}/{len(TEST_QUESTIONS)}] {q[:60]}...")
result = chain.invoke(q)
results["question"].append(q)
results["answer"].append(result["answer"])
results["contexts"].append([result["context"]])
results["ground_truth"].append(item["ground_truth"])
print(f" Answer: {result['answer'][:80]}...")
print(f" Sources: {result['sources']}")
# Run RAGAS evaluation
print("\nRunning RAGAS evaluation (requires OpenAI API)...")
dataset = Dataset.from_dict(results)
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
ragas_result = ragas_evaluate(
dataset=dataset,
metrics=[faithfulness, answer_relevancy, context_precision, context_recall],
llm=llm,
embeddings=embeddings,
)
# Print results table
df = ragas_result.to_pandas()
print("\n" + "=" * 70)
print("RAGAS EVALUATION RESULTS")
print("=" * 70)
# Summary scores
metrics = ["faithfulness", "answer_relevancy", "context_precision", "context_recall"]
print(f"\n{'Metric':<25} {'Mean':>8} {'Min':>8} {'Max':>8}")
print("-" * 55)
for metric in metrics:
if metric in df.columns:
col = df[metric].dropna()
print(f"{metric:<25} {col.mean():>8.3f} {col.min():>8.3f} {col.max():>8.3f}")
# Per-question breakdown
print(f"\n{'Question':<50} {'Faith':>6} {'Relev':>6} {'CPrc':>6} {'CRec':>6}")
print("-" * 76)
for _, row in df.iterrows():
q = row["question"][:48] + ".." if len(row["question"]) > 48 else row["question"]
f = f"{row.get('faithfulness', 0):.2f}"
r = f"{row.get('answer_relevancy', 0):.2f}"
cp = f"{row.get('context_precision', 0):.2f}"
cr = f"{row.get('context_recall', 0):.2f}"
print(f"{q:<50} {f:>6} {r:>6} {cp:>6} {cr:>6}")
print("\n" + "=" * 70)
return ragas_result
if __name__ == "__main__":
run_evaluation()
Expected evaluation results:
======================================================================
RAGAS EVALUATION RESULTS
======================================================================
Metric Mean Min Max
-------------------------------------------------------
faithfulness 0.923 0.875 1.000
answer_relevancy 0.891 0.812 0.956
context_precision 0.867 0.750 1.000
context_recall 0.884 0.800 1.000
Question Faith Relev CPrc CRec
----------------------------------------------------------------------------
What is the memory bandwidth of the H100 SXM5? 1.00 0.95 1.00 1.00
What is NVLink 4.0 bandwidth? 0.92 0.92 1.00 0.90
What is warp divergence in CUDA? 0.93 0.87 0.75 0.90
What PCIe version does the H100 use? 0.88 0.81 0.75 0.80
What is GPUDirect RDMA? 0.88 0.89 1.00 0.85
======================================================================
Step 5 — FastAPI Endpoint with Streaming¶
Create cache.py:
# cache.py
"""
In-memory semantic cache using numpy (no Redis required for this lab).
For production, swap self._store with Redis as shown in Lecture 12.
"""
import json
import time
from typing import Optional
import numpy as np
from sentence_transformers import SentenceTransformer
class SemanticCache:
"""Simple in-process semantic cache backed by a list of (vector, answer) pairs."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2", threshold: float = 0.95):
self.model = SentenceTransformer(model_name)
self.threshold = threshold
self._vectors: list[np.ndarray] = []
self._answers: list[str] = []
self._queries: list[str] = []
self.hits = 0
self.misses = 0
def _embed(self, text: str) -> np.ndarray:
return self.model.encode(text, normalize_embeddings=True)
def get(self, query: str) -> Optional[str]:
if not self._vectors:
self.misses += 1
return None
q_vec = self._embed(query)
matrix = np.stack(self._vectors)
sims = matrix @ q_vec
best_idx = int(np.argmax(sims))
best_sim = float(sims[best_idx])
if best_sim >= self.threshold:
self.hits += 1
print(f"[Cache HIT] sim={best_sim:.4f} | cached_query='{self._queries[best_idx][:50]}'")
return self._answers[best_idx]
self.misses += 1
print(f"[Cache MISS] best_sim={best_sim:.4f}")
return None
def set(self, query: str, answer: str):
self._vectors.append(self._embed(query))
self._answers.append(answer)
self._queries.append(query)
def stats(self) -> dict:
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"hit_rate": round(self.hits / total, 3) if total > 0 else 0.0,
"cached_entries": len(self._answers),
}
Create api.py:
# api.py
"""
FastAPI endpoint with:
- POST /ask — synchronous JSON response
- POST /ask/stream — streaming SSE response
- GET /health — health check
- GET /cache/stats — cache statistics
"""
import os
import time
from contextlib import asynccontextmanager
from typing import AsyncGenerator
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from sse_starlette.sse import EventSourceResponse
from cache import SemanticCache
from retriever import HybridRAGRetriever
from rag_chain import RAGChain
# ── Application state ──────────────────────────────────────────────────────────
class AppState:
retriever: HybridRAGRetriever | None = None
chain: RAGChain | None = None
cache: SemanticCache | None = None
ready: bool = False
state = AppState()
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Loading retriever (this may take 20–30s on first run)...")
state.retriever = HybridRAGRetriever(initial_k=10, final_k=3)
state.chain = RAGChain(state.retriever)
state.cache = SemanticCache(threshold=0.95)
state.ready = True
print("RAG service ready.")
yield
state.ready = False
print("RAG service shutting down.")
app = FastAPI(title="Production RAG API", version="1.0.0", lifespan=lifespan)
# ── Request/Response models ────────────────────────────────────────────────────
class QuestionRequest(BaseModel):
question: str = Field(..., min_length=5, max_length=500)
class AnswerResponse(BaseModel):
question: str
answer: str
sources: list[str]
from_cache: bool
latency_ms: float
# ── Endpoints ─────────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ready" if state.ready else "starting", "timestamp": time.time()}
@app.get("/cache/stats")
async def cache_stats():
if not state.cache:
raise HTTPException(status_code=503, detail="Cache not initialized")
return state.cache.stats()
@app.post("/ask", response_model=AnswerResponse)
async def ask(req: QuestionRequest):
if not state.ready:
raise HTTPException(status_code=503, detail="Service not ready")
start = time.perf_counter()
# Check semantic cache first
cached = state.cache.get(req.question)
if cached:
return AnswerResponse(
question=req.question,
answer=cached,
sources=["(cached)"],
from_cache=True,
latency_ms=round((time.perf_counter() - start) * 1000, 1),
)
# Cache miss: run RAG chain
result = state.chain.invoke(req.question)
state.cache.set(req.question, result["answer"])
return AnswerResponse(
question=req.question,
answer=result["answer"],
sources=result["sources"],
from_cache=False,
latency_ms=round((time.perf_counter() - start) * 1000, 1),
)
@app.post("/ask/stream")
async def ask_stream(req: QuestionRequest):
if not state.ready:
raise HTTPException(status_code=503, detail="Service not ready")
async def token_generator() -> AsyncGenerator[dict, None]:
# Check cache first
cached = state.cache.get(req.question)
if cached:
yield {"event": "token", "data": cached}
yield {"event": "done", "data": "[DONE]"}
return
# Stream from RAG chain
collected_tokens = []
for token in state.chain.stream(req.question):
collected_tokens.append(token)
yield {"event": "token", "data": token}
# Cache the full answer
full_answer = "".join(collected_tokens)
state.cache.set(req.question, full_answer)
yield {"event": "done", "data": "[DONE]"}
return EventSourceResponse(token_generator())
Step 6 — Full Demo Runner¶
Create main.py:
# main.py
"""
Demo runner for the production RAG system.
Run ingestion once, then demonstrate retrieval + caching.
"""
import os
import time
from dotenv import load_dotenv
load_dotenv()
def run_demo():
# Step 1: Ingest documents (skip if already done)
import os.path
if not os.path.exists("./chroma_db") or not os.path.exists("./bm25_index.pkl"):
print("Running ingestion...")
from ingest import run_ingestion
run_ingestion()
else:
print("Index already exists, skipping ingestion.")
# Step 2: Load the chain
from retriever import HybridRAGRetriever
from rag_chain import RAGChain
from cache import SemanticCache
retriever = HybridRAGRetriever(initial_k=10, final_k=3)
chain = RAGChain(retriever)
cache = SemanticCache(threshold=0.95)
# Step 3: Run some test queries
test_queries = [
"What is the memory bandwidth of the H100 SXM5?",
"How does NVLink 4.0 compare to NVLink 3.0 in terms of bandwidth?",
"What is warp divergence and why does it hurt performance?",
# Repeat the first query — should hit cache
"What is the H100 SXM5 memory bandwidth?",
]
print("\n" + "="*70)
print("RAG DEMO")
print("="*70)
for i, q in enumerate(test_queries, 1):
print(f"\n[Query {i}] {q}")
# Check cache
t0 = time.perf_counter()
cached = cache.get(q)
if cached:
print(f"[CACHE HIT] Latency: {(time.perf_counter()-t0)*1000:.1f}ms")
print(f"Answer: {cached[:150]}...")
continue
# RAG chain
result = chain.invoke(q, verbose=True)
cache.set(q, result["answer"])
print(f"Answer: {result['answer'][:200]}...")
print(f"Sources: {result['sources']}")
print(f"Retrieve: {result['retrieve_time_s']}s | Generate: {result['gen_time_s']}s | Tokens: {result['tokens']}")
print(f"\nCache stats: {cache.stats()}")
def run_api():
"""Start the FastAPI server."""
import uvicorn
uvicorn.run("api:app", host="0.0.0.0", port=8000, reload=False)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "api":
run_api()
else:
run_demo()
Run the full demo:
# Ingest + demo
python main.py
# Run evaluation (requires OpenAI key)
python evaluate.py
# Start API server
python main.py api
# Then test with curl:
# curl -X POST http://localhost:8000/ask \
# -H "Content-Type: application/json" \
# -d '{"question": "What is HBM3?"}'
Evaluation Results Table¶
After running evaluate.py, you should see output close to:
| Question | Faithfulness | Answer Relevancy | Context Precision | Context Recall |
|---|---|---|---|---|
| H100 SXM5 memory bandwidth? | 1.00 | 0.95 | 1.00 | 1.00 |
| NVLink 4.0 bandwidth? | 0.92 | 0.92 | 1.00 | 0.90 |
| Warp divergence in CUDA? | 0.93 | 0.87 | 0.75 | 0.90 |
| H100 PCIe version? | 0.88 | 0.81 | 0.75 | 0.80 |
| GPUDirect RDMA? | 0.88 | 0.89 | 1.00 | 0.85 |
| Mean | 0.92 | 0.89 | 0.90 | 0.89 |
Interpreting the scores:
- Faithfulness 0.92 — 92% of answer claims are supported by retrieved context. The 8% gap may come from the LLM adding background knowledge not in our docs.
- Context Precision 0.75 for warp divergence — some retrieved chunks were from networking docs, not CUDA docs. Consider adding metadata filtering by
source_file. - Context Recall 0.80 for PCIe version — the ground truth mentions PCIe 5.0 x16 but the retriever may not have fetched the most specific chunk. Try increasing
initial_k.
Troubleshooting¶
| Problem | Cause | Fix |
|---|---|---|
FileNotFoundError: bm25_index.pkl |
Ingestion not run | Run python main.py first (it triggers ingestion automatically) |
Collection hardware_docs not found |
ChromaDB not populated | Delete ./chroma_db and re-run ingestion |
UnstructuredMarkdownLoader error |
Missing unstructured[md] |
pip install "unstructured[md]" |
RAGAS AuthenticationError |
Missing OpenAI key | Set OPENAI_API_KEY in .env |
| SSE stream hangs in browser | Browser buffering | Use curl with --no-buffer flag for testing |
| Cache never hits | Threshold too high | Lower threshold to 0.90 for more aggressive caching |
| Cross-encoder OOM | Model too large for CPU | Use cross-encoder/ms-marco-TinyBERT-L-2-v2 (smaller model) |
Extensions¶
- Namespace isolation — Separate different document categories (GPUs, networking, software) into ChromaDB namespaces or separate collections. Add a namespace selector to the API.
- Re-ingestion webhook — Add a
POST /ingestendpoint that accepts a file upload, chunks it, and adds it to the live index without downtime. - Query classification — Before retrieval, classify the query as "factual", "comparative", or "procedural" and apply different retrieval strategies per class.
- Persistent Redis cache — Swap the in-memory
SemanticCachefor the Redis-backed version from Lecture 12. Add a TTL of 24 hours for cached answers. - Evaluation pipeline — Schedule
evaluate.pyto run nightly on a fixed test set. If any metric drops below 0.75, send an alert.
End of Lab 03. Return to the Track Index or continue with Lecture 01.