diff --git a/docs/runbooks/scenario-runner.md b/docs/runbooks/scenario-runner.md index 42705f14..04d73356 100644 --- a/docs/runbooks/scenario-runner.md +++ b/docs/runbooks/scenario-runner.md @@ -4,7 +4,7 @@ The scenario runner executes end-to-end managed-warehouse flows against a configured dev environment. The first smoke scenario provisions a warehouse, waits for readiness, runs `SELECT 1` over PGWire with managed-hostname SNI, then deprovisions and verifies cleanup. -The frozen metadata scenario provisions the same fresh dev warehouse shape, creates read-only views over frozen persons/events parquet supplied by `DUCKGRES_SCENARIO_FROZEN_S3_URI`, validates the dataset manifest row, runs lightweight metadata/user-exploration queries, then deprovisions. +The frozen metadata and perf scenarios provision the same fresh dev warehouse shape, create read-only views over frozen persons/events parquet supplied by `DUCKGRES_SCENARIO_FROZEN_S3_URI`, run their workload, then deprovision. ## Required Environment @@ -36,6 +36,13 @@ Frozen dataset scenarios additionally require: export DUCKGRES_SCENARIO_FROZEN_S3_URI="s3:///frozen_v1/" ``` +Frozen perf scenarios additionally require the Flight SQL address because the perf catalog exercises both PGWire and Flight: + +```bash +export DUCKGRES_SCENARIO_FLIGHT_ADDR="" +export DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY="true" +``` + Do not commit concrete dev endpoints, secrets, org IDs, or private bucket names. ## Run @@ -58,6 +65,12 @@ Run frozen metadata exploration: just scenario-frozen-metadata ``` +Run frozen perf queries: + +```bash +just scenario-frozen-perf +``` + Run a specific scenario file: ```bash @@ -72,6 +85,13 @@ The frozen metadata scenario uses: - `tests/scenario/sql/setup_frozen_views.sql` - `tests/scenario/sql/metadata_catalog.yaml` +The frozen perf scenario uses: + +- `tests/scenario/scenarios/posthog_frozen_perf.yaml` +- `tests/perf/queries/ducklake_frozen.yaml` + +Perf artifacts are written under `artifacts/scenario//perf/` using the existing `tests/perf/core` artifact schema, including `query_results.csv`, `summary.json`, and `server_metrics.prom`. + `DUCKGRES_SCENARIO_FROZEN_S3_URI` must point at a dev-owned frozen dataset prefix with `persons/` and `events/` parquet children. The provisioned Duckgres worker role also needs read/list access to that prefix; the runner process only supplies the URI, while the worker performs the S3 reads during `read_parquet`. diff --git a/justfile b/justfile index b139c19e..63eddb55 100644 --- a/justfile +++ b/justfile @@ -379,6 +379,11 @@ scenario-smoke: scenario-frozen-metadata: ./scripts/scenario_run.sh tests/scenario/scenarios/posthog_frozen_metadata.yaml +# Run the dev frozen dataset perf scenario +[group('test')] +scenario-frozen-perf: + ./scripts/scenario_run.sh tests/scenario/scenarios/posthog_frozen_perf.yaml + # Lint (matches CI — uses golangci-lint, not go vet) [group('test')] lint: diff --git a/scripts/scenario_run.sh b/scripts/scenario_run.sh index 608cd5ba..a3a62ae1 100755 --- a/scripts/scenario_run.sh +++ b/scripts/scenario_run.sh @@ -18,6 +18,8 @@ Optional environment: DUCKGRES_SCENARIO_RUN_ID DUCKGRES_SCENARIO_PG_PORT DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT + DUCKGRES_SCENARIO_FLIGHT_ADDR (required by frozen perf scenarios) + DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY DUCKGRES_SCENARIO_MAX_RUNTIME DUCKGRES_SCENARIO_GO_TEST_TIMEOUT DUCKGRES_SCENARIO_FROZEN_S3_URI (required by frozen dataset scenarios) @@ -75,12 +77,36 @@ root_relative_path() { esac } +scenario_file="$(root_relative_path "$scenario_file")" +output_base="$(root_relative_path "$output_base")" + +scenario_required_env() { + awk ' + /^[^[:space:]]/ { in_required = 0 } + /^required_env:[[:space:]]*$/ { in_required = 1; next } + in_required && /^[[:space:]]*-[[:space:]]*/ { + value = $0 + sub(/^[[:space:]]*-[[:space:]]*/, "", value) + gsub(/^["'\'']|["'\'']$/, "", value) + if (value != "") print value + } + ' "$1" +} + required=( DUCKGRES_SCENARIO_API_BASE DUCKGRES_SCENARIO_INTERNAL_SECRET DUCKGRES_SCENARIO_PG_HOST DUCKGRES_SCENARIO_SNI_SUFFIX ) +if [ -f "$scenario_file" ]; then + while IFS= read -r name; do + required+=("$name") + done < <(scenario_required_env "$scenario_file") +elif [ "$check_env_only" -eq 1 ]; then + echo "Scenario file not found: $scenario_file" >&2 + exit 2 +fi missing=() for name in "${required[@]}"; do if [ -z "${!name:-}" ]; then @@ -101,9 +127,6 @@ if [ "$check_env_only" -eq 1 ]; then exit 0 fi -scenario_file="$(root_relative_path "$scenario_file")" -output_base="$(root_relative_path "$output_base")" - args=( go test -count=1 ./tests/scenario -timeout "$go_test_timeout" diff --git a/tests/perf/drivers/flight/driver.go b/tests/perf/drivers/flight/driver.go index 0e535d1e..b1446802 100644 --- a/tests/perf/drivers/flight/driver.go +++ b/tests/perf/drivers/flight/driver.go @@ -24,17 +24,33 @@ type Driver struct { exec Executor } +type ConnectionConfig struct { + Addr string + ServerName string + Username string + Password string + InsecureSkipVerify bool +} + func NewWithExecutor(exec Executor) *Driver { return &Driver{exec: exec} } func NewFromAddress(addr, username, password string, insecureSkipVerify bool) (*Driver, error) { - tlsCfg := &tls.Config{InsecureSkipVerify: insecureSkipVerify} // test/perf env only + return NewFromConfig(ConnectionConfig{ + Addr: addr, + Username: username, + Password: password, + InsecureSkipVerify: insecureSkipVerify, + }) +} + +func NewFromConfig(cfg ConnectionConfig) (*Driver, error) { client, err := flightsql.NewClient( - addr, + cfg.Addr, nil, nil, - grpc.WithTransportCredentials(credentials.NewTLS(tlsCfg)), + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfigForConnection(cfg))), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(flightclient.MaxGRPCMessageSize), grpc.MaxCallSendMsgSize(flightclient.MaxGRPCMessageSize), @@ -44,7 +60,7 @@ func NewFromAddress(addr, username, password string, insecureSkipVerify bool) (* return nil, fmt.Errorf("create Flight SQL client: %w", err) } - token, err := bootstrapSessionToken(client, username, password) + token, err := bootstrapSessionToken(client, cfg.Username, cfg.Password) if err != nil { _ = client.Close() return nil, err @@ -58,6 +74,14 @@ func NewFromAddress(addr, username, password string, insecureSkipVerify bool) (* }, nil } +func tlsConfigForConnection(cfg ConnectionConfig) *tls.Config { + tlsCfg := &tls.Config{InsecureSkipVerify: cfg.InsecureSkipVerify} // test/perf env only + if cfg.ServerName != "" { + tlsCfg.ServerName = cfg.ServerName + } + return tlsCfg +} + func (d *Driver) Protocol() core.Protocol { return core.ProtocolFlight } diff --git a/tests/perf/drivers/flight/driver_test.go b/tests/perf/drivers/flight/driver_test.go index 366aaa80..1500f975 100644 --- a/tests/perf/drivers/flight/driver_test.go +++ b/tests/perf/drivers/flight/driver_test.go @@ -34,3 +34,16 @@ func TestDriverUsesDuckhogVariant(t *testing.T) { t.Fatalf("expected duckhog SQL, got %q", exec.lastQuery) } } + +func TestTLSConfigUsesExplicitServerName(t *testing.T) { + cfg := tlsConfigForConnection(ConnectionConfig{ + ServerName: "scenario-org.dev.example", + InsecureSkipVerify: true, + }) + if cfg.ServerName != "scenario-org.dev.example" { + t.Fatalf("server name = %q, want explicit managed hostname", cfg.ServerName) + } + if !cfg.InsecureSkipVerify { + t.Fatal("expected insecure skip verify to pass through") + } +} diff --git a/tests/scenario/perf/adapter_test.go b/tests/scenario/perf/adapter_test.go new file mode 100644 index 00000000..943e87e7 --- /dev/null +++ b/tests/scenario/perf/adapter_test.go @@ -0,0 +1,288 @@ +package perf + +import ( + "context" + "errors" + "os" + "path/filepath" + "strings" + "testing" + "time" + + perfcore "github.com/posthog/duckgres/tests/perf/core" + "github.com/posthog/duckgres/tests/scenario/core" + "github.com/posthog/duckgres/tests/scenario/provision" + scenariosql "github.com/posthog/duckgres/tests/scenario/sql" +) + +func TestExecutorRunsPerfStepAndWritesArtifacts(t *testing.T) { + catalogPath := writePerfCatalog(t, []perfcore.Protocol{perfcore.ProtocolPGWire, perfcore.ProtocolFlight}) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + factory := &fakeDriverFactory{} + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + Port: 5432, + SSLMode: "require", + ConnectTimeout: 10, + ApplicationName: "duckgres-scenario-runner", + }, + OutputDir: t.TempDir(), + FlightAddr: "flight.dev.example:443", + DriverFactory: factory, + Now: func() time.Time { + return time.Unix(1700000000, 0) + }, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "perf_queries", + Type: StepTypePerfQueries, + With: map[string]any{ + "org_id": "scenario-org", + "catalog_file": catalogPath, + "run_id": "scenario-run-1", + "dataset_version": "posthog-file-views-v1", + }, + }) + if err != nil { + t.Fatalf("ExecuteStep returned error: %v", err) + } + + result, ok := executor.State().Result("perf_queries") + if !ok { + t.Fatal("expected perf result to be recorded") + } + if result.Summary.RunID != "scenario-run-1" || result.Summary.TotalQueries != 2 || result.Summary.TotalErrors != 0 { + t.Fatalf("summary = %+v", result.Summary) + } + if got := factory.pgwireDSN; !strings.Contains(got, "host=scenario-org.dev.example") || !strings.Contains(got, "password=root-password") { + t.Fatalf("pgwire dsn = %q, want scenario org host and provision password", got) + } + if factory.flightAddr != "flight.dev.example:443" || factory.flightUsername != "root" || factory.flightPassword != "root-password" { + t.Fatalf("flight config = %q/%q/%q", factory.flightAddr, factory.flightUsername, factory.flightPassword) + } + if factory.flightServerName != "scenario-org.dev.example" { + t.Fatalf("flight server name = %q, want managed scenario hostname", factory.flightServerName) + } + if factory.flightInsecureSkipVerify { + t.Fatal("flight insecure skip verify should come from executor config when step does not override it") + } + + perfDir := filepath.Join(executor.OutputDir(), "perf") + for _, name := range []string{"summary.json", "query_results.csv", "server_metrics.prom"} { + if _, err := os.Stat(filepath.Join(perfDir, name)); err != nil { + t.Fatalf("expected perf artifact %s: %v", name, err) + } + } + csvBytes, err := os.ReadFile(filepath.Join(perfDir, "query_results.csv")) + if err != nil { + t.Fatalf("read query_results.csv: %v", err) + } + csvText := string(csvBytes) + if !strings.Contains(csvText, "query_id,intent_id,measure_iteration,protocol,status,error,error_class,rows,duration_ms,started_at") { + t.Fatalf("query_results.csv header changed: %q", csvText) + } + if !strings.Contains(csvText, "\nq1,i1,1,pgwire,ok,") || !strings.Contains(csvText, "\nq1,i1,1,flight,ok,") { + t.Fatalf("query_results.csv missing measured pgwire/flight rows: %q", csvText) + } +} + +func TestExecutorFailsPerfStepWhenMeasuredQueryErrors(t *testing.T) { + catalogPath := writePerfCatalog(t, []perfcore.Protocol{perfcore.ProtocolPGWire}) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + SSLMode: "require", + }, + OutputDir: t.TempDir(), + DriverFactory: &fakeDriverFactory{ + pgwireErr: errors.New("query failed"), + }, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "perf_queries", + Type: StepTypePerfQueries, + With: map[string]any{ + "org_id": "scenario-org", + "catalog_file": catalogPath, + "run_id": "scenario-run-1", + }, + }) + if err == nil { + t.Fatal("expected perf query error to fail the scenario step") + } + if !strings.Contains(err.Error(), "recorded 1 query error") { + t.Fatalf("error = %v, want query error count", err) + } + var classified core.ClassifiedError + if !errors.As(err, &classified) || classified.ErrorClass() != ErrorClassPerf { + t.Fatalf("error = %T %v, want class %q", err, err, ErrorClassPerf) + } + if _, err := os.Stat(filepath.Join(executor.OutputDir(), "perf", "query_results.csv")); err != nil { + t.Fatalf("expected perf artifacts to be closed before failure: %v", err) + } +} + +func TestExecutorRequiresFlightAddrWhenCatalogTargetsFlight(t *testing.T) { + catalogPath := writePerfCatalog(t, []perfcore.Protocol{perfcore.ProtocolFlight}) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + SSLMode: "require", + }, + OutputDir: t.TempDir(), + DriverFactory: &fakeDriverFactory{}, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "perf_queries", + Type: StepTypePerfQueries, + With: map[string]any{ + "org_id": "scenario-org", + "catalog_file": catalogPath, + "run_id": "scenario-run-1", + }, + }) + if err == nil || !strings.Contains(err.Error(), "flight_addr") { + t.Fatalf("error = %v, want missing flight_addr", err) + } +} + +func TestExecutorClosesCreatedDriversWhenLaterDriverCreationFails(t *testing.T) { + catalogPath := writePerfCatalog(t, []perfcore.Protocol{perfcore.ProtocolPGWire, perfcore.ProtocolFlight}) + provisionState := provision.NewState() + provisionState.StoreProvisionResponse("scenario-org", provision.ProvisionResponse{ + Org: "scenario-org", + Username: "root", + Password: "root-password", + }) + factory := &fakeDriverFactory{flightErr: errors.New("flight unavailable")} + executor := NewExecutor(ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: "10.0.0.10", + SNISuffix: ".dev.example", + SSLMode: "require", + }, + OutputDir: t.TempDir(), + FlightAddr: "flight.dev.example:443", + DriverFactory: factory, + }) + + err := executor.ExecuteStep(context.Background(), core.Step{ + ID: "perf_queries", + Type: StepTypePerfQueries, + With: map[string]any{ + "org_id": "scenario-org", + "catalog_file": catalogPath, + "run_id": "scenario-run-1", + }, + }) + if err == nil || !strings.Contains(err.Error(), "create flight perf driver") { + t.Fatalf("error = %v, want flight driver creation failure", err) + } + if factory.pgwireDriver == nil || !factory.pgwireDriver.closed { + t.Fatalf("expected pgwire driver to be closed after flight creation failure") + } +} + +func writePerfCatalog(t *testing.T, targets []perfcore.Protocol) string { + t.Helper() + var targetLines strings.Builder + for _, target := range targets { + targetLines.WriteString(" - ") + targetLines.WriteString(string(target)) + targetLines.WriteByte('\n') + } + path := filepath.Join(t.TempDir(), "perf_catalog.yaml") + body := "name: scenario-perf\n" + + "description: perf adapter test\n" + + "seed: 42\n" + + "dataset_scale: 1\n" + + "targets:\n" + targetLines.String() + + "warmup_iterations: 1\n" + + "measure_iterations: 1\n" + + "queries:\n" + + " - query_id: q1\n" + + " intent_id: i1\n" + + " tags: [test]\n" + + " params: {}\n" + + " pgwire_sql: SELECT 1\n" + + " duckhog_sql: SELECT 1\n" + if err := os.WriteFile(path, []byte(body), 0o644); err != nil { + t.Fatalf("write perf catalog: %v", err) + } + return path +} + +type fakeDriverFactory struct { + pgwireDSN string + pgwireErr error + pgwireDriver *fakeProtocolDriver + flightAddr string + flightServerName string + flightUsername string + flightPassword string + flightInsecureSkipVerify bool + flightErr error +} + +func (f *fakeDriverFactory) NewPGWire(dsn string) (perfcore.ProtocolDriver, error) { + f.pgwireDSN = dsn + f.pgwireDriver = &fakeProtocolDriver{protocol: perfcore.ProtocolPGWire, err: f.pgwireErr} + return f.pgwireDriver, nil +} + +func (f *fakeDriverFactory) NewFlight(addr, serverName, username, password string, insecureSkipVerify bool) (perfcore.ProtocolDriver, error) { + f.flightAddr = addr + f.flightServerName = serverName + f.flightUsername = username + f.flightPassword = password + f.flightInsecureSkipVerify = insecureSkipVerify + if f.flightErr != nil { + return nil, f.flightErr + } + return &fakeProtocolDriver{protocol: perfcore.ProtocolFlight}, nil +} + +type fakeProtocolDriver struct { + protocol perfcore.Protocol + err error + closed bool +} + +func (d *fakeProtocolDriver) Protocol() perfcore.Protocol { return d.protocol } + +func (d *fakeProtocolDriver) Execute(context.Context, perfcore.Query, []any) (perfcore.ExecutionResult, error) { + return perfcore.ExecutionResult{Rows: 1, Duration: time.Millisecond}, d.err +} + +func (d *fakeProtocolDriver) Close() error { + d.closed = true + return nil +} diff --git a/tests/scenario/perf/errors.go b/tests/scenario/perf/errors.go new file mode 100644 index 00000000..244ff218 --- /dev/null +++ b/tests/scenario/perf/errors.go @@ -0,0 +1,28 @@ +package perf + +const ( + ErrorClassConfig = "perf_config" + ErrorClassPerf = "perf_execution" + ErrorClassUnsupportedStep = "unsupported_step" +) + +type classifiedError struct { + class string + err error +} + +func (e classifiedError) Error() string { + return e.err.Error() +} + +func (e classifiedError) Unwrap() error { + return e.err +} + +func (e classifiedError) ErrorClass() string { + return e.class +} + +func classified(class string, err error) error { + return classifiedError{class: class, err: err} +} diff --git a/tests/scenario/perf/steps.go b/tests/scenario/perf/steps.go new file mode 100644 index 00000000..b663cbd8 --- /dev/null +++ b/tests/scenario/perf/steps.go @@ -0,0 +1,364 @@ +package perf + +import ( + "context" + "fmt" + "path/filepath" + "strconv" + "sync" + "time" + + perfcore "github.com/posthog/duckgres/tests/perf/core" + flightdriver "github.com/posthog/duckgres/tests/perf/drivers/flight" + pgdriver "github.com/posthog/duckgres/tests/perf/drivers/pgwire" + "github.com/posthog/duckgres/tests/scenario/core" + "github.com/posthog/duckgres/tests/scenario/provision" + scenariosql "github.com/posthog/duckgres/tests/scenario/sql" +) + +const StepTypePerfQueries = "perf_queries" + +type DriverFactory interface { + NewPGWire(dsn string) (perfcore.ProtocolDriver, error) + NewFlight(addr, serverName, username, password string, insecureSkipVerify bool) (perfcore.ProtocolDriver, error) +} + +type ExecutorConfig struct { + ProvisionState *provision.State + Connection scenariosql.ConnectionConfig + OutputDir string + FlightAddr string + FlightInsecureSkipVerify bool + DriverFactory DriverFactory + State *State + Now func() time.Time +} + +type Executor struct { + provisionState *provision.State + connection scenariosql.ConnectionConfig + outputDir string + flightAddr string + flightInsecureSkipVerify bool + driverFactory DriverFactory + state *State + now func() time.Time +} + +type State struct { + mu sync.Mutex + results map[string]StepResult +} + +type StepResult struct { + StepID string + OutputDir string + Summary perfcore.RunSummary +} + +type stepSpec struct { + OrgID string + Username string + Password string + CatalogFile string + RunID string + DatasetVersion string + Database string + OutputSubdir string + ReadOnly bool + FlightAddr string + FlightServerName string + FlightInsecureSkipVerify bool +} + +type defaultDriverFactory struct{} + +func NewExecutor(cfg ExecutorConfig) *Executor { + factory := cfg.DriverFactory + if factory == nil { + factory = defaultDriverFactory{} + } + state := cfg.State + if state == nil { + state = NewState() + } + now := cfg.Now + if now == nil { + now = time.Now + } + return &Executor{ + provisionState: cfg.ProvisionState, + connection: cfg.Connection, + outputDir: cfg.OutputDir, + flightAddr: cfg.FlightAddr, + flightInsecureSkipVerify: cfg.FlightInsecureSkipVerify, + driverFactory: factory, + state: state, + now: now, + } +} + +func NewState() *State { + return &State{results: make(map[string]StepResult)} +} + +func (e *Executor) State() *State { + return e.state +} + +func (e *Executor) OutputDir() string { + return e.outputDir +} + +func (s *State) StoreResult(result StepResult) { + s.mu.Lock() + defer s.mu.Unlock() + s.results[result.StepID] = result +} + +func (s *State) Result(stepID string) (StepResult, bool) { + s.mu.Lock() + defer s.mu.Unlock() + result, ok := s.results[stepID] + return result, ok +} + +func (e *Executor) ExecuteStep(ctx context.Context, step core.Step) error { + if step.Type != StepTypePerfQueries { + return classified(ErrorClassUnsupportedStep, fmt.Errorf("unsupported perf step type %q", step.Type)) + } + spec, err := e.parseStep(step) + if err != nil { + return err + } + catalog, err := perfcore.LoadCatalog(spec.CatalogFile) + if err != nil { + return classified(ErrorClassConfig, err) + } + if spec.ReadOnly { + if err := perfcore.ValidateReadOnlyCatalog(catalog); err != nil { + return classified(ErrorClassConfig, fmt.Errorf("read-only perf catalog validation failed: %w", err)) + } + } + + drivers, err := e.driversForCatalog(catalog, spec) + if err != nil { + return err + } + defer closeDrivers(drivers) + + perfDir := filepath.Join(e.outputDir, spec.OutputSubdir) + sink, err := perfcore.NewArtifactSink(perfDir) + if err != nil { + return classified(ErrorClassConfig, err) + } + sinkClosed := false + closeSink := func(summary perfcore.RunSummary, metrics string) error { + if sinkClosed { + return nil + } + sinkClosed = true + return sink.Close(summary, metrics) + } + runner := perfcore.NewQueryRunner(perfcore.RunnerConfig{ + RunID: spec.RunID, + Catalog: catalog, + DatasetVersion: spec.DatasetVersion, + Drivers: drivers, + Sink: closingSink{sink: sink, closeFunc: closeSink}, + Now: e.now, + }) + summary, err := runner.Run(ctx) + if err != nil { + _ = closeSink(summary, "") + return classified(ErrorClassPerf, err) + } + result := StepResult{ + StepID: step.ID, + OutputDir: perfDir, + Summary: summary, + } + e.state.StoreResult(result) + if summary.TotalErrors > 0 { + return classified(ErrorClassPerf, fmt.Errorf("perf step %s recorded %d query error(s)", step.ID, summary.TotalErrors)) + } + return nil +} + +type closingSink struct { + sink perfcore.ResultSink + closeFunc func(perfcore.RunSummary, string) error +} + +func (s closingSink) Record(result perfcore.QueryResult) error { + return s.sink.Record(result) +} + +func (s closingSink) Close(summary perfcore.RunSummary, serverMetrics string) error { + return s.closeFunc(summary, serverMetrics) +} + +func (e *Executor) parseStep(step core.Step) (stepSpec, error) { + orgID, err := requiredString(step, "org_id") + if err != nil { + return stepSpec{}, err + } + catalogFile, err := requiredString(step, "catalog_file") + if err != nil { + return stepSpec{}, err + } + runID, err := requiredString(step, "run_id") + if err != nil { + return stepSpec{}, err + } + if e.outputDir == "" { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("perf output dir is required")) + } + + username := stringFromWith(step, "username", "root") + password := stringFromWith(step, "password", "") + if password == "" { + if e.provisionState == nil { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("provision state is required when with.password is omitted")) + } + resp, ok := e.provisionState.ProvisionResponse(orgID) + if !ok { + return stepSpec{}, classified(ErrorClassConfig, fmt.Errorf("no provision response found for org %q", orgID)) + } + if resp.Username != "" { + username = resp.Username + } + password = resp.Password + } + + return stepSpec{ + OrgID: orgID, + Username: username, + Password: password, + CatalogFile: catalogFile, + RunID: runID, + DatasetVersion: stringFromWith(step, "dataset_version", ""), + Database: stringFromWith(step, "catalog", "ducklake"), + OutputSubdir: stringFromWith(step, "output_subdir", "perf"), + ReadOnly: boolFromWith(step, "read_only", true), + FlightAddr: stringFromWith(step, "flight_addr", e.flightAddr), + FlightServerName: stringFromWith(step, "flight_server_name", e.defaultFlightServerName(orgID)), + FlightInsecureSkipVerify: boolFromWith(step, "flight_insecure_skip_verify", e.flightInsecureSkipVerify), + }, nil +} + +func (e *Executor) driversForCatalog(catalog perfcore.Catalog, spec stepSpec) (map[perfcore.Protocol]perfcore.ProtocolDriver, error) { + drivers := make(map[perfcore.Protocol]perfcore.ProtocolDriver, len(catalog.Targets)) + var success bool + defer func() { + if !success { + closeDrivers(drivers) + } + }() + for _, target := range catalog.Targets { + if _, ok := drivers[target]; ok { + continue + } + switch target { + case perfcore.ProtocolPGWire: + dsn, err := e.pgwireDSN(spec) + if err != nil { + return nil, err + } + driver, err := e.driverFactory.NewPGWire(dsn) + if err != nil { + return nil, classified(ErrorClassConfig, fmt.Errorf("create pgwire perf driver: %w", err)) + } + drivers[target] = driver + case perfcore.ProtocolFlight: + if spec.FlightAddr == "" { + return nil, classified(ErrorClassConfig, fmt.Errorf("with.flight_addr or DUCKGRES_SCENARIO_FLIGHT_ADDR is required when perf catalog targets flight")) + } + if spec.FlightServerName == "" { + return nil, classified(ErrorClassConfig, fmt.Errorf("flight server name requires with.flight_server_name or a scenario SNI suffix")) + } + driver, err := e.driverFactory.NewFlight(spec.FlightAddr, spec.FlightServerName, spec.Username, spec.Password, spec.FlightInsecureSkipVerify) + if err != nil { + return nil, classified(ErrorClassConfig, fmt.Errorf("create flight perf driver: %w", err)) + } + drivers[target] = driver + default: + return nil, classified(ErrorClassConfig, fmt.Errorf("unsupported perf target protocol %q", target)) + } + } + success = true + return drivers, nil +} + +func (e *Executor) defaultFlightServerName(orgID string) string { + if e.connection.SNISuffix == "" { + return "" + } + return orgID + e.connection.SNISuffix +} + +func (e *Executor) pgwireDSN(spec stepSpec) (string, error) { + cfg := e.connection + cfg.OrgID = spec.OrgID + cfg.Database = spec.Database + cfg.Username = spec.Username + cfg.Password = spec.Password + dsn, err := cfg.DSN() + if err != nil { + return "", classified(ErrorClassConfig, err) + } + return dsn, nil +} + +func closeDrivers(drivers map[perfcore.Protocol]perfcore.ProtocolDriver) { + for _, driver := range drivers { + _ = driver.Close() + } +} + +func (defaultDriverFactory) NewPGWire(dsn string) (perfcore.ProtocolDriver, error) { + return pgdriver.NewFromDSN(dsn) +} + +func (defaultDriverFactory) NewFlight(addr, serverName, username, password string, insecureSkipVerify bool) (perfcore.ProtocolDriver, error) { + return flightdriver.NewFromConfig(flightdriver.ConnectionConfig{ + Addr: addr, + ServerName: serverName, + Username: username, + Password: password, + InsecureSkipVerify: insecureSkipVerify, + }) +} + +func requiredString(step core.Step, key string) (string, error) { + value, ok := step.With[key].(string) + if !ok || value == "" { + return "", classified(ErrorClassConfig, fmt.Errorf("step %s with.%s must be a non-empty string", step.ID, key)) + } + return value, nil +} + +func stringFromWith(step core.Step, key, fallback string) string { + value, ok := step.With[key].(string) + if !ok || value == "" { + return fallback + } + return value +} + +func boolFromWith(step core.Step, key string, fallback bool) bool { + raw, ok := step.With[key] + if !ok { + return fallback + } + switch value := raw.(type) { + case bool: + return value + case string: + parsed, err := strconv.ParseBool(value) + if err == nil { + return parsed + } + } + return fallback +} diff --git a/tests/scenario/runner_test.go b/tests/scenario/runner_test.go index 012321f4..96ba0de8 100644 --- a/tests/scenario/runner_test.go +++ b/tests/scenario/runner_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/posthog/duckgres/tests/scenario/core" + scenarioperf "github.com/posthog/duckgres/tests/scenario/perf" "github.com/posthog/duckgres/tests/scenario/provision" scenariosql "github.com/posthog/duckgres/tests/scenario/sql" ) @@ -76,14 +77,29 @@ func TestScenarioRunner(t *testing.T) { ApplicationName: "duckgres-scenario-runner", }, }) + scenarioOutputDir := filepath.Join(*scenarioOutputBase, runID) + perfExecutor := scenarioperf.NewExecutor(scenarioperf.ExecutorConfig{ + ProvisionState: provisionState, + Connection: scenariosql.ConnectionConfig{ + HostAddr: mustEnv(t, "DUCKGRES_SCENARIO_PG_HOST"), + SNISuffix: mustEnv(t, "DUCKGRES_SCENARIO_SNI_SUFFIX"), + Port: intEnv(t, "DUCKGRES_SCENARIO_PG_PORT", 5432), + SSLMode: "require", + ConnectTimeout: intEnv(t, "DUCKGRES_SCENARIO_PG_CONNECT_TIMEOUT", 10), + ApplicationName: "duckgres-scenario-runner", + }, + OutputDir: scenarioOutputDir, + FlightAddr: os.Getenv("DUCKGRES_SCENARIO_FLIGHT_ADDR"), + FlightInsecureSkipVerify: boolEnv(t, "DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY", true), + }) ctx, cancel := context.WithTimeout(context.Background(), *scenarioMaxRuntime) defer cancel() runner := core.NewRunner(core.RunnerConfig{ RunID: runID, Scenario: loaded, - Executor: dispatchExecutor{provision: provisionExecutor, sql: sqlExecutor}, - OutputDir: filepath.Join(*scenarioOutputBase, runID), + Executor: dispatchExecutor{provision: provisionExecutor, sql: sqlExecutor, perf: perfExecutor}, + OutputDir: scenarioOutputDir, WriteFiles: true, CleanupTimeout: 15 * time.Minute, }) @@ -195,6 +211,50 @@ func TestLoadScenarioForRunResolvesScenarioRelativeFiles(t *testing.T) { } } +func TestFrozenPerfScenarioUsesSupportedStepsAndRelativeCatalog(t *testing.T) { + t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "s3://example-frozen/frozen_v1/") + t.Setenv("DUCKGRES_SCENARIO_FLIGHT_ADDR", "flight.dev.example:443") + + scenario, _, err := loadScenarioForRun(filepath.Join("scenarios", "posthog_frozen_perf.yaml")) + if err != nil { + t.Fatalf("load frozen perf scenario: %v", err) + } + resolved, err := resolveRunTemplates(scenario, "scenario-frozen-perf-20260102t030405z") + if err != nil { + t.Fatalf("resolve templates: %v", err) + } + + foundPerf := false + for _, step := range resolved.Steps { + if !dispatchSupports(step.Type) { + t.Fatalf("step %s has unsupported type %q", step.ID, step.Type) + } + if containsTemplate(step.With) { + t.Fatalf("step %s still contains unresolved template values: %#v", step.ID, step.With) + } + if step.Type != scenarioperf.StepTypePerfQueries { + continue + } + foundPerf = true + catalogFile, ok := step.With["catalog_file"].(string) + if !ok || !filepath.IsAbs(catalogFile) { + t.Fatalf("perf catalog_file = %#v, want absolute path", step.With["catalog_file"]) + } + if _, err := os.Stat(catalogFile); err != nil { + t.Fatalf("perf catalog file %q should exist: %v", catalogFile, err) + } + if runID, _ := step.With["run_id"].(string); runID != "scenario-frozen-perf-20260102t030405z" { + t.Fatalf("perf run_id = %q, want scenario run id", runID) + } + if _, ok := step.With["flight_insecure_skip_verify"]; ok { + t.Fatal("perf scenario should use DUCKGRES_SCENARIO_FLIGHT_INSECURE_SKIP_VERIFY default instead of hardcoding TLS behavior") + } + } + if !foundPerf { + t.Fatal("expected frozen perf scenario to include a perf_queries step") + } +} + func TestResolveRunTemplatesRejectsMissingEnvTemplate(t *testing.T) { t.Setenv("DUCKGRES_SCENARIO_FROZEN_S3_URI", "") @@ -219,6 +279,7 @@ func TestResolveRunTemplatesRejectsMissingEnvTemplate(t *testing.T) { type dispatchExecutor struct { provision *provision.Executor sql *scenariosql.Executor + perf *scenarioperf.Executor } func (e dispatchExecutor) ExecuteStep(ctx context.Context, step core.Step) error { @@ -227,6 +288,8 @@ func (e dispatchExecutor) ExecuteStep(ctx context.Context, step core.Step) error return e.provision.ExecuteStep(ctx, step) case scenariosql.StepTypeSQL, scenariosql.StepTypeSQLCatalog: return e.sql.ExecuteStep(ctx, step) + case scenarioperf.StepTypePerfQueries: + return e.perf.ExecuteStep(ctx, step) default: return fmt.Errorf("unsupported scenario step type %q", step.Type) } @@ -238,6 +301,8 @@ func dispatchSupports(stepType string) bool { return true case scenariosql.StepTypeSQL, scenariosql.StepTypeSQLCatalog: return true + case scenarioperf.StepTypePerfQueries: + return true default: return false } @@ -265,6 +330,19 @@ func intEnv(t *testing.T, key string, fallback int) int { return parsed } +func boolEnv(t *testing.T, key string, fallback bool) bool { + t.Helper() + value := os.Getenv(key) + if value == "" { + return fallback + } + parsed, err := strconv.ParseBool(value) + if err != nil { + t.Fatalf("%s must be a boolean: %v", key, err) + } + return parsed +} + func defaultRunID(s core.Scenario) string { prefix := s.RunIDPrefix if prefix == "" { @@ -295,7 +373,7 @@ func resolveScenarioFilePaths(s core.Scenario, baseDir string) core.Scenario { } with := make(map[string]any, len(step.With)) for k, v := range step.With { - if k == "file" { + if k == "file" || k == "catalog_file" { if file, ok := v.(string); ok && file != "" && !filepath.IsAbs(file) { v = filepath.Clean(filepath.Join(baseDir, file)) } diff --git a/tests/scenario/scenarios/posthog_frozen_perf.yaml b/tests/scenario/scenarios/posthog_frozen_perf.yaml new file mode 100644 index 00000000..c0d7d768 --- /dev/null +++ b/tests/scenario/scenarios/posthog_frozen_perf.yaml @@ -0,0 +1,54 @@ +name: posthog-frozen-perf +run_id_prefix: scenario-frozen-perf +required_env: + - DUCKGRES_SCENARIO_FROZEN_S3_URI + - DUCKGRES_SCENARIO_FLIGHT_ADDR +steps: + - id: provision + type: provision_warehouse + with: + org_id: scenario-frozen-perf-${run_id_compact} + request: + database_name: scenario_frozen_perf_${run_id_compact} + metadata_store: + type: cnpg-shard + ducklake: + enabled: true + data_store: + type: s3bucket + + - id: wait_ready + type: wait_warehouse_ready + with: + org_id: scenario-frozen-perf-${run_id_compact} + timeout: 15m + poll_interval: 10s + + - id: setup_frozen_views + type: sql + with: + org_id: scenario-frozen-perf-${run_id_compact} + catalog: ducklake + file: ../sql/setup_frozen_views.sql + max_attempts: 12 + retry_interval: 10s + + - id: perf_queries + type: perf_queries + with: + org_id: scenario-frozen-perf-${run_id_compact} + catalog: ducklake + catalog_file: ../../perf/queries/ducklake_frozen.yaml + run_id: ${run_id} + dataset_version: posthog-file-views-v1 + flight_addr: ${env:DUCKGRES_SCENARIO_FLIGHT_ADDR} + + - id: deprovision + type: deprovision_warehouse + depends_on: [perf_queries] + always_run: true + with: + org_id: scenario-frozen-perf-${run_id_compact} + verify_deleted: true + cleanup_timeout: 15m + poll_interval: 10s diff --git a/tests/scenario/script_test.go b/tests/scenario/script_test.go index d07e796c..37d02d7a 100644 --- a/tests/scenario/script_test.go +++ b/tests/scenario/script_test.go @@ -29,3 +29,29 @@ func TestScenarioRunScriptValidatesRequiredEnvVars(t *testing.T) { } } } + +func TestScenarioRunScriptCheckEnvIncludesScenarioRequiredEnv(t *testing.T) { + script := filepath.Join("..", "..", "scripts", "scenario_run.sh") + cmd := exec.Command("bash", script, "--check-env", "tests/scenario/scenarios/posthog_frozen_perf.yaml") + cmd.Env = []string{ + "PATH=" + os.Getenv("PATH"), + "DUCKGRES_SCENARIO_API_BASE=http://127.0.0.1", + "DUCKGRES_SCENARIO_INTERNAL_SECRET=test-secret", + "DUCKGRES_SCENARIO_PG_HOST=127.0.0.1", + "DUCKGRES_SCENARIO_SNI_SUFFIX=.dev.example", + } + + out, err := cmd.CombinedOutput() + if err == nil { + t.Fatal("expected script to fail without frozen perf required env vars") + } + text := string(out) + for _, name := range []string{ + "DUCKGRES_SCENARIO_FROZEN_S3_URI", + "DUCKGRES_SCENARIO_FLIGHT_ADDR", + } { + if !strings.Contains(text, name) { + t.Fatalf("script output %q missing %s", text, name) + } + } +}