Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -4331,6 +4332,111 @@ var _ = Describe("Consolidation", func() {
ExpectExists(ctx, env.Client, nodeClaims[1])
ExpectExists(ctx, env.Client, nodeClaims[2])
})
It("can replace node with ephemeral volumes and zonal topology spread", func() {
labels = map[string]string{
"app": "test-ephemeral-zonal-spread",
}
// create our RS so we can link a pod to it
rs := test.ReplicaSet()
ExpectApplied(ctx, env.Client, rs)

tsc := corev1.TopologySpreadConstraint{
MaxSkew: 1,
TopologyKey: corev1.LabelTopologyZone,
WhenUnsatisfiable: corev1.DoNotSchedule,
LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
}

sc := test.StorageClass(test.StorageClassOptions{
ObjectMeta: metav1.ObjectMeta{Name: "ephemeral-sc"},
Provisioner: lo.ToPtr("test.csi.driver"),
VolumeBindingMode: lo.ToPtr(storagev1.VolumeBindingWaitForFirstConsumer),
})

volumeName := "tmp-ephemeral"
pods := test.Pods(4, test.PodOptions{
ResourceRequirements: corev1.ResourceRequirements{Requests: map[corev1.ResourceName]resource.Quantity{corev1.ResourceCPU: resource.MustParse("1")}},
TopologySpreadConstraints: []corev1.TopologySpreadConstraint{tsc},
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
Name: rs.Name,
UID: rs.UID,
Controller: lo.ToPtr(true),
BlockOwnerDeletion: lo.ToPtr(true),
},
}},
})

// Add ephemeral volumes to pods
for _, p := range pods[:3] {
p.Spec.Volumes = append(p.Spec.Volumes, corev1.Volume{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: lo.ToPtr(sc.Name),
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("1Gi")},
},
},
},
},
},
})
}

ExpectApplied(ctx, env.Client, rs, sc, pods[0], pods[1], pods[2], nodeClaims[0], nodes[0], nodeClaims[1], nodes[1], nodeClaims[2], nodes[2], nodePool)

// bind pods to nodes
ExpectManualBinding(ctx, env.Client, pods[0], nodes[0])
ExpectManualBinding(ctx, env.Client, pods[1], nodes[1])
ExpectManualBinding(ctx, env.Client, pods[2], nodes[2])

// Create ephemeral PVCs bound to PVs in each zone (simulating what happens when pods are scheduled)
zones := []string{"test-zone-1", "test-zone-2", "test-zone-3"}
for i, p := range pods[:3] {
pvName := fmt.Sprintf("pv-%s", p.Name)
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.Namespace,
Name: fmt.Sprintf("%s-%s", p.Name, volumeName),
},
StorageClassName: lo.ToPtr(sc.Name),
VolumeName: pvName,
})
pv := test.PersistentVolume(test.PersistentVolumeOptions{
ObjectMeta: metav1.ObjectMeta{Name: pvName},
Zones: []string{zones[i]},
})
ExpectApplied(ctx, env.Client, pvc, pv)
}

// inform cluster state about nodes and nodeclaims
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*corev1.Node{nodes[0], nodes[1], nodes[2]}, []*v1.NodeClaim{nodeClaims[0], nodeClaims[1], nodeClaims[2]})

ExpectSkew(ctx, env.Client, "default", &tsc).To(ConsistOf(1, 1, 1))
ExpectSingletonReconciled(ctx, disruptionController)

// The most expensive node (zone-2) should be replaced despite ephemeral volumes
cmds := queue.GetCommands()
Expect(cmds).To(HaveLen(1))
ExpectMakeNewNodeClaimsReady(ctx, env.Client, cluster, cloudProvider, cmds[0])
ExpectObjectReconciled(ctx, env.Client, queue, cmds[0].Candidates[0].NodeClaim)

// Cascade any deletion of the nodeclaim to the node
ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaims[1])

// should create a new node as there is a cheaper one that can hold the pod
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(3))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(3))
ExpectNotFound(ctx, env.Client, nodeClaims[1], nodes[1])
})
})
Context("Parallelization", func() {
It("should not schedule an additional node when receiving pending pods while consolidating", func() {
Expand Down
116 changes: 116 additions & 0 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3775,6 +3775,122 @@ var _ = Context("Scheduling", func() {
})
})

Describe("VolumeTopology", func() {
It("should not return volume topology requirements for ephemeral volumes on bound pods", func() {
volumeName := "tmp-ephemeral"
pod := test.UnschedulablePod()
pod.Spec.NodeName = "fake-node" // simulate a bound pod
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: lo.ToPtr("my-storage-class"),
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("1Gi")},
},
},
},
},
},
})
pvName := "test-pv"
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: fmt.Sprintf("%s-%s", pod.Name, volumeName),
},
StorageClassName: lo.ToPtr("my-storage-class"),
VolumeName: pvName,
})
pv := test.PersistentVolume(test.PersistentVolumeOptions{
ObjectMeta: metav1.ObjectMeta{Name: pvName},
Zones: []string{"test-zone-1"},
})
ExpectApplied(ctx, env.Client, pvc, pv)

volumeTopology := scheduling.NewVolumeTopology(env.Client)
reqs, err := volumeTopology.GetRequirements(ctx, pod)
Expect(err).ToNot(HaveOccurred())
Expect(reqs).To(BeNil())
})
It("should return volume topology requirements for ephemeral volumes on unbound pods", func() {
volumeName := "tmp-ephemeral"
pod := test.UnschedulablePod()
// pod.Spec.NodeName is empty (unbound pod)
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
Ephemeral: &corev1.EphemeralVolumeSource{
VolumeClaimTemplate: &corev1.PersistentVolumeClaimTemplate{
Spec: corev1.PersistentVolumeClaimSpec{
StorageClassName: lo.ToPtr("my-storage-class"),
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{corev1.ResourceStorage: resource.MustParse("1Gi")},
},
},
},
},
},
})
pvName := "test-pv"
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: fmt.Sprintf("%s-%s", pod.Name, volumeName),
},
StorageClassName: lo.ToPtr("my-storage-class"),
VolumeName: pvName,
})
pv := test.PersistentVolume(test.PersistentVolumeOptions{
ObjectMeta: metav1.ObjectMeta{Name: pvName},
Zones: []string{"test-zone-1"},
})
ExpectApplied(ctx, env.Client, pvc, pv)

volumeTopology := scheduling.NewVolumeTopology(env.Client)
reqs, err := volumeTopology.GetRequirements(ctx, pod)
Expect(err).ToNot(HaveOccurred())
Expect(reqs).ToNot(BeNil())
Expect(reqs.Get(corev1.LabelTopologyZone).Values()).To(ConsistOf("test-zone-1"))
})
It("should return volume topology requirements for regular PVCs on bound pods", func() {
pod := test.UnschedulablePod()
pod.Spec.NodeName = "fake-node" // simulate a bound pod
pvName := "test-pv"
pvcName := "test-pvc"
pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{
Name: "data",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvcName,
},
},
})
pvc := test.PersistentVolumeClaim(test.PersistentVolumeClaimOptions{
ObjectMeta: metav1.ObjectMeta{
Namespace: pod.Namespace,
Name: pvcName,
},
VolumeName: pvName,
})
pv := test.PersistentVolume(test.PersistentVolumeOptions{
ObjectMeta: metav1.ObjectMeta{Name: pvName},
Zones: []string{"test-zone-1"},
})
ExpectApplied(ctx, env.Client, pvc, pv)

volumeTopology := scheduling.NewVolumeTopology(env.Client)
reqs, err := volumeTopology.GetRequirements(ctx, pod)
Expect(err).ToNot(HaveOccurred())
Expect(reqs).ToNot(BeNil())
Expect(reqs.Get(corev1.LabelTopologyZone).Values()).To(ConsistOf("test-zone-1"))
})
})

Describe("Deleting Nodes", func() {
It("should re-schedule pods from a deleting node when pods are active", func() {
ExpectApplied(ctx, env.Client, nodePool)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controllers/provisioning/scheduling/volumetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (v *VolumeTopology) GetRequirements(ctx context.Context, pod *v1.Pod) (sche
}

func (v *VolumeTopology) getRequirements(ctx context.Context, pod *v1.Pod, volume v1.Volume) ([]v1.NodeSelectorRequirement, error) {
// For ephemeral volumes on bound pods, skip topology requirements.
// Ephemeral volume PVCs are owned by the pod and deleted when the pod is deleted.
// When the pod is rescheduled (e.g., during consolidation), a new PVC will be created,
// so the old PVC's zone constraint should not constrain the new scheduling decision.
if volume.Ephemeral != nil && pod.Spec.NodeName != "" {
return nil, nil
}

pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, v.kubeClient, pod, volume)
if err != nil {
return nil, fmt.Errorf("discovering persistent volume claim, %w", err)
Expand Down
Loading