Skip to content

Commit ac18d39

Browse files
committed
docs: add airflow example
1 parent 84158cf commit ac18d39

File tree

6 files changed

+148
-0
lines changed

6 files changed

+148
-0
lines changed

.github/workflows/doc-build.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ jobs:
2626
- name: Install TorchX
2727
run: |
2828
python setup.py develop
29+
- name: Start Airflow
30+
run: |
31+
# start airflow in background
32+
airflow standalone &
33+
# wait 5 seconds for airflow to start
34+
sleep 5
2935
- name: Doc Test
3036
run: |
3137
cd docs

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ jupytext
1010
ipython_genutils
1111
# https://github.com/jupyter/nbconvert/issues/1736
1212
jinja2<=3.0.3
13+
apache-airflow

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Works With
6868
:caption: Pipelines
6969

7070
pipelines/kfp
71+
pipelines/airflow.md
7172

7273

7374
Examples

docs/source/pipelines/airflow.md

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
---
2+
jupyter:
3+
jupytext:
4+
text_representation:
5+
extension: .md
6+
format_name: markdown
7+
format_version: '1.3'
8+
jupytext_version: 1.13.7
9+
kernelspec:
10+
display_name: Python 3
11+
language: python
12+
name: python3
13+
---
14+
15+
# Airflow
16+
17+
For pipelines that support Python based execution you can directly use the
18+
TorchX API. TorchX is designed to be easily integrated in to other applications
19+
via the programmatic API. No special Airflow integrations are needed.
20+
21+
With TorchX, you can use Airflow for the pipeline orchestration and run your
22+
PyTorch application (i.e. distributed training) on a remote GPU cluster.
23+
24+
```python
25+
import datetime
26+
import pendulum
27+
28+
from airflow.utils.state import DagRunState, TaskInstanceState
29+
from airflow.utils.types import DagRunType
30+
from airflow.models.dag import DAG
31+
from airflow.decorators import task
32+
33+
34+
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
35+
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
36+
```
37+
38+
To launch a TorchX job from Airflow you can create a Airflow Python task to
39+
import the runner, launch the job and wait for it to complete. If you're running
40+
on a remote cluster you may need to use the virtualenv task to install the
41+
`torchx` package.
42+
43+
```python
44+
@task(task_id=f'hello_torchx')
45+
def run_torchx(echo):
46+
"""This is a function that will run within the DAG execution"""
47+
from torchx.runner import get_runner
48+
with get_runner() as runner:
49+
# Run the utils.sh component on the local_cwd scheduler.
50+
app_id = runner.run_component(
51+
"utils.sh",
52+
["echo", "Hello, TorchX!"],
53+
scheduler="local_cwd",
54+
)
55+
56+
# Wait for the the job to complete
57+
status = runner.wait(app_id, wait_interval=1)
58+
59+
# Raise_for_status will raise an exception if the job didn't succeed
60+
status.raise_for_status()
61+
62+
# Finally we can print all of the log lines from the TorchX job so it
63+
# will show up in the workflow logs.
64+
for line in runner.log_lines(app_id, "sh", k=0):
65+
print(line, end="")
66+
```
67+
68+
Once we have the task defined we can put it into a Airflow DAG and run it like
69+
normal.
70+
71+
```python
72+
from torchx.schedulers.ids import make_unique
73+
74+
with DAG(
75+
dag_id=make_unique('example_python_operator'),
76+
schedule_interval=None,
77+
start_date=DATA_INTERVAL_START,
78+
catchup=False,
79+
tags=['example'],
80+
) as dag:
81+
run_job = run_torchx("foo")
82+
83+
84+
dagrun = dag.create_dagrun(
85+
state=DagRunState.RUNNING,
86+
execution_date=DATA_INTERVAL_START,
87+
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
88+
start_date=DATA_INTERVAL_END,
89+
run_type=DagRunType.MANUAL,
90+
)
91+
ti = dagrun.get_task_instance(task_id="hello_torchx")
92+
ti.task = dag.get_task(task_id="hello_torchx")
93+
ti.run(ignore_ti_state=True)
94+
assert ti.state == TaskInstanceState.SUCCESS
95+
```
96+
97+
If all goes well you should see `Hello, TorchX!` printed above.
98+
99+
## Next Steps
100+
101+
* Checkout the [runner API documentation](runner.rst) to learn more about
102+
programmatic usage of TorchX

torchx/specs/api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,25 @@ def __repr__(self) -> str:
477477
app_status_dict["state"] = repr(app_status_dict["state"])
478478
return yaml.dump({"AppStatus": app_status_dict})
479479

480+
def raise_for_status(self) -> None:
481+
"""
482+
raise_for_status will raise an AppStatusError if the state is not SUCCEEDED.
483+
"""
484+
if self.state != AppState.SUCCEEDED:
485+
raise AppStatusError(self, f"job did not succeed: {self}")
486+
487+
488+
class AppStatusError(Exception):
489+
"""
490+
AppStatusError is raised when the job status is in an exceptional state i.e.
491+
not SUCCEEDED.
492+
"""
493+
494+
def __init__(self, status: AppStatus, *args: object) -> None:
495+
super().__init__(*args)
496+
497+
self.status = status
498+
480499

481500
# valid run cfg values; only support primitives (str, int, float, bool, List[str])
482501
# TODO(wilsonhong): python 3.9+ supports list[T] in typing, which can be used directly

torchx/specs/test/api_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
AppDryRunInfo,
1919
AppState,
2020
AppStatus,
21+
AppStatusError,
2122
CfgVal,
2223
get_type_name,
2324
InvalidRunConfigException,
@@ -86,6 +87,24 @@ def test_serialize_embed_json(self) -> None:
8687
""",
8788
)
8889

90+
def test_raise_on_status(self) -> None:
91+
AppStatus(state=AppState.SUCCEEDED).raise_for_status()
92+
93+
with self.assertRaisesRegex(
94+
AppStatusError, r"(?s)job did not succeed:.*FAILED.*"
95+
):
96+
AppStatus(state=AppState.FAILED).raise_for_status()
97+
98+
with self.assertRaisesRegex(
99+
AppStatusError, r"(?s)job did not succeed:.*CANCELLED.*"
100+
):
101+
AppStatus(state=AppState.CANCELLED).raise_for_status()
102+
103+
with self.assertRaisesRegex(
104+
AppStatusError, r"(?s)job did not succeed:.*RUNNING.*"
105+
):
106+
AppStatus(state=AppState.RUNNING).raise_for_status()
107+
89108

90109
class ResourceTest(unittest.TestCase):
91110
def test_copy_resource(self) -> None:

0 commit comments

Comments
 (0)