From e4d1eca02b8dc2a763e311aad21e3799b909b76b Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Thu, 25 Jun 2026 14:39:50 -0400 Subject: [PATCH] Add query log e2e smoke and runbook --- docs/runbooks/query-log-writer.md | 176 ++++++++++++++++++++++++++++++ tests/e2e-mw-dev/README.md | 7 ++ tests/e2e-mw-dev/harness.sh | 49 ++++++++- tests/manifests/manifests_test.go | 80 ++++++++++++++ 4 files changed, 311 insertions(+), 1 deletion(-) create mode 100644 docs/runbooks/query-log-writer.md diff --git a/docs/runbooks/query-log-writer.md b/docs/runbooks/query-log-writer.md new file mode 100644 index 00000000..c6b21636 --- /dev/null +++ b/docs/runbooks/query-log-writer.md @@ -0,0 +1,176 @@ +# Query Log Writer Runbook + +Use this when durable rows are missing from `ducklake.system.query_log`, Kafka +query-log lag is rising, or you need to confirm that per-query resource usage is +available after a rollout. + +## Mental Model + +Duckgres has two query-log paths: + +- Direct DuckLake sink: the control plane writes query-log rows straight into + each tenant's `ducklake.system.query_log`. This is the default when no Kafka + query-log producer config is present. +- Kafka sink: control-plane pods publish query-log events to Kafka when + `DUCKGRES_QUERY_LOG_SINK=kafka`; the `duckgres-query-log-writer` Deployment + consumes those events and writes generated rows into the same tenant table. + +Both paths write the same logical row shape, including `cpu_time_s` and +`peak_buffer_memory_bytes`. `cpu_time_s` is DuckDB profiling CPU/thread time in +seconds. `peak_buffer_memory_bytes` is DuckDB peak buffer memory, not process +RSS or cgroup memory. + +The Kafka writer commits offsets only after a successful durable write or an +intentional drop for an invalid/non-targetable event. Retryable write failures +should hold the partition in place until the writer can write or an operator +fixes the underlying target. + +## Check Producer Mode + +Confirm the control plane is configured to publish to Kafka: + +```bash +kubectl -n duckgres get deploy duckgres-control-plane \ + -o jsonpath='{range .spec.template.spec.containers[0].env[*]}{.name}{" value="}{.value}{" config_key="}{.valueFrom.configMapKeyRef.key}{" optional="}{.valueFrom.configMapKeyRef.optional}{"\n"}{end}' \ + | grep -E 'DUCKGRES_QUERY_LOG' + +kubectl -n duckgres get configmap duckgres-query-log-kafka \ + -o jsonpath='sink={.data.sink}{"\n"}brokers={.data.brokers}{"\n"}topic={.data.topic}{"\n"}client_id={.data.client_id}{"\n"}' +``` + +For the Kafka path, expect `DUCKGRES_QUERY_LOG_SINK=kafka` plus non-empty Kafka +broker, topic, and client-id settings. If `DUCKGRES_QUERY_LOG_SINK` is absent +or the ConfigMap has `sink` set to anything other than `kafka`, the control +plane is using the direct DuckLake sink and the writer will not receive new +events. + +## Check Writer Health + +```bash +kubectl -n duckgres get deploy duckgres-query-log-writer +kubectl -n duckgres logs deploy/duckgres-query-log-writer --tail=200 +kubectl -n duckgres port-forward svc/duckgres-query-log-writer-metrics 9090:9090 +curl -s localhost:9090/metrics | rg 'duckgres_query_log_kafka_writer' +``` + +Useful signals: + +- `outcome="inserted"` should increase while writer rows are inserted. +- `outcome="committed"` should increase after corresponding Kafka offsets are + committed. +- `outcome="retried"` means the writer is preserving Kafka offsets and retrying + a target write. +- `outcome="failed"` means the writer hit a non-recoverable processing error. +- `outcome="dropped"` should be rare and means the event was invalid or could + not be mapped to a tenant target. + +## Check Kafka Lag + +Use the cluster Kafka exporter or Kafka admin tooling to inspect Kafka lag for: + +- topic: the configured `DUCKGRES_QUERY_LOG_KAFKA_TOPIC` +- consumer group: `duckgres-query-log-writer`, unless overridden by + `DUCKGRES_QUERY_LOG_KAFKA_GROUP_ID` + +Kafka lag increasing while `outcome="inserted"` and `outcome="committed"` are +flat usually means the writer is not running, cannot reach Kafka, or is blocked +on tenant DuckLake writes. Kafka lag increasing while `outcome="retried"` is +rising usually means the writer is deliberately holding offsets behind a +retryable write failure. + +## Check Tenant Rows + +Run this against the affected tenant DuckLake: + +```sql +SELECT + event_time, + user_name, + query_duration_ms, + cpu_time_s, + peak_buffer_memory_bytes, + type, + left(query, 160) AS query +FROM ducklake.system.query_log +ORDER BY event_time DESC +LIMIT 20; +``` + +Top recent resource consumers: + +```sql +SELECT + user_name, + normalized_query_hash, + count(*) AS queries, + sum(cpu_time_s) AS total_cpu_time_s, + max(peak_buffer_memory_bytes) AS max_peak_buffer_memory_bytes +FROM ducklake.system.query_log +WHERE event_time > now() - INTERVAL '1 hour' +GROUP BY 1, 2 +ORDER BY total_cpu_time_s DESC +LIMIT 20; +``` + +If `cpu_time_s` or `peak_buffer_memory_bytes` is always zero, check whether the +query completed before profiling output was available. OOMs or worker crashes +can still miss profiling data because DuckDB may not finish writing the profile. + +## Missing Schema Or Table + +Check whether the table exists in the tenant DuckLake: + +```sql +SELECT catalog_name, schema_name +FROM ducklake.information_schema.schemata +WHERE schema_name = 'system'; + +SELECT table_catalog, table_schema, table_name +FROM ducklake.information_schema.tables +WHERE table_schema = 'system' + AND table_name = 'query_log'; +``` + +A missing schema or missing table usually means no query-log writer has +successfully attached to that tenant DuckLake yet, query logging is disabled, or +the writer cannot resolve the tenant target. Check writer logs for target +resolution, credential, attach, or `ensureQueryLogTable` errors. + +## Recover A Stuck Writer + +If Kafka lag is rising and logs show stuck writer connections or repeated target +write failures: + +1. Fix the underlying tenant target first: credentials, network access, object + store permissions, or metadata store availability. +2. Restart the writer to clear process-local DuckDB connections: + +```bash +kubectl -n duckgres rollout restart deploy/duckgres-query-log-writer +``` + +If restart is not enough, scale to zero and back: + +```bash +kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=0 +kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=1 +``` + +Because offsets are committed after durable writes, restart/retry should not +skip retryable events. Event IDs make normal single-consumer Kafka replays safe +for the same tenant table; they are not a storage-level uniqueness guarantee +across multiple independent writer groups. + +## Roll Back Kafka Query Logging + +To return to the direct DuckLake sink: + +```bash +kubectl -n duckgres scale deploy/duckgres-query-log-writer --replicas=0 +kubectl -n duckgres delete configmap duckgres-query-log-kafka +kubectl -n duckgres rollout restart deploy/duckgres-control-plane +``` + +After rollback, new control-plane pods should no longer show +`DUCKGRES_QUERY_LOG_SINK=kafka`, and new query-log rows should be written by the +direct sink. diff --git a/tests/e2e-mw-dev/README.md b/tests/e2e-mw-dev/README.md index b500db6e..aabb51a9 100644 --- a/tests/e2e-mw-dev/README.md +++ b/tests/e2e-mw-dev/README.md @@ -64,6 +64,13 @@ client-go: the pool must then serve a retrying connection. The harness logs whether backpressure was observed **and** handles it (queries retry through it). - **activation** — DuckLake **and** Iceberg catalogs attach and read/write. +- **query log** — a successful marker query and a failed marker query both land + in `ducklake.system.query_log` on the cnpg and external-RDS lanes. The + completed marker row must expose non-negative `cpu_time_s` and + `peak_buffer_memory_bytes`, proving the durable resource columns are present + in the tenant DuckLake log table. The harness intentionally leaves the + optional Kafka query-log writer disabled, so this is the direct DuckLake sink + smoke rather than a Kafka deployment test. - **worker sizing** (TTL-pool model, `docs/design/worker-ttl-pool.md`) — a client-sized connection (`duckgres.worker_cpu`/`worker_memory`/`worker_ttl` startup options, sent via `PGOPTIONS`; CP runs `allowClientWorkerProfile=true` diff --git a/tests/e2e-mw-dev/harness.sh b/tests/e2e-mw-dev/harness.sh index d348184f..905127f3 100755 --- a/tests/e2e-mw-dev/harness.sh +++ b/tests/e2e-mw-dev/harness.sh @@ -30,6 +30,9 @@ # session on a cold Iceberg worker must not fail the pg_catalog # compat-view bind (the CP primes the REST catalog's schema list # first); asserted on cnpg + ext. +# query log : a successful marker query and a failed marker query land in +# ducklake.system.query_log, with cpu_time_s and +# peak_buffer_memory_bytes present for completed queries. # sizing : a client-sized connection (duckgres.worker_cpu/memory/ttl) # spawns a worker pod carrying the requested CPU+memory, and a # same-shape reconnect reuses that hot-idle worker (no respawn) @@ -616,6 +619,48 @@ rw_ducklake() { # org password pg "$1" "$2" ducklake "DROP TABLE $t;" } +wait_query_log_sql() { # org password label sql_returning_ok_or_wait + a=0 out="" + while [ "$a" -lt 30 ]; do + if out="$(pg_try "$1" "$2" ducklake "$4")"; then + [ "$out" = "ok" ] && return + fi + sleep 2 + a=$((a + 1)) + done + fail "$1 query_log: timed out waiting for $3 (last=$out)" +} + +query_log_smoke() { # org password + log "query_log smoke on $1" + base="e2e_ql_$(echo "$1" | tr -c 'a-z0-9' _)_$$_$(date +%s)" + ok_marker="${base}_ok" + bad_table="${base}_missing" + + got="$(pg "$1" "$2" ducklake \ + "SELECT '$ok_marker' || '|' || count(*)::VARCHAR FROM range(1000000) t(i);")" + [ "$got" = "$ok_marker|1000000" ] || \ + fail "$1 query_log marker query returned '$got'" + + wait_query_log_sql "$1" "$2" "completed marker query $ok_marker" \ + "SELECT CASE WHEN count(*) > 0 AND min(cpu_time_s) >= 0 AND min(peak_buffer_memory_bytes) >= 0 THEN 'ok' ELSE 'wait' END + FROM ducklake.system.query_log + WHERE query LIKE '%$ok_marker%' + AND type = 'QueryFinish' + AND COALESCE(exception_code, '') = '';" + + if out="$(pg_try "$1" "$2" ducklake "SELECT * FROM $bad_table;")"; then + fail "$1 query_log failed marker query unexpectedly succeeded: $out" + fi + + wait_query_log_sql "$1" "$2" "failed marker query $bad_table" \ + "SELECT CASE WHEN count(*) > 0 THEN 'ok' ELSE 'wait' END + FROM ducklake.system.query_log + WHERE query LIKE '%$bad_table%' + AND type = 'ExceptionWhileProcessing' + AND COALESCE(exception, '') <> '';" +} + # Regression for the S3-throttling backfill failure: a sustained HTTP 503 # SlowDown during a DuckLake DELETE used to outlast httpfs's tiny default retry # budget (http_retries=3, ~0.5s cumulative backoff) and surface as a fatal @@ -1388,6 +1433,7 @@ lane_cnpg() { # full wire/catalog/concurrency/sizing coverage on the cnpg org jsonb_concat_semantics "$CNPG" "$cnpg_pw" cold_burst_absorption "$CNPG" "$cnpg_pw" # early, while this org is mostly cold rw_ducklake "$CNPG" "$cnpg_pw" + query_log_smoke "$CNPG" "$cnpg_pw" httpfs_retry_budget "$CNPG" "$cnpg_pw" # S3-503 retry budget raised per worker (applyHTTPFSRetryBudget) persistent_user_secret "$CNPG" "$cnpg_pw" # after rw_ducklake (org worker hot) persistent_user_secret_isolation "$CNPG" "$cnpg_pw" @@ -1463,6 +1509,7 @@ lane_ext() { # external-RDS metadata backend + org default profile basic_query "$EXT" "$ext_pw" pg_compat_functions "$EXT" "$ext_pw" rw_ducklake "$EXT" "$ext_pw" + query_log_smoke "$EXT" "$ext_pw" httpfs_retry_budget "$EXT" "$ext_pw" # S3-503 retry budget per worker, ext-metadata backend too persistent_user_secret "$EXT" "$ext_pw" # secret replay on the ext-metadata org too iceberg_cold_first_connect "$EXT" "$ext_pw" # cold first session must not fail the metadata-init bind (ext backend) @@ -1544,7 +1591,7 @@ main() { # mid-run image bump); it stays covered by the controlplane/ unit tests. log "SKIP version-reaper (needs an in-run image bump; see README)" - log "PASS: admin-no-query-token + wire + malformed-startup-resilience + jsonb-concat + cold-burst-absorption + pipeline-error-recovery + cancel-reuse + activation(DuckLake/Iceberg) + iceberg-cold-first-connect + ext-forks + worker-pod + concurrency + durability + crash-recovery + busy-only-do-not-disrupt + graceful-drain + one-session-per-worker + parallel-cold-burst-ramp + worker-sizing(cnpg DuckLake+Iceberg) + org-default-profile(ext) + persistent-user-secrets(cnpg+ext, cross-user isolation) + isolation + lifecycle-teardown, on cnpg & ext (4 parallel lanes)" + log "PASS: admin-no-query-token + wire + malformed-startup-resilience + jsonb-concat + cold-burst-absorption + pipeline-error-recovery + cancel-reuse + activation(DuckLake/Iceberg) + query-log + iceberg-cold-first-connect + ext-forks + worker-pod + concurrency + durability + crash-recovery + busy-only-do-not-disrupt + graceful-drain + one-session-per-worker + parallel-cold-burst-ramp + worker-sizing(cnpg DuckLake+Iceberg) + org-default-profile(ext) + persistent-user-secrets(cnpg+ext, cross-user isolation) + isolation + lifecycle-teardown, on cnpg & ext (4 parallel lanes)" } main "$@" diff --git a/tests/manifests/manifests_test.go b/tests/manifests/manifests_test.go index e7f36d1b..2b55fe6c 100644 --- a/tests/manifests/manifests_test.go +++ b/tests/manifests/manifests_test.go @@ -232,6 +232,70 @@ func TestQueryLogWriterAlertsReferenceWriterMetrics(t *testing.T) { } } +func TestE2EHarnessIncludesQueryLogSmoke(t *testing.T) { + content := readManifest(t, "tests", "e2e-mw-dev", "harness.sh") + for _, want := range []string{ + "query_log_smoke()", + "ducklake.system.query_log", + "cpu_time_s", + "peak_buffer_memory_bytes", + "ExceptionWhileProcessing", + } { + if !strings.Contains(content, want) { + t.Fatalf("expected %q in tests/e2e-mw-dev/harness.sh", want) + } + } + for _, tt := range []struct { + name string + parts []string + }{ + {name: "cnpg lane", parts: []string{"query_log_smoke", `"$CNPG"`, `"$cnpg_pw"`}}, + {name: "ext lane", parts: []string{"query_log_smoke", `"$EXT"`, `"$ext_pw"`}}, + } { + if !lineContainsAll(content, tt.parts...) { + t.Fatalf("expected %s to call query_log_smoke in tests/e2e-mw-dev/harness.sh", tt.name) + } + } +} + +func TestE2EReadmeDocumentsQueryLogSmoke(t *testing.T) { + content := readManifest(t, "tests", "e2e-mw-dev", "README.md") + for _, want := range []string{ + "query log", + "ducklake.system.query_log", + "cpu_time_s", + "peak_buffer_memory_bytes", + "failed marker query", + } { + if !strings.Contains(content, want) { + t.Fatalf("expected %q in tests/e2e-mw-dev/README.md", want) + } + } +} + +func TestQueryLogWriterRunbookCoversOperatorChecks(t *testing.T) { + content := readManifest(t, "docs", "runbooks", "query-log-writer.md") + for _, want := range []string{ + "DUCKGRES_QUERY_LOG_SINK=kafka", + "duckgres-query-log-writer", + "Kafka lag", + "system.query_log", + "cpu_time_s", + "peak_buffer_memory_bytes", + "missing schema", + "missing table", + "stuck writer", + "rollout restart", + "offsets", + `outcome="inserted"`, + `outcome="committed"`, + } { + if !strings.Contains(content, want) { + t.Fatalf("expected %q in docs/runbooks/query-log-writer.md", want) + } + } +} + func TestManifestTestsRunInUnitRecipe(t *testing.T) { content := readManifest(t, "justfile") if !strings.Contains(content, "./tests/manifests/...") { @@ -281,3 +345,19 @@ func envBlock(t *testing.T, content, name string) string { } return rest } + +func lineContainsAll(content string, parts ...string) bool { + for _, line := range strings.Split(content, "\n") { + ok := true + for _, part := range parts { + if !strings.Contains(line, part) { + ok = false + break + } + } + if ok { + return true + } + } + return false +}