-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathocp.py
More file actions
767 lines (608 loc) · 26.9 KB
/
ocp.py
File metadata and controls
767 lines (608 loc) · 26.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
# *****************************************************************************
# Copyright (c) 2024 IBM Corporation and other Contributors.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# which accompanies this distribution, and is available at
# http://www.eclipse.org/legal/epl-v10.html
#
# *****************************************************************************
import logging
from time import sleep
from kubeconfig import KubeConfig
from kubeconfig.exceptions import KubectlNotFoundError
from openshift.dynamic import DynamicClient
from openshift.dynamic.exceptions import NotFoundError
from kubernetes import client
from kubernetes.stream import stream
from kubernetes.stream.ws_client import ERROR_CHANNEL
from kubernetes.dynamic.resource import ResourceInstance
import yaml
logger = logging.getLogger(__name__)
def connect(server: str, token: str, skipVerify: bool = False) -> bool:
"""
Connect to a target OpenShift Container Platform (OCP) cluster.
Configures kubectl/oc context with the provided server URL and authentication token.
Parameters:
server (str): The OpenShift cluster API server URL (e.g., "https://api.cluster.example.com:6443")
token (str): The authentication token for cluster access
skipVerify (bool, optional): Whether to skip TLS certificate verification. Defaults to False.
Returns:
bool: True if connection was successful, False if kubectl is not found on the path
Raises:
KubectlNotFoundError: If kubectl/oc is not available in the system PATH
"""
logger.info(f"Connect(server={server}, token=***)")
try:
conf = KubeConfig()
except KubectlNotFoundError:
logger.warning("Unable to locate kubectl on the path")
return False
conf.view()
logger.debug(f"Starting KubeConfig context: {conf.current_context()}")
conf.set_credentials(
name='my-credentials',
token=token
)
conf.set_cluster(
name='my-cluster',
server=server,
insecure_skip_tls_verify=skipVerify
)
conf.set_context(
name='my-context',
cluster='my-cluster',
user='my-credentials'
)
conf.use_context('my-context')
conf.view()
logger.info(f"KubeConfig context changed to {conf.current_context()}")
return True
def getClusterVersion(dynClient: DynamicClient) -> str:
"""
Get the current OpenShift cluster version.
Retrieves the completed cluster version from the ClusterVersion custom resource.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
str: The cluster version string (e.g., "4.12.0"), or None if not found
Raises:
NotFoundError: If the ClusterVersion resource cannot be retrieved
"""
clusterVersionAPI = dynClient.resources.get(api_version="config.openshift.io/v1", kind="ClusterVersion")
# Version jsonPath = .status.history[?(@.state=="Completed")].version
try:
clusterVersion = clusterVersionAPI.get(name="version")
for record in clusterVersion.status.history:
if record.state == "Completed":
return record.version
except NotFoundError:
logger.debug("Unable to retrieve ClusterVersion")
return None
def isClusterVersionInRange(version: str, releases: list[str]) -> bool:
"""
Check if a cluster version matches any of the specified release versions.
Parameters:
version (str): The cluster version to check (e.g., "4.12.0")
releases (list[str]): List of release version prefixes to match against (e.g., ["4.12", "4.13"])
Returns:
bool: True if the version starts with any of the release prefixes, False otherwise
"""
if releases is not None:
for release in releases:
if version.startswith(f"{release}."):
return True
return False
def getNamespace(dynClient: DynamicClient, namespace: str) -> dict:
"""
Get a Kubernetes namespace by name.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
namespace (str): The name of the namespace to retrieve
Returns:
dict: The namespace resource as a dictionary, or an empty dict if not found
Raises:
NotFoundError: If the namespace does not exist
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")
try:
ns = namespaceAPI.get(name=namespace)
logger.debug(f"Namespace {namespace} exists")
return ns
except NotFoundError:
logger.debug(f"Namespace {namespace} does not exist")
return {}
def createNamespace(dynClient: DynamicClient, namespace: str, kyvernoLabel: str = None) -> bool:
"""
Create a Kubernetes namespace if it does not already exist.
If the namespace exists and a Kyverno label is provided, the namespace will be patched
to include the label.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
namespace (str): The name of the namespace to create
kyvernoLabel (str, optional): Value for the 'ibm.com/kyverno' label. Defaults to None.
Returns:
bool: Always returns True
Raises:
NotFoundError: If the namespace resource cannot be accessed
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")
try:
ns = namespaceAPI.get(name=namespace)
logger.info(f"Namespace {namespace} already exists")
if kyvernoLabel is not None:
if ns.metadata.labels is None or "ibm.com/kyverno" not in ns.metadata.labels.keys() or ns.metadata.labels["ibm.com/kyverno"] != kyvernoLabel:
logger.info(f"Patching namespace with Kyverno Labels ibm.com/kyverno: {kyvernoLabel}")
body = {"metadata": {"labels": {"ibm.com/kyverno": kyvernoLabel}}}
namespaceAPI.patch(
name=namespace,
body=body,
content_type="application/merge-patch+json"
)
except NotFoundError:
nsObj = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace
}
}
if kyvernoLabel is not None:
nsObj["metadata"]["labels"] = {
"ibm.com/kyverno": kyvernoLabel
}
namespaceAPI.create(body=nsObj)
logger.debug(f"Created namespace {namespace}")
return True
def deleteNamespace(dynClient: DynamicClient, namespace: str) -> bool:
"""
Delete a Kubernetes namespace if it exists.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
namespace (str): The name of the namespace to delete
Returns:
bool: Always returns True
Raises:
NotFoundError: If the namespace does not exist (caught and logged)
"""
namespaceAPI = dynClient.resources.get(api_version="v1", kind="Namespace")
try:
namespaceAPI.delete(name=namespace)
logger.debug(f"Namespace {namespace} deleted")
except NotFoundError:
logger.debug(f"Namespace {namespace} can not be deleted because it does not exist")
return True
def waitForCRD(dynClient: DynamicClient, crdName: str) -> bool:
"""
Wait for a Custom Resource Definition (CRD) to be established and ready.
Polls the CRD status up to 100 times with 5-second intervals (max ~8 minutes).
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
crdName (str): The name of the CRD to wait for (e.g., "suites.core.mas.ibm.com")
Returns:
bool: True if the CRD becomes established, False if timeout is reached
Raises:
NotFoundError: If the CRD is not found (caught and retried)
"""
crdAPI = dynClient.resources.get(api_version="apiextensions.k8s.io/v1", kind="CustomResourceDefinition")
maxRetries = 100
foundReadyCRD = False
retries = 0
while not foundReadyCRD and retries < maxRetries:
retries += 1
try:
crd = crdAPI.get(name=crdName)
conditions = crd.status.conditions
if conditions is None:
logger.debug(f"Looking for status.conditions to be available to iterate for {crdName}")
sleep(5)
continue
else:
for condition in conditions:
if condition.type == "Established":
if condition.status == "True":
foundReadyCRD = True
else:
logger.debug(f"Waiting 5s for {crdName} CRD to be ready before checking again ...")
sleep(5)
continue
except NotFoundError:
logger.debug(f"Waiting 5s for {crdName} CRD to be installed before checking again ...")
sleep(5)
return foundReadyCRD
def waitForDeployment(dynClient: DynamicClient, namespace: str, deploymentName: str) -> bool:
"""
Wait for a Kubernetes Deployment to have at least one ready replica.
Polls the deployment status up to 100 times with 5-second intervals (max ~8 minutes).
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
namespace (str): The namespace containing the deployment
deploymentName (str): The name of the deployment to wait for
Returns:
bool: True if the deployment becomes ready, False if timeout is reached
Raises:
NotFoundError: If the deployment is not found (caught and retried)
"""
deploymentAPI = dynClient.resources.get(api_version="apps/v1", kind="Deployment")
maxRetries = 100
foundReadyDeployment = False
retries = 0
while not foundReadyDeployment and retries < maxRetries:
retries += 1
try:
deployment = deploymentAPI.get(name=deploymentName, namespace=namespace)
if deployment.status.readyReplicas is not None and deployment.status.readyReplicas > 0:
# Depending on how early we are checking the deployment the status subresource may not
# have even been initialized yet, hence the check for "is not None" to avoid a
# NoneType and int comparison TypeError
foundReadyDeployment = True
else:
logger.debug(f"Waiting 5s for deployment {deploymentName} to be ready before checking again ...")
sleep(5)
except NotFoundError:
logger.debug(f"Waiting 5s for deployment {deploymentName} to be created before checking again ...")
sleep(5)
return foundReadyDeployment
def getConsoleURL(dynClient: DynamicClient) -> str:
"""
Get the OpenShift web console URL.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
str: The HTTPS URL of the OpenShift console (e.g., "https://console-openshift-console.apps.cluster.example.com")
Raises:
NotFoundError: If the console route is not found
"""
routesAPI = dynClient.resources.get(api_version="route.openshift.io/v1", kind="Route")
consoleRoute = routesAPI.get(name="console", namespace="openshift-console")
return f"https://{consoleRoute.spec.host}"
def getNodes(dynClient: DynamicClient) -> dict:
"""
Get all nodes in the cluster.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
list: List of node resources as dictionaries
Raises:
NotFoundError: If nodes cannot be retrieved
"""
nodesAPI = dynClient.resources.get(api_version="v1", kind="Node")
nodes = nodesAPI.get().to_dict()['items']
return nodes
def getStorageClass(dynClient: DynamicClient, name: str) -> dict | None:
"""
Get a specific StorageClass by name.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
name (str): The name of the StorageClass to retrieve
Returns:
StorageClass: The StorageClass resource, or None if not found
Raises:
NotFoundError: If the StorageClass does not exist (caught and returns None)
"""
try:
storageClassAPI = dynClient.resources.get(api_version="storage.k8s.io/v1", kind="StorageClass")
storageclass = storageClassAPI.get(name=name)
return storageclass
except NotFoundError:
return None
def getStorageClasses(dynClient: DynamicClient) -> list:
"""
Get all StorageClasses in the cluster.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
list: List of StorageClass resources
Raises:
NotFoundError: If StorageClasses cannot be retrieved
"""
storageClassAPI = dynClient.resources.get(api_version="storage.k8s.io/v1", kind="StorageClass")
storageClasses = storageClassAPI.get().items
return storageClasses
def getClusterIssuers(dynClient: DynamicClient) -> list:
"""
Get all ClusterIssuers in the cluster.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
list: List of ClusterIssuers resources
Raises:
NotFoundError: If ClusterIssuers cannot be retrieved
"""
clusterIssuerAPI = dynClient.resources.get(api_version="cert-manager.io/v1", kind="ClusterIssuer")
clusterIssuers = clusterIssuerAPI.get().items
return clusterIssuers
def getClusterIssuer(dynClient: DynamicClient, name: str) -> ResourceInstance:
"""
Get a specific ClusterIssuer by name.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
name (str): The name of the ClusterIssuer to retrieve
Returns:
ClusterIssuer: The ClusterIssuer resource, or None if not found
Raises:
NotFoundError: If the ClusterIssuer does not exist (caught and returns None)
"""
try:
clusterIssuerAPI = dynClient.resources.get(api_version="cert-manager.io/v1", kind="ClusterIssuer")
clusterIssuer = clusterIssuerAPI.get(name=name)
return clusterIssuer
except NotFoundError:
return None
def getStorageClassVolumeBindingMode(dynClient: DynamicClient, storageClassName: str) -> str:
"""
Get the volumeBindingMode for a storage class.
Args:
dynClient: OpenShift dynamic client
storageClassName: Name of the storage class
Returns:
str: "Immediate" or "WaitForFirstConsumer" (defaults to "Immediate" if not found)
"""
try:
storageClass = getStorageClass(dynClient, storageClassName)
if storageClass and hasattr(storageClass, 'volumeBindingMode'):
return storageClass.volumeBindingMode
# Default to Immediate if not specified (Kubernetes default)
logger.debug(f"Storage class {storageClassName} does not have volumeBindingMode set, defaulting to 'Immediate'")
return "Immediate"
except Exception as e:
logger.warning(f"Unable to determine volumeBindingMode for storage class {storageClassName}: {e}")
# Default to Immediate to maintain backward compatibility
return "Immediate"
def isSNO(dynClient: DynamicClient) -> bool:
"""
Check if the cluster is a Single Node OpenShift (SNO) deployment.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
Returns:
bool: True if the cluster has exactly one node, False otherwise
"""
return len(getNodes(dynClient)) == 1
def crdExists(dynClient: DynamicClient, crdName: str) -> bool:
"""
Check if a Custom Resource Definition (CRD) exists in the cluster.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
crdName (str): The name of the CRD to check (e.g., "suites.core.mas.ibm.com")
Returns:
bool: True if the CRD exists, False otherwise
Raises:
NotFoundError: If the CRD does not exist (caught and returns False)
"""
crdAPI = dynClient.resources.get(api_version="apiextensions.k8s.io/v1", kind="CustomResourceDefinition")
try:
crdAPI.get(name=crdName)
logger.debug(f"CRD does exist: {crdName}")
return True
except NotFoundError:
logger.debug(f"CRD does not exist: {crdName}")
return False
def listInstances(dynClient: DynamicClient, apiVersion: str, kind: str) -> list:
"""
Get a list of instances of a particular custom resource on the cluster.
Logs information about each instance found, including name and reconciled version.
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
apiVersion (str): The API version of the custom resource (e.g., "core.mas.ibm.com/v1")
kind (str): The kind of custom resource (e.g., "Suite")
Returns:
list: List of custom resource instances as dictionaries
Raises:
NotFoundError: If the custom resource type is not found
"""
api = dynClient.resources.get(api_version=apiVersion, kind=kind)
instances = api.get().to_dict()['items']
if len(instances) > 0:
logger.info(f"There are {len(instances)} {kind} instances installed on this cluster:")
for instance in instances:
logger.info(f" * {instance['metadata']['name']} v{instance.get('status', {}).get('versions', {}).get('reconciled', 'N/A')}")
else:
logger.info(f"There are no {kind} instances installed on this cluster")
return instances
def waitForPVC(dynClient: DynamicClient, namespace: str, pvcName: str) -> bool:
"""
Wait for a PersistentVolumeClaim (PVC) to be bound.
Allows up to 10 minutes for a PVC to report successful binding, with increasing
retry delays (30s, then 1m, 2m, and 5m intervals).
Parameters:
dynClient (DynamicClient): OpenShift Dynamic Client
namespace (str): The namespace containing the PVC
pvcName (str): The name of the PVC to wait for
Returns:
bool: True if the PVC becomes bound, False if timeout is reached
Raises:
NotFoundError: If the PVC is not found (caught and retried)
"""
pvcAPI = dynClient.resources.get(api_version="v1", kind="PersistentVolumeClaim")
maxRetries = 20
retryDelaySeconds = 30
foundReadyPVC = False
retries = 0
while not foundReadyPVC and retries < maxRetries:
retries += 1
# After 5 retries increase the delay to 1 minute
# After 10 retries increase the delay to 2 minutes
# After 15 retries increase the delay to 5 minutes
if retries == 6:
retryDelaySeconds = 60
elif retries == 11:
retryDelaySeconds = 120
elif retries == 16:
retryDelaySeconds = 300
try:
pvc = pvcAPI.get(name=pvcName, namespace=namespace)
if pvc.status.phase == "Bound":
foundReadyPVC = True
else:
logger.debug(f"Waiting {retryDelaySeconds}s for PVC {pvcName} to be bound before checking again ...")
sleep(retryDelaySeconds)
except NotFoundError:
logger.debug(f"Waiting {retryDelaySeconds}s for PVC {pvcName} to be created before checking again ...")
sleep(retryDelaySeconds)
return foundReadyPVC
# Assisted by WCA@IBM
# Latest GenAI contribution: ibm/granite-8b-code-instruct
def execInPod(core_v1_api: client.CoreV1Api, pod_name: str, namespace, command: list, timeout: int = 60) -> str:
"""
Executes a command in a Kubernetes pod and returns the standard output.
If running this function from inside a pod (i.e. config.load_incluster_config()),
the ServiceAccount assigned to the pod must have the following access in one of the Roles bound to it:
rules:
- apiGroups:
- ""
resources:
- pods/exec
verbs:
- create
- get
- list
Args:
core_v1_api (client.CoreV1Api): The Kubernetes API client.
pod_name (str): The name of the pod to execute the command in.
namespace (str): The namespace of the pod.
command (list): The command to execute in the pod.
timeout (int, optional): The timeout in seconds for the command execution. Defaults to 60.
Returns:
str: The standard output of the command.
Raises:
Exception: If the command execution fails or times out.
"""
logger.debug(f"Executing command {command} on pod {pod_name} in {namespace}")
req = stream(
core_v1_api.connect_get_namespaced_pod_exec,
pod_name,
namespace,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=False,
)
req.run_forever(timeout)
stdout = req.read_stdout()
stderr = req.read_stderr()
err = yaml.load(req.read_channel(ERROR_CHANNEL), Loader=yaml.FullLoader)
if err.get("status") == "Failure":
raise Exception(f"Failed to execute {command} on {pod_name} in namespace {namespace}: {err.get('message')}. stdout: {stdout}, stderr: {stderr}")
logger.debug(f"stdout: \n----------------------------------------------------------------\n{stdout}\n----------------------------------------------------------------\n")
return stdout
def updateGlobalPullSecret(dynClient: DynamicClient, registryUrl: str, username: str, password: str) -> dict:
"""
Update the global pull secret in openshift-config namespace with new registry credentials.
Args:
dynClient: OpenShift Dynamic Client
registryUrl: Registry URL (e.g., "myregistry.com:5000")
username: Registry username
password: Registry password
Returns:
dict: Updated secret information
"""
import json
import base64
logger.info(f"Updating global pull secret with credentials for {registryUrl}")
# Get the existing pull secret
secretsAPI = dynClient.resources.get(api_version="v1", kind="Secret")
try:
pullSecret = secretsAPI.get(name="pull-secret", namespace="openshift-config")
except NotFoundError:
raise Exception("Global pull-secret not found in openshift-config namespace")
# Convert to dict to allow modifications
secretDict = pullSecret.to_dict()
# Decode the existing dockerconfigjson
dockerConfigJson = secretDict['data'].get(".dockerconfigjson", "")
dockerConfig = json.loads(base64.b64decode(dockerConfigJson).decode('utf-8'))
# Create auth string (username:password base64 encoded)
authString = base64.b64encode(f"{username}:{password}".encode('utf-8')).decode('utf-8')
# Add or update the registry credentials
if "auths" not in dockerConfig:
dockerConfig["auths"] = {}
dockerConfig["auths"][registryUrl] = {
"username": username,
"password": password,
"email": username,
"auth": authString
}
# Encode back to base64
updatedDockerConfig = base64.b64encode(json.dumps(dockerConfig).encode('utf-8')).decode('utf-8')
# Update the secret dict
secretDict['data'][".dockerconfigjson"] = updatedDockerConfig
# Apply the updated secret
updatedSecret = secretsAPI.apply(body=secretDict, namespace="openshift-config")
logger.info(f"Successfully updated global pull secret with credentials for {registryUrl}")
return {
"name": updatedSecret.metadata.name,
"namespace": updatedSecret.metadata.namespace,
"registry": registryUrl,
"changed": True
}
def configureIngressForPathBasedRouting(dynClient: DynamicClient, ingressControllerName: str = "default") -> bool:
"""
Configure OpenShift IngressController for path-based routing.
Sets the namespaceOwnership to InterNamespaceAllowed on the specified IngressController,
which is required for path-based routing mode in MAS.
Args:
dynClient: OpenShift Dynamic Client
ingressControllerName (optional): Name of the IngressController to configure. Defaults to "default".
Returns:
bool: True if configuration was successful or already configured, False otherwise
Raises:
NotFoundError: If the IngressController resource cannot be found
"""
logger.info(f"Configuring IngressController '{ingressControllerName}' for path-based routing")
try:
ingressControllerAPI = dynClient.resources.get(
api_version="operator.openshift.io/v1",
kind="IngressController"
)
try:
ingressController = ingressControllerAPI.get(
name=ingressControllerName,
namespace="openshift-ingress-operator"
)
except NotFoundError:
logger.error(f"IngressController '{ingressControllerName}' not found in namespace 'openshift-ingress-operator'")
return False
currentPolicy = None
if hasattr(ingressController, 'spec') and hasattr(ingressController.spec, 'routeAdmission'):
if hasattr(ingressController.spec.routeAdmission, 'namespaceOwnership'):
currentPolicy = ingressController.spec.routeAdmission.namespaceOwnership
logger.debug(f"Current namespaceOwnership policy: {currentPolicy if currentPolicy else 'Not set'}")
if currentPolicy == "InterNamespaceAllowed":
logger.info(f"IngressController '{ingressControllerName}' is already configured with namespaceOwnership: InterNamespaceAllowed")
return True
logger.info(f"Patching IngressController '{ingressControllerName}' to enable InterNamespaceAllowed")
patch = {
"spec": {
"routeAdmission": {
"namespaceOwnership": "InterNamespaceAllowed"
}
}
}
ingressControllerAPI.patch(
body=patch,
name=ingressControllerName,
namespace="openshift-ingress-operator",
content_type="application/merge-patch+json"
)
maxRetries = 5
retryDelay = 5
for attempt in range(maxRetries):
sleep(retryDelay)
try:
updatedController = ingressControllerAPI.get(
name=ingressControllerName,
namespace="openshift-ingress-operator"
)
if (hasattr(updatedController, 'spec') and hasattr(updatedController.spec, 'routeAdmission') and hasattr(updatedController.spec.routeAdmission, 'namespaceOwnership') and updatedController.spec.routeAdmission.namespaceOwnership == "InterNamespaceAllowed"):
logger.info(f"Successfully configured IngressController '{ingressControllerName}' for path-based routing")
return True
except NotFoundError:
logger.warning(f"IngressController '{ingressControllerName}' not found during verification (attempt {attempt + 1}/{maxRetries})")
if attempt < maxRetries - 1:
logger.debug(f"Waiting for IngressController to reconcile (attempt {attempt + 1}/{maxRetries})")
logger.error(f"Failed to verify IngressController configuration after {maxRetries} attempts")
return False
except Exception as e:
logger.error(f"Failed to configure IngressController '{ingressControllerName}': {str(e)}")
return False