|
9 | 9 | from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import (
|
10 | 10 | _kube_api_error_handling,
|
11 | 11 | )
|
12 |
| - |
| 12 | +import time |
13 | 13 |
|
14 | 14 | def get_ray_cluster(cluster_name, namespace):
|
15 | 15 | api = client.CustomObjectsApi()
|
@@ -299,31 +299,38 @@ def create_kueue_resources(
|
299 | 299 |
|
300 | 300 |
|
301 | 301 | def delete_kueue_resources(self):
|
302 |
| - # Delete if given cluster-queue exists |
303 |
| - for cq in self.cluster_queues: |
304 |
| - try: |
305 |
| - self.custom_api.delete_cluster_custom_object( |
306 |
| - group="kueue.x-k8s.io", |
307 |
| - plural="clusterqueues", |
308 |
| - version="v1beta1", |
309 |
| - name=cq, |
310 |
| - ) |
311 |
| - print(f"\n'{cq}' cluster-queue deleted") |
312 |
| - except Exception as e: |
313 |
| - print(f"\nError deleting cluster-queue '{cq}' : {e}") |
314 |
| - |
315 |
| - # Delete if given resource-flavor exists |
316 |
| - for flavor in self.resource_flavors: |
317 |
| - try: |
318 |
| - self.custom_api.delete_cluster_custom_object( |
319 |
| - group="kueue.x-k8s.io", |
320 |
| - plural="resourceflavors", |
321 |
| - version="v1beta1", |
322 |
| - name=flavor, |
323 |
| - ) |
324 |
| - print(f"'{flavor}' resource-flavor deleted") |
325 |
| - except Exception as e: |
326 |
| - print(f"\nError deleting resource-flavor '{flavor}': {e}") |
| 302 | + try: |
| 303 | + # Delete if given cluster-queue exists |
| 304 | + for cq in getattr(self, "cluster_queues", []): |
| 305 | + try: |
| 306 | + self.custom_api.delete_cluster_custom_object( |
| 307 | + group="kueue.x-k8s.io", |
| 308 | + plural="clusterqueues", |
| 309 | + version="v1beta1", |
| 310 | + name=cq, |
| 311 | + ) |
| 312 | + print(f"\n'{cq}' cluster-queue deleted") |
| 313 | + except Exception as e: |
| 314 | + print(f"\nError deleting cluster-queue '{cq}' : {e}") |
| 315 | + |
| 316 | + # Delete if given resource-flavor exists |
| 317 | + for flavor in getattr(self, "resource_flavors", []): |
| 318 | + try: |
| 319 | + self.custom_api.delete_cluster_custom_object( |
| 320 | + group="kueue.x-k8s.io", |
| 321 | + plural="resourceflavors", |
| 322 | + version="v1beta1", |
| 323 | + name=flavor, |
| 324 | + ) |
| 325 | + print(f"'{flavor}' resource-flavor deleted") |
| 326 | + except Exception as e: |
| 327 | + print(f"\nError deleting resource-flavor '{flavor}': {e}") |
| 328 | + |
| 329 | + # Wait for resources to be cleaned up |
| 330 | + time.sleep(5) |
| 331 | + except Exception as e: |
| 332 | + print(f"Error during Kueue resource cleanup: {e}") |
| 333 | + raise |
327 | 334 |
|
328 | 335 |
|
329 | 336 | def get_pod_node(self, namespace, name):
|
@@ -407,3 +414,113 @@ def assert_get_cluster_and_jobsubmit(
|
407 | 414 | assert job_list[0].submission_id == submission_id
|
408 | 415 |
|
409 | 416 | cluster.down()
|
| 417 | + |
| 418 | + |
| 419 | +def kubectl_get_pod_status(namespace, pod_name): |
| 420 | + """Get the status of a pod.""" |
| 421 | + try: |
| 422 | + # First check if the pod exists |
| 423 | + result = subprocess.run( |
| 424 | + ["kubectl", "get", "pod", pod_name, "-n", namespace], |
| 425 | + capture_output=True, |
| 426 | + text=True, |
| 427 | + check=False |
| 428 | + ) |
| 429 | + if result.returncode != 0: |
| 430 | + print(f"Pod {pod_name} not found in namespace {namespace}") |
| 431 | + return "NotFound" |
| 432 | + |
| 433 | + # Get the pod phase |
| 434 | + result = subprocess.run( |
| 435 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.phase}'"], |
| 436 | + capture_output=True, |
| 437 | + text=True, |
| 438 | + check=True |
| 439 | + ) |
| 440 | + status = result.stdout.strip("'") |
| 441 | + |
| 442 | + # Get pod conditions for more detailed status |
| 443 | + conditions = subprocess.run( |
| 444 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.conditions}'"], |
| 445 | + capture_output=True, |
| 446 | + text=True, |
| 447 | + check=True |
| 448 | + ) |
| 449 | + print(f"Pod {pod_name} conditions: {conditions.stdout}") |
| 450 | + |
| 451 | + return status |
| 452 | + except subprocess.CalledProcessError as e: |
| 453 | + print(f"Error getting pod status for {pod_name}: {e.stderr}") |
| 454 | + return "Error" |
| 455 | + |
| 456 | + |
| 457 | +def kubectl_get_pod_ready(namespace, pod_name): |
| 458 | + """Check if all containers in a pod are ready.""" |
| 459 | + try: |
| 460 | + # Get container statuses |
| 461 | + result = subprocess.run( |
| 462 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.containerStatuses}'"], |
| 463 | + capture_output=True, |
| 464 | + text=True, |
| 465 | + check=True |
| 466 | + ) |
| 467 | + print(f"Container statuses for {pod_name}: {result.stdout}") |
| 468 | + |
| 469 | + # Get ready status |
| 470 | + result = subprocess.run( |
| 471 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.containerStatuses[*].ready}'"], |
| 472 | + capture_output=True, |
| 473 | + text=True, |
| 474 | + check=True |
| 475 | + ) |
| 476 | + statuses = result.stdout.strip("'").split() |
| 477 | + ready = all(status == "true" for status in statuses) |
| 478 | + |
| 479 | + if not ready: |
| 480 | + # Get container names and their ready status |
| 481 | + names_result = subprocess.run( |
| 482 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.containerStatuses[*].name}'"], |
| 483 | + capture_output=True, |
| 484 | + text=True, |
| 485 | + check=True |
| 486 | + ) |
| 487 | + container_names = names_result.stdout.strip("'").split() |
| 488 | + for name, status in zip(container_names, statuses): |
| 489 | + print(f"Container {name} ready status: {status}") |
| 490 | + |
| 491 | + return ready |
| 492 | + except subprocess.CalledProcessError as e: |
| 493 | + print(f"Error checking pod readiness for {pod_name}: {e.stderr}") |
| 494 | + return False |
| 495 | + |
| 496 | + |
| 497 | +def kubectl_get_pod_container_status(namespace, pod_name): |
| 498 | + """Get detailed container status for a pod.""" |
| 499 | + try: |
| 500 | + # Get container names |
| 501 | + names_result = subprocess.run( |
| 502 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.containerStatuses[*].name}'"], |
| 503 | + capture_output=True, |
| 504 | + text=True, |
| 505 | + check=True |
| 506 | + ) |
| 507 | + container_names = names_result.stdout.strip("'").split() |
| 508 | + |
| 509 | + # Get container states |
| 510 | + states_result = subprocess.run( |
| 511 | + ["kubectl", "get", "pod", pod_name, "-n", namespace, "-o", "jsonpath='{.status.containerStatuses[*].state}'"], |
| 512 | + capture_output=True, |
| 513 | + text=True, |
| 514 | + check=True |
| 515 | + ) |
| 516 | + states = states_result.stdout.strip("'").split() |
| 517 | + |
| 518 | + # Combine names and states |
| 519 | + status = {} |
| 520 | + for name, state in zip(container_names, states): |
| 521 | + status[name] = state |
| 522 | + |
| 523 | + return status |
| 524 | + except subprocess.CalledProcessError as e: |
| 525 | + print(f"Error getting container status for {pod_name}: {e.stderr}") |
| 526 | + return "Error" |
0 commit comments