Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions controlplane/configstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,40 @@ func (cs *ConfigStore) UpdateWarehouseState(orgID string, expectedState ManagedW
return nil
}

// FinalizeWarehouseDeletion removes the config-store rows for a fully torn-down
// managed warehouse. It only succeeds while the warehouse is still in deleting
// state so cleanup remains retryable if the transaction fails.
func (cs *ConfigStore) FinalizeWarehouseDeletion(orgID string) error {
return cs.db.Transaction(func(tx *gorm.DB) error {
result := tx.Model(&ManagedWarehouse{}).
Where("org_id = ? AND state = ?", orgID, ManagedWarehouseStateDeleting).
Updates(map[string]interface{}{
"state": ManagedWarehouseStateDeleted,
"status_message": "Resources deleted",
})
if result.Error != nil {
return fmt.Errorf("finalize warehouse deletion: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("warehouse %q expected state %q: %w", orgID, ManagedWarehouseStateDeleting, ErrWarehouseStateMismatch)
}

if err := tx.Where("org_id = ?", orgID).Delete(&OrgUserSecret{}).Error; err != nil {
return fmt.Errorf("delete org user secrets: %w", err)
}
if err := tx.Where("org_id = ?", orgID).Delete(&OrgUser{}).Error; err != nil {
return fmt.Errorf("delete org users: %w", err)
}
if err := tx.Where("org_id = ?", orgID).Delete(&ManagedWarehouse{}).Error; err != nil {
return fmt.Errorf("delete managed warehouse: %w", err)
}
if err := tx.Where("name = ?", orgID).Delete(&Org{}).Error; err != nil {
return fmt.Errorf("delete org: %w", err)
}
return nil
})
}

// GetManagedWarehouseIceberg reads the embedded Iceberg config for an
// org. Returns (nil, nil) when the org has no warehouse row so callers
// can distinguish "never provisioned" from a DB error.
Expand Down
8 changes: 3 additions & 5 deletions controlplane/provisioner/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type WarehouseStore interface {
ListWarehousesByStates(states []configstore.ManagedWarehouseProvisioningState) ([]configstore.ManagedWarehouse, error)
UpdateWarehouseState(orgID string, expectedState configstore.ManagedWarehouseProvisioningState, updates map[string]interface{}) error
FinalizeWarehouseDeletion(orgID string) error
// UpdateIcebergConfig writes per-org Iceberg/Lakekeeper config without
// CAS'ing on the top-level warehouse state. The Lakekeeper provisioner
// uses this because Iceberg provisioning runs in parallel with the
Expand Down Expand Up @@ -515,10 +516,7 @@ func (c *Controller) reconcileDeleting(ctx context.Context, w *configstore.Manag
}
}

if err := c.store.UpdateWarehouseState(w.OrgID, configstore.ManagedWarehouseStateDeleting, map[string]interface{}{
"state": configstore.ManagedWarehouseStateDeleted,
"status_message": "Resources deleted",
}); err != nil {
log.Warn("Failed to update state to deleted.", "error", err)
if err := c.store.FinalizeWarehouseDeletion(w.OrgID); err != nil {
log.Warn("Failed to finalize warehouse deletion.", "error", err)
}
}
4 changes: 2 additions & 2 deletions controlplane/provisioner/controller_lakekeeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestReconcileDeleting_TearsDownLakekeeper(t *testing.T) {
if _, err := kc.CoreV1().ServiceAccounts(k8sClient.namespace).Get(ctx, LakekeeperServiceAccountName(orgID), metav1.GetOptions{}); !apierrors.IsNotFound(err) {
t.Errorf("Lakekeeper ServiceAccount not torn down: err=%v", err)
}
if fs.warehouses[orgID].State != configstore.ManagedWarehouseStateDeleted {
t.Errorf("warehouse state = %q, want deleted", fs.warehouses[orgID].State)
if _, ok := fs.warehouses[orgID]; ok {
t.Error("warehouse row was not removed")
}
}

Expand Down
37 changes: 30 additions & 7 deletions controlplane/provisioner/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import (
// fakeStore implements WarehouseStore for unit tests.
type fakeStore struct {
warehouses map[string]*configstore.ManagedWarehouse
orgs map[string]bool
}

func newFakeStore() *fakeStore {
return &fakeStore{warehouses: make(map[string]*configstore.ManagedWarehouse)}
return &fakeStore{
warehouses: make(map[string]*configstore.ManagedWarehouse),
orgs: make(map[string]bool),
}
}

func (s *fakeStore) ListWarehousesByStates(states []configstore.ManagedWarehouseProvisioningState) ([]configstore.ManagedWarehouse, error) {
Expand Down Expand Up @@ -121,6 +125,19 @@ func (s *fakeStore) UpdateWarehouseState(orgID string, expectedState configstore
return nil
}

func (s *fakeStore) FinalizeWarehouseDeletion(orgID string) error {
w, ok := s.warehouses[orgID]
if !ok {
return fmt.Errorf("warehouse %q: %w", orgID, configstore.ErrWarehouseStateMismatch)
}
if w.State != configstore.ManagedWarehouseStateDeleting {
return fmt.Errorf("warehouse %q expected deleting got %q: %w", orgID, w.State, configstore.ErrWarehouseStateMismatch)
}
delete(s.orgs, orgID)
delete(s.warehouses, orgID)
return nil
}

// UpdateIcebergConfig writes per-org Iceberg/Lakekeeper fields without a
// top-level state CAS. Mirrors the real configstore method's contract.
func (s *fakeStore) UpdateIcebergConfig(orgID string, updates map[string]interface{}) error {
Expand Down Expand Up @@ -748,6 +765,7 @@ func TestReconcileProvisioningProbesPgBouncerWhenEnabled(t *testing.T) {
func TestReconcileDeletingDeletesCR(t *testing.T) {
dc, fakeK8s := newFakeDucklingClient()
fs := newFakeStore()
fs.orgs["org-c"] = true
fs.warehouses["org-c"] = &configstore.ManagedWarehouse{
OrgID: "org-c",
State: configstore.ManagedWarehouseStateDeleting,
Expand Down Expand Up @@ -780,9 +798,11 @@ func TestReconcileDeletingDeletesCR(t *testing.T) {
return
}

// Verify state transitioned to deleted
if fs.warehouses["org-c"].State != configstore.ManagedWarehouseStateDeleted {
t.Fatalf("expected deleted state, got %q", fs.warehouses["org-c"].State)
if _, ok := fs.warehouses["org-c"]; ok {
t.Fatal("expected warehouse row to be removed")
}
if fs.orgs["org-c"] {
t.Fatal("expected org row to be removed")
}
}

Expand All @@ -791,6 +811,7 @@ func TestReconcileDeletingRetriesOnNonNotFoundError(t *testing.T) {
// When it's a different error, it should NOT transition to deleted.
dc, _ := newFakeDucklingClient()
fs := newFakeStore()
fs.orgs["org-d"] = true
fs.warehouses["org-d"] = &configstore.ManagedWarehouse{
OrgID: "org-d",
State: configstore.ManagedWarehouseStateDeleting,
Expand All @@ -801,9 +822,11 @@ func TestReconcileDeletingRetriesOnNonNotFoundError(t *testing.T) {
ctrl := NewControllerWithClient(fs, dc, time.Second)
ctrl.reconcile(ctx)

// NotFound on delete is fine — should still transition to deleted
if fs.warehouses["org-d"].State != configstore.ManagedWarehouseStateDeleted {
t.Fatalf("expected deleted state on NotFound, got %q", fs.warehouses["org-d"].State)
if _, ok := fs.warehouses["org-d"]; ok {
t.Fatal("expected warehouse row to be removed on NotFound")
}
if fs.orgs["org-d"] {
t.Fatal("expected org row to be removed on NotFound")
}
}

Expand Down
16 changes: 13 additions & 3 deletions docs/runbooks/managed-warehouse-deprovision.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Runbook: Managed Warehouse Deprovision

Use this runbook to delete a managed warehouse and then remove the org from the control-plane config store. This is destructive: confirm the org ID, cluster, AWS account, and data-retention expectations before starting.
Use this runbook to delete a managed warehouse and verify the org has been removed from the control-plane config store. This is destructive: confirm the org ID, cluster, AWS account, and data-retention expectations before starting.

## Setup

Expand Down Expand Up @@ -98,7 +98,16 @@ while true; do
done
```

5. Delete the org row only after the Duckling CR and managed resources are gone.
5. Verify the org row was removed after the Duckling CR and managed resources are gone.

```bash
curl -sS "$API_BASE/api/v1/orgs" \
-H "X-Duckgres-Internal-Secret: $DUCKGRES_INTERNAL_SECRET" \
| jq -r '.[] | [.name, .database_name] | @tsv' \
| grep -F "$ORG" || echo "org row removed"
```

If the org row is still present after resource cleanup is complete, remove it manually:

```bash
curl -sS -X DELETE "$API_BASE/api/v1/orgs/$ORG" \
Expand All @@ -111,7 +120,8 @@ curl -sS -X DELETE "$API_BASE/api/v1/orgs/$ORG" \
```bash
curl -sS "$API_BASE/api/v1/orgs" \
-H "X-Duckgres-Internal-Secret: $DUCKGRES_INTERNAL_SECRET" \
| jq -r '.[] | [.name, .database_name] | @tsv'
| jq -r '.[] | [.name, .database_name] | @tsv' \
| grep -F "$ORG" || echo "org row removed"

kubectl --context "$TARGET_CONTEXT" -n ducklings get ducklings.k8s.posthog.com "$CR_NAME" || true
kubectl --context "$TARGET_CONTEXT" get managed -A | grep "$CR_NAME" || true
Expand Down
73 changes: 73 additions & 0 deletions tests/configstore/managed_warehouse_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package configstore_test

import (
"errors"
"testing"

"github.com/posthog/duckgres/controlplane/configstore"
Expand Down Expand Up @@ -107,3 +108,75 @@ func TestManagedWarehouseConfigStorePostgres(t *testing.T) {
t.Fatalf("expected warehouse to be deleted via cascade, count=%d", count)
}
}

func TestFinalizeWarehouseDeletionRemovesOrgConfigRows(t *testing.T) {
store := newIsolatedConfigStore(t)

if err := store.DB().Create(&configstore.Org{Name: "analytics", DatabaseName: "analytics"}).Error; err != nil {
t.Fatalf("create org: %v", err)
}
if err := store.DB().Create(&configstore.OrgUser{
OrgID: "analytics",
Username: "root",
Password: "hash",
}).Error; err != nil {
t.Fatalf("create org user: %v", err)
}
if err := store.DB().Create(&configstore.OrgUserSecret{
OrgID: "analytics",
Username: "root",
SecretName: "persisted_secret",
Ciphertext: []byte("sealed"),
}).Error; err != nil {
t.Fatalf("create org user secret: %v", err)
}
if err := store.DB().Create(&configstore.ManagedWarehouse{
OrgID: "analytics",
State: configstore.ManagedWarehouseStateDeleting,
}).Error; err != nil {
t.Fatalf("create warehouse: %v", err)
}

if err := store.FinalizeWarehouseDeletion("analytics"); err != nil {
t.Fatalf("finalize deletion: %v", err)
}

assertRowCount(t, store, &configstore.Org{}, "name = ?", "analytics", 0)
assertRowCount(t, store, &configstore.ManagedWarehouse{}, "org_id = ?", "analytics", 0)
assertRowCount(t, store, &configstore.OrgUser{}, "org_id = ?", "analytics", 0)
assertRowCount(t, store, &configstore.OrgUserSecret{}, "org_id = ?", "analytics", 0)
}

func TestFinalizeWarehouseDeletionRequiresDeletingState(t *testing.T) {
store := newIsolatedConfigStore(t)

if err := store.DB().Create(&configstore.Org{Name: "analytics", DatabaseName: "analytics"}).Error; err != nil {
t.Fatalf("create org: %v", err)
}
if err := store.DB().Create(&configstore.ManagedWarehouse{
OrgID: "analytics",
State: configstore.ManagedWarehouseStateReady,
}).Error; err != nil {
t.Fatalf("create warehouse: %v", err)
}

err := store.FinalizeWarehouseDeletion("analytics")
if !errors.Is(err, configstore.ErrWarehouseStateMismatch) {
t.Fatalf("expected ErrWarehouseStateMismatch, got %v", err)
}

assertRowCount(t, store, &configstore.Org{}, "name = ?", "analytics", 1)
assertRowCount(t, store, &configstore.ManagedWarehouse{}, "org_id = ?", "analytics", 1)
}

func assertRowCount(t *testing.T, store *configstore.ConfigStore, model any, query string, arg any, want int64) {
t.Helper()

var count int64
if err := store.DB().Model(model).Where(query, arg).Count(&count).Error; err != nil {
t.Fatalf("count %T: %v", model, err)
}
if count != want {
t.Fatalf("count %T = %d, want %d", model, count, want)
}
}
6 changes: 3 additions & 3 deletions tests/e2e-mw-dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ client-go:
reused hot-idle worker (cnpg lane).
- **isolation** — two tenants (cnpg vs ext) see distinct catalogs; a
cross-tenant read is denied.
- **lifecycle** — deprovision → `warehouse=deleted` → the Crossplane Duckling
CR **fully** deletes (`kubectl wait --for=delete`, asserting the finalizer
cascade that drops the cnpg role+db completed). Same-id **re-provision** is
- **lifecycle** — deprovision → warehouse config row removed → the Crossplane
Duckling CR **fully** deletes (`kubectl wait --for=delete`, asserting the
finalizer cascade that drops the cnpg role+db completed). Same-id **re-provision** is
*not* done in-Job: a clean slate needs DROPping a possibly-stranded cnpg role,
which only `run.sh` (on the runner, with cnpg-shards exec) can do — so the
stranded-cnpg-role regression (#649/#650/#11518/#11522) is covered **across
Expand Down
21 changes: 15 additions & 6 deletions tests/e2e-mw-dev/harness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,16 @@ k() { "$KUBECTL" -n "$NS" "$@"; }

api_post() { curl -fsS -X POST -H "$H" "$API/api/v1/orgs/$1/$2"; }
api_get() { curl -fsS -H "$H" "$API/api/v1/orgs/$1/$2"; }
state_of() { api_get "$1" warehouse/status 2>/dev/null | jq -r '.state // "missing"'; }
state_of() {
org="$1"
tmp="/tmp/duckgres-status-$org.json"
code="$(curl -sS -o "$tmp" -w '%{http_code}' -H "$H" "$API/api/v1/orgs/$org/warehouse/status" 2>/dev/null || true)"
case "$code" in
200) jq -r '.state // "missing"' "$tmp" ;;
404) echo missing ;;
*) echo "http_$code" ;;
esac
}

provision() { # org metadata_json
log "provision $1"
Expand Down Expand Up @@ -1293,10 +1302,10 @@ tenant_isolation() { # orgA pwA orgB pwB
pg "$1" "$2" ducklake "DROP TABLE $t;"
}

# ---- lifecycle: deprovision → warehouse deleted → Duckling CR fully gone ----
# Proves the teardown path works end to end: warehouse marked deleted, the
# Crossplane Duckling CR removed, and its finalizer cascade (which drops the
# cnpg lakekeeper_<org> role+db) completed.
# ---- lifecycle: deprovision → config row removed → Duckling CR fully gone ----
# Proves the teardown path works end to end: warehouse config removed after the
# Crossplane Duckling CR is deleted, and the CR finalizer cascade (which drops
# the cnpg lakekeeper_<org> role+db) completed.
#
# NOTE: same-org-id *re-provision* in the SAME run is intentionally NOT done
# here. It is the regression net for the stranded-cnpg-role bugs
Expand All @@ -1313,7 +1322,7 @@ tenant_isolation() { # orgA pwA orgB pwB
lifecycle_teardown_cnpg() { # org
log "lifecycle: deprovision $1 + assert Duckling CR fully deleted"
api_post "$1" deprovision >/dev/null
wait_state "$1" deleted 600
wait_state "$1" missing 600
if ! "$KUBECTL" -n ducklings wait --for=delete "duckling/$1" --timeout=420s >/dev/null 2>&1; then
fail "Duckling CR $1 did not fully delete within 420s (finalizer/cnpg cascade stuck)"
fi
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e-mw-dev/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ cmd_teardown() {
for _ in $(seq 1 30); do
gone=1
for org in $(ci_orgs "$PR_NUMBER"); do
st="$(curl -fsS -H "X-Duckgres-Internal-Secret: $secret" \
st="$(curl -sS -H "X-Duckgres-Internal-Secret: $secret" \
"http://localhost:18080/api/v1/orgs/$org/warehouse/status" 2>/dev/null \
| sed -n 's/.*"state":"\([^"]*\)".*/\1/p')"
| sed -n 's/.*"state":"\([^"]*\)".*/\1/p' || true)"
[ "$st" = "deleted" ] || [ -z "$st" ] || gone=0
done
[ "$gone" = 1 ] && break
Expand Down
Loading