Skip to content

View and delete jobs functionality in the CLI #293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion src/codeflare_sdk/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
from kubernetes import client, config
import pickle
import os
from ray.job_submission import JobSubmissionClient
from torchx.runner import get_runner
from rich.table import Table
from rich import print

from codeflare_sdk.cluster.auth import _create_api_client_config
from codeflare_sdk.cluster.cluster import list_clusters_all_namespaces, get_cluster
from codeflare_sdk.cluster.model import RayCluster
from codeflare_sdk.cluster.auth import _create_api_client_config, config_check
from codeflare_sdk.utils.kube_api_helpers import _kube_api_error_handling
import codeflare_sdk.cluster.auth as sdk_auth

Expand Down Expand Up @@ -73,3 +79,95 @@ def resolve_command(self, ctx, args):
# always return the full command name
_, cmd, args = super().resolve_command(ctx, args)
return cmd.name, cmd, args


def print_jobs(jobs):
headers = ["Submission ID", "Job ID", "RayCluster", "Namespace", "Status"]
table = Table(show_header=True)
for header in headers:
table.add_column(header)
for job in jobs:
table.add_row(*[job[header] for header in headers])
print(table)


def list_all_kubernetes_jobs(print_to_console=True):
k8s_jobs = []
runner = get_runner()
jobs = runner.list(scheduler="kubernetes_mcad")
rayclusters = {
raycluster.name for raycluster in list_clusters_all_namespaces(False)
}
for job in jobs:
namespace, name = job.app_id.split(":")
status = job.state
if name not in rayclusters:
k8s_jobs.append(
{
"Submission ID": name,
"Job ID": "N/A",
"RayCluster": "N/A",
"Namespace": namespace,
"Status": str(status),
"App Handle": job.app_handle,
}
)
if print_to_console:
print_jobs(k8s_jobs)
return k8s_jobs


def list_all_jobs(print_to_console=True):
k8s_jobs = list_all_kubernetes_jobs(False)
rc_jobs = list_all_raycluster_jobs(False)
all_jobs = rc_jobs + k8s_jobs
if print_to_console:
print_jobs(all_jobs)
return all_jobs


def list_raycluster_jobs(cluster: RayCluster, print_to_console=True):
rc_jobs = []
client = JobSubmissionClient(cluster.dashboard)
jobs = client.list_jobs()
for job in jobs:
job_obj = {
"Submission ID": job.submission_id,
"Job ID": job.job_id,
"RayCluster": cluster.name,
"Namespace": cluster.namespace,
"Status": str(job.status),
"App Handle": "ray://torchx/" + cluster.dashboard + "-" + job.submission_id,
}
rc_jobs.append(job_obj)
if print_to_console:
print_jobs(rc_jobs)
return rc_jobs


def list_all_raycluster_jobs(print_to_console=True):
rc_jobs = []
clusters = list_clusters_all_namespaces(False)
for cluster in clusters:
cluster.dashboard = "http://" + cluster.dashboard
rc_jobs += list_raycluster_jobs(cluster, False)
if print_to_console:
print_jobs(rc_jobs)
return rc_jobs


def get_job_app_handle(job_submission):
job = get_job_object(job_submission)
return job["App Handle"]


def get_job_object(job_submission):
all_jobs = list_all_jobs(False)
for job in all_jobs:
if job["Submission ID"] == job_submission:
return job
raise (
FileNotFoundError(
f"Job {job_submission} not found. Try using 'codeflare list --all' to see all jobs"
)
)
27 changes: 27 additions & 0 deletions src/codeflare_sdk/cli/commands/cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import click
from torchx.runner import get_runner


from codeflare_sdk.cli.cli_utils import get_job_app_handle


@click.group()
def cli():
"""Cancel a resource"""
pass


@cli.command()
@click.pass_context
@click.argument("submission-id", type=str)
def job(ctx, submission_id):
"""Cancel a job"""
runner = get_runner()
try:
app_handle = get_job_app_handle(submission_id)
runner.cancel(app_handle=app_handle)
click.echo(f"{submission_id} cancelled successfully")
except FileNotFoundError:
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
except Exception as e:
click.echo("Error cancelling job: " + str(e))
38 changes: 29 additions & 9 deletions src/codeflare_sdk/cli/commands/list.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import click
from kubernetes import client, config

from codeflare_sdk.cluster.cluster import (
list_clusters_all_namespaces,
list_all_clusters,
)
from codeflare_sdk.cli.cli_utils import PluralAlias
from codeflare_sdk.cluster.cluster import get_cluster
from codeflare_sdk.cluster.cluster import _copy_to_ray
from codeflare_sdk.cli.cli_utils import list_all_jobs
from codeflare_sdk.cli.cli_utils import list_all_kubernetes_jobs
from codeflare_sdk.cli.cli_utils import list_raycluster_jobs


@click.group(cls=PluralAlias)
Expand All @@ -16,15 +20,31 @@ def cli():

@cli.command()
@click.option("--namespace", type=str)
@click.option("--all", is_flag=True)
@click.pass_context
def raycluster(ctx, namespace, all):
"""List all rayclusters in a specified namespace"""
if all and namespace:
click.echo("--all and --namespace are mutually exclusive")
return
namespace = namespace or ctx.obj.current_namespace
if not all:
def raycluster(ctx, namespace):
"""
List all rayclusters
"""
if namespace:
list_all_clusters(namespace)
return
list_clusters_all_namespaces()


@cli.command()
@click.pass_context
@click.option("--cluster-name", "-c", type=str)
@click.option("--namespace", "-n", type=str)
@click.option("--kube-mcad-scheduler-only", is_flag=True)
def job(ctx, cluster_name, namespace, kube_mcad_scheduler_only):
"""
List all jobs submitted
"""
if cluster_name:
cluster = get_cluster(cluster_name, namespace or ctx.obj.current_namespace)
list_raycluster_jobs(_copy_to_ray(cluster), True)
return
if kube_mcad_scheduler_only:
list_all_kubernetes_jobs(True)
return
list_all_jobs(True)
25 changes: 25 additions & 0 deletions src/codeflare_sdk/cli/commands/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import click
from torchx.runner import get_runner

from codeflare_sdk.cli.cli_utils import get_job_app_handle


@click.group()
def cli():
"""Get the logs of a specified resource"""
pass


@cli.command()
@click.pass_context
@click.argument("submission-id", type=str)
def job(ctx, submission_id):
"""Get the logs of a specified job"""
runner = get_runner()
try:
app_handle = get_job_app_handle(submission_id)
click.echo("".join(runner.log_lines(app_handle, None)))
except FileNotFoundError:
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
except Exception as e:
click.echo("Error getting job logs: " + str(e))
17 changes: 17 additions & 0 deletions src/codeflare_sdk/cli/commands/status.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import click
from torchx.runner import get_runner

from codeflare_sdk.cluster.cluster import get_cluster
from codeflare_sdk.cli.cli_utils import get_job_app_handle


@click.group()
Expand All @@ -22,3 +24,18 @@ def raycluster(ctx, name, namespace):
click.echo(f"Cluster {name} not found in {namespace} namespace")
return
cluster.status()


@cli.command()
@click.pass_context
@click.argument("submission-id", type=str)
def job(ctx, submission_id):
"""Get the status of a specified job"""
runner = get_runner()
try:
app_handle = get_job_app_handle(submission_id)
click.echo(runner.status(app_handle=app_handle))
except FileNotFoundError:
click.echo(f"Submission ID {submission_id} not found in Kubernetes Cluster")
except Exception as e:
click.echo("Error getting job status: " + str(e))
Loading