Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions cmd/duckgres-controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,23 @@ func main() {
MinWorkers: resolved.ProcessMinWorkers,
MaxWorkers: resolved.ProcessMaxWorkers,
},
SocketDir: *socketDir,
ConfigPath: *configFile,
WorkerQueueTimeout: resolved.WorkerQueueTimeout,
WorkerIdleTimeout: resolved.WorkerIdleTimeout,
RetireOnSessionEnd: resolved.ProcessRetireOnSessionEnd,
HandoverDrainTimeout: resolved.HandoverDrainTimeout,
MetricsServer: metricsSrv,
WorkerBackend: resolved.WorkerBackend,
ConfigStoreConn: resolved.ConfigStoreConn,
ConfigPollInterval: resolved.ConfigPollInterval,
InternalSecret: resolved.InternalSecret,
InternalSecretFallbacks: resolved.InternalSecretFallbacks,
SNIRoutingMode: resolved.SNIRoutingMode,
ManagedHostnameSuffixes: resolved.ManagedHostnameSuffixes,
DucklingBucketSuffix: resolved.DucklingBucketSuffix,
DuckLakeDefaultSpecVersion: resolved.DuckLakeDefaultSpecVersion,
SocketDir: *socketDir,
ConfigPath: *configFile,
WorkerQueueTimeout: resolved.WorkerQueueTimeout,
WorkerIdleTimeout: resolved.WorkerIdleTimeout,
RetireOnSessionEnd: resolved.ProcessRetireOnSessionEnd,
HandoverDrainTimeout: resolved.HandoverDrainTimeout,
DrainingInstanceExpiryTimeout: resolved.DrainingInstanceExpiryTimeout,
MetricsServer: metricsSrv,
WorkerBackend: resolved.WorkerBackend,
ConfigStoreConn: resolved.ConfigStoreConn,
ConfigPollInterval: resolved.ConfigPollInterval,
InternalSecret: resolved.InternalSecret,
InternalSecretFallbacks: resolved.InternalSecretFallbacks,
SNIRoutingMode: resolved.SNIRoutingMode,
ManagedHostnameSuffixes: resolved.ManagedHostnameSuffixes,
DucklingBucketSuffix: resolved.DucklingBucketSuffix,
DuckLakeDefaultSpecVersion: resolved.DuckLakeDefaultSpecVersion,
K8s: controlplane.K8sConfig{
WorkerImage: resolved.K8sWorkerImage,
WorkerNamespace: resolved.K8sWorkerNamespace,
Expand All @@ -244,7 +245,7 @@ func main() {
WorkerProfileMinMemory: resolved.K8sWorkerProfileMinMemory,
WorkerProfileMaxMemory: resolved.K8sWorkerProfileMaxMemory,
WorkerMaxTTL: resolved.K8sWorkerMaxTTL,
WorkerDefaultTTL: resolved.K8sWorkerDefaultTTL,
WorkerDefaultTTL: resolved.K8sWorkerDefaultTTL,
AWSRegion: resolved.AWSRegion,
},
}
Expand Down
59 changes: 30 additions & 29 deletions configloader/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,36 @@ package configloader
// mode (e.g., the worker binary reads but ignores ControlPlane fields);
// the cost is one parsed-but-unused struct field per binary.
type FileConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
FlightPort int `yaml:"flight_port"` // Control-plane Flight SQL ingress port (0 disables)
FlightSessionIdleTTL string `yaml:"flight_session_idle_ttl"` // e.g., "10m"
FlightSessionReapInterval string `yaml:"flight_session_reap_interval"` // e.g., "1m"
FlightHandleIdleTTL string `yaml:"flight_handle_idle_ttl"` // e.g., "15m"
FlightSessionTokenTTL string `yaml:"flight_session_token_ttl"` // e.g., "1h"
DataDir string `yaml:"data_dir"`
TLS TLSConfig `yaml:"tls"`
Users map[string]string `yaml:"users"`
RateLimit RateLimitFileConfig `yaml:"rate_limit"`
Extensions []string `yaml:"extensions"`
DuckLake DuckLakeFileConfig `yaml:"ducklake"`
Iceberg IcebergFileConfig `yaml:"iceberg"`
FilePersistence bool `yaml:"file_persistence"`
ProcessIsolation bool `yaml:"process_isolation"`
IdleTimeout string `yaml:"idle_timeout"`
SessionInitTimeout string `yaml:"session_init_timeout"`
MemoryLimit string `yaml:"memory_limit"`
Threads int `yaml:"threads"`
MemoryBudget string `yaml:"memory_budget"`
MemoryRebalance *bool `yaml:"memory_rebalance"`
Process ProcessFileConfig `yaml:"process"`
WorkerQueueTimeout string `yaml:"worker_queue_timeout"`
WorkerIdleTimeout string `yaml:"worker_idle_timeout"`
HandoverDrainTimeout string `yaml:"handover_drain_timeout"`
PassthroughUsers []string `yaml:"passthrough_users"`
LogLevel string `yaml:"log_level"`
QueryLog QueryLogFileConfig `yaml:"query_log"`
Host string `yaml:"host"`
Port int `yaml:"port"`
FlightPort int `yaml:"flight_port"` // Control-plane Flight SQL ingress port (0 disables)
FlightSessionIdleTTL string `yaml:"flight_session_idle_ttl"` // e.g., "10m"
FlightSessionReapInterval string `yaml:"flight_session_reap_interval"` // e.g., "1m"
FlightHandleIdleTTL string `yaml:"flight_handle_idle_ttl"` // e.g., "15m"
FlightSessionTokenTTL string `yaml:"flight_session_token_ttl"` // e.g., "1h"
DataDir string `yaml:"data_dir"`
TLS TLSConfig `yaml:"tls"`
Users map[string]string `yaml:"users"`
RateLimit RateLimitFileConfig `yaml:"rate_limit"`
Extensions []string `yaml:"extensions"`
DuckLake DuckLakeFileConfig `yaml:"ducklake"`
Iceberg IcebergFileConfig `yaml:"iceberg"`
FilePersistence bool `yaml:"file_persistence"`
ProcessIsolation bool `yaml:"process_isolation"`
IdleTimeout string `yaml:"idle_timeout"`
SessionInitTimeout string `yaml:"session_init_timeout"`
MemoryLimit string `yaml:"memory_limit"`
Threads int `yaml:"threads"`
MemoryBudget string `yaml:"memory_budget"`
MemoryRebalance *bool `yaml:"memory_rebalance"`
Process ProcessFileConfig `yaml:"process"`
WorkerQueueTimeout string `yaml:"worker_queue_timeout"`
WorkerIdleTimeout string `yaml:"worker_idle_timeout"`
HandoverDrainTimeout string `yaml:"handover_drain_timeout"`
DrainingInstanceExpiryTimeout string `yaml:"draining_instance_expiry_timeout"`
PassthroughUsers []string `yaml:"passthrough_users"`
LogLevel string `yaml:"log_level"`
QueryLog QueryLogFileConfig `yaml:"query_log"`

// Worker backend configuration
WorkerBackend string `yaml:"worker_backend"` // "process" (default) or "remote"
Expand Down
2 changes: 2 additions & 0 deletions configresolve/cliflags.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
workerQueueTimeout := fs.String("worker-queue-timeout", "", "How long to wait for an available worker/org connection slot (e.g., '60s') (env: DUCKGRES_WORKER_QUEUE_TIMEOUT)")
workerIdleTimeout := fs.String("worker-idle-timeout", "", "How long to keep an idle worker alive (e.g., '5m') (env: DUCKGRES_WORKER_IDLE_TIMEOUT)")
handoverDrainTimeout := fs.String("handover-drain-timeout", "", "How long to wait for planned shutdowns/upgrades to drain before forcing exit (default: '24h' in process mode, '15m' in remote mode) (env: DUCKGRES_HANDOVER_DRAIN_TIMEOUT)")
drainingInstanceExpiryTimeout := fs.String("draining-instance-expiry-timeout", "", "How long a draining control-plane row may linger before the leader janitor force-expires it so its orphaned workers become reapable. Independent of --handover-drain-timeout. (default: '2h') (env: DUCKGRES_DRAINING_INSTANCE_EXPIRY_TIMEOUT)")
acmeDomain := fs.String("acme-domain", "", "Domain for ACME/Let's Encrypt certificate (env: DUCKGRES_ACME_DOMAIN)")
acmeEmail := fs.String("acme-email", "", "Contact email for Let's Encrypt notifications (env: DUCKGRES_ACME_EMAIL)")
acmeCacheDir := fs.String("acme-cache-dir", "", "Directory for ACME certificate cache (env: DUCKGRES_ACME_CACHE_DIR)")
Expand Down Expand Up @@ -112,6 +113,7 @@ func RegisterCLIInputsFlags(fs *flag.FlagSet) func() CLIInputs {
cli.WorkerQueueTimeout = *workerQueueTimeout
cli.WorkerIdleTimeout = *workerIdleTimeout
cli.HandoverDrainTimeout = *handoverDrainTimeout
cli.DrainingInstanceExpiryTimeout = *drainingInstanceExpiryTimeout
cli.ACMEDomain = *acmeDomain
cli.ACMEEmail = *acmeEmail
cli.ACMECacheDir = *acmeCacheDir
Expand Down
1 change: 1 addition & 0 deletions configresolve/cliflags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func fieldNameToFlagName(name string) string {
{"WorkerQueueTimeout", "worker-queue-timeout"},
{"WorkerIdleTimeout", "worker-idle-timeout"},
{"HandoverDrainTimeout", "handover-drain-timeout"},
{"DrainingInstanceExpiryTimeout", "draining-instance-expiry-timeout"},
{"ProcessMinWorkers", "process-min-workers"},
{"ProcessMaxWorkers", "process-max-workers"},
{"ProcessRetireOnSessionEnd", "process-retire-on-session-end"},
Expand Down
143 changes: 84 additions & 59 deletions configresolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,65 +27,66 @@ import (
type CLIInputs struct {
Set map[string]bool

Host string
Port int
FlightPort int
FlightSessionIdleTTL string
FlightSessionReapInterval string
FlightHandleIdleTTL string
FlightSessionTokenTTL string
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
SessionInitTimeout string
MemoryLimit string
Threads int
MemoryBudget string
MemoryRebalance bool
DuckLakeDeltaCatalogEnabled bool
DuckLakeDeltaCatalogPath string
DuckLakeDefaultSpecVersion string
IcebergEnabled bool
IcebergRegion string
IcebergNamespace string
ProcessMinWorkers int
ProcessMaxWorkers int
ProcessRetireOnSessionEnd bool
WorkerQueueTimeout string
WorkerIdleTimeout string
HandoverDrainTimeout string
ACMEDomain string
ACMEEmail string
ACMECacheDir string
ACMEDNSProvider string
ACMEDNSZoneID string
MaxConnections int
ConfigStoreConn string
ConfigPollInterval string
InternalSecret string
InternalSecretFallbacks string
SNIRoutingMode string
ManagedHostnameSuffixes string
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
K8sControlPlaneID string
K8sWorkerPort int
K8sWorkerSecret string
K8sWorkerConfigMap string
K8sWorkerImagePullPolicy string
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sWorkerCPURequest string
K8sWorkerMemoryRequest string
K8sWorkerNodeSelector string
K8sWorkerTolerationKey string
K8sWorkerTolerationValue string
AWSRegion string
QueryLog bool
Host string
Port int
FlightPort int
FlightSessionIdleTTL string
FlightSessionReapInterval string
FlightHandleIdleTTL string
FlightSessionTokenTTL string
DataDir string
CertFile string
KeyFile string
FilePersistence bool
ProcessIsolation bool
IdleTimeout string
SessionInitTimeout string
MemoryLimit string
Threads int
MemoryBudget string
MemoryRebalance bool
DuckLakeDeltaCatalogEnabled bool
DuckLakeDeltaCatalogPath string
DuckLakeDefaultSpecVersion string
IcebergEnabled bool
IcebergRegion string
IcebergNamespace string
ProcessMinWorkers int
ProcessMaxWorkers int
ProcessRetireOnSessionEnd bool
WorkerQueueTimeout string
WorkerIdleTimeout string
HandoverDrainTimeout string
DrainingInstanceExpiryTimeout string
ACMEDomain string
ACMEEmail string
ACMECacheDir string
ACMEDNSProvider string
ACMEDNSZoneID string
MaxConnections int
ConfigStoreConn string
ConfigPollInterval string
InternalSecret string
InternalSecretFallbacks string
SNIRoutingMode string
ManagedHostnameSuffixes string
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
K8sControlPlaneID string
K8sWorkerPort int
K8sWorkerSecret string
K8sWorkerConfigMap string
K8sWorkerImagePullPolicy string
K8sWorkerServiceAccount string
K8sMaxWorkers int
K8sWorkerCPURequest string
K8sWorkerMemoryRequest string
K8sWorkerNodeSelector string
K8sWorkerTolerationKey string
K8sWorkerTolerationValue string
AWSRegion string
QueryLog bool
}

type Resolved struct {
Expand All @@ -97,6 +98,7 @@ type Resolved struct {
WorkerQueueTimeout time.Duration
WorkerIdleTimeout time.Duration
HandoverDrainTimeout time.Duration
DrainingInstanceExpiryTimeout time.Duration
WorkerBackend string
K8sWorkerImage string
K8sWorkerNamespace string
Expand Down Expand Up @@ -196,6 +198,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
workerQueueTimeout := 60 * time.Second
var workerIdleTimeout time.Duration
var handoverDrainTimeout time.Duration
var drainingInstanceExpiryTimeout time.Duration
var processMinWorkers, processMaxWorkers int
var processRetireOnSessionEnd bool
var workerBackend string
Expand Down Expand Up @@ -427,6 +430,13 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
warn("Invalid handover_drain_timeout duration: " + err.Error())
}
}
if fileCfg.DrainingInstanceExpiryTimeout != "" {
if d, err := time.ParseDuration(fileCfg.DrainingInstanceExpiryTimeout); err == nil {
drainingInstanceExpiryTimeout = d
} else {
warn("Invalid draining_instance_expiry_timeout duration: " + err.Error())
}
}
if len(fileCfg.PassthroughUsers) > 0 {
cfg.PassthroughUsers = make(map[string]bool, len(fileCfg.PassthroughUsers))
for _, u := range fileCfg.PassthroughUsers {
Expand Down Expand Up @@ -760,6 +770,13 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
warn("Invalid DUCKGRES_HANDOVER_DRAIN_TIMEOUT duration: " + err.Error())
}
}
if v := getenv("DUCKGRES_DRAINING_INSTANCE_EXPIRY_TIMEOUT"); v != "" {
if d, err := time.ParseDuration(v); err == nil {
drainingInstanceExpiryTimeout = d
} else {
warn("Invalid DUCKGRES_DRAINING_INSTANCE_EXPIRY_TIMEOUT duration: " + err.Error())
}
}
if v := getenv("DUCKGRES_ACME_DOMAIN"); v != "" {
cfg.ACMEDomain = v
}
Expand Down Expand Up @@ -1101,6 +1118,13 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
warn("Invalid --handover-drain-timeout duration: " + err.Error())
}
}
if cli.Set["draining-instance-expiry-timeout"] {
if d, err := time.ParseDuration(cli.DrainingInstanceExpiryTimeout); err == nil {
drainingInstanceExpiryTimeout = d
} else {
warn("Invalid --draining-instance-expiry-timeout duration: " + err.Error())
}
}
if cli.Set["acme-domain"] {
cfg.ACMEDomain = cli.ACMEDomain
}
Expand Down Expand Up @@ -1260,6 +1284,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
WorkerQueueTimeout: workerQueueTimeout,
WorkerIdleTimeout: workerIdleTimeout,
HandoverDrainTimeout: handoverDrainTimeout,
DrainingInstanceExpiryTimeout: drainingInstanceExpiryTimeout,
WorkerBackend: workerBackend,
K8sWorkerImage: k8sWorkerImage,
K8sWorkerNamespace: k8sWorkerNamespace,
Expand Down
11 changes: 10 additions & 1 deletion controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,16 @@ type ControlPlaneConfig struct {
WorkerIdleTimeout time.Duration // How long to keep an idle worker alive (default: 5m)
RetireOnSessionEnd bool // When true, process workers are retired immediately after their last session ends.
HandoverDrainTimeout time.Duration // How long to wait for connections to drain during upgrade. 0 = unbounded (wait until k8s SIGKILL via terminationGracePeriodSeconds). Default: 0 in remote mode (so a CP rolling out doesn't kill in-flight customer queries at a self-imposed wall — see drainAndShutdown), 24h in process mode.
MetricsServer *http.Server // Optional metrics server to shut down during upgrade
// DrainingInstanceExpiryTimeout bounds how long a *draining* control-plane
// row may linger before the leader janitor force-expires it, so its orphaned
// workers become visible to the orphan sweep (ListOrphanedWorkers only lists
// workers whose owning CP is expired). This is deliberately independent of
// HandoverDrainTimeout: session draining stays unbounded (0) in remote mode so
// a rollout never cuts an in-flight import, while the draining *row* still gets
// a finite life so leaked workers can be reaped. 0 = use the built-in default
// (see effectiveDrainingInstanceExpiry); the reaper is never disabled.
DrainingInstanceExpiryTimeout time.Duration
MetricsServer *http.Server // Optional metrics server to shut down during upgrade

// WorkerBackend selects the worker management backend.
// "process" (default): workers are local child processes communicating over Unix sockets.
Expand Down
Loading
Loading