Skip to content

Commit 4c19662

Browse files
committed
docs: add airflow example
1 parent 84158cf commit 4c19662

File tree

6 files changed

+147
-5
lines changed

6 files changed

+147
-5
lines changed

.github/workflows/doc-build.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ jobs:
3737
- name: Doc Build
3838
run: |
3939
cd docs
40+
# start airflow
41+
airflow standalone&
42+
# wait 5 seconds for airflow to start
43+
sleep 5
44+
4045
make html
4146
- name: Papermill
4247
run: |

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ jupytext
1010
ipython_genutils
1111
# https://github.com/jupyter/nbconvert/issues/1736
1212
jinja2<=3.0.3
13+
apache-airflow

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Works With
6868
:caption: Pipelines
6969

7070
pipelines/kfp
71+
pipelines/airflow.md
7172

7273

7374
Examples

docs/source/pipelines/airflow.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
---
2+
jupyter:
3+
jupytext:
4+
text_representation:
5+
extension: .md
6+
format_name: markdown
7+
format_version: '1.3'
8+
jupytext_version: 1.13.7
9+
kernelspec:
10+
display_name: Python 3
11+
language: python
12+
name: python3
13+
---
14+
15+
# Airflow
16+
17+
For pipelines that support Python based execution you can directly use the
18+
TorchX API. TorchX is designed to be easily integrated in to other applications
19+
via the programmatic API. No special Airflow integrations are needed.
20+
21+
With TorchX, you can use Airflow for the pipeline orchestration and run your
22+
PyTorch application (i.e. distributed training) on a remote GPU cluster.
23+
24+
```python
25+
import datetime
26+
import pendulum
27+
28+
from airflow.utils.state import DagRunState, TaskInstanceState
29+
from airflow.utils.types import DagRunType
30+
from airflow.models.dag import DAG
31+
from airflow.decorators import task
32+
33+
34+
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
35+
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
36+
```
37+
38+
To launch a TorchX job from Airflow you can create a Airflow Python task to
39+
import the runner, launch the job and wait for it to complete. If you're running
40+
on a remote cluster you may need to use the virtualenv task to install the
41+
`torchx` package.
42+
43+
```python
44+
@task(task_id=f'hello_torchx')
45+
def run_torchx(echo):
46+
"""This is a function that will run within the DAG execution"""
47+
from torchx.runner import get_runner
48+
with get_runner() as runner:
49+
# Run the utils.sh component on the local_cwd scheduler.
50+
app_id = runner.run_component(
51+
"utils.sh",
52+
["echo", "Hello, TorchX!"],
53+
scheduler="local_cwd",
54+
)
55+
56+
# Wait for the the job to complete
57+
status = runner.wait(app_id, wait_interval=1)
58+
59+
# Raise_for_status will raise an exception if the job didn't succeed
60+
status.raise_for_status()
61+
62+
# Finally we can print all of the log lines from the TorchX job so it
63+
# will show up in the workflow logs.
64+
for line in runner.log_lines(app_id, "sh", k=0):
65+
print(line, end="")
66+
```
67+
68+
Once we have the task defined we can put it into a Airflow DAG and run it like
69+
normal.
70+
71+
```python
72+
from torchx.schedulers.ids import make_unique
73+
74+
with DAG(
75+
dag_id=make_unique('example_python_operator'),
76+
schedule_interval=None,
77+
start_date=DATA_INTERVAL_START,
78+
catchup=False,
79+
tags=['example'],
80+
) as dag:
81+
run_job = run_torchx("foo")
82+
83+
84+
dagrun = dag.create_dagrun(
85+
state=DagRunState.RUNNING,
86+
execution_date=DATA_INTERVAL_START,
87+
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
88+
start_date=DATA_INTERVAL_END,
89+
run_type=DagRunType.MANUAL,
90+
)
91+
ti = dagrun.get_task_instance(task_id="hello_torchx")
92+
ti.task = dag.get_task(task_id="hello_torchx")
93+
ti.run(ignore_ti_state=True)
94+
assert ti.state == TaskInstanceState.SUCCESS
95+
```
96+
97+
If all goes well you should see `Hello, TorchX!` printed above.

torchx/specs/api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,25 @@ def __repr__(self) -> str:
477477
app_status_dict["state"] = repr(app_status_dict["state"])
478478
return yaml.dump({"AppStatus": app_status_dict})
479479

480+
def raise_for_status(self) -> None:
481+
"""
482+
raise_for_status will raise an AppStatusError if the state is not SUCCEEDED.
483+
"""
484+
if self.state != AppState.SUCCEEDED:
485+
raise AppStatusError(self, f"job did not succeed: {self}")
486+
487+
488+
class AppStatusError(Exception):
489+
"""
490+
AppStatusError is raised when the job status is in an exceptional state i.e.
491+
not SUCCEEDED.
492+
"""
493+
494+
def __init__(self, status: AppStatus, *args: object) -> None:
495+
super().__init__(*args)
496+
497+
self.status = status
498+
480499

481500
# valid run cfg values; only support primitives (str, int, float, bool, List[str])
482501
# TODO(wilsonhong): python 3.9+ supports list[T] in typing, which can be used directly

torchx/specs/test/api_test.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,23 @@
1313
import torchx.specs.named_resources_aws as named_resources_aws
1414
from torchx.specs import named_resources, resource
1515
from torchx.specs.api import (
16-
_TERMINAL_STATES,
1716
AppDef,
1817
AppDryRunInfo,
1918
AppState,
2019
AppStatus,
20+
AppStatusError,
2121
CfgVal,
22-
get_type_name,
2322
InvalidRunConfigException,
24-
macros,
25-
MalformedAppHandleException,
2623
MISSING,
24+
MalformedAppHandleException,
2725
NULL_RESOURCE,
28-
parse_app_handle,
2926
Resource,
3027
RetryPolicy,
3128
Role,
29+
_TERMINAL_STATES,
30+
get_type_name,
31+
macros,
32+
parse_app_handle,
3233
runopts,
3334
)
3435

@@ -86,6 +87,24 @@ def test_serialize_embed_json(self) -> None:
8687
""",
8788
)
8889

90+
def test_raise_on_status(self) -> None:
91+
AppStatus(state=AppState.SUCCEEDED).raise_for_status()
92+
93+
with self.assertRaisesRegex(
94+
AppStatusError, r"(?s)job did not succeed:.*FAILED.*"
95+
):
96+
AppStatus(state=AppState.FAILED).raise_for_status()
97+
98+
with self.assertRaisesRegex(
99+
AppStatusError, r"(?s)job did not succeed:.*CANCELLED.*"
100+
):
101+
AppStatus(state=AppState.CANCELLED).raise_for_status()
102+
103+
with self.assertRaisesRegex(
104+
AppStatusError, r"(?s)job did not succeed:.*RUNNING.*"
105+
):
106+
AppStatus(state=AppState.RUNNING).raise_for_status()
107+
89108

90109
class ResourceTest(unittest.TestCase):
91110
def test_copy_resource(self) -> None:

0 commit comments

Comments
 (0)