Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions tests/configstore/runtime_store_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,151 @@ func TestUpsertWorkerRecordStampsAndPreservesHotIdleSince(t *testing.T) {
}
}

// Seam regression: a hot-idle worker must still be reaped on its hot_idle_since
// clock after the REAL credential-refresh writers (BumpWorkerEpoch +
// MarkCredentialsRefreshed) bump its updated_at. The pre-fix reaper keyed off
// updated_at, so these refresh writes — which fire on a shorter cadence than the
// TTL and target hot_idle rows — perpetually reset the reap clock. The existing
// TestListExpiredHotIdleWorkersUsesHotIdleSinceNotUpdatedAt forges updated_at via
// a raw UPDATE; this test instead drives the production write paths, proving they
// don't reset the clock.
func TestListExpiredHotIdleWorkersSurvivesRealCredentialRefresh(t *testing.T) {
store := newIsolatedConfigStore(t)
tbl := store.RuntimeSchema() + ".worker_records"
const (
id = 1
cp = "cp:boot"
epoch = int64(1)
ttl = 5 * time.Minute
)
now := time.Now()
idleSince := now.Add(-30 * time.Minute).UTC().Truncate(time.Microsecond)

if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{
WorkerID: id,
PodName: "duckgres-worker-seam",
State: configstore.WorkerStateHotIdle,
OrgID: "analytics",
Image: "duckgres:v2",
OwnerCPInstanceID: cp,
OwnerEpoch: epoch,
LastHeartbeatAt: now,
TTLMinutes: int(ttl.Minutes()),
}); err != nil {
t.Fatalf("UpsertWorkerRecord: %v", err)
}
// Wall-clock time can't advance in a test, so backdate the idle moment to
// simulate a worker that became idle 30m ago. Only the setup is forged; the
// updated_at bump under test comes from the real writers below.
if err := store.DB().Exec(
fmt.Sprintf(`UPDATE %s SET hot_idle_since = ?, updated_at = ? WHERE worker_id = ?`, tbl),
idleSince, idleSince, id,
).Error; err != nil {
t.Fatalf("backdate timestamps: %v", err)
}

// Drive the real credential-refresh write paths, exactly as the scheduler does.
newEpoch, err := store.BumpWorkerEpoch(id, cp, epoch)
if err != nil {
t.Fatalf("BumpWorkerEpoch: %v", err)
}
if _, err := store.MarkCredentialsRefreshed(id, cp, newEpoch, now.Add(time.Hour)); err != nil {
t.Fatalf("MarkCredentialsRefreshed: %v", err)
}

rec, err := store.GetWorkerRecord(id)
if err != nil {
t.Fatalf("GetWorkerRecord: %v", err)
}
if rec.HotIdleSince == nil || !rec.HotIdleSince.Equal(idleSince) {
t.Fatalf("refresh writes must not move hot_idle_since: got %v, want %v", rec.HotIdleSince, idleSince)
}
// Confirm the bug precondition is present: updated_at was pushed forward to
// ~now, so the OLD updated_at-keyed predicate (updated_at + ttl <= now) would
// NOT have reaped this worker. If this ever stops holding the test has lost its
// teeth.
if !rec.UpdatedAt.Add(ttl).After(now) {
t.Fatalf("expected refresh to push updated_at near now so the old predicate would hide the worker; updated_at=%v", rec.UpdatedAt)
}

expired, err := store.ListExpiredHotIdleWorkers(now, 10*time.Minute)
if err != nil {
t.Fatalf("ListExpiredHotIdleWorkers: %v", err)
}
found := false
for _, w := range expired {
if w.WorkerID == id {
found = true
}
}
if !found {
t.Error("hot-idle worker idle for 30m (ttl 5m) must be reaped despite a real credential refresh bumping updated_at")
}
}

// Invariant guard for the whole class of bug: a column read as a semantic clock
// by one subsystem must not be moved by another subsystem's writes. Every real
// credential-refresh / lease writer must leave hot_idle_since untouched while it
// (legitimately) advances updated_at.
func TestCredentialRefreshWritesLeaveHotIdleSinceUnchanged(t *testing.T) {
store := newIsolatedConfigStore(t)
tbl := store.RuntimeSchema() + ".worker_records"
writers := []struct {
name string
apply func(t *testing.T, id int)
}{
{"BumpWorkerEpoch", func(t *testing.T, id int) {
if _, err := store.BumpWorkerEpoch(id, "cp:boot", 1); err != nil {
t.Fatalf("BumpWorkerEpoch: %v", err)
}
}},
{"MarkCredentialsRefreshed", func(t *testing.T, id int) {
if _, err := store.MarkCredentialsRefreshed(id, "cp:boot", 1, time.Now().Add(time.Hour)); err != nil {
t.Fatalf("MarkCredentialsRefreshed: %v", err)
}
}},
}
for i, w := range writers {
t.Run(w.name, func(t *testing.T) {
id := i + 1
if err := store.UpsertWorkerRecord(&configstore.WorkerRecord{
WorkerID: id,
PodName: fmt.Sprintf("duckgres-worker-inv-%d", id),
State: configstore.WorkerStateHotIdle,
OrgID: "analytics",
Image: "duckgres:v2",
OwnerCPInstanceID: "cp:boot",
OwnerEpoch: 1,
LastHeartbeatAt: time.Now(),
}); err != nil {
t.Fatalf("UpsertWorkerRecord: %v", err)
}
// Age the row an hour into the past so the writer's updated_at bump is
// unambiguously measurable; hot_idle_since must stay put regardless.
idleSince := time.Now().Add(-time.Hour).UTC().Truncate(time.Microsecond)
if err := store.DB().Exec(
fmt.Sprintf(`UPDATE %s SET hot_idle_since = ?, updated_at = ? WHERE worker_id = ?`, tbl),
idleSince, idleSince, id,
).Error; err != nil {
t.Fatalf("backdate timestamps: %v", err)
}

w.apply(t, id)

rec, err := store.GetWorkerRecord(id)
if err != nil {
t.Fatalf("GetWorkerRecord: %v", err)
}
if rec.HotIdleSince == nil || !rec.HotIdleSince.Equal(idleSince) {
t.Errorf("%s moved hot_idle_since: got %v, want %v", w.name, rec.HotIdleSince, idleSince)
}
if !rec.UpdatedAt.After(idleSince) {
t.Errorf("%s should have advanced updated_at past the backdated value; got %v", w.name, rec.UpdatedAt)
}
})
}
}

func ptrTime(t time.Time) *time.Time {
return &t
}
Loading