diff --git a/docs/runbooks/scenario-runner.md b/docs/runbooks/scenario-runner.md index 04d73356..57686cdf 100644 --- a/docs/runbooks/scenario-runner.md +++ b/docs/runbooks/scenario-runner.md @@ -4,7 +4,7 @@ The scenario runner executes end-to-end managed-warehouse flows against a configured dev environment. The first smoke scenario provisions a warehouse, waits for readiness, runs `SELECT 1` over PGWire with managed-hostname SNI, then deprovisions and verifies cleanup. -The frozen metadata and perf scenarios provision the same fresh dev warehouse shape, create read-only views over frozen persons/events parquet supplied by `DUCKGRES_SCENARIO_FROZEN_S3_URI`, run their workload, then deprovision. +The frozen metadata, perf, and dbt scenarios provision the same fresh dev warehouse shape, create read-only views over frozen persons/events parquet supplied by `DUCKGRES_SCENARIO_FROZEN_S3_URI`, run their workload, then deprovision. ## Required Environment @@ -26,6 +26,7 @@ export DUCKGRES_SCENARIO_OUTPUT_BASE="artifacts/scenario" export DUCKGRES_SCENARIO_RUN_ID="scenario-smoke-manual" export DUCKGRES_SCENARIO_PG_PORT="5432" export DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT="10" +export DUCKGRES_SCENARIO_DBT_BIN="dbt" export DUCKGRES_SCENARIO_MAX_RUNTIME="30m" export DUCKGRES_SCENARIO_GO_TEST_TIMEOUT="60m" ``` @@ -71,6 +72,12 @@ Run frozen perf queries: just scenario-frozen-perf ``` +Run frozen dbt lifecycle: + +```bash +just scenario-frozen-dbt +``` + Run a specific scenario file: ```bash @@ -92,6 +99,13 @@ The frozen perf scenario uses: Perf artifacts are written under `artifacts/scenario//perf/` using the existing `tests/perf/core` artifact schema, including `query_results.csv`, `summary.json`, and `server_metrics.prom`. +The frozen dbt scenario uses: + +- `tests/scenario/scenarios/posthog_frozen_dbt.yaml` +- `tests/scenario/dbt/posthog_frozen_project/` + +dbt artifacts are written under `artifacts/scenario//dbt/`, including per-command stdout/stderr logs, `target/` artifacts, and dbt logs. Install `dbt-postgres` locally or set `DUCKGRES_SCENARIO_DBT_BIN` to the dbt executable to use. + `DUCKGRES_SCENARIO_FROZEN_S3_URI` must point at a dev-owned frozen dataset prefix with `persons/` and `events/` parquet children. The provisioned Duckgres worker role also needs read/list access to that prefix; the runner process only supplies the URI, while the worker performs the S3 reads during `read_parquet`. diff --git a/justfile b/justfile index 63eddb55..82a246b2 100644 --- a/justfile +++ b/justfile @@ -384,6 +384,11 @@ scenario-frozen-metadata: scenario-frozen-perf: ./scripts/scenario_run.sh tests/scenario/scenarios/posthog_frozen_perf.yaml +# Run the dev frozen dataset dbt scenario +[group('test')] +scenario-frozen-dbt: + ./scripts/scenario_run.sh tests/scenario/scenarios/posthog_frozen_dbt.yaml + # Lint (matches CI — uses golangci-lint, not go vet) [group('test')] lint: diff --git a/scripts/scenario_run.sh b/scripts/scenario_run.sh index a3a62ae1..39181703 100755 --- a/scripts/scenario_run.sh +++ b/scripts/scenario_run.sh @@ -20,6 +20,7 @@ Optional environment: DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT DUCKGRES_SCENARIO_FLIGHT_ADDR (required by frozen perf scenarios) DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY + DUCKGRES_SCENARIO_DBT_BIN DUCKGRES_SCENARIO_MAX_RUNTIME DUCKGRES_SCENARIO_GO_TEST_TIMEOUT DUCKGRES_SCENARIO_FROZEN_S3_URI (required by frozen dataset scenarios) diff --git a/tests/scenario/dbt/errors.go b/tests/scenario/dbt/errors.go new file mode 100644 index 00000000..c2f12f8b --- /dev/null +++ b/tests/scenario/dbt/errors.go @@ -0,0 +1,28 @@ +package dbt + +const ( + ErrorClassConfig = "dbt_config" + ErrorClassDBT = "dbt_execution" + ErrorClassUnsupportedStep = "unsupported_step" +) + +type classifiedError struct { + class string + err error +} + +func (e classifiedError) Error() string { + return e.err.Error() +} + +func (e classifiedError) Unwrap() error { + return e.err +} + +func (e classifiedError) ErrorClass() string { + return e.class +} + +func classified(class string, err error) error { + return classifiedError{class: class, err: err} +} diff --git a/tests/scenario/dbt/posthog_frozen_project/dbt_project.yml b/tests/scenario/dbt/posthog_frozen_project/dbt_project.yml new file mode 100644 index 00000000..9fd96002 --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/dbt_project.yml @@ -0,0 +1,13 @@ +name: posthog_frozen_scenario +version: "1.0.0" +profile: duckgres_scenario + +model-paths: ["models"] +test-paths: ["tests"] + +models: + posthog_frozen_scenario: + staging: + +materialized: view + marts: + +materialized: table diff --git a/tests/scenario/dbt/posthog_frozen_project/models/marts/daily_event_counts.sql b/tests/scenario/dbt/posthog_frozen_project/models/marts/daily_event_counts.sql new file mode 100644 index 00000000..18ae9585 --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/marts/daily_event_counts.sql @@ -0,0 +1,6 @@ +SELECT + date_trunc('day', event_timestamp) AS event_day, + event, + count(*) AS events +FROM {{ ref('stg_events') }} +GROUP BY 1, 2 diff --git a/tests/scenario/dbt/posthog_frozen_project/models/marts/person_event_rollup.sql b/tests/scenario/dbt/posthog_frozen_project/models/marts/person_event_rollup.sql new file mode 100644 index 00000000..419ba36d --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/marts/person_event_rollup.sql @@ -0,0 +1,8 @@ +SELECT + person_id, + count(*) AS events, + min(event_timestamp) AS first_event_timestamp, + max(event_timestamp) AS last_event_timestamp +FROM {{ ref('stg_events') }} +WHERE person_id IS NOT NULL +GROUP BY 1 diff --git a/tests/scenario/dbt/posthog_frozen_project/models/schema.yml b/tests/scenario/dbt/posthog_frozen_project/models/schema.yml new file mode 100644 index 00000000..2eecb610 --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/schema.yml @@ -0,0 +1,38 @@ +version: 2 + +models: + - name: stg_persons_daily + columns: + - name: person_day + tests: + - not_null + - name: persons + tests: + - not_null + + - name: stg_events + columns: + - name: event + tests: + - not_null + - name: event_timestamp + tests: + - not_null + + - name: daily_event_counts + columns: + - name: event_day + tests: + - not_null + - name: events + tests: + - not_null + + - name: person_event_rollup + columns: + - name: person_id + tests: + - not_null + - name: events + tests: + - not_null diff --git a/tests/scenario/dbt/posthog_frozen_project/models/sources.yml b/tests/scenario/dbt/posthog_frozen_project/models/sources.yml new file mode 100644 index 00000000..55a22057 --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/sources.yml @@ -0,0 +1,8 @@ +version: 2 + +sources: + - name: frozen_v1 + schema: frozen_v1 + tables: + - name: persons_file_view + - name: events_file_view diff --git a/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_events.sql b/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_events.sql new file mode 100644 index 00000000..7dda6c3c --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_events.sql @@ -0,0 +1,6 @@ +SELECT + event, + person_id, + "timestamp" AS event_timestamp +FROM {{ source('frozen_v1', 'events_file_view') }} +WHERE "timestamp" IS NOT NULL diff --git a/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_persons_daily.sql b/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_persons_daily.sql new file mode 100644 index 00000000..80a017b1 --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/models/staging/stg_persons_daily.sql @@ -0,0 +1,6 @@ +SELECT + date_trunc('day', _timestamp) AS person_day, + count(*) AS persons +FROM {{ source('frozen_v1', 'persons_file_view') }} +WHERE _timestamp IS NOT NULL +GROUP BY 1 diff --git a/tests/scenario/dbt/posthog_frozen_project/profiles.yml b/tests/scenario/dbt/posthog_frozen_project/profiles.yml new file mode 100644 index 00000000..5f23bfed --- /dev/null +++ b/tests/scenario/dbt/posthog_frozen_project/profiles.yml @@ -0,0 +1,14 @@ +duckgres_scenario: + target: dev + outputs: + dev: + type: postgres + host: "{{ env_var('DUCKGRES_DBT_HOST') }}" + port: "{{ env_var('DUCKGRES_DBT_PORT') | int }}" + user: "{{ env_var('DUCKGRES_DBT_USER') }}" + pass: "{{ env_var('DBT_ENV_SECRET_DUCKGRES_PASSWORD') }}" + dbname: "{{ env_var('DUCKGRES_DBT_DBNAME') }}" + schema: "{{ env_var('DUCKGRES_DBT_SCHEMA') }}" + sslmode: "{{ env_var('DUCKGRES_DBT_SSLMODE', 'require') }}" + connect_timeout: "{{ env_var('DUCKGRES_DBT_CONNECT_TIMEOUT', '10') | int }}" + threads: 1 diff --git a/tests/scenario/dbt/runner_test.go b/tests/scenario/dbt/runner_test.go new file mode 100644 index 00000000..cde4ef0f --- /dev/null +++ b/tests/scenario/dbt/runner_test.go @@ -0,0 +1,249 @@ +package dbt + +import ( + "context" + "errors" + "os" + "path/filepath" + "reflect" + "strings" + "testing" + + "github.com/posthog/duckgres/tests/scenario/core" + "github.com/posthog/duckgres/tests/scenario/provision" + scenariosql "github.com/posthog/duckgres/tests/scenario/sql" +) + +func TestExecutorRunsCommandsCapturesLogsAndCopiesTargetArtifacts(t *testing.T) { + projectDir := writeDBTProject(t) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + runner := &fakeCommandRunner{ + onRun: func(req CommandRequest) CommandResult { + if req.CommandName == "run" { + targetDir := argValue(req.Args, "--target-path") + if targetDir == "" { + t.Fatal("missing --target-path") + } + if err := os.MkdirAll(targetDir, 0o755); err != nil { + t.Fatalf("create target dir: %v", err) + } + if err := os.WriteFile(filepath.Join(targetDir, "run_results.json"), []byte(`{"metadata":{}}`), 0o644); err != nil { + t.Fatalf("write run_results: %v", err) + } + } + return CommandResult{ + Stdout: "connected with root-password", + Stderr: "warning root-password", + } + }, + } + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + Port: 5432, + SSLMode: "require", + ConnectTimeout: 10, + ApplicationName: "duckgres-scenario-runner", + }, + OutputDir: t.TempDir(), + CommandRunner: runner, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "dbt_models", + Type: StepTypeDBTRun, + With: map[string]any{ + "org_id": "scenario-org", + "project_dir": projectDir, + "commands": []any{"debug", "run", "test", "docs_generate"}, + }, + }) + if err != nil { + t.Fatalf("ExecuteStep returned error: %v", err) + } + if got := runner.commandNames(); !reflect.DeepEqual(got, []string{"debug", "run", "test", "docs_generate"}) { + t.Fatalf("commands = %#v, want debug/run/test/docs_generate", got) + } + first := runner.requests[0] + if first.Dir != projectDir { + t.Fatalf("command dir = %q, want project dir", first.Dir) + } + if !containsAll(first.Args, "--project-dir", projectDir, "--profiles-dir", projectDir) { + t.Fatalf("dbt args = %#v, want project/profiles dirs", first.Args) + } + if targetPath := argValue(first.Args, "--target-path"); targetPath != filepath.Join(executor.OutputDir(), "dbt", "target") { + t.Fatalf("target path = %q", targetPath) + } + if logPath := argValue(first.Args, "--log-path"); logPath != filepath.Join(executor.OutputDir(), "dbt", "logs") { + t.Fatalf("log path = %q", logPath) + } + if envValue(first.Env, "DUCKGRES_DBT_HOST") != "scenario-org.dev.example" { + t.Fatalf("DUCKGRES_DBT_HOST = %q", envValue(first.Env, "DUCKGRES_DBT_HOST")) + } + if envValue(first.Env, "PGHOSTADDR") != "10.0.0.10" { + t.Fatalf("PGHOSTADDR = %q", envValue(first.Env, "PGHOSTADDR")) + } + if envValue(first.Env, "DBT_ENV_SECRET_DUCKGRES_PASSWORD") != "root-password" { + t.Fatal("expected command environment to include provision password as a dbt secret") + } + if envValue(first.Env, "DUCKGRES_DBT_PASSWORD") != "" { + t.Fatal("expected command environment to avoid non-secret dbt password variable") + } + + dbtDir := filepath.Join(executor.OutputDir(), "dbt") + for _, path := range []string{ + filepath.Join(dbtDir, "debug.stdout.log"), + filepath.Join(dbtDir, "run.stderr.log"), + filepath.Join(dbtDir, "target", "run_results.json"), + } { + if _, err := os.Stat(path); err != nil { + t.Fatalf("expected artifact %s: %v", path, err) + } + } + logBytes, err := os.ReadFile(filepath.Join(dbtDir, "debug.stdout.log")) + if err != nil { + t.Fatalf("read dbt stdout log: %v", err) + } + if strings.Contains(string(logBytes), "root-password") || !strings.Contains(string(logBytes), "[REDACTED]") { + t.Fatalf("stdout log was not redacted: %q", string(logBytes)) + } + result, ok := executor.State().Result("dbt_models") + if !ok { + t.Fatal("expected dbt result to be recorded") + } + if result.CommandsRun != 4 || result.OutputDir != dbtDir { + t.Fatalf("result = %+v", result) + } +} + +func TestExecutorStopsAfterFailedDBTCommand(t *testing.T) { + projectDir := writeDBTProject(t) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + runner := &fakeCommandRunner{ + onRun: func(req CommandRequest) CommandResult { + if req.CommandName == "run" { + return CommandResult{ExitCode: 2, Stderr: "database error"} + } + return CommandResult{} + }, + } + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + SSLMode: "require", + }, + OutputDir: t.TempDir(), + CommandRunner: runner, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "dbt_models", + Type: StepTypeDBTRun, + With: map[string]any{ + "org_id": "scenario-org", + "project_dir": projectDir, + "commands": []any{"debug", "run", "test"}, + }, + }) + if err == nil { + t.Fatal("expected dbt run failure") + } + if !strings.Contains(err.Error(), "dbt command run failed with exit code 2") { + t.Fatalf("error = %v, want failed command", err) + } + var classified core.ClassifiedError + if !errors.As(err, &classified) || classified.ErrorClass() != ErrorClassDBT { + t.Fatalf("error = %T %v, want class %q", err, err, ErrorClassDBT) + } + if got := runner.commandNames(); !reflect.DeepEqual(got, []string{"debug", "run"}) { + t.Fatalf("commands = %#v, want debug/run only", got) + } +} + +func writeDBTProject(t *testing.T) string { + t.Helper() + projectDir := t.TempDir() + if err := os.MkdirAll(filepath.Join(projectDir, "models"), 0o755); err != nil { + t.Fatalf("create models dir: %v", err) + } + for path, body := range map[string]string{ + "dbt_project.yml": "name: scenario_dbt\nversion: '1.0'\nprofile: duckgres_scenario\nmodel-paths: [models]\n", + "profiles.yml": "duckgres_scenario:\n target: dev\n outputs:\n dev:\n type: postgres\n host: \"{{ env_var('DUCKGRES_DBT_HOST') }}\"\n", + "models/model.sql": "SELECT 1 AS id\n", + } { + if err := os.WriteFile(filepath.Join(projectDir, path), []byte(body), 0o644); err != nil { + t.Fatalf("write %s: %v", path, err) + } + } + return projectDir +} + +type fakeCommandRunner struct { + requests []CommandRequest + onRun func(CommandRequest) CommandResult +} + +func (r *fakeCommandRunner) Run(_ context.Context, req CommandRequest) CommandResult { + r.requests = append(r.requests, req) + if r.onRun != nil { + return r.onRun(req) + } + return CommandResult{} +} + +func (r *fakeCommandRunner) commandNames() []string { + names := make([]string, 0, len(r.requests)) + for _, req := range r.requests { + names = append(names, req.CommandName) + } + return names +} + +func containsAll(values []string, wants ...string) bool { + for _, want := range wants { + found := false + for _, value := range values { + if value == want { + found = true + break + } + } + if !found { + return false + } + } + return true +} + +func envValue(env []string, key string) string { + prefix := key + "=" + for _, item := range env { + if strings.HasPrefix(item, prefix) { + return strings.TrimPrefix(item, prefix) + } + } + return "" +} + +func argValue(args []string, key string) string { + for i := 0; i < len(args)-1; i++ { + if args[i] == key { + return args[i+1] + } + } + return "" +} diff --git a/tests/scenario/dbt/steps.go b/tests/scenario/dbt/steps.go new file mode 100644 index 00000000..efb7859f --- /dev/null +++ b/tests/scenario/dbt/steps.go @@ -0,0 +1,380 @@ +package dbt + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + + "github.com/posthog/duckgres/tests/scenario/core" + "github.com/posthog/duckgres/tests/scenario/provision" + scenariosql "github.com/posthog/duckgres/tests/scenario/sql" +) + +const StepTypeDBTRun = "dbt_run" + +type CommandRunner interface { + Run(context.Context, CommandRequest) CommandResult +} + +type CommandRequest struct { + Binary string + CommandName string + Args []string + Env []string + Dir string +} + +type CommandResult struct { + ExitCode int + Stdout string + Stderr string + Err error +} + +type ExecutorConfig struct { + ProvisionState *provision.State + Connection scenariosql.ConnectionConfig + OutputDir string + DBTBinary string + CommandRunner CommandRunner + State *State +} + +type Executor struct { + provisionState *provision.State + connection scenariosql.ConnectionConfig + outputDir string + dbtBinary string + commandRunner CommandRunner + state *State +} + +type State struct { + mu sync.Mutex + results map[string]StepResult +} + +type StepResult struct { + StepID string + OutputDir string + CommandsRun int +} + +type stepSpec struct { + OrgID string + Username string + Password string + ProjectDir string + ProfilesDir string + OutputSubdir string + DBTBinary string + Database string + Schema string + SSLMode string + ConnectTimeout int + Commands []commandSpec +} + +type commandSpec struct { + Name string + Args []string +} + +type defaultCommandRunner struct{} + +func NewExecutor(cfg ExecutorConfig) *Executor { + runner := cfg.CommandRunner + if runner == nil { + runner = defaultCommandRunner{} + } + state := cfg.State + if state == nil { + state = NewState() + } + dbtBinary := cfg.DBTBinary + if dbtBinary == "" { + dbtBinary = "dbt" + } + return &Executor{ + provisionState: cfg.ProvisionState, + connection: cfg.Connection, + outputDir: cfg.OutputDir, + dbtBinary: dbtBinary, + commandRunner: runner, + state: state, + } +} + +func NewState() *State { + return &State{results: make(map[string]StepResult)} +} + +func (e *Executor) State() *State { + return e.state +} + +func (e *Executor) OutputDir() string { + return e.outputDir +} + +func (s *State) StoreResult(result StepResult) { + s.mu.Lock() + defer s.mu.Unlock() + s.results[result.StepID] = result +} + +func (s *State) Result(stepID string) (StepResult, bool) { + s.mu.Lock() + defer s.mu.Unlock() + result, ok := s.results[stepID] + return result, ok +} + +func (e *Executor) ExecuteStep(ctx context.Context, step core.Step) error { + if step.Type != StepTypeDBTRun { + return classified(ErrorClassUnsupportedStep, fmt.Errorf("unsupported dbt step type %q", step.Type)) + } + spec, err := e.parseStep(step) + if err != nil { + return err + } + dbtDir := filepath.Join(e.outputDir, spec.OutputSubdir) + if err := os.MkdirAll(dbtDir, 0o755); err != nil { + return classified(ErrorClassConfig, fmt.Errorf("create dbt artifact dir: %w", err)) + } + + commandsRun := 0 + for _, command := range spec.Commands { + req := CommandRequest{ + Binary: spec.DBTBinary, + CommandName: command.Name, + Args: append(append([]string{}, command.Args...), + "--project-dir", spec.ProjectDir, + "--profiles-dir", spec.ProfilesDir, + "--target-path", filepath.Join(dbtDir, "target"), + "--log-path", filepath.Join(dbtDir, "logs"), + ), + Env: e.commandEnv(spec), + Dir: spec.ProjectDir, + } + result := e.commandRunner.Run(ctx, req) + commandsRun++ + if err := writeCommandLogs(dbtDir, command.Name, result, spec.Password); err != nil { + return err + } + if result.Err != nil { + e.state.StoreResult(StepResult{StepID: step.ID, OutputDir: dbtDir, CommandsRun: commandsRun}) + return classified(ErrorClassDBT, fmt.Errorf("dbt command %s failed: %w", command.Name, result.Err)) + } + if result.ExitCode != 0 { + e.state.StoreResult(StepResult{StepID: step.ID, OutputDir: dbtDir, CommandsRun: commandsRun}) + return classified(ErrorClassDBT, fmt.Errorf("dbt command %s failed with exit code %d", command.Name, result.ExitCode)) + } + } + e.state.StoreResult(StepResult{StepID: step.ID, OutputDir: dbtDir, CommandsRun: commandsRun}) + return nil +} + +func (e *Executor) parseStep(step core.Step) (stepSpec, error) { + orgID, err := requiredString(step, "org_id") + if err != nil { + return stepSpec{}, err + } + projectDir, err := requiredString(step, "project_dir") + if err != nil { + return stepSpec{}, err + } + if e.outputDir == "" { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("dbt output dir is required")) + } + + username := stringFromWith(step, "username", "root") + password := stringFromWith(step, "password", "") + if password == "" { + if e.provisionState == nil { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("provision state is required when with.password is omitted")) + } + resp, ok := e.provisionState.ProvisionResponse(orgID) + if !ok { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("no provision response found for org %q", orgID)) + } + if resp.Username != "" { + username = resp.Username + } + password = resp.Password + } + + commands, err := commandsFromStep(step) + if err != nil { + return stepSpec{}, err + } + return stepSpec{ + OrgID: orgID, + Username: username, + Password: password, + ProjectDir: projectDir, + ProfilesDir: stringFromWith(step, "profiles_dir", projectDir), + OutputSubdir: stringFromWith(step, "output_subdir", "dbt"), + DBTBinary: stringFromWith(step, "dbt_bin", e.dbtBinary), + Database: stringFromWith(step, "catalog", "ducklake"), + Schema: stringFromWith(step, "schema", "dbt_models"), + SSLMode: stringFromWith(step, "sslmode", "require"), + ConnectTimeout: intFromWith(step, "connect_timeout", e.connection.ConnectTimeout), + Commands: commands, + }, nil +} + +func (e *Executor) commandEnv(spec stepSpec) []string { + port := e.connection.Port + if port == 0 { + port = 5432 + } + connectTimeout := spec.ConnectTimeout + if connectTimeout == 0 { + connectTimeout = 10 + } + return []string{ + "DUCKGRES_DBT_HOST=" + spec.OrgID + e.connection.SNISuffix, + "DUCKGRES_DBT_HOSTADDR=" + e.connection.HostAddr, + "DUCKGRES_DBT_PORT=" + strconv.Itoa(port), + "DUCKGRES_DBT_USER=" + spec.Username, + "DBT_ENV_SECRET_DUCKGRES_PASSWORD=" + spec.Password, + "DUCKGRES_DBT_DBNAME=" + spec.Database, + "DUCKGRES_DBT_SCHEMA=" + spec.Schema, + "DUCKGRES_DBT_SSLMODE=" + spec.SSLMode, + "DUCKGRES_DBT_CONNECT_TIMEOUT=" + strconv.Itoa(connectTimeout), + "PGHOSTADDR=" + e.connection.HostAddr, + } +} + +func commandsFromStep(step core.Step) ([]commandSpec, error) { + raw, ok := step.With["commands"] + if !ok { + return []commandSpec{ + {Name: "debug", Args: []string{"debug"}}, + {Name: "run", Args: []string{"run"}}, + {Name: "test", Args: []string{"test"}}, + {Name: "docs_generate", Args: []string{"docs", "generate"}}, + }, nil + } + items, ok := raw.([]any) + if !ok || len(items) == 0 { + return nil, classified(ErrorClassConfig, fmt.Errorf("step %s with.commands must be a non-empty list", step.ID)) + } + commands := make([]commandSpec, 0, len(items)) + for i, item := range items { + name, ok := item.(string) + if !ok || strings.TrimSpace(name) == "" { + return nil, classified(ErrorClassConfig, fmt.Errorf("step %s with.commands[%d] must be a non-empty string", step.ID, i)) + } + command, err := parseCommandName(strings.TrimSpace(name)) + if err != nil { + return nil, classified(ErrorClassConfig, fmt.Errorf("step %s with.commands[%d]: %w", step.ID, i, err)) + } + commands = append(commands, command) + } + return commands, nil +} + +func parseCommandName(name string) (commandSpec, error) { + switch name { + case "debug": + return commandSpec{Name: name, Args: []string{"debug"}}, nil + case "run": + return commandSpec{Name: name, Args: []string{"run"}}, nil + case "test": + return commandSpec{Name: name, Args: []string{"test"}}, nil + case "docs_generate", "docs generate": + return commandSpec{Name: "docs_generate", Args: []string{"docs", "generate"}}, nil + default: + return commandSpec{}, fmt.Errorf("unsupported dbt command %q", name) + } +} + +func writeCommandLogs(dir, commandName string, result CommandResult, secret string) error { + for suffix, body := range map[string]string{ + "stdout": result.Stdout, + "stderr": result.Stderr, + } { + path := filepath.Join(dir, commandName+"."+suffix+".log") + redacted := redactSecret(body, secret) + if err := os.WriteFile(path, []byte(redacted), 0o644); err != nil { + return classified(ErrorClassConfig, fmt.Errorf("write dbt %s log: %w", suffix, err)) + } + } + return nil +} + +func redactSecret(text, secret string) string { + if secret == "" { + return text + } + return strings.ReplaceAll(text, secret, "[REDACTED]") +} + +func (defaultCommandRunner) Run(ctx context.Context, req CommandRequest) CommandResult { + cmd := exec.CommandContext(ctx, req.Binary, req.Args...) + cmd.Dir = req.Dir + cmd.Env = append(os.Environ(), req.Env...) + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + err := cmd.Run() + result := CommandResult{ + Stdout: stdout.String(), + Stderr: stderr.String(), + } + if err == nil { + return result + } + result.Err = err + if exitErr, ok := err.(*exec.ExitError); ok { + result.ExitCode = exitErr.ExitCode() + result.Err = nil + } + return result +} + +func requiredString(step core.Step, key string) (string, error) { + value, ok := step.With[key].(string) + if !ok || value == "" { + return "", classified(ErrorClassConfig, fmt.Errorf("step %s with.%s must be a non-empty string", step.ID, key)) + } + return value, nil +} + +func stringFromWith(step core.Step, key, fallback string) string { + value, ok := step.With[key].(string) + if !ok || value == "" { + return fallback + } + return value +} + +func intFromWith(step core.Step, key string, fallback int) int { + raw, ok := step.With[key] + if !ok { + return fallback + } + switch value := raw.(type) { + case int: + return value + case int64: + return int(value) + case float64: + return int(value) + case string: + parsed, err := strconv.Atoi(value) + if err == nil { + return parsed + } + } + return fallback +} diff --git a/tests/scenario/runner_test.go b/tests/scenario/runner_test.go index 96ba0de8..02ae205b 100644 --- a/tests/scenario/runner_test.go +++ b/tests/scenario/runner_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/posthog/duckgres/tests/scenario/core" + scenariodbt "github.com/posthog/duckgres/tests/scenario/dbt" scenarioperf "github.com/posthog/duckgres/tests/scenario/perf" "github.com/posthog/duckgres/tests/scenario/provision" scenariosql "github.com/posthog/duckgres/tests/scenario/sql" @@ -92,13 +93,26 @@ func TestScenarioRunner(t *testing.T) { FlightAddr: os.Getenv("DUCKGRES_SCENARIO_FLIGHT_ADDR"), FlightInsecureSkipVerify: boolEnv(t, "DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY", true), }) + dbtExecutor := scenariodbt.NewExecutor(scenariodbt.ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: mustEnv(t, "DUCKGRES_SCENARIO_PG_HOST"), + SNISuffix: mustEnv(t, "DUCKGRES_SCENARIO_SNI_SUFFIX"), + Port: intEnv(t, "DUCKGRES_SCENARIO_PG_PORT", 5432), + SSLMode: "require", + ConnectTimeout: intEnv(t, "DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT", 10), + ApplicationName: "duckgres-scenario-runner", + }, + OutputDir: scenarioOutputDir, + DBTBinary: envOrDefault("DUCKGRES_SCENARIO_DBT_BIN", "dbt"), + }) ctx, cancel := context.WithTimeout(context.Background(), *scenarioMaxRuntime) defer cancel() runner := core.NewRunner(core.RunnerConfig{ RunID: runID, Scenario: loaded, - Executor: dispatchExecutor{provision: provisionExecutor, sql: sqlExecutor, perf: perfExecutor}, + Executor: dispatchExecutor{provision: provisionExecutor, sql: sqlExecutor, perf: perfExecutor, dbt: dbtExecutor}, OutputDir: scenarioOutputDir, WriteFiles: true, CleanupTimeout: 15 * time.Minute, @@ -255,6 +269,43 @@ func TestFrozenPerfScenarioUsesSupportedStepsAndRelativeCatalog(t *testing.T) { } } +func TestFrozenDBTScenarioUsesSupportedStepsAndRelativeProject(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "s3://example-frozen/frozen_v1/") + + scenario, _, err := loadScenarioForRun(filepath.Join("scenarios", "posthog_frozen_dbt.yaml")) + if err != nil { + t.Fatalf("load frozen dbt scenario: %v", err) + } + resolved, err := resolveRunTemplates(scenario, "scenario-frozen-dbt-20260102t030405z") + if err != nil { + t.Fatalf("resolve templates: %v", err) + } + + foundDBT := false + for _, step := range resolved.Steps { + if !dispatchSupports(step.Type) { + t.Fatalf("step %s has unsupported type %q", step.ID, step.Type) + } + if containsTemplate(step.With) { + t.Fatalf("step %s still contains unresolved template values: %#v", step.ID, step.With) + } + if step.Type != scenariodbt.StepTypeDBTRun { + continue + } + foundDBT = true + projectDir, ok := step.With["project_dir"].(string) + if !ok || !filepath.IsAbs(projectDir) { + t.Fatalf("dbt project_dir = %#v, want absolute path", step.With["project_dir"]) + } + if _, err := os.Stat(filepath.Join(projectDir, "dbt_project.yml")); err != nil { + t.Fatalf("dbt project should exist at %q: %v", projectDir, err) + } + } + if !foundDBT { + t.Fatal("expected frozen dbt scenario to include a dbt_run step") + } +} + func TestResolveRunTemplatesRejectsMissingEnvTemplate(t *testing.T) { t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "") @@ -280,6 +331,7 @@ type dispatchExecutor struct { provision *provision.Executor sql *scenariosql.Executor perf *scenarioperf.Executor + dbt *scenariodbt.Executor } func (e dispatchExecutor) ExecuteStep(ctx context.Context, step core.Step) error { @@ -290,6 +342,8 @@ func (e dispatchExecutor) ExecuteStep(ctx context.Context, step core.Step) error return e.sql.ExecuteStep(ctx, step) case scenarioperf.StepTypePerfQueries: return e.perf.ExecuteStep(ctx, step) + case scenariodbt.StepTypeDBTRun: + return e.dbt.ExecuteStep(ctx, step) default: return fmt.Errorf("unsupported scenario step type %q", step.Type) } @@ -303,6 +357,8 @@ func dispatchSupports(stepType string) bool { return true case scenarioperf.StepTypePerfQueries: return true + case scenariodbt.StepTypeDBTRun: + return true default: return false } @@ -343,6 +399,14 @@ func boolEnv(t *testing.T, key string, fallback bool) bool { return parsed } +func envOrDefault(key, fallback string) string { + value := os.Getenv(key) + if value == "" { + return fallback + } + return value +} + func defaultRunID(s core.Scenario) string { prefix := s.RunIDPrefix if prefix == "" { @@ -373,7 +437,7 @@ func resolveScenarioFilePaths(s core.Scenario, baseDir string) core.Scenario { } with := make(map[string]any, len(step.With)) for k, v := range step.With { - if k == "file" || k == "catalog_file" { + if k == "file" || k == "catalog_file" || k == "project_dir" || k == "profiles_dir" { if file, ok := v.(string); ok && file != "" && !filepath.IsAbs(file) { v = filepath.Clean(filepath.Join(baseDir, file)) } diff --git a/tests/scenario/scenarios/posthog_frozen_dbt.yaml b/tests/scenario/scenarios/posthog_frozen_dbt.yaml new file mode 100644 index 00000000..61816343 --- /dev/null +++ b/tests/scenario/scenarios/posthog_frozen_dbt.yaml @@ -0,0 +1,52 @@ +name: posthog-frozen-dbt +run_id_prefix: scenario-frozen-dbt +required_env: + - DUCKGRES_SCENARIO_FROZEN_S3_URI +steps: + - id: provision + type: provision_warehouse + with: + org_id: scenario-frozen-dbt-${run_id_compact} + request: + database_name: scenario_frozen_dbt_${run_id_compact} + metadata_store: + type: cnpg-shard + ducklake: + enabled: true + data_store: + type: s3bucket + + - id: wait_ready + type: wait_warehouse_ready + with: + org_id: scenario-frozen-dbt-${run_id_compact} + timeout: 15m + poll_interval: 10s + + - id: setup_frozen_views + type: sql + with: + org_id: scenario-frozen-dbt-${run_id_compact} + catalog: ducklake + file: ../sql/setup_frozen_views.sql + max_attempts: 12 + retry_interval: 10s + + - id: dbt_models + type: dbt_run + with: + org_id: scenario-frozen-dbt-${run_id_compact} + catalog: ducklake + schema: dbt_frozen + project_dir: ../dbt/posthog_frozen_project + commands: [debug, run, test, docs_generate] + + - id: deprovision + type: deprovision_warehouse + depends_on: [dbt_models] + always_run: true + with: + org_id: scenario-frozen-dbt-${run_id_compact} + verify_deleted: true + cleanup_timeout: 15m + poll_interval: 10s