Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 176 additions & 0 deletions docs/runbooks/query-log-writer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Query Log Writer Runbook

Use this when durable rows are missing from `ducklake.system.query_log`, Kafka
query-log lag is rising, or you need to confirm that per-query resource usage is
available after a rollout.

## Mental Model

Duckgres has two query-log paths:

- Direct DuckLake sink: the control plane writes query-log rows straight into
each tenant's `ducklake.system.query_log`. This is the default when no Kafka
query-log producer config is present.
- Kafka sink: control-plane pods publish query-log events to Kafka when
`DUCKGRES_QUERY_LOG_SINK=kafka`; the `duckgres-query-log-writer` Deployment
consumes those events and writes generated rows into the same tenant table.

Both paths write the same logical row shape, including `cpu_time_s` and
`peak_buffer_memory_bytes`. `cpu_time_s` is DuckDB profiling CPU/thread time in
seconds. `peak_buffer_memory_bytes` is DuckDB peak buffer memory, not process
RSS or cgroup memory.

The Kafka writer commits offsets only after a successful durable write or an
intentional drop for an invalid/non-targetable event. Retryable write failures
should hold the partition in place until the writer can write or an operator
fixes the underlying target.

## Check Producer Mode

Confirm the control plane is configured to publish to Kafka:

```bash
kubectl -n duckgres get deploy duckgres-control-plane \
-o jsonpath='{range .spec.template.spec.containers[0].env[*]}{.name}{" value="}{.value}{" config_key="}{.valueFrom.configMapKeyRef.key}{" optional="}{.valueFrom.configMapKeyRef.optional}{"\n"}{end}' \
| grep -E 'DUCKGRES_QUERY_LOG'

kubectl -n duckgres get configmap duckgres-query-log-kafka \
-o jsonpath='sink={.data.sink}{"\n"}brokers={.data.brokers}{"\n"}topic={.data.topic}{"\n"}client_id={.data.client_id}{"\n"}'
```

For the Kafka path, expect `DUCKGRES_QUERY_LOG_SINK=kafka` plus non-empty Kafka
broker, topic, and client-id settings. If `DUCKGRES_QUERY_LOG_SINK` is absent
or the ConfigMap has `sink` set to anything other than `kafka`, the control
plane is using the direct DuckLake sink and the writer will not receive new
events.

## Check Writer Health

```bash
kubectl -n duckgres get deploy duckgres-query-log-writer
kubectl -n duckgres logs deploy/duckgres-query-log-writer --tail=200
kubectl -n duckgres port-forward svc/duckgres-query-log-writer-metrics 9090:9090
curl -s localhost:9090/metrics | rg 'duckgres_query_log_kafka_writer'
```

Useful signals:

- `outcome="inserted"` should increase while writer rows are inserted.
- `outcome="committed"` should increase after corresponding Kafka offsets are
committed.
- `outcome="retried"` means the writer is preserving Kafka offsets and retrying
a target write.
- `outcome="failed"` means the writer hit a non-recoverable processing error.
- `outcome="dropped"` should be rare and means the event was invalid or could
not be mapped to a tenant target.

## Check Kafka Lag

Use the cluster Kafka exporter or Kafka admin tooling to inspect Kafka lag for:

- topic: the configured `DUCKGRES_QUERY_LOG_KAFKA_TOPIC`
- consumer group: `duckgres-query-log-writer`, unless overridden by
`DUCKGRES_QUERY_LOG_KAFKA_GROUP_ID`

Kafka lag increasing while `outcome="inserted"` and `outcome="committed"` are
flat usually means the writer is not running, cannot reach Kafka, or is blocked
on tenant DuckLake writes. Kafka lag increasing while `outcome="retried"` is
rising usually means the writer is deliberately holding offsets behind a
retryable write failure.

## Check Tenant Rows

Run this against the affected tenant DuckLake:

```sql
SELECT
event_time,
user_name,
query_duration_ms,
cpu_time_s,
peak_buffer_memory_bytes,
type,
left(query, 160) AS query
FROM ducklake.system.query_log
ORDER BY event_time DESC
LIMIT 20;
```

Top recent resource consumers:

```sql
SELECT
user_name,
normalized_query_hash,
count(*) AS queries,
sum(cpu_time_s) AS total_cpu_time_s,
max(peak_buffer_memory_bytes) AS max_peak_buffer_memory_bytes
FROM ducklake.system.query_log
WHERE event_time > now() - INTERVAL '1 hour'
GROUP BY 1, 2
ORDER BY total_cpu_time_s DESC
LIMIT 20;
```

If `cpu_time_s` or `peak_buffer_memory_bytes` is always zero, check whether the
query completed before profiling output was available. OOMs or worker crashes
can still miss profiling data because DuckDB may not finish writing the profile.

## Missing Schema Or Table

Check whether the table exists in the tenant DuckLake:

```sql
SELECT catalog_name, schema_name
FROM ducklake.information_schema.schemata
WHERE schema_name = 'system';

SELECT table_catalog, table_schema, table_name
FROM ducklake.information_schema.tables
WHERE table_schema = 'system'
AND table_name = 'query_log';
```

A missing schema or missing table usually means no query-log writer has
successfully attached to that tenant DuckLake yet, query logging is disabled, or
the writer cannot resolve the tenant target. Check writer logs for target
resolution, credential, attach, or `ensureQueryLogTable` errors.

## Recover A Stuck Writer

If Kafka lag is rising and logs show stuck writer connections or repeated target
write failures:

1. Fix the underlying tenant target first: credentials, network access, object
store permissions, or metadata store availability.
2. Restart the writer to clear process-local DuckDB connections:

```bash
kubectl -n duckgres rollout restart deploy/duckgres-query-log-writer
```

If restart is not enough, scale to zero and back:

```bash
kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=0
kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=1
```

Because offsets are committed after durable writes, restart/retry should not
skip retryable events. Event IDs make normal single-consumer Kafka replays safe
for the same tenant table; they are not a storage-level uniqueness guarantee
across multiple independent writer groups.

## Roll Back Kafka Query Logging

To return to the direct DuckLake sink:

```bash
kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=0
kubectl -n duckgres delete configmap duckgres-query-log-kafka
kubectl -n duckgres rollout restart deploy/duckgres-control-plane
```

After rollback, new control-plane pods should no longer show
`DUCKGRES_QUERY_LOG_SINK=kafka`, and new query-log rows should be written by the
direct sink.
7 changes: 7 additions & 0 deletions tests/e2e-mw-dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ client-go:
the pool must then serve a retrying connection. The harness logs whether
backpressure was observed **and** handles it (queries retry through it).
- **activation** — DuckLake **and** Iceberg catalogs attach and read/write.
- **query log** — a successful marker query and a failed marker query both land
in `ducklake.system.query_log` on the cnpg and external-RDS lanes. The
completed marker row must expose non-negative `cpu_time_s` and
`peak_buffer_memory_bytes`, proving the durable resource columns are present
in the tenant DuckLake log table. The harness intentionally leaves the
optional Kafka query-log writer disabled, so this is the direct DuckLake sink
smoke rather than a Kafka deployment test.
- **worker sizing** (TTL-pool model, `docs/design/worker-ttl-pool.md`) — a
client-sized connection (`duckgres.worker_cpu`/`worker_memory`/`worker_ttl`
startup options, sent via `PGOPTIONS`; CP runs `allowClientWorkerProfile=true`
Expand Down
49 changes: 48 additions & 1 deletion tests/e2e-mw-dev/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
# session on a cold Iceberg worker must not fail the pg_catalog
# compat-view bind (the CP primes the REST catalog's schema list
# first); asserted on cnpg + ext.
# query log : a successful marker query and a failed marker query land in
# ducklake.system.query_log, with cpu_time_s and
# peak_buffer_memory_bytes present for completed queries.
# sizing : a client-sized connection (duckgres.worker_cpu/memory/ttl)
# spawns a worker pod carrying the requested CPU+memory, and a
# same-shape reconnect reuses that hot-idle worker (no respawn)
Expand Down Expand Up @@ -616,6 +619,48 @@ rw_ducklake() { # org password
pg "$1" "$2" ducklake "DROP TABLE $t;"
}

wait_query_log_sql() { # org password label sql_returning_ok_or_wait
a=0 out=""
while [ "$a" -lt 30 ]; do
if out="$(pg_try "$1" "$2" ducklake "$4")"; then
[ "$out" = "ok" ] && return
fi
sleep 2
a=$((a + 1))
done
fail "$1 query_log: timed out waiting for $3 (last=$out)"
}

query_log_smoke() { # org password
log "query_log smoke on $1"
base="e2e_ql_$(echo "$1" | tr -c 'a-z0-9' _)_$$_$(date +%s)"
ok_marker="${base}_ok"
bad_table="${base}_missing"

got="$(pg "$1" "$2" ducklake \
"SELECT '$ok_marker' || '|' || count(*)::VARCHAR FROM range(1000000) t(i);")"
[ "$got" = "$ok_marker|1000000" ] || \
fail "$1 query_log marker query returned '$got'"

wait_query_log_sql "$1" "$2" "completed marker query $ok_marker" \
"SELECT CASE WHEN count(*) > 0 AND min(cpu_time_s) >= 0 AND min(peak_buffer_memory_bytes) >= 0 THEN 'ok' ELSE 'wait' END
FROM ducklake.system.query_log
WHERE query LIKE '%$ok_marker%'
AND type = 'QueryFinish'
AND COALESCE(exception_code, '') = '';"

if out="$(pg_try "$1" "$2" ducklake "SELECT * FROM $bad_table;")"; then
fail "$1 query_log failed marker query unexpectedly succeeded: $out"
fi

wait_query_log_sql "$1" "$2" "failed marker query $bad_table" \
"SELECT CASE WHEN count(*) > 0 THEN 'ok' ELSE 'wait' END
FROM ducklake.system.query_log
WHERE query LIKE '%$bad_table%'
AND type = 'ExceptionWhileProcessing'
AND COALESCE(exception, '') <> '';"
}

# Regression for the S3-throttling backfill failure: a sustained HTTP 503
# SlowDown during a DuckLake DELETE used to outlast httpfs's tiny default retry
# budget (http_retries=3, ~0.5s cumulative backoff) and surface as a fatal
Expand Down Expand Up @@ -1388,6 +1433,7 @@ lane_cnpg() { # full wire/catalog/concurrency/sizing coverage on the cnpg org
jsonb_concat_semantics "$CNPG" "$cnpg_pw"
cold_burst_absorption "$CNPG" "$cnpg_pw" # early, while this org is mostly cold
rw_ducklake "$CNPG" "$cnpg_pw"
query_log_smoke "$CNPG" "$cnpg_pw"
httpfs_retry_budget "$CNPG" "$cnpg_pw" # S3-503 retry budget raised per worker (applyHTTPFSRetryBudget)
persistent_user_secret "$CNPG" "$cnpg_pw" # after rw_ducklake (org worker hot)
persistent_user_secret_isolation "$CNPG" "$cnpg_pw"
Expand Down Expand Up @@ -1463,6 +1509,7 @@ lane_ext() { # external-RDS metadata backend + org default profile
basic_query "$EXT" "$ext_pw"
pg_compat_functions "$EXT" "$ext_pw"
rw_ducklake "$EXT" "$ext_pw"
query_log_smoke "$EXT" "$ext_pw"
httpfs_retry_budget "$EXT" "$ext_pw" # S3-503 retry budget per worker, ext-metadata backend too
persistent_user_secret "$EXT" "$ext_pw" # secret replay on the ext-metadata org too
iceberg_cold_first_connect "$EXT" "$ext_pw" # cold first session must not fail the metadata-init bind (ext backend)
Expand Down Expand Up @@ -1544,7 +1591,7 @@ main() {
# mid-run image bump); it stays covered by the controlplane/ unit tests.
log "SKIP version-reaper (needs an in-run image bump; see README)"

log "PASS: admin-no-query-token + wire + malformed-startup-resilience + jsonb-concat + cold-burst-absorption + pipeline-error-recovery + cancel-reuse + activation(DuckLake/Iceberg) + iceberg-cold-first-connect + ext-forks + worker-pod + concurrency + durability + crash-recovery + busy-only-do-not-disrupt + graceful-drain + one-session-per-worker + parallel-cold-burst-ramp + worker-sizing(cnpg DuckLake+Iceberg) + org-default-profile(ext) + persistent-user-secrets(cnpg+ext, cross-user isolation) + isolation + lifecycle-teardown, on cnpg & ext (4 parallel lanes)"
log "PASS: admin-no-query-token + wire + malformed-startup-resilience + jsonb-concat + cold-burst-absorption + pipeline-error-recovery + cancel-reuse + activation(DuckLake/Iceberg) + query-log + iceberg-cold-first-connect + ext-forks + worker-pod + concurrency + durability + crash-recovery + busy-only-do-not-disrupt + graceful-drain + one-session-per-worker + parallel-cold-burst-ramp + worker-sizing(cnpg DuckLake+Iceberg) + org-default-profile(ext) + persistent-user-secrets(cnpg+ext, cross-user isolation) + isolation + lifecycle-teardown, on cnpg & ext (4 parallel lanes)"
}

main "$@"
80 changes: 80 additions & 0 deletions tests/manifests/manifests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,70 @@ func TestQueryLogWriterAlertsReferenceWriterMetrics(t *testing.T) {
}
}

func TestE2EHarnessIncludesQueryLogSmoke(t *testing.T) {
content := readManifest(t, "tests", "e2e-mw-dev", "harness.sh")
for _, want := range []string{
"query_log_smoke()",
"ducklake.system.query_log",
"cpu_time_s",
"peak_buffer_memory_bytes",
"ExceptionWhileProcessing",
} {
if !strings.Contains(content, want) {
t.Fatalf("expected %q in tests/e2e-mw-dev/harness.sh", want)
}
}
for _, tt := range []struct {
name string
parts []string
}{
{name: "cnpg lane", parts: []string{"query_log_smoke", `"$CNPG"`, `"$cnpg_pw"`}},
{name: "ext lane", parts: []string{"query_log_smoke", `"$EXT"`, `"$ext_pw"`}},
} {
if !lineContainsAll(content, tt.parts...) {
t.Fatalf("expected %s to call query_log_smoke in tests/e2e-mw-dev/harness.sh", tt.name)
}
}
}

func TestE2EReadmeDocumentsQueryLogSmoke(t *testing.T) {
content := readManifest(t, "tests", "e2e-mw-dev", "README.md")
for _, want := range []string{
"query log",
"ducklake.system.query_log",
"cpu_time_s",
"peak_buffer_memory_bytes",
"failed marker query",
} {
if !strings.Contains(content, want) {
t.Fatalf("expected %q in tests/e2e-mw-dev/README.md", want)
}
}
}

func TestQueryLogWriterRunbookCoversOperatorChecks(t *testing.T) {
content := readManifest(t, "docs", "runbooks", "query-log-writer.md")
for _, want := range []string{
"DUCKGRES_QUERY_LOG_SINK=kafka",
"duckgres-query-log-writer",
"Kafka lag",
"system.query_log",
"cpu_time_s",
"peak_buffer_memory_bytes",
"missing schema",
"missing table",
"stuck writer",
"rollout restart",
"offsets",
`outcome="inserted"`,
`outcome="committed"`,
} {
if !strings.Contains(content, want) {
t.Fatalf("expected %q in docs/runbooks/query-log-writer.md", want)
}
}
}

func TestManifestTestsRunInUnitRecipe(t *testing.T) {
content := readManifest(t, "justfile")
if !strings.Contains(content, "./tests/manifests/...") {
Expand Down Expand Up @@ -281,3 +345,19 @@ func envBlock(t *testing.T, content, name string) string {
}
return rest
}

func lineContainsAll(content string, parts ...string) bool {
for _, line := range strings.Split(content, "\n") {
ok := true
for _, part := range parts {
if !strings.Contains(line, part) {
ok = false
break
}
}
if ok {
return true
}
}
return false
}
Loading