-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpayments_agent_api.py
More file actions
211 lines (174 loc) · 7.76 KB
/
payments_agent_api.py
File metadata and controls
211 lines (174 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# payments_agent_api.py
#
# FastAPI wrapper around the NL-to-SQL pipeline.
# Exposes three endpoints:
# POST /ask -> the main one, takes a question and returns SQL + rows + summary
# GET /health -> just returns ok, useful for n8n preflight checks
# GET /schema -> dumps the DB schema as text, good for debugging the LLM prompts
#
# Run with:
# python -m uvicorn payments_agent_api:app --reload
#
# Then open http://localhost:8000/docs to try it interactively (Swagger UI).
#
# ---- LESSONS LEARNED ----
# 1. Pydantic v2 changed how Field examples work — you now pass a list to "examples"
# instead of a single value in "example". Burned 20 min on that.
# 2. CORS middleware needs to go before any route definitions or it doesn't always apply.
# not 100% sure why, just moved it to the top and it worked.
# 3. The timing middleware was fun to add — shows up as X-Process-Time in response headers.
# Useful for spotting when the LLM is slow.
# 4. Figured out that HTTPException with status_code=422 is the right code for
# "your question produced unsafe SQL" — it's a client error, not a server error.
# 5. FastAPI serializes tuples as lists in JSON automatically, which is what we want.
# didn't need to do anything special for the rows field.
# -------------------------
import logging
import time
from typing import Any
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
# pull in the pipeline from the agent module
from payments_nl_sql_agent import answer_question, get_schema_description
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger(__name__)
# ---- app setup ---------------------------------------------------------------
app = FastAPI(
title = "B2B Payments AI Assistant",
description = (
"Ask questions about B2B LatAm payment data in plain English or Spanish. "
"Get back the SQL, raw rows, and a business summary. "
"LLM calls are currently stubbed — see payments_nl_sql_agent.py to add Claude."
),
version = "1.0.0",
docs_url = "/docs", # Swagger UI lives here
redoc_url= "/redoc",
)
# allow all origins for now — restrict this in production obviously
# FIXME: lock this down before going live
# NOTE: allow_credentials=True + allow_origins=["*"] is actually an invalid combo —
# browsers reject credentialed cross-origin requests when origin is wildcard.
# keeping credentials=False here until we have a real origin allowlist.
app.add_middleware(
CORSMiddleware,
allow_origins = ["*"],
allow_credentials = False,
allow_methods = ["*"],
allow_headers = ["*"],
)
# ---- middleware --------------------------------------------------------------
@app.middleware("http")
async def add_timing_header(request: Request, call_next):
# adds X-Process-Time to every response so you can see how long things took
# handy for spotting when the LLM stub vs real API is running
t0 = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - t0
response.headers["X-Process-Time"] = f"{elapsed:.3f}s"
return response
# ---- request/response models -------------------------------------------------
class QuestionRequest(BaseModel):
# what the caller sends to /ask
# min_length=5 just to stop people sending empty strings or "hi"
question: str = Field(
...,
min_length = 5,
max_length = 500,
description = "A question about the payments data, in English or Spanish.",
examples = [
"What was the total payment volume last month?",
"Which clients have the highest number of failed payments?",
"¿Cuál fue el volumen total de pagos fallidos en enero para clientes enterprise?",
],
)
class AnswerResponse(BaseModel):
# what /ask sends back
# keeping question in the response so the caller doesn't have to track it
question : str
sql : str
rows : list[Any] # list of tuples, FastAPI serializes them as lists in JSON
summary : str
class SchemaResponse(BaseModel):
# just wraps the schema string so it looks nice in Swagger
schema_description: str
class HealthResponse(BaseModel):
status : str
service : str
version : str
# ---- endpoints ---------------------------------------------------------------
@app.get("/health", response_model=HealthResponse, tags=["ops"])
def health_check():
# simple liveness check — if this returns 200 the service is up
# useful as a preflight in n8n before hitting /ask
return HealthResponse(status="ok", service="payments-ai-assistant", version="1.0.0")
@app.get("/schema", response_model=SchemaResponse, tags=["debug"])
def get_schema():
# returns the schema text that gets injected into LLM prompts
# good for checking that the DB looks right or tuning the prompt manually
try:
desc = get_schema_description()
return SchemaResponse(schema_description=desc)
except Exception as e:
log.exception("couldn't read schema")
raise HTTPException(status_code=500, detail=str(e)) from e
@app.post("/ask", response_model=AnswerResponse, tags=["analytics"])
def ask_question(request: QuestionRequest):
# This is where I got stuck the longest:
# figuring out whether to raise 422 or 500 depending on what went wrong.
# settled on: if the LLM gave us bad SQL -> 422 (client-ish problem)
# if sqlite crashed -> 500 (server problem)
#
# n8n example:
# HTTP Request node
# Method: POST
# URL: http://localhost:8000/ask
# Body (JSON): { "question": "What was the total payment volume last month?" }
#
# Make (Integromat) example:
# HTTP module -> Make a request
# URL: http://localhost:8000/ask
# Method: POST
# Body type: Raw (application/json)
# Body: { "question": "..." }
log.info(f"POST /ask | question={request.question!r}")
result = answer_question(request.question)
# if the pipeline caught an error, surface it as an HTTP error
# so n8n / Make can route to their error branch
if result.get("error"):
err = result["error"]
# bad SQL from the LLM = 422, everything else = 500
if "guardrail" in err.lower() or "select" in err.lower() or "forbidden" in err.lower():
raise HTTPException(status_code=422, detail=err)
raise HTTPException(status_code=500, detail=err)
return AnswerResponse(
question = result["question"],
sql = result["sql"],
rows = result["rows"],
summary = result["summary"],
)
# ---- global error handler ----------------------------------------------------
@app.exception_handler(Exception)
async def catch_all(request: Request, exc: Exception):
# catch anything that slips past the endpoint's own try/except
# returns structured JSON instead of a raw 500 page — important for n8n/Make
# which need to parse the response body even on errors
log.exception(f"unhandled error on {request.method} {request.url.path}")
return JSONResponse(
status_code = 500,
content = {
"detail": "something went wrong in the payments assistant",
"error": str(exc),
},
)
# ---- dev convenience ---------------------------------------------------------
if __name__ == "__main__":
# can run directly with `python payments_agent_api.py` during dev
# for anything more serious use: uvicorn payments_agent_api:app --host 0.0.0.0 --port 8000
import uvicorn
uvicorn.run("payments_agent_api:app", host="127.0.0.1", port=8000, reload=True)