Build a RAG (Retrieval-Augmented Generation) chatbot step by step using Python (FastAPI) and Couchbase Vector Search.
You will start with a simple OpenAI chatbot and progressively add vector search, conversation history, and semantic caching — all without switching branches.
- Python 3.11+
- Node.js 18+ (for the frontend)
- An OpenAI API key
- A Couchbase Capella account (needed from Exercise 3 onwards)
# Clone this branch
git clone -b python-workshop https://github.com/couchbaselabs/ais-hol
cd ais-hol
# Install Python dependencies (once, covers all exercises)
cd backend
poetry env use 3.13
eval $(poetry env activate)
poetry install
# Copy and fill in environment variables
cp .env.example .env
# Edit .env — at minimum set OPENAI_API_KEY for Exercise 1
# Install frontend dependencies (once)
cd ../frontend
npm installA chatbot that accepts a message and an optional system prompt, calls OpenAI, and returns a plain text response. Use the Simple Chat tab in the UI.
Edit backend/.env and set:
INFERENCE_MODEL_API_KEY=gpt-4o-mini
INFERENCE_MODEL_BASE_URL=https://api.openai.com/v1
INFERENCE_MODEL=your_openai_api_key_here
PORT=5000Open backend/services/openai_service.py and find the generate_response function. Replace the placeholder with a real OpenAI call:
async def generate_response(message: str, system_prompt: str | None = None) -> str:
client = _get_inference_client()
default_prompt = "You are a helpful AI assistant. Be concise and friendly."
final_prompt = system_prompt or default_prompt
completion = await client.chat.completions.create(
model=INFERENCE_MODEL,
messages=[
{"role": "system", "content": final_prompt},
{"role": "user", "content": message},
],
max_tokens=1000,
temperature=0.7,
)
return completion.choices[0].message.content.strip()# Terminal 1 — backend
cd backend
eval $(poetry env activate)
python main.py
# Terminal 2 — frontend
cd frontend
npm run devOpen the app, select the Simple Chat tab, and send a message. You should get a real AI response.
API: POST /api/chat — { "message": "...", "systemPrompt": "..." } returns { "response": "...", "timestamp": "..." }
This can also be verified using curl:
curl 'localhost:5000/api/chat' \
-H 'content-type: application/json' \
--data-raw $'{"message":"Hello?","systemPrompt":"You are a helpful AI assistant. Please respond to the user\'s message in a friendly and helpful manner. Keep your responses concise but informative."}'`Before building the RAG app you need chunked documents stored in Couchbase and their vector embeddings generated. This exercise uses two steps:
- Import — use
cbshto chunk and import raw markdown into a collection nameddocumentation(no embedding yet) - Vectorize — use the Capella AI Services vectorization workflow to generate embeddings automatically inside the database
cbshis pre-installed by the devcontainerpostCreateCommand— no manual install needed. Run allcbshcommands from the repository root so thatscripts/paths resolve correctly.
- Sign up at cloud.couchbase.com/signup
- Create a cluster (Couchbase Server 8.0+, Search Service and Eventing Service enabled)
- Inside the cluster create:
- Bucket:
shared - Scope:
public - Collection:
documentation
- Bucket:
- Go to Organization Settings → API Keys → Generate Key and copy the access key and secret
We will start by configuring your org and API key, yourOrgIdentifier can be whatever you want. It will be used later on to associate an API key with a cluster configuration.
- create a folder named
.cbshin the same folder, where Couchbase Shell executable will be run, or in your home directory like~/.cbsh/ - open/create
~/.cbsh/configand edit this file with the following content (code ~/.cbsh/configif you are using codepsace):
version = 1
[[capella-organization]]
identifier = "yourOrgIdentifier"
access-key = "yourAccessKey"
secret-key = "yourSecretKey"
default-project = "Trial - Project"
Note
- The value of
identifierkey in this config file will also be used in step 2.3
Let's launch Couchbase Shell and explore its interactive command-line interface.
cbshYou should see the Couchbase Shell prompt:
# MacOS/Linux
👤 🏠
>
# Windows PowerShell
>
You'll now tell Couchbase Shell how to connect to your cloud cluster by providing the connection string, username, and password you created earlier.
Select the Capella project you will be working on:
# List all your projects
projects# Select a Project
projects | cb-env project $in.0.nameHere we using the first row with 0 but if you have multiple projects, 0 refers to the row number.
⚠ Understanding the following is important for the rest of the workshop, as we will manipulate JSON, which are all dataframes in a Couchbase Shell context.
Couchbase Shell is based on nushell, where everything structured is managed as a dataframe, and every commands can be piped. Here Project returns the list of projects your API Key gives you access to. You can type projects to display the list. It's piped in the next command cb-env project that requires a string argument. (type cb-env project -h to see the details of the command). '$in' refers to whatever was piped in that command. As it's a list of records, '$in.0.name' will get the first element of the list, then the value of the record 'name'.
Now that the Project has been selected, we can list available clusters by running the clusters command.
We can assign the name our Free Tier cluster by running:
let cluster_name = clusters | $in.0.nameThis variable will be accessible with $cluster_name until you exit Couchbase Shell.
The following command allows you to register the cluster:
Note
Please be sure that the parameter --capealla-organization has the same value with the identifier key, which you've already defined in your config file in step 2.1
# Register your cluster
( clusters get $cluster_name | cb-env register $cluster_name $in."connection string"
--capella-organization "yourOrgIdentifier"
--project (projects | $in.0.name)
--default-bucket shared
--default-scope public
--default-collection documentation
--username cbsh
--password yourPassword123!
--save )cb-env cluster $cluster_nameNote
Replace:
your-password with the password you will create (yes we are setting up the connection before creating the user, and it must contain an uppercase letter, lowercase letter, number and special character, and minimum 8 chars long.)
With an active Project and Cluster, we can create the cluster user.
credentials create --read --write --username cbsh --password yourPassword123!Note
As you have modified the cbsh configuration, you have to exit and reenter cbsh to take it into account.
Run cbsh from the repository root:
cbshcb-env cluster <your-cluster-identifier>
use scripts/couchbase.nu *
use scripts/importers.nu *
# Import raw chunks — no embedding step
import_markdown_no_embed scripts/content/files/en-us/glossary1/ "glossary" "a glossary of IT terms"
This reads all markdown files, chunks them, assigns a content hash as document ID, and upserts into the documentation collection. No OpenAI calls are made.
Now use the Capella AI Services vectorization workflow to generate embeddings for all documents in documentation and create a vector search index automatically.
- In Capella, go to AI Services → Workflows → Create New Workflow
- Click Data from Capella
- Give the workflow a name and click Setup Workflow
- Under Data Source, select your cluster, then:
- Bucket:
shared - Scope:
public - Collection:
documentation
- Bucket:
- Under Source Fields, click Map all source fields to a single vector field
- Set the Vector Field name to
vector
- Set the Vector Field name to
- Click Next
- Under Embedding Model, click Capella Model
- Select your available embedding model
- Add your API key ID and Token
- Click Next, verify the configuration, then click Run Workflow
The workflow generates a vector field on every document in documentation and creates a vector search index. Wait for the workflow status to show all documents processed before moving to Exercise 3.
See: Vectorize Structured Data from Capella
Add to backend/.env:
COUCHBASE_SEARCH_INDEX_NAME=<index-name-created-by-the-workflow>The index name is shown in the Capella AI Services workflow detail page after the workflow completes.
query "CREATE PRIMARY INDEX ON `default`:`shared`.`public`.`documentation`"The backend embeds the user query, searches Couchbase for the most relevant documents, injects them into the prompt, and streams the OpenAI response. Switch to the RAG Chat tab in the UI.
Add to backend/.env:
COUCHBASE_CONNECTION_STRING=couchbases://your-cluster-endpoint
COUCHBASE_USERNAME=your-username
COUCHBASE_PASSWORD=your-password
COUCHBASE_BUCKET_NAME=sharedIn backend/services/openai_service.py:
async def get_embedding(text: str) -> list[float]:
client = _get_embeddings_client()
response = await client.embeddings.create(model=EMBEDDING_MODEL, input=text)
return response.data[0].embeddingIn backend/services/couchbase_service.py:
The Capella AI Services workflow creates a SQL++ GSI vector index (not an FTS index). Query it using
ORDER BY ANN_DISTANCE()via SQL++, notscope.search().
async def get_relevant_documents(embedding: list[float], name: str | None = None) -> list[dict]:
cluster = _get_cluster()
bucket_name = os.environ["COUCHBASE_BUCKET_NAME"]
index_name = os.environ["COUCHBASE_SEARCH_INDEX_NAME"]
sql = f"""
SELECT META(d).id AS id,
d.filepath,
d.content,
ANN_DISTANCE(d.vector, $embedding, "L2") AS score
FROM `{bucket_name}`.`{SCOPE_NAME}`.`documentation` AS d
USE INDEX ({index_name} USING GSI)
ORDER BY ANN_DISTANCE(d.vector, $embedding, "L2")
LIMIT 4
"""
result = cluster.query(sql, QueryOptions(named_parameters={"embedding": embedding}))
documents = []
for row in result.rows():
documents.append({
"id": row.get("id", ""),
"filepath": row.get("filepath", ""),
"content": row.get("content", ""),
"score": row.get("score", 0.0),
})
return documentsIn backend/services/openai_service.py:
async def stream_completion(prompt: str):
client = _get_inference_client()
stream = await client.chat.completions.create(
model=INFERENCE_MODEL,
messages=[
{"role": "system", "content": "Return plain text, no markdown. Be informal and conversational."},
{"role": "user", "content": prompt},
],
stream=True,
)
async for chunk in stream:
if not chunk.choices:
continue
token = chunk.choices[0].delta.content
if token:
yield tokenIn backend/main.py, replace the placeholder in the query function:
@app.post("/api/query")
async def query(body: QueryRequest):
if not body.q or not body.q.strip():
raise HTTPException(status_code=400, detail="Query is required.")
embedding = await get_embedding(body.q)
documents = await get_relevant_documents(embedding)
document_list = "\n\n".join(
f"Document {i+1}:\n ID: {doc['id']}\n Filepath: {doc['filepath']}\n Score: {doc['score']}\n Content: {doc['content']}"
for i, doc in enumerate(documents)
)
prompt = (
"You are a Web MDN Documentation expert.\n"
"Answer the user query using the documents below.\n\n"
f"{document_list}\n\n"
f"User Query: {body.q}\n\n"
"Reference document IDs and filepaths where relevant."
)
return StreamingResponse(stream_completion(prompt), media_type="text/plain; charset=utf-8")Restart the backend, then click the RAG Chat tab. Ask something like "What is an array?" — the response will stream in and reference MDN documentation.
Every message is stored in Couchbase so the model can answer follow-up questions like "What did I just ask?".
In Couchbase Capella Query Workbench:
CREATE COLLECTION `shared`.`_default`.`conversations`;
CREATE INDEX idx_conversation_session
ON `shared`.`_default`.`conversations`(session_id, timestamp)
WHERE type = "chat_message";Or with cbsh:
query 'CREATE COLLECTION `shared`.`_default`.`conversations`;'
query 'CREATE INDEX idx_conversation_session ON `shared`.`_default`.`conversations`(session_id, timestamp) WHERE type = "chat_message";'
In backend/services/conversation_service.py, implement all four functions:
add_message:
async def add_message(session_id: str, content: str, role: str) -> None:
cluster = _get_cluster()
collection = cluster.bucket(BUCKET_NAME).scope(SCOPE).collection(COLLECTION)
doc = {
"session_id": session_id,
"role": role,
"content": content,
"timestamp": datetime.now(timezone.utc).isoformat(),
"type": "chat_message",
}
key = f"{session_id}_{int(datetime.now().timestamp() * 1000)}_{role}"
collection.insert(key, doc)get_conversation_history:
async def get_conversation_history(session_id: str, limit: int = 10) -> list[dict]:
cluster = _get_cluster()
sql = f"""
SELECT content, `role`, timestamp
FROM `{BUCKET_NAME}`.`{SCOPE}`.`{COLLECTION}`
WHERE session_id = $session_id AND type = "chat_message"
ORDER BY timestamp DESC LIMIT $limit
"""
result = cluster.query(sql, QueryOptions(named_parameters={"session_id": session_id, "limit": limit}))
messages = [{"role": r["role"], "content": r["content"], "timestamp": r["timestamp"]} for r in result.rows()]
messages.reverse()
return messagesformat_conversation_history:
def format_conversation_history(messages: list[dict]) -> str:
if not messages:
return "No previous conversation history."
return "\n".join(
f"{'User' if m['role'] == 'user' else 'Assistant'}: {m['content']}"
for m in messages
)clear_conversation_history:
async def clear_conversation_history(session_id: str) -> None:
cluster = _get_cluster()
sql = f"""
DELETE FROM `{BUCKET_NAME}`.`{SCOPE}`.`{COLLECTION}`
WHERE session_id = $session_id AND type = "chat_message"
"""
cluster.query(sql, QueryOptions(named_parameters={"session_id": session_id}))Replace the route body in main.py:
@app.post("/api/query")
async def query(body: QueryRequest):
if not body.q or not body.q.strip():
raise HTTPException(status_code=400, detail="Query is required.")
session_id = body.session_id or "default-session"
await add_message(session_id, body.q, "user")
history = await get_conversation_history(session_id, limit=10)
formatted_history = format_conversation_history(history)
embedding = await get_embedding(body.q)
documents = await get_relevant_documents(embedding)
document_list = "\n\n".join(
f"Document {i+1}:\n ID: {doc['id']}\n Filepath: {doc['filepath']}\n Score: {doc['score']}\n Content: {doc['content']}"
for i, doc in enumerate(documents)
)
prompt = (
"You are a Web MDN Documentation expert with access to conversation history.\n\n"
f"CONVERSATION HISTORY:\n{formatted_history}\n\n"
f"RELEVANT DOCUMENTS:\n{document_list}\n\n"
f"CURRENT QUERY: {body.q}\n\n"
"Answer using the documents and history. Reference document IDs and filepaths where relevant."
)
async def generate_and_store():
full_response = ""
async for token in stream_completion(prompt):
full_response += token
yield token
await add_message(session_id, full_response, "assistant")
return StreamingResponse(generate_and_store(), media_type="text/plain; charset=utf-8")In main.py, replace the placeholder bodies:
@app.get("/api/conversation/history")
async def get_history(session_id: str, limit: int = 10):
messages = await get_conversation_history(session_id, limit)
return {"session_id": session_id, "messages": messages, "count": len(messages)}
@app.delete("/api/conversation/clear")
async def clear_history(body: ClearRequest):
await clear_conversation_history(body.session_id)
return {"success": True}Now you can try the conversation feature in the RAG chat. For instance tell your name in a first message, and then ask what is your name.
Instead of passing raw message history to the prompt, use Couchbase Capella's built-in
ai_summary SQL++ function to compress it. The summarization runs inside the database
— no extra API call from the backend is needed.
Enable the Summarization AI Function on your Capella cluster:
- In Capella, go to AI Services → AI Functions
- Click Enable AI Functions
- Select Summarization and click Next
- Choose your LLM model (OpenAI, Bedrock, or Capella Model Service) and configure credentials
- Select your operational cluster and click Complete Setup
- Wait for the status to show Healthy before proceeding
See: Capella AI Functions — Summarization
In backend/services/conversation_service.py:
async def summarize_conversation(session_id: str, max_words: int = 150) -> str:
cluster = _get_cluster()
history = await get_conversation_history(session_id)
if len(history) < 2:
return "No conversation to summarize."
text = "\n".join(
f"{'User' if m['role'] == 'user' else 'Assistant'}: {m['content']}"
for m in history
)
sql = """
SELECT default:ai_summary({
"text": $text,
"max_words": $max_words,
"temperature": 0.3
}) AS summary
"""
result = cluster.query(
sql,
QueryOptions(named_parameters={"text": text, "max_words": max_words})
)
rows = list(result.rows())
return rows[0]["summary"][0]["response"]In main.py, replace formatted_history in the prompt with the summary:
# Replace this:
history = await get_conversation_history(session_id, limit=10)
formatted_history = format_conversation_history(history)
# With this:
formatted_history = await summarize_conversation(session_id)The prompt stays compact regardless of how long the conversation grows.
Restart the backend and try in the RAG Chat tab:
- Ask: "What is the JavaScript Array.map() method?"
- Ask: "What was my previous question?"
- Ask: "Can you explain that in simpler terms?"
Check the Capella Query Workbench to see the ai_summary function being called.
Semantically similar queries are served from cache without calling OpenAI, reducing latency and cost.
In Couchbase Capella:
- Create a new bucket named
semantic_cache - Inside it, create a collection named
semanticin the_defaultscope - Create a primary index on this collection
- Create a SQL++ vector index on the collection. In the Capella Query tab run:
CREATE VECTOR INDEX `semantic_cache_vector_idx`
ON `semantic_cache`.`_default`.`semantic`(`vector` VECTOR)
WITH {
"dimension": 2048,
"similarity": "L2",
"description": "IVF,SQ8"
}Note
- Adjust
"dimension"to match your embedding model output size (2048 fornvidia/llama-3.2-nv-embedqa-1b-v2, 1536 fortext-embedding-3-small). - You will most likely have an error due to the absence of existing documents but the index will still be created.
Or with cbsh:
buckets create semantic_cache 200
collections create --bucket semantic_cache --scope _default semantic
query 'CREATE PRIMARY INDEX ON `default`:`semantic_cache`.`_default`.`semantic`'
query 'CREATE VECTOR INDEX `semantic_cache_vector_idx` ON `semantic_cache`.`_default`.`semantic`(`vector` VECTOR) WITH { "dimension": 2048, "similarity": "L2", "description": "IVF,SQ8"}'
- Add to
backend/.env:
CACHE_INDEX=semantic_cache_vector_idxIn backend/services/semantic_cache_service.py:
cache_get:
async def cache_get(prompt, embedding, llm_signature, similarity_threshold=0.85, k=3):
cluster = _get_cluster()
try:
sql = f"""
SELECT META(c).id AS id,
c.llm_signature,
c.response,
ANN_DISTANCE(c.vector, $embedding, "L2") AS score
FROM `{CACHE_BUCKET}`.`{CACHE_SCOPE}`.`{CACHE_COLLECTION}` AS c
USE INDEX (`semantic_cache_idx` USING GSI)
ORDER BY ANN_DISTANCE(c.vector, $embedding, "L2")
LIMIT {k}
"""
result = cluster.query(
sql, QueryOptions(named_parameters={"embedding": embedding})
)
for row in result.rows():
# ANN_DISTANCE with L2: lower = more similar, so skip if score is too high
if row.get("score", 1.0) > similarity_threshold:
continue
if row.get("llm_signature") == llm_signature:
print(f"Cache HIT (score={row.get('score', '?'):.3f})")
return row["response"]
except Exception as e:
print(f"Cache lookup error: {e}")
return Nonecache_put:
async def cache_put(prompt, embedding, llm_signature, response, ttl_minutes=1440):
import uuid
from couchbase.options import UpsertOptions
cluster = _get_cluster()
collection = (
cluster.bucket(CACHE_BUCKET).scope(CACHE_SCOPE).collection(CACHE_COLLECTION)
)
doc = {
"prompt": prompt,
"response": response,
"llm_signature": llm_signature,
"vector": embedding,
}
collection.upsert(
str(uuid.uuid4()),
doc,
UpsertOptions(expiry=timedelta(minutes=ttl_minutes)),
)Wrap the RAG pipeline with cache check/store in main.py:
@app.post("/api/query")
async def query(body: QueryRequest):
if not body.q or not body.q.strip():
raise HTTPException(status_code=400, detail="Query is required.")
session_id = body.session_id or "default-session"
llm_sig = create_llm_signature(INFERENCE_MODEL, 0.7, 1000, "MDN expert")
embedding = await get_embedding(body.q)
cached = await cache_get(body.q, embedding, llm_sig)
if cached:
async def from_cache():
yield cached
return StreamingResponse(from_cache(), media_type="text/plain; charset=utf-8")
await add_message(session_id, body.q, "user")
history = await get_conversation_history(session_id, limit=10)
formatted_history = format_conversation_history(history)
documents = await get_relevant_documents(embedding)
document_list = "\n\n".join(
f"Document {i+1}:\n ID: {doc['id']}\n Filepath: {doc['filepath']}\n Score: {doc['score']}\n Content: {doc['content']}"
for i, doc in enumerate(documents)
)
prompt = (
"You are a Web MDN Documentation expert with access to conversation history.\n\n"
f"CONVERSATION HISTORY:\n{formatted_history}\n\n"
f"RELEVANT DOCUMENTS:\n{document_list}\n\n"
f"CURRENT QUERY: {body.q}\n\n"
"Answer using the documents and history. Reference document IDs and filepaths where relevant."
)
async def generate_and_store():
full_response = ""
async for token in stream_completion(prompt):
full_response += token
yield token
await add_message(session_id, full_response, "assistant")
await cache_put(body.q, embedding, llm_sig, full_response)
return StreamingResponse(generate_and_store(), media_type="text/plain; charset=utf-8")Restart the backend and send the same query twice. The second response should be instant. Check the terminal for Cache HIT log messages.
# Backend
cd backend
eval $(poetry env activate)
python main.py
# Frontend (separate terminal)
cd frontend
npm run devThe app runs at http://localhost:3000.
- Simple Chat tab — Exercise 1 chatbot
- RAG Chat tab — Exercises 3–5 RAG application
- Agent Chat tab — Exercises 6–7 multi-agent system
A multi-agent system using LangGraph and the Couchbase Agent Catalog. A router agent classifies each user message and either answers directly or hands off to a math agent equipped with calculation tools. The Agent Catalog manages tool discovery and versioning.
User message
│
[router] ──── direct answer ────▶ response
│
└── math question ──▶ [math_agent] ──▶ response
Add to backend/.env:
AGENT_CATALOG_CONN_STRING=couchbases://your-cluster-endpoint
AGENT_CATALOG_USERNAME=your-username
AGENT_CATALOG_PASSWORD=your-password
AGENT_CATALOG_BUCKET=sharedInitialise the catalog. Run from the repository root (where .git lives) so agentc can install its post-commit hook:
cd /path/to/ais-hol
PYTHONPATH=/workspaces/ais-hol/backend/ agentc initOpen backend/agents/math_tools.py. The file defines five functions decorated with @agentc_tool (imported from agentc_core.tool):
from agentc_core.tool import tool as agentc_tool
@agentc_tool
def add(a: float, b: float) -> float:
"""Add two numbers and return the result."""
return a + b
@agentc_tool
def evaluate_expression(expression: str) -> float:
"""Evaluate a mathematical expression string (e.g. 'sqrt(144) + 10')."""
return _safe_eval(expression)The evaluate_expression tool uses a whitelist-based safe eval — only names from Python's math module are permitted.
The agent's system prompt and tool list are declared in backend/agents/prompts/math_agent.yaml:
record_kind: prompt
name: math_agent
description: System prompt and tools for the math agent.
content:
agent_instructions: >
You are a precise math assistant. Use the available tools to evaluate
the user's calculation request. Always use a tool — do not compute
answers in your head.
tools:
- name: add
- name: subtract
- name: multiply
- name: divide
- name: evaluate_expressionagentc index resolves the tools list at index time. At runtime, catalog.find("prompt", name="math_agent") returns the prompt with tool functions already attached — no manual catalog.find("tool", ...) calls needed.
Run from the ** root directory** after exporting env vars:
export $(grep -v '^#' backend/.env | grep -v '^$' | xargs)
PYTHONPATH=/workspaces/ais-hol/backend/ agentc index ./backend/agents/prompts/
PYTHONPATH=/workspaces/ais-hol/backend/ agentc index ./backend/agents/
PYTHONPATH=/workspaces/ais-hol/backend/ agentc publihPYTHONPATH=. is required so that from agents.state import AgentState resolves when agentc imports the tool files. Both tools and prompts are indexed and published in one pass.
Important:
publishrequires a clean git working tree — commit any changes before running it. Re-runindexthenpublishevery time you modify a tool or prompt file. This can be quickly achieve with something like:
git checkout -b testbranch # optionaly switch to a new branch
git add . # adding everything in the repo
git commit -m"your commit message"Open backend/agents/router_agent.py. The router uses an LLM with structured output to classify the message:
class RouterDecision(BaseModel):
route: Literal["direct", "math", "faq"]
answer: str | None = None
missing_topic: str | None = None"direct"→ router answers immediately"math"→Command(goto="math_agent")"faq"→ FAQ catalog lookup (Exercise 7)
Open backend/agents/math_agent.py. It extends agentc_langgraph.ReActAgent, which fetches the math_agent prompt (and its attached tools) from the catalog and wraps each invocation in an agentc Span for activity logging:
class MathAgent(agentc_langgraph.agent.ReActAgent):
def __init__(self, catalog: agentc.Catalog, span: agentc.Span):
super().__init__(
chat_model=_get_llm(),
catalog=catalog,
span=span,
prompt_name="math_agent", # resolves prompts/math_agent.yaml
)
async def _ainvoke(self, span, state, config):
agent = self.create_react_agent(span) # attaches ToolNode + Callback
result = await agent.ainvoke({"messages": [("user", state["message"])], ...})
return Command(goto="__end__", update={"answer": result["messages"][-1].content})create_react_agent(span) wraps the tool node with agentc_langgraph.ToolNode (logs tool results) and attaches a Callback to the chat model (logs completions and tool calls).
Open backend/agents/graph.py. The graph is wrapped in agentc_langgraph.GraphRunnable, which creates a root Span and encloses every invocation in it. catalog and span are injected into the math and FAQ nodes via functools.partial:
class AgentGraph(agentc_langgraph.graph.GraphRunnable):
async def acompile(self):
builder = StateGraph(AgentState)
builder.add_node("router", router_node)
builder.add_node("math_agent",
functools.partial(math_agent_node, catalog=self.catalog, span=self.span))
builder.set_entry_point("router")
return builder.compile()
agent_graph = AgentGraph(catalog=agentc.Catalog())In backend/main.py the route is already wired. Note that previous_node must be initialised to None in the input state — it is used internally by the agentc span logging:
@app.post("/api/agent")
async def agent(body: AgentRequest):
result = await agent_graph.ainvoke({"message": body.message, "previous_node": None})
return {
"response": result.get("answer", ""),
"routed_to": result.get("routed_to", "router"),
...
}Restart the backend and open the Agent Chat tab.
- Ask
"What is 2 + 2?"→ routed to Math Agent, badge showsMATH AGENT - Ask
"What is the capital of France?"→ answered directly, badge showsROUTER - Ask
"sqrt(144) + 10"→ routed to Math Agent, returns22.0
API: POST /api/agent — { "message": "..." } returns { "response": "...", "routed_to": "...", "faq_collection": null, "missing_topic": null, "timestamp": "..." }
Extend the multi-agent graph with a FAQ search agent. Multiple FAQ PDFs are each ingested into their own Couchbase collection. The router embeds the user question, compares it against FAQ metadata stored in a faq_catalog collection, and routes to the FAQ search agent when a match is found. When no FAQ covers the topic, the router returns an informative message.
User message
│
[router] ──── direct answer ──────────────────────▶ response
│
├── math question ──▶ [math_agent] ─────────────▶ response
│
├── FAQ match found ──▶ [faq_search_agent] ──────▶ response (badge: FAQ Search · hr_policy)
│
└── no FAQ match ──── informative message ───────▶ response (badge: No FAQ found · topic)
Create an S3 bucket (or use an existing one). Upload one or more FAQ PDFs, each representing a distinct topic. Choose a short snake_case name for each (e.g. hr_policy, product_manual) — this will become the Couchbase collection name.
For each FAQ PDF:
- In Capella, go to AI Services → Workflows → Create New Workflow
- Click Data from S3
- Give the workflow a name and click Start Workflow
- Under Data Source, configure:
- S3 Bucket URL: your S3 bucket URL (e.g.
s3://my-bucket/hr_policy.pdf) - AWS Access Key ID and Secret Access Key
- S3 Bucket URL: your S3 bucket URL (e.g.
- Under Target, select your cluster, then:
- Bucket:
shared - Scope:
public - Collection: your chosen name (e.g.
hr_policy)
- Bucket:
- Under Embedding Model, click External Model
- Select
text-embedding-3-smallfrom the OpenAI model list - Add your OpenAI API key
- Select
- Click Next, verify, then click Run Workflow
Wait for the workflow to complete. Each document in the collection will have content and vector fields.
Rename the vector index after the workflow completes. The Capella workflow creates a vector index with an auto-generated name. Rename it to
shared.public.<collection_name>_vector_idx(e.g.shared.public.hr_policy_vector_idx) so thehybrid_faq_searchtool can find it. You can rename it in the Capella Search UI or viacbsh:search index update shared.public.<auto-generated-name> --new-name shared.public.hr_policy_vector_idx
Run cbsh from the repository root, then create a Full-Text Search index on the new collection.
Index naming is required. The
hybrid_faq_searchtool looks up indexes by the conventionshared.public.<collection_name>_fts_idx. Use exactly this pattern — replacehr_policywith your collection name.
cb-env cluster <your-cluster-identifier>
# Replace hr_policy with your collection name
search index create shared.public.hr_policy_fts_idx \
--type fulltext-index \
--source-name shared \
--source-type couchbase \
--params '{
"mapping": {
"default_mapping": {
"enabled": true,
"dynamic": false,
"properties": {
"content": { "enabled": true, "dynamic": false,
"fields": [{ "name": "content", "type": "text", "analyzer": "standard", "index": true }]
}
}
},
"default_type": "_default",
"default_analyzer": "standard"
},
"store": { "indexType": "scorch" }
}'
Repeat for each FAQ collection, changing the index name and collection accordingly.
The faq_catalog collection stores metadata embeddings used by the router to match questions to FAQs. Create its vector index.
Index naming is required. The FAQ catalog service looks up this index by the fixed name
shared.public.faq_catalog_idx. Do not change it.
search index create shared.public.faq_catalog_idx \
--type fulltext-index \
--source-name shared \
--source-type couchbase \
--params '{
"mapping": {
"default_mapping": {
"enabled": true,
"dynamic": false,
"properties": {
"vector": { "enabled": true, "dynamic": false,
"fields": [{ "name": "vector", "type": "vector",
"dims": 1536, "similarity": "dot_product" }]
}
}
}
},
"store": { "indexType": "scorch" }
}'
After ingestion, register the FAQ so the router can discover it. Run this once per FAQ from a Python shell inside backend/:
import asyncio
from services.faq_catalog_service import register_faq
asyncio.run(register_faq(
collection_name="hr_policy",
display_name="HR Policy FAQ",
description="Answers to common HR questions about leave, benefits, conduct, and payroll.",
))This upserts a metadata document with an embedding of the description into the faq_catalog collection.
Open backend/services/faq_catalog_service.py. The key function is find_best_faq:
async def find_best_faq(question_embedding: list[float]) -> dict | None:
# Runs a VectorQuery against faq_catalog_idx
# Returns the top FAQ metadata doc if score >= FAQ_SIMILARITY_THRESHOLD
# Returns None otherwiseThe threshold is controlled by FAQ_SIMILARITY_THRESHOLD in .env (default 0.75).
Open backend/agents/faq_search_tools.py. The hybrid_faq_search tool runs both a vector search and an FTS search against the target collection, then merges and deduplicates results by document ID:
from agentc_core.tool import tool as agentc_tool
@agentc_tool
def hybrid_faq_search(query: str, collection_name: str) -> list[dict]:
"""Search a FAQ collection using both vector similarity and full-text search."""
...The tool looks up indexes by a fixed naming convention — your index names in cbsh must match exactly:
| Index type | Expected name |
|---|---|
| Vector | shared.public.<collection_name>_vector_idx |
| FTS | shared.public.<collection_name>_fts_idx |
The agent's system prompt is declared in backend/agents/prompts/faq_search_agent.yaml:
record_kind: prompt
name: faq_search_agent
description: System prompt and tools for the FAQ search agent.
content:
agent_instructions: >
You are a helpful assistant that answers questions using FAQ documentation.
Use the hybrid_faq_search tool to find relevant content, then synthesise a
clear, accurate answer based only on what the documents say.
tools:
- name: hybrid_faq_searchAfter implementing, commit your changes, then re-index and publish:
cd backend
export $(grep -v '^#' .env | grep -v '^$' | xargs)
AGENT_CATALOG_CONN_ROOT_CERTIFICATE=/path/to/ais-hol/backend/certificate \
PYTHONPATH=. \
.venv/bin/agentc --no-config index ./agents/
AGENT_CATALOG_CONN_ROOT_CERTIFICATE=/path/to/ais-hol/backend/certificate \
.venv/bin/agentc --no-config publish --bucket sharedOpen backend/agents/faq_search_agent.py. It extends agentc_langgraph.ReActAgent and binds collection_name into the tool before the ReAct loop runs — the LLM only needs to supply the query argument:
class FaqSearchAgent(agentc_langgraph.agent.ReActAgent):
def __init__(self, catalog, span, collection_name):
super().__init__(chat_model=_get_llm(), catalog=catalog, span=span,
prompt_name="faq_search_agent")
# Pre-fill collection_name so the LLM only sees query
self.tools = [self._bind_collection(t, collection_name) for t in self.tools]Open backend/agents/router_agent.py. The router now:
- Embeds the user question with
get_embedding() - Calls
find_best_faq(embedding)to check the catalog - If a match is found →
Command(goto="faq_search_agent")withfaq_collectionin state - If no match → returns an informative message with
missing_topicset
Open backend/agents/graph.py — add the faq_search_agent node with catalog and span injected via functools.partial:
builder.add_node("faq_search_agent",
functools.partial(faq_search_agent_node, catalog=self.catalog, span=self.span))Ingest and register two different FAQ PDFs (e.g. hr_policy and product_manual), then restart the backend and test in the Agent Chat tab:
- Ask
"How many days of annual leave do I get?"→ badge:FAQ SEARCH · hr policy - Ask
"How do I reset my product license?"→ badge:FAQ SEARCH · product manual - Ask
"What is the refund policy?"(no matching FAQ) → badge:NO FAQ FOUND · refund policy - Ask
"What is 15 * 7?"→ badge:MATH AGENT(Exercise 6 still works)