Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/duckgres-controlplane/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func main() {
WorkerTolerationValue: resolved.K8sWorkerTolerationValue,
AllowClientWorkerProfile: resolved.K8sAllowClientWorkerProfile,
WorkerPriorityClassName: resolved.K8sWorkerPriorityClassName,
HeadroomNodes: resolved.K8sHeadroomNodes,
HeadroomPercent: resolved.K8sHeadroomPercent,
PlaceholderImage: resolved.K8sPlaceholderImage,
PlaceholderPriorityClassName: resolved.K8sPlaceholderPriorityClassName,
Expand Down
8 changes: 8 additions & 0 deletions configresolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type Resolved struct {
K8sWorkerTolerationValue string
K8sAllowClientWorkerProfile bool
K8sWorkerPriorityClassName string
K8sHeadroomNodes int
K8sHeadroomPercent int
K8sPlaceholderImage string
K8sPlaceholderPriorityClassName string
Expand Down Expand Up @@ -204,6 +205,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
var k8sWorkerProfileMinCPU, k8sWorkerProfileMaxCPU, k8sWorkerProfileMinMemory, k8sWorkerProfileMaxMemory string
var k8sWorkerMaxTTL, k8sWorkerDefaultTTL time.Duration
var k8sWorkerPriorityClassName string
var k8sHeadroomNodes int
var k8sHeadroomPercent int
var k8sPlaceholderImage, k8sPlaceholderPriorityClassName string
var k8sWorkerImage, k8sWorkerNamespace, k8sControlPlaneID string
Expand Down Expand Up @@ -907,6 +909,11 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
if v := getenv("DUCKGRES_K8S_WORKER_PRIORITY_CLASS"); v != "" {
k8sWorkerPriorityClassName = v
}
if v := getenv("DUCKGRES_K8S_HEADROOM_NODES"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
k8sHeadroomNodes = n
}
}
if v := getenv("DUCKGRES_K8S_HEADROOM_PERCENT"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
k8sHeadroomPercent = n
Expand Down Expand Up @@ -1277,6 +1284,7 @@ func ResolveEffective(fileCfg *configloader.FileConfig, cli CLIInputs, getenv fu
K8sWorkerTolerationValue: k8sWorkerTolerationValue,
K8sAllowClientWorkerProfile: k8sAllowClientWorkerProfile,
K8sWorkerPriorityClassName: k8sWorkerPriorityClassName,
K8sHeadroomNodes: k8sHeadroomNodes,
K8sHeadroomPercent: k8sHeadroomPercent,
K8sPlaceholderImage: k8sPlaceholderImage,
K8sPlaceholderPriorityClassName: k8sPlaceholderPriorityClassName,
Expand Down
10 changes: 10 additions & 0 deletions controlplane/configstore/lifecycle_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func (cs *ConfigStore) ListExpiredHotIdleSnapshots(now time.Time, defaultTTL tim
return workerSnapshotsFromRecords(records), nil
}

// ListExpiredHotIdleSnapshotsForCP is the snapshot-typed, owner-scoped variant
// of ListExpiredHotIdleWorkers used by the per-CP fallback reaper.
func (cs *ConfigStore) ListExpiredHotIdleSnapshotsForCP(ownerCPInstanceID string, now time.Time, defaultTTL time.Duration) ([]WorkerSnapshot, error) {
records, err := cs.ListExpiredHotIdleWorkersForCP(ownerCPInstanceID, now, defaultTTL)
if err != nil {
return nil, err
}
return workerSnapshotsFromRecords(records), nil
}

// ListStuckWorkerSnapshots is the snapshot-typed variant of
// ListStuckWorkers used by the janitor's stuck-spawn/activate reaper.
func (cs *ConfigStore) ListStuckWorkerSnapshots(spawningBefore, activatingBefore time.Time) ([]WorkerSnapshot, error) {
Expand Down
23 changes: 23 additions & 0 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,29 @@ func (cs *ConfigStore) ListExpiredHotIdleWorkers(now time.Time, defaultTTL time.
return workers, nil
}

// ListExpiredHotIdleWorkersForCP is the owner-scoped variant of
// ListExpiredHotIdleWorkers: it returns only expired hot-idle workers owned by
// ownerCPInstanceID. Used by the per-CP fallback reaper, which runs on every
// replica independent of the janitor leader lease so hot-idle workers are still
// retired (and their nodes reclaimed) even if leadership is wedged/absent. Each
// replica only reaps the workers it owns; the leader janitor still handles
// cross-CP/orphaned rows.
func (cs *ConfigStore) ListExpiredHotIdleWorkersForCP(ownerCPInstanceID string, now time.Time, defaultTTL time.Duration) ([]WorkerRecord, error) {
defMins := int64(defaultTTL.Minutes())
if defMins < 0 {
defMins = 0
}
var workers []WorkerRecord
err := cs.db.Table(cs.runtimeTable((&WorkerRecord{}).TableName())).
Where("state = ? AND owner_cp_instance_id = ? AND COALESCE(hot_idle_since, updated_at) + (CASE WHEN COALESCE(ttl_minutes, 0) > 0 THEN ttl_minutes ELSE ? END) * interval '1 minute' <= ?",
WorkerStateHotIdle, ownerCPInstanceID, defMins, now).
Find(&workers).Error
if err != nil {
return nil, fmt.Errorf("list expired hot-idle workers for cp: %w", err)
}
return workers, nil
}

// CountHotIdleWorkers returns the number of compatible hot-idle workers for an
// org/profile/image bucket. Empty profile CPU/memory is the default profile.
func (cs *ConfigStore) CountHotIdleWorkers(orgID, image, profileCPU, profileMemory string) (int, error) {
Expand Down
15 changes: 9 additions & 6 deletions controlplane/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,14 @@ type K8sConfig struct {
WorkerPriorityClassName string // PriorityClass for worker pods, so they preempt overprovision headroom pause pods (empty = none)
AWSRegion string // AWS region for STS client

// Node-headroom controller: keep HeadroomPercent% of the worker nodepool's
// allocatable CPU+memory free via low-priority placeholder pods, so a worker
// spawn schedules immediately (preempting placeholders) rather than waiting
// on a fresh Karpenter node. 0 = disabled.
HeadroomPercent int // % of worker-nodepool allocatable to hold free (0 = disabled)
// Node-headroom controller holds preemptible low-priority placeholder pods so
// a worker spawn schedules immediately (preempting a placeholder) rather than
// waiting on a fresh Karpenter node. HeadroomNodes>0 selects the constant
// mode (a fixed number of node-sized placeholders, demand-independent — the
// prod default); HeadroomPercent is the legacy demand-proportional fallback.
// Both 0 = disabled.
HeadroomNodes int // CONSTANT node-headroom: number of node-sized placeholder pods (0 = use HeadroomPercent)
HeadroomPercent int // legacy demand-% headroom, used only when HeadroomNodes==0 (0 = disabled)
PlaceholderImage string // Image for placeholder pods (a pause image)
PlaceholderPriorityClassName string // PriorityClass for placeholder pods — MUST rank below WorkerPriorityClassName

Expand All @@ -166,7 +169,7 @@ type K8sConfig struct {
// "default TTL", pairing with WorkerMaxTTL (the clamp). It governs both
// no-ttl paths the same way: default-shape workers (reaped by the janitor)
// and sized-but-no-ttl workers (stamped at profile resolution). Per-request
// precedence: client GUC > org default > this > built-in (20m,
// precedence: client GUC > org default > this > built-in (1m,
// defaultWorkerTTL). Raise it above a tenant's job cadence (e.g. 70m for
// hourly jobs) so scheduled workloads reuse hot-idle workers instead of
// cold-spawning every run — at the cost of idle worker nodes.
Expand Down
114 changes: 80 additions & 34 deletions controlplane/headroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ import (
// headroomPodLabel marks the low-priority placeholder pods this controller owns.
const headroomPodLabel = "duckgres-headroom"

// Node-sized placeholder shape for CONSTANT node-headroom (HeadroomNodes>0):
// one placeholder ≈ one worker node, so N placeholders reserve N preemptible
// nodes' worth of warm capacity regardless of current demand. Sized to an r6gd
// worker node's schedulable capacity (matches the historical exclusive-worker
// shape). A real worker (any shape) preempts one placeholder to claim a node;
// the next reconcile recreates it, which makes Karpenter add a node in the
// background. Kept as a constant — the count is the only knob — per the
// "keep headroom small and constant" decision.
const (
headroomNodeCPU = "46"
headroomNodeMemory = "360Gi"
)

// headroomPlaceholdersNeeded returns how many placeholder pods of size
// (phCPUMillis, phMemBytes) are required to hold `percent`% of the CURRENT
// WORKER DEMAND (workerCPUMillis/workerMemBytes — the summed resource requests
Expand Down Expand Up @@ -69,39 +82,31 @@ func ceilDiv(a, b int64) int {
return int((a + b - 1) / b)
}

// reconcileHeadroom drives the number of placeholder pods toward the configured
// percentage of CURRENT WORKER DEMAND (summed live worker pod requests).
// Leader-gated (called from the janitor, which runs only on the elected CP).
// A no-op when headroomPercent<=0.
// reconcileHeadroom drives the number of low-priority placeholder pods toward
// the headroom target. Leader-gated (called from the janitor, which runs only
// on the elected CP). Always runs (even when headroom is disabled) so that a
// disabled/just-disabled pool converges to ZERO placeholders instead of
// stranding the ones it created — there is no other path that deletes them.
//
// Placeholders are sized to the pool's DEFAULT WORKER SHAPE (WorkerCPURequest/
// WorkerMemoryRequest, else the built-in defaults) — no separate sizing knob:
// one preempted placeholder always frees exactly one default-worker slot, in
// every environment. They carry a PriorityClass BELOW the worker
// PriorityClass, so the scheduler preempts them to fit a real worker; a worker
// larger than one placeholder preempts AS MANY as it needs (standard k8s
// priority preemption). The reconcile then recreates placeholders back toward
// Two modes:
// - CONSTANT (HeadroomNodes>0, the prod default): keep exactly HeadroomNodes
// node-sized placeholders, INDEPENDENT of worker demand. This is what
// bounds headroom cost: the previous demand-proportional mode counted
// hot_idle (idle-but-alive) workers as demand and provisioned an extra
// headroomPercent% of full-worker-sized placeholders, so it AMPLIFIED any
// idle-worker leak ~1.75x. A fixed node count cannot do that.
// - LEGACY percent (HeadroomPercent>0, HeadroomNodes==0): demand-proportional
// sizing, retained for small/dev pools.
//
// Placeholders carry a PriorityClass BELOW the worker PriorityClass, so the
// scheduler preempts them to fit a real worker; a worker larger than one
// placeholder preempts as many as it needs. The reconcile recreates them toward
// the target on the next tick, which makes Karpenter add node capacity in the
// background. So headroom self-heals after any size of spawn.
// background, so headroom self-heals after any spawn.
func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) {
if p.headroomPercent <= 0 {
return
}
phCPU, err := resource.ParseQuantity(firstNonEmpty(p.workerCPURequest, defaultWorkerCPU))
if err != nil {
slog.Warn("Headroom: invalid default worker cpu for placeholder sizing.", "value", p.workerCPURequest, "error", err)
return
}
phMem, err := resource.ParseQuantity(firstNonEmpty(p.workerMemoryRequest, defaultWorkerMemory))
if err != nil {
slog.Warn("Headroom: invalid default worker memory for placeholder sizing.", "value", p.workerMemoryRequest, "error", err)
return
}

workerCPU, workerMem, err := p.activeWorkerDemand(ctx)
if err != nil {
slog.Warn("Headroom: failed to sum worker demand.", "error", err)
return
desired, phCPU, phMem, ok := p.headroomTarget(ctx)
if !ok {
return // sizing/demand error already logged; back off, retry next tick
}

existing, _, err := p.listPlaceholderPods(ctx)
Expand All @@ -110,8 +115,6 @@ func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) {
return
}

desired := headroomPlaceholdersNeeded(workerCPU, workerMem, phCPU.MilliValue(), phMem.Value(), p.headroomPercent)

switch {
case len(existing) < desired:
for i := len(existing); i < desired; i++ {
Expand All @@ -120,15 +123,58 @@ func (p *K8sWorkerPool) reconcileHeadroom(ctx context.Context) {
return // back off; next tick retries
}
}
slog.Info("Headroom: scaled placeholder pods up.", "from", len(existing), "to", desired, "percent", p.headroomPercent)
slog.Info("Headroom: scaled placeholder pods up.", "from", len(existing), "to", desired)
case len(existing) > desired:
// Delete the newest placeholders first (stable order by name).
sort.Slice(existing, func(i, j int) bool { return existing[i] > existing[j] })
for i := 0; i < len(existing)-desired; i++ {
_ = p.clientset.CoreV1().Pods(p.namespace).Delete(ctx, existing[i], metav1.DeleteOptions{GracePeriodSeconds: int64Ptr(0)})
}
slog.Info("Headroom: scaled placeholder pods down.", "from", len(existing), "to", desired, "percent", p.headroomPercent)
slog.Info("Headroom: scaled placeholder pods down.", "from", len(existing), "to", desired)
}
}

// headroomTarget resolves the desired placeholder count and per-placeholder
// size for this tick. ok=false means a transient sizing/demand error (already
// logged) and the caller should skip this tick. A desired of 0 with ok=true is
// the disabled state — the caller still lists and deletes any existing
// placeholders so they don't leak.
func (p *K8sWorkerPool) headroomTarget(ctx context.Context) (desired int, phCPU, phMem resource.Quantity, ok bool) {
if p.headroomNodes > 0 {
cpu, err := resource.ParseQuantity(headroomNodeCPU)
if err != nil {
slog.Warn("Headroom: invalid node cpu for placeholder sizing.", "value", headroomNodeCPU, "error", err)
return 0, resource.Quantity{}, resource.Quantity{}, false
}
mem, err := resource.ParseQuantity(headroomNodeMemory)
if err != nil {
slog.Warn("Headroom: invalid node memory for placeholder sizing.", "value", headroomNodeMemory, "error", err)
return 0, resource.Quantity{}, resource.Quantity{}, false
}
return p.headroomNodes, cpu, mem, true
}

if p.headroomPercent > 0 {
cpu, err := resource.ParseQuantity(firstNonEmpty(p.workerCPURequest, defaultWorkerCPU))
if err != nil {
slog.Warn("Headroom: invalid default worker cpu for placeholder sizing.", "value", p.workerCPURequest, "error", err)
return 0, resource.Quantity{}, resource.Quantity{}, false
}
mem, err := resource.ParseQuantity(firstNonEmpty(p.workerMemoryRequest, defaultWorkerMemory))
if err != nil {
slog.Warn("Headroom: invalid default worker memory for placeholder sizing.", "value", p.workerMemoryRequest, "error", err)
return 0, resource.Quantity{}, resource.Quantity{}, false
}
workerCPU, workerMem, err := p.activeWorkerDemand(ctx)
if err != nil {
slog.Warn("Headroom: failed to sum worker demand.", "error", err)
return 0, resource.Quantity{}, resource.Quantity{}, false
}
return headroomPlaceholdersNeeded(workerCPU, workerMem, cpu.MilliValue(), mem.Value(), p.headroomPercent), cpu, mem, true
}

// Disabled: converge to zero placeholders.
return 0, resource.Quantity{}, resource.Quantity{}, true
}

// activeWorkerDemand sums the resource REQUESTS of live (non-terminating)
Expand Down
82 changes: 82 additions & 0 deletions controlplane/headroom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,32 @@ package controlplane

import (
"context"
"fmt"
"testing"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)

// emulateGenerateName makes the fake clientset assign unique names from
// metav1.GenerateName on pod create, as the real API server does (the fake
// otherwise leaves Name empty, so multiple GenerateName creates collide).
func emulateGenerateName(cs *fake.Clientset) {
var n int
cs.PrependReactor("create", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
pod, ok := action.(k8stesting.CreateAction).GetObject().(*corev1.Pod)
if ok && pod.Name == "" && pod.GenerateName != "" {
n++
pod.Name = fmt.Sprintf("%s%d", pod.GenerateName, n)
}
return false, nil, nil // fall through to the default tracker with the mutated object
})
}

func TestHeadroomPlaceholdersNeeded(t *testing.T) {
const (
gi = 1 << 30
Expand Down Expand Up @@ -81,6 +99,70 @@ func TestPlaceholderPodGenerateName(t *testing.T) {
}
}

// Constant node-headroom keeps a FIXED number of node-sized placeholders and
// does NOT scale with worker demand — the property that prevents an idle-worker
// leak from being amplified into runaway placeholder capacity.
func TestReconcileHeadroomConstantNodesIgnoresDemand(t *testing.T) {
worker := map[string]string{"app": "duckgres-worker"}
mkWorker := func(name string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "default", Labels: worker},
Spec: corev1.PodSpec{Containers: []corev1.Container{{
Name: "duckdb-worker",
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("64Gi"),
}},
}}},
}
}
// A large worker population (as a hot_idle leak would produce) must NOT
// inflate the placeholder count under constant mode.
objs := []runtime.Object{}
for i := 0; i < 50; i++ {
objs = append(objs, mkWorker(fmt.Sprintf("w%d", i)))
}
cs := fake.NewSimpleClientset(objs...)
emulateGenerateName(cs)
p := &K8sWorkerPool{clientset: cs, namespace: "default", cpID: "duckgres-cp-abcde", headroomNodes: 3}

p.reconcileHeadroom(context.Background())

pods, err := cs.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{LabelSelector: "app=" + headroomPodLabel})
if err != nil {
t.Fatalf("list placeholders: %v", err)
}
if len(pods.Items) != 3 {
t.Fatalf("constant headroom: expected 3 placeholders regardless of 50 workers, got %d", len(pods.Items))
}
req := pods.Items[0].Spec.Containers[0].Resources.Requests
if req.Cpu().String() != headroomNodeCPU || req.Memory().String() != headroomNodeMemory {
t.Fatalf("expected node-sized placeholder %s/%s, got %s/%s", headroomNodeCPU, headroomNodeMemory, req.Cpu(), req.Memory())
}
}

// Disabling headroom (both knobs 0) must converge existing placeholders to zero,
// not strand them — there is no other path that deletes placeholder pods.
func TestReconcileHeadroomDisabledDeletesPlaceholders(t *testing.T) {
mkPlaceholder := func(name string) *corev1.Pod {
return &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: name, Namespace: "default", Labels: map[string]string{"app": headroomPodLabel},
}}
}
cs := fake.NewSimpleClientset(mkPlaceholder("duckgres-placeholder-x-1"), mkPlaceholder("duckgres-placeholder-x-2"))
p := &K8sWorkerPool{clientset: cs, namespace: "default", headroomNodes: 0, headroomPercent: 0}

p.reconcileHeadroom(context.Background())

pods, err := cs.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{LabelSelector: "app=" + headroomPodLabel})
if err != nil {
t.Fatalf("list placeholders: %v", err)
}
if len(pods.Items) != 0 {
t.Fatalf("expected disabled headroom to delete all placeholders, got %d", len(pods.Items))
}
}

func TestActiveWorkerDemand(t *testing.T) {
// Sums live worker pod requests; excludes terminating/terminal pods and
// non-worker pods (placeholders must never feed the demand baseline).
Expand Down
Loading
Loading