diff --git a/docs/perf-harness-runbook.md b/docs/perf-harness-runbook.md index 7cb6a1ea..66ec945c 100644 --- a/docs/perf-harness-runbook.md +++ b/docs/perf-harness-runbook.md @@ -5,6 +5,8 @@ Golden-query performance signal collection for `pgwire` and `flight` protocols. This is observability-only; there is no pass/fail performance gate. +Status: the old prod-us deployed perf runner path is legacy while the dev Duckgres Scenario Runner replacement is being built. Prefer `tests/scenario/scenarios/posthog_frozen_metadata.yaml` for current frozen dataset end-to-end validation; keep this perf harness for local/library use and historical artifact compatibility until the scenario perf adapter is available. + ## Local Prerequisites - Run commands from the Duckgres repository root. diff --git a/docs/runbooks/scenario-runner.md b/docs/runbooks/scenario-runner.md index a04d6f35..42705f14 100644 --- a/docs/runbooks/scenario-runner.md +++ b/docs/runbooks/scenario-runner.md @@ -4,6 +4,8 @@ The scenario runner executes end-to-end managed-warehouse flows against a configured dev environment. The first smoke scenario provisions a warehouse, waits for readiness, runs `SELECT 1` over PGWire with managed-hostname SNI, then deprovisions and verifies cleanup. +The frozen metadata scenario provisions the same fresh dev warehouse shape, creates read-only views over frozen persons/events parquet supplied by `DUCKGRES_SCENARIO_FROZEN_S3_URI`, validates the dataset manifest row, runs lightweight metadata/user-exploration queries, then deprovisions. + ## Required Environment Set these before running a real scenario: @@ -28,6 +30,12 @@ export DUCKGRES_SCENARIO_MAX_RUNTIME="30m" export DUCKGRES_SCENARIO_GO_TEST_TIMEOUT="60m" ``` +Frozen dataset scenarios additionally require: + +```bash +export DUCKGRES_SCENARIO_FROZEN_S3_URI="s3:///frozen_v1/" +``` + Do not commit concrete dev endpoints, secrets, org IDs, or private bucket names. ## Run @@ -44,6 +52,12 @@ Run the dev smoke: just scenario-smoke ``` +Run frozen metadata exploration: + +```bash +just scenario-frozen-metadata +``` + Run a specific scenario file: ```bash @@ -52,6 +66,15 @@ just scenario scenario=tests/scenario/scenarios/provision_smoke.yaml Artifacts are written under `artifacts/scenario//`. +The frozen metadata scenario uses: + +- `tests/scenario/scenarios/posthog_frozen_metadata.yaml` +- `tests/scenario/sql/setup_frozen_views.sql` +- `tests/scenario/sql/metadata_catalog.yaml` + +`DUCKGRES_SCENARIO_FROZEN_S3_URI` must point at a dev-owned frozen dataset prefix with `persons/` and `events/` parquet children. +The provisioned Duckgres worker role also needs read/list access to that prefix; the runner process only supplies the URI, while the worker performs the S3 reads during `read_parquet`. + ## Leaked Dev Warehouse Recovery The smoke scenario has an `always_run` deprovision step, but an interrupted process can still leave dev resources behind. To clean up: diff --git a/justfile b/justfile index 4bd640b4..b139c19e 100644 --- a/justfile +++ b/justfile @@ -374,6 +374,11 @@ scenario scenario="tests/scenario/scenarios/provision_smoke.yaml": scenario-smoke: ./scripts/scenario_run.sh tests/scenario/scenarios/provision_smoke.yaml +# Run the dev frozen dataset metadata exploration scenario +[group('test')] +scenario-frozen-metadata: + ./scripts/scenario_run.sh tests/scenario/scenarios/posthog_frozen_metadata.yaml + # Lint (matches CI — uses golangci-lint, not go vet) [group('test')] lint: diff --git a/scripts/scenario_run.sh b/scripts/scenario_run.sh index a479ccd1..608cd5ba 100755 --- a/scripts/scenario_run.sh +++ b/scripts/scenario_run.sh @@ -20,6 +20,7 @@ Optional environment: DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT DUCKGRES_SCENARIO_MAX_RUNTIME DUCKGRES_SCENARIO_GO_TEST_TIMEOUT + DUCKGRES_SCENARIO_FROZEN_S3_URI (required by frozen dataset scenarios) USAGE } @@ -64,6 +65,16 @@ while [ "$#" -gt 0 ]; do esac done +script_dir="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" && pwd)" +repo_root="$(cd -- "$script_dir/.." && pwd)" + +root_relative_path() { + case "$1" in + /*) printf '%s\n' "$1" ;; + *) printf '%s\n' "$repo_root/$1" ;; + esac +} + required=( DUCKGRES_SCENARIO_API_BASE DUCKGRES_SCENARIO_INTERNAL_SECRET @@ -90,6 +101,9 @@ if [ "$check_env_only" -eq 1 ]; then exit 0 fi +scenario_file="$(root_relative_path "$scenario_file")" +output_base="$(root_relative_path "$output_base")" + args=( go test -count=1 ./tests/scenario -timeout "$go_test_timeout" diff --git a/tests/scenario/core/catalog.go b/tests/scenario/core/catalog.go index b6a13e6c..4854ed36 100644 --- a/tests/scenario/core/catalog.go +++ b/tests/scenario/core/catalog.go @@ -10,9 +10,10 @@ import ( ) type Scenario struct { - Name string `yaml:"name" json:"name"` - RunIDPrefix string `yaml:"run_id_prefix" json:"run_id_prefix,omitempty"` - Steps []Step `yaml:"steps" json:"steps"` + Name string `yaml:"name" json:"name"` + RunIDPrefix string `yaml:"run_id_prefix" json:"run_id_prefix,omitempty"` + RequiredEnv []string `yaml:"required_env" json:"required_env,omitempty"` + Steps []Step `yaml:"steps" json:"steps"` } type Step struct { @@ -26,6 +27,7 @@ type Step struct { type rawScenario struct { Name string `yaml:"name"` RunIDPrefix string `yaml:"run_id_prefix"` + RequiredEnv []string `yaml:"required_env"` Steps []rawStep `yaml:"steps"` } @@ -59,6 +61,7 @@ func normalizeScenario(raw rawScenario) (Scenario, error) { scenario := Scenario{ Name: strings.TrimSpace(raw.Name), RunIDPrefix: strings.TrimSpace(raw.RunIDPrefix), + RequiredEnv: make([]string, 0, len(raw.RequiredEnv)), Steps: make([]Step, 0, len(raw.Steps)), } if scenario.Name == "" { @@ -67,6 +70,13 @@ func normalizeScenario(raw rawScenario) (Scenario, error) { if len(raw.Steps) == 0 { return Scenario{}, fmt.Errorf("scenario steps must include at least one step") } + for i, key := range raw.RequiredEnv { + key = strings.TrimSpace(key) + if key == "" { + return Scenario{}, fmt.Errorf("required_env[%d] must be non-empty", i) + } + scenario.RequiredEnv = append(scenario.RequiredEnv, key) + } seen := make(map[string]struct{}, len(raw.Steps)) var previousID string diff --git a/tests/scenario/core/catalog_test.go b/tests/scenario/core/catalog_test.go index 93c65100..fe47c193 100644 --- a/tests/scenario/core/catalog_test.go +++ b/tests/scenario/core/catalog_test.go @@ -145,6 +145,8 @@ steps: func TestParseScenarioAllowsOpenWithMap(t *testing.T) { raw := []byte(` name: with-map +required_env: + - DUCKGRES_SCENARIO_FROZEN_S3_URI steps: - id: query type: fake @@ -158,7 +160,28 @@ steps: if err != nil { t.Fatalf("ParseScenario returned error: %v", err) } + if got := scenario.RequiredEnv; len(got) != 1 || got[0] != "DUCKGRES_SCENARIO_FROZEN_S3_URI" { + t.Fatalf("required_env = %#v, want frozen S3 URI", got) + } if scenario.Steps[0].With["sql"] != "SELECT 1" { t.Fatalf("unexpected with map: %#v", scenario.Steps[0].With) } } + +func TestParseScenarioRejectsEmptyRequiredEnv(t *testing.T) { + _, err := ParseScenario([]byte(` +name: bad-env +required_env: [DUCKGRES_SCENARIO_FROZEN_S3_URI, ""] +steps: + - id: query + type: fake + with: + sql: SELECT 1 +`)) + if err == nil { + t.Fatal("expected empty required_env entry to fail") + } + if !strings.Contains(err.Error(), "required_env[1] must be non-empty") { + t.Fatalf("error = %v, want required_env index", err) + } +} diff --git a/tests/scenario/core/templates.go b/tests/scenario/core/templates.go new file mode 100644 index 00000000..9dcdd409 --- /dev/null +++ b/tests/scenario/core/templates.go @@ -0,0 +1,29 @@ +package core + +import ( + "fmt" + "os" + "regexp" + "strings" +) + +var envTemplatePattern = regexp.MustCompile(`\$\{env:([A-Za-z_][A-Za-z0-9_]*)\}`) + +// ResolveEnvTemplates replaces ${env:NAME} placeholders without exposing values in errors. +func ResolveEnvTemplates(text string) (string, error) { + var missing []string + out := envTemplatePattern.ReplaceAllStringFunc(text, func(match string) string { + parts := envTemplatePattern.FindStringSubmatch(match) + key := parts[1] + value := os.Getenv(key) + if value == "" { + missing = append(missing, key) + return match + } + return value + }) + if len(missing) != 0 { + return "", fmt.Errorf("missing required env template value(s): %s", strings.Join(missing, ", ")) + } + return out, nil +} diff --git a/tests/scenario/runner_test.go b/tests/scenario/runner_test.go index 0e946027..012321f4 100644 --- a/tests/scenario/runner_test.go +++ b/tests/scenario/runner_test.go @@ -32,7 +32,7 @@ func TestScenarioRunner(t *testing.T) { t.Fatal("-scenario-file is required") } - loaded, err := core.LoadScenario(*scenarioFile) + loaded, _, err := loadScenarioForRun(*scenarioFile) if err != nil { t.Fatalf("load scenario: %v", err) } @@ -40,7 +40,13 @@ func TestScenarioRunner(t *testing.T) { if runID == "" { runID = defaultRunID(loaded) } - loaded = resolveRunTemplates(loaded, runID) + if missing := missingRequiredEnv(loaded); len(missing) != 0 { + t.Fatalf("missing required scenario environment: %s", strings.Join(missing, ", ")) + } + loaded, err = resolveRunTemplates(loaded, runID) + if err != nil { + t.Fatalf("resolve scenario templates: %v", err) + } provisionClient, err := provision.NewClient(provision.Config{ BaseURL: mustEnv(t, "DUCKGRES_SCENARIO_API_BASE"), @@ -91,7 +97,10 @@ func TestProvisionSmokeScenarioUsesRunUniqueSupportedSteps(t *testing.T) { if err != nil { t.Fatalf("load provision smoke: %v", err) } - resolved := resolveRunTemplates(scenario, "scenario-smoke-20260102t030405z") + resolved, err := resolveRunTemplates(scenario, "scenario-smoke-20260102t030405z") + if err != nil { + t.Fatalf("resolve templates: %v", err) + } for _, step := range resolved.Steps { if !dispatchSupports(step.Type) { t.Fatalf("step %s has unsupported type %q", step.ID, step.Type) @@ -115,6 +124,98 @@ func TestProvisionSmokeScenarioUsesRunUniqueSupportedSteps(t *testing.T) { } } +func TestFrozenMetadataScenarioRequiresDatasetURI(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "") + + scenario, err := core.LoadScenario(filepath.Join("scenarios", "posthog_frozen_metadata.yaml")) + if err != nil { + t.Fatalf("load frozen metadata scenario: %v", err) + } + missing := missingRequiredEnv(scenario) + if len(missing) != 1 || missing[0] != "DUCKGRES_SCENARIO_FROZEN_S3_URI" { + t.Fatalf("missing required env = %#v, want frozen S3 URI", missing) + } +} + +func TestFrozenMetadataScenarioResolvesEnvTemplates(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "s3://example-frozen/frozen_v1/") + + scenario, err := core.LoadScenario(filepath.Join("scenarios", "posthog_frozen_metadata.yaml")) + if err != nil { + t.Fatalf("load frozen metadata scenario: %v", err) + } + resolved, err := resolveRunTemplates(scenario, "scenario-frozen-20260102t030405z") + if err != nil { + t.Fatalf("resolve templates: %v", err) + } + for _, step := range resolved.Steps { + if !dispatchSupports(step.Type) { + t.Fatalf("step %s has unsupported type %q", step.ID, step.Type) + } + if containsTemplate(step.With) { + t.Fatalf("step %s still contains unresolved template values: %#v", step.ID, step.With) + } + } +} + +func TestLoadScenarioForRunResolvesScenarioRelativeFiles(t *testing.T) { + scenario, scenarioPath, err := loadScenarioForRun(filepath.Join("scenarios", "posthog_frozen_metadata.yaml")) + if err != nil { + t.Fatalf("loadScenarioForRun returned error: %v", err) + } + if !filepath.IsAbs(scenarioPath) { + t.Fatalf("scenarioPath = %q, want absolute path", scenarioPath) + } + + foundSetupFile := false + foundCatalogFile := false + for _, step := range scenario.Steps { + file, ok := step.With["file"].(string) + if !ok { + continue + } + if !filepath.IsAbs(file) { + t.Fatalf("step %s file path = %q, want absolute path", step.ID, file) + } + if _, err := os.Stat(file); err != nil { + t.Fatalf("step %s file path %q should exist: %v", step.ID, file, err) + } + switch step.ID { + case "setup_frozen_views": + foundSetupFile = strings.HasSuffix(file, filepath.Join("sql", "setup_frozen_views.sql")) + case "metadata_exploration": + foundCatalogFile = strings.HasSuffix(file, filepath.Join("sql", "metadata_catalog.yaml")) + } + } + if !foundSetupFile { + t.Fatal("expected setup_frozen_views file to resolve under sql/") + } + if !foundCatalogFile { + t.Fatal("expected metadata_exploration file to resolve under sql/") + } +} + +func TestResolveRunTemplatesRejectsMissingEnvTemplate(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "") + + _, err := resolveRunTemplates(core.Scenario{ + Name: "env-template", + Steps: []core.Step{{ + ID: "setup", + Type: scenariosql.StepTypeSQL, + With: map[string]any{ + "sql": "SELECT '${env:DUCKGRES_SCENARIO_FROZEN_S3_URI}'", + }, + }}, + }, "scenario-env-20260102t030405z") + if err == nil { + t.Fatal("expected missing env template to fail") + } + if !strings.Contains(err.Error(), "DUCKGRES_SCENARIO_FROZEN_S3_URI") { + t.Fatalf("error = %v, want env var name", err) + } +} + type dispatchExecutor struct { provision *provision.Executor sql *scenariosql.Executor @@ -172,7 +273,52 @@ func defaultRunID(s core.Scenario) string { return fmt.Sprintf("%s-%s", prefix, time.Now().UTC().Format("20060102t150405z")) } -func resolveRunTemplates(s core.Scenario, runID string) core.Scenario { +func loadScenarioForRun(path string) (core.Scenario, string, error) { + absPath, err := filepath.Abs(path) + if err != nil { + return core.Scenario{}, "", fmt.Errorf("resolve scenario path %s: %w", path, err) + } + scenario, err := core.LoadScenario(absPath) + if err != nil { + return core.Scenario{}, "", err + } + return resolveScenarioFilePaths(scenario, filepath.Dir(absPath)), absPath, nil +} + +func resolveScenarioFilePaths(s core.Scenario, baseDir string) core.Scenario { + out := s + out.Steps = make([]core.Step, len(s.Steps)) + for i, step := range s.Steps { + if step.With == nil { + out.Steps[i] = step + continue + } + with := make(map[string]any, len(step.With)) + for k, v := range step.With { + if k == "file" { + if file, ok := v.(string); ok && file != "" && !filepath.IsAbs(file) { + v = filepath.Clean(filepath.Join(baseDir, file)) + } + } + with[k] = v + } + step.With = with + out.Steps[i] = step + } + return out +} + +func missingRequiredEnv(s core.Scenario) []string { + var missing []string + for _, key := range s.RequiredEnv { + if os.Getenv(key) == "" { + missing = append(missing, key) + } + } + return missing +} + +func resolveRunTemplates(s core.Scenario, runID string) (core.Scenario, error) { vars := map[string]string{ "run_id": runID, "run_id_compact": compactRunID(runID), @@ -181,11 +327,15 @@ func resolveRunTemplates(s core.Scenario, runID string) core.Scenario { out.Steps = make([]core.Step, len(s.Steps)) for i, step := range s.Steps { if step.With != nil { - step.With = resolveTemplateValue(step.With, vars).(map[string]any) + resolved, err := resolveTemplateValue(step.With, vars) + if err != nil { + return core.Scenario{}, fmt.Errorf("step %s: %w", step.ID, err) + } + step.With = resolved.(map[string]any) } out.Steps[i] = step } - return out + return out, nil } func compactRunID(runID string) string { @@ -201,28 +351,40 @@ func compactRunID(runID string) string { return b.String() } -func resolveTemplateValue(value any, vars map[string]string) any { +func resolveTemplateValue(value any, vars map[string]string) (any, error) { switch typed := value.(type) { case map[string]any: out := make(map[string]any, len(typed)) for k, v := range typed { - out[k] = resolveTemplateValue(v, vars) + resolved, err := resolveTemplateValue(v, vars) + if err != nil { + return nil, err + } + out[k] = resolved } - return out + return out, nil case []any: out := make([]any, len(typed)) for i, v := range typed { - out[i] = resolveTemplateValue(v, vars) + resolved, err := resolveTemplateValue(v, vars) + if err != nil { + return nil, err + } + out[i] = resolved } - return out + return out, nil case string: out := typed for k, v := range vars { out = strings.ReplaceAll(out, "${"+k+"}", v) } - return out + out, err := core.ResolveEnvTemplates(out) + if err != nil { + return nil, err + } + return out, nil default: - return typed + return typed, nil } } diff --git a/tests/scenario/scenarios/posthog_frozen_metadata.yaml b/tests/scenario/scenarios/posthog_frozen_metadata.yaml new file mode 100644 index 00000000..8582b123 --- /dev/null +++ b/tests/scenario/scenarios/posthog_frozen_metadata.yaml @@ -0,0 +1,65 @@ +name: posthog-frozen-metadata +run_id_prefix: scenario-frozen-metadata +required_env: + - DUCKGRES_SCENARIO_FROZEN_S3_URI +steps: + - id: provision + type: provision_warehouse + with: + org_id: scenario-frozen-${run_id_compact} + request: + database_name: scenario_frozen_${run_id_compact} + metadata_store: + type: cnpg-shard + ducklake: + enabled: true + data_store: + type: s3bucket + + - id: wait_ready + type: wait_warehouse_ready + with: + org_id: scenario-frozen-${run_id_compact} + timeout: 15m + poll_interval: 10s + + - id: setup_frozen_views + type: sql + with: + org_id: scenario-frozen-${run_id_compact} + catalog: ducklake + file: ../sql/setup_frozen_views.sql + max_attempts: 12 + retry_interval: 10s + + - id: validate_dataset_manifest + type: sql + with: + org_id: scenario-frozen-${run_id_compact} + catalog: ducklake + sql: | + SELECT dataset_version + FROM main.dataset_manifest + WHERE dataset_version = 'posthog-file-views-v1' + min_rows: 1 + max_attempts: 3 + retry_interval: 5s + + - id: metadata_exploration + type: sql_catalog + with: + org_id: scenario-frozen-${run_id_compact} + catalog: ducklake + file: ../sql/metadata_catalog.yaml + max_attempts: 3 + retry_interval: 5s + + - id: deprovision + type: deprovision_warehouse + depends_on: [metadata_exploration] + always_run: true + with: + org_id: scenario-frozen-${run_id_compact} + verify_deleted: true + cleanup_timeout: 15m + poll_interval: 10s diff --git a/tests/scenario/sql/metadata_catalog.yaml b/tests/scenario/sql/metadata_catalog.yaml new file mode 100644 index 00000000..424df7ff --- /dev/null +++ b/tests/scenario/sql/metadata_catalog.yaml @@ -0,0 +1,48 @@ +name: posthog-frozen-metadata-v1 +description: Lightweight metadata and user-exploration queries over frozen persons/events parquet views. +queries: + - id: manifest_has_dataset + sql: | + SELECT dataset_version + FROM main.dataset_manifest + WHERE dataset_version = 'posthog-file-views-v1' + + - id: list_frozen_schemata + sql: | + SELECT schema_name + FROM information_schema.schemata + WHERE schema_name IN ('frozen_v1', 'main') + ORDER BY schema_name + + - id: list_frozen_views + sql: | + SELECT table_schema, table_name, table_type + FROM information_schema.tables + WHERE table_schema = 'frozen_v1' + ORDER BY table_name + + - id: describe_persons_columns + sql: | + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = 'frozen_v1' AND table_name = 'persons_file_view' + ORDER BY ordinal_position + + - id: describe_events_columns + sql: | + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_schema = 'frozen_v1' AND table_name = 'events_file_view' + ORDER BY ordinal_position + + - id: persons_sample + sql: | + SELECT * + FROM frozen_v1.persons_file_view + LIMIT 5 + + - id: events_sample + sql: | + SELECT * + FROM frozen_v1.events_file_view + LIMIT 5 diff --git a/tests/scenario/sql/setup_frozen_views.sql b/tests/scenario/sql/setup_frozen_views.sql new file mode 100644 index 00000000..74d30cf4 --- /dev/null +++ b/tests/scenario/sql/setup_frozen_views.sql @@ -0,0 +1,21 @@ +CREATE SCHEMA IF NOT EXISTS frozen_v1; + +CREATE OR REPLACE VIEW frozen_v1.persons_file_view AS +SELECT * +FROM read_parquet('${env:DUCKGRES_SCENARIO_FROZEN_S3_URI}persons/*.parquet', union_by_name = true); + +CREATE OR REPLACE VIEW frozen_v1.events_file_view AS +SELECT * +FROM read_parquet('${env:DUCKGRES_SCENARIO_FROZEN_S3_URI}events/*.parquet', union_by_name = true); + +CREATE TABLE IF NOT EXISTS main.dataset_manifest ( + dataset_version VARCHAR, + dataset_uri VARCHAR, + created_at TIMESTAMPTZ +); + +DELETE FROM main.dataset_manifest +WHERE dataset_version = 'posthog-file-views-v1'; + +INSERT INTO main.dataset_manifest (dataset_version, dataset_uri, created_at) +VALUES ('posthog-file-views-v1', '${env:DUCKGRES_SCENARIO_FROZEN_S3_URI}', now()); diff --git a/tests/scenario/sql/steps.go b/tests/scenario/sql/steps.go index 3e9dfe00..5de40998 100644 --- a/tests/scenario/sql/steps.go +++ b/tests/scenario/sql/steps.go @@ -1,6 +1,7 @@ package sql import ( + "bytes" "context" "fmt" "os" @@ -11,6 +12,7 @@ import ( "github.com/posthog/duckgres/tests/scenario/core" "github.com/posthog/duckgres/tests/scenario/provision" + "gopkg.in/yaml.v3" ) const ( @@ -59,6 +61,18 @@ type querySpec struct { Catalog string } +type catalogFile struct { + Name string `yaml:"name"` + Description string `yaml:"description"` + Queries []catalogFileQuery `yaml:"queries"` +} + +type catalogFileQuery struct { + ID string `yaml:"id"` + SQL string `yaml:"sql"` + Catalog string `yaml:"catalog"` +} + func NewExecutor(cfg ExecutorConfig) *Executor { driver := cfg.Driver if driver == nil { @@ -148,6 +162,11 @@ func (e *Executor) executeQuery(ctx context.Context, step core.Step, spec queryS for attempts = 1; attempts <= retry.MaxAttempts; attempts++ { result, err := e.driver.Execute(ctx, req) if err == nil { + if minRows, ok, err := intFromWith(step, "min_rows"); err != nil { + return err + } else if ok && result.Rows < int64(minRows) { + return classified(ErrorClassSQL, fmt.Errorf("execute SQL step %s query %s returned %d rows, want at least %d", step.ID, spec.ID, result.Rows, minRows)) + } e.state.StoreResult(StepResult{ StepID: resultID, QueryID: spec.ID, @@ -224,9 +243,16 @@ func parseSQLStep(step core.Step) (querySpec, error) { } func parseCatalogStep(step core.Step) ([]querySpec, error) { + file := stringFromWith(step, "file", "") + if file != "" { + if _, ok := step.With["queries"]; ok { + return nil, invalidStep(step.ID, "with.file and with.queries are mutually exclusive") + } + return parseCatalogFile(step, file) + } raw, ok := step.With["queries"] if !ok { - return nil, invalidStep(step.ID, "with.queries is required") + return nil, invalidStep(step.ID, "with.queries or with.file is required") } items, ok := raw.([]any) if !ok || len(items) == 0 { @@ -252,9 +278,48 @@ func parseCatalogStep(step core.Step) ([]querySpec, error) { return specs, nil } +func parseCatalogFile(step core.Step, file string) ([]querySpec, error) { + raw, err := os.ReadFile(file) + if err != nil { + return nil, invalidStep(step.ID, "read catalog file %s: %v", file, err) + } + var catalog catalogFile + dec := yaml.NewDecoder(bytes.NewReader(raw)) + dec.KnownFields(true) + if err := dec.Decode(&catalog); err != nil { + return nil, invalidStep(step.ID, "parse catalog file %s: %v", file, err) + } + if len(catalog.Queries) == 0 { + return nil, invalidStep(step.ID, "catalog file %s must contain at least one query", file) + } + defaultCatalog := stringFromWith(step, "catalog", "ducklake") + specs := make([]querySpec, 0, len(catalog.Queries)) + for i, item := range catalog.Queries { + item.ID = strings.TrimSpace(item.ID) + item.SQL = strings.TrimSpace(item.SQL) + item.Catalog = strings.TrimSpace(item.Catalog) + if item.ID == "" { + return nil, invalidStep(step.ID, "catalog file %s queries[%d].id must be non-empty", file, i) + } + if item.SQL == "" { + return nil, invalidStep(step.ID, "catalog file %s queries[%d].sql must be non-empty", file, i) + } + catalogName := item.Catalog + if catalogName == "" { + catalogName = defaultCatalog + } + specs = append(specs, querySpec{ID: item.ID, SQL: item.SQL, Catalog: catalogName}) + } + return specs, nil +} + func sqlFromStep(step core.Step) (string, error) { if sqlText := stringFromWith(step, "sql", ""); strings.TrimSpace(sqlText) != "" { - return sqlText, nil + resolved, err := core.ResolveEnvTemplates(sqlText) + if err != nil { + return "", invalidStep(step.ID, "%v", err) + } + return resolved, nil } file := stringFromWith(step, "file", "") if file == "" { @@ -267,7 +332,11 @@ func sqlFromStep(step core.Step) (string, error) { if strings.TrimSpace(string(raw)) == "" { return "", invalidStep(step.ID, "SQL file %s is empty", file) } - return string(raw), nil + resolved, err := core.ResolveEnvTemplates(string(raw)) + if err != nil { + return "", invalidStep(step.ID, "%v", err) + } + return resolved, nil } func retryConfigForStep(step core.Step, base RetryConfig) (RetryConfig, error) { diff --git a/tests/scenario/sql/steps_test.go b/tests/scenario/sql/steps_test.go index a378a0ec..636f80ca 100644 --- a/tests/scenario/sql/steps_test.go +++ b/tests/scenario/sql/steps_test.go @@ -3,6 +3,8 @@ package sql import ( "context" "errors" + "os" + "path/filepath" "strings" "testing" "time" @@ -122,6 +124,50 @@ func TestExecutorDoesNotRetryNonTransientSQLErrors(t *testing.T) { } } +func TestExecutorFailsWhenSQLReturnsFewerThanMinRows(t *testing.T) { + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + Port: 5432, + SSLMode: "require", + }, + Driver: &fakeDriver{ + executeFunc: func(context.Context, QueryRequest) (QueryResult, error) { + return QueryResult{Rows: 0}, nil + }, + }, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "validate_manifest", + Type: StepTypeSQL, + With: map[string]any{ + "org_id": "scenario-org", + "catalog": "ducklake", + "sql": "SELECT 1 WHERE false", + "min_rows": 1, + }, + }) + if err == nil { + t.Fatal("expected min_rows assertion failure") + } + if !strings.Contains(err.Error(), "returned 0 rows, want at least 1") { + t.Fatalf("error = %v, want min_rows message", err) + } + var classified core.ClassifiedError + if !errors.As(err, &classified) || classified.ErrorClass() != ErrorClassSQL { + t.Fatalf("error = %T %v, want class %q", err, err, ErrorClassSQL) + } +} + func TestExecutorRunsInlineSQLCatalog(t *testing.T) { provisionState := provision.NewState() provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ @@ -173,6 +219,134 @@ func TestExecutorRunsInlineSQLCatalog(t *testing.T) { } } +func TestExecutorRunsSQLCatalogFile(t *testing.T) { + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + catalogFile := filepath.Join(t.TempDir(), "metadata_catalog.yaml") + if err := os.WriteFile(catalogFile, []byte(` +name: metadata-smoke +queries: + - id: schemata + sql: SELECT schema_name FROM information_schema.schemata + - id: tables + catalog: ducklake + sql: SELECT table_name FROM information_schema.tables +`), 0o644); err != nil { + t.Fatalf("write catalog file: %v", err) + } + + var queryIDs []string + driver := &fakeDriver{ + executeFunc: func(_ context.Context, req QueryRequest) (QueryResult, error) { + queryIDs = append(queryIDs, req.QueryID) + return QueryResult{Rows: 1}, nil + }, + } + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + Port: 5432, + SSLMode: "require", + }, + Driver: driver, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "metadata", + Type: StepTypeSQLCatalog, + With: map[string]any{ + "org_id": "scenario-org", + "file": catalogFile, + }, + }) + if err != nil { + t.Fatalf("ExecuteStep returned error: %v", err) + } + if got := strings.Join(queryIDs, ","); got != "schemata,tables" { + t.Fatalf("query IDs = %q, want schemata,tables", got) + } + if _, ok := executor.State().Result("metadata/schemata"); !ok { + t.Fatal("expected schemata result") + } + if _, ok := executor.State().Result("metadata/tables"); !ok { + t.Fatal("expected tables result") + } +} + +func TestRepositoryMetadataCatalogFileParses(t *testing.T) { + step := core.Step{ + ID: "metadata", + Type: StepTypeSQLCatalog, + With: map[string]any{ + "catalog": "ducklake", + "file": filepath.Join("metadata_catalog.yaml"), + }, + } + specs, err := parseCatalogStep(step) + if err != nil { + t.Fatalf("parseCatalogStep returned error: %v", err) + } + if len(specs) == 0 { + t.Fatal("expected repository metadata catalog to contain queries") + } +} + +func TestExecutorTemplatesSQLFileEnvVars(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "s3://example-frozen/frozen_v1/") + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + sqlFile := filepath.Join(t.TempDir(), "setup.sql") + if err := os.WriteFile(sqlFile, []byte("SELECT '${env:DUCKGRES_SCENARIO_FROZEN_S3_URI}persons/*.parquet'"), 0o644); err != nil { + t.Fatalf("write SQL file: %v", err) + } + + var gotSQL string + driver := &fakeDriver{ + executeFunc: func(_ context.Context, req QueryRequest) (QueryResult, error) { + gotSQL = req.SQL + return QueryResult{Rows: 1}, nil + }, + } + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + Port: 5432, + SSLMode: "require", + }, + Driver: driver, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "setup_frozen", + Type: StepTypeSQL, + With: map[string]any{ + "org_id": "scenario-org", + "file": sqlFile, + }, + }) + if err != nil { + t.Fatalf("ExecuteStep returned error: %v", err) + } + if !strings.Contains(gotSQL, "s3://example-frozen/frozen_v1/persons/*.parquet") { + t.Fatalf("SQL = %q, want templated frozen S3 URI", gotSQL) + } + if strings.Contains(gotSQL, "${env:") { + t.Fatalf("SQL still contains env template: %q", gotSQL) + } +} + func TestExecutorUsesProvisionCredentialsInDSN(t *testing.T) { provisionState := provision.NewState() provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{