Skip to content

Commit a551741

Browse files
d4l3kfacebook-github-bot
authored andcommitted
docs: add airflow example (#519)
Summary: This adds an example for how to use Airflow with TorchX. This example requires airflow server to be running on the local machine during the docs build process. This also adds in a `AppStatus.raise_for_status()` method to match the requests/urllib3 behavior which just makes checking for whether or not a job failed easier. Build is working locally -- might need to fiddle with CI to get airflow running there correctly still Pull Request resolved: #519 Test Plan: ``` cd docs && make html ``` ![Screenshot 2022-06-14 at 18-01-37 Airflow — PyTorch_TorchX main documentation](https://user-images.githubusercontent.com/909104/173714270-c3429671-b0e8-4bc1-83ca-9a487e2381b5.png) Reviewed By: priyaramani Differential Revision: D37161851 Pulled By: d4l3k fbshipit-source-id: f7ee74741a25a21f7d4f376e8ab07070ae3b233d
1 parent 939a778 commit a551741

File tree

6 files changed

+150
-0
lines changed

6 files changed

+150
-0
lines changed

.github/workflows/doc-build.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ jobs:
2626
- name: Install TorchX
2727
run: |
2828
python setup.py develop
29+
- name: Start Airflow
30+
run: |
31+
# start airflow in background
32+
airflow standalone &
33+
# wait 5 seconds for airflow to start
34+
sleep 5
2935
- name: Doc Test
3036
run: |
3137
cd docs

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ jupytext
1010
ipython_genutils
1111
# https://github.com/jupyter/nbconvert/issues/1736
1212
jinja2<=3.0.3
13+
apache-airflow

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ Works With
6868
:caption: Pipelines
6969

7070
pipelines/kfp
71+
pipelines/airflow.md
7172

7273

7374
Examples

docs/source/pipelines/airflow.md

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
---
2+
jupyter:
3+
jupytext:
4+
text_representation:
5+
extension: .md
6+
format_name: markdown
7+
format_version: '1.3'
8+
jupytext_version: 1.13.7
9+
kernelspec:
10+
display_name: Python 3
11+
language: python
12+
name: python3
13+
---
14+
15+
# Airflow
16+
17+
For pipelines that support Python based execution you can directly use the
18+
TorchX API. TorchX is designed to be easily integrated in to other applications
19+
via the programmatic API. No special Airflow integrations are needed.
20+
21+
With TorchX, you can use Airflow for the pipeline orchestration and run your
22+
PyTorch application (i.e. distributed training) on a remote GPU cluster.
23+
24+
```python
25+
import datetime
26+
import pendulum
27+
28+
from airflow.utils.state import DagRunState, TaskInstanceState
29+
from airflow.utils.types import DagRunType
30+
from airflow.models.dag import DAG
31+
from airflow.decorators import task
32+
33+
34+
DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC")
35+
DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1)
36+
```
37+
38+
To launch a TorchX job from Airflow you can create a Airflow Python task to
39+
import the runner, launch the job and wait for it to complete. If you're running
40+
on a remote cluster you may need to use the virtualenv task to install the
41+
`torchx` package.
42+
43+
```python
44+
@task(task_id=f'hello_torchx')
45+
def run_torchx(message):
46+
"""This is a function that will run within the DAG execution"""
47+
from torchx.runner import get_runner
48+
with get_runner() as runner:
49+
# Run the utils.sh component on the local_cwd scheduler.
50+
app_id = runner.run_component(
51+
"utils.sh",
52+
["echo", message],
53+
scheduler="local_cwd",
54+
)
55+
56+
# Wait for the the job to complete
57+
status = runner.wait(app_id, wait_interval=1)
58+
59+
# Raise_for_status will raise an exception if the job didn't succeed
60+
status.raise_for_status()
61+
62+
# Finally we can print all of the log lines from the TorchX job so it
63+
# will show up in the workflow logs.
64+
for line in runner.log_lines(app_id, "sh", k=0):
65+
print(line, end="")
66+
```
67+
68+
Once we have the task defined we can put it into a Airflow DAG and run it like
69+
normal.
70+
71+
```python
72+
from torchx.schedulers.ids import make_unique
73+
74+
with DAG(
75+
dag_id=make_unique('example_python_operator'),
76+
schedule_interval=None,
77+
start_date=DATA_INTERVAL_START,
78+
catchup=False,
79+
tags=['example'],
80+
) as dag:
81+
run_job = run_torchx("Hello, TorchX!")
82+
83+
84+
dagrun = dag.create_dagrun(
85+
state=DagRunState.RUNNING,
86+
execution_date=DATA_INTERVAL_START,
87+
data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END),
88+
start_date=DATA_INTERVAL_END,
89+
run_type=DagRunType.MANUAL,
90+
)
91+
ti = dagrun.get_task_instance(task_id="hello_torchx")
92+
ti.task = dag.get_task(task_id="hello_torchx")
93+
ti.run(ignore_ti_state=True)
94+
assert ti.state == TaskInstanceState.SUCCESS
95+
```
96+
97+
If all goes well you should see `Hello, TorchX!` printed above.
98+
99+
## Next Steps
100+
101+
* Checkout the [runner API documentation](../runner.rst) to learn more about
102+
programmatic usage of TorchX
103+
* Browse through the collection of [builtin components](../components/overview.rst)
104+
which can be used in your Airflow pipeline

torchx/specs/api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,25 @@ def __repr__(self) -> str:
477477
app_status_dict["state"] = repr(app_status_dict["state"])
478478
return yaml.dump({"AppStatus": app_status_dict})
479479

480+
def raise_for_status(self) -> None:
481+
"""
482+
raise_for_status will raise an AppStatusError if the state is not SUCCEEDED.
483+
"""
484+
if self.state != AppState.SUCCEEDED:
485+
raise AppStatusError(self, f"job did not succeed: {self}")
486+
487+
488+
class AppStatusError(Exception):
489+
"""
490+
AppStatusError is raised when the job status is in an exceptional state i.e.
491+
not SUCCEEDED.
492+
"""
493+
494+
def __init__(self, status: AppStatus, *args: object) -> None:
495+
super().__init__(*args)
496+
497+
self.status = status
498+
480499

481500
# valid run cfg values; only support primitives (str, int, float, bool, List[str])
482501
# TODO(wilsonhong): python 3.9+ supports list[T] in typing, which can be used directly

torchx/specs/test/api_test.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
AppDryRunInfo,
1919
AppState,
2020
AppStatus,
21+
AppStatusError,
2122
CfgVal,
2223
get_type_name,
2324
InvalidRunConfigException,
@@ -86,6 +87,24 @@ def test_serialize_embed_json(self) -> None:
8687
""",
8788
)
8889

90+
def test_raise_on_status(self) -> None:
91+
AppStatus(state=AppState.SUCCEEDED).raise_for_status()
92+
93+
with self.assertRaisesRegex(
94+
AppStatusError, r"(?s)job did not succeed:.*FAILED.*"
95+
):
96+
AppStatus(state=AppState.FAILED).raise_for_status()
97+
98+
with self.assertRaisesRegex(
99+
AppStatusError, r"(?s)job did not succeed:.*CANCELLED.*"
100+
):
101+
AppStatus(state=AppState.CANCELLED).raise_for_status()
102+
103+
with self.assertRaisesRegex(
104+
AppStatusError, r"(?s)job did not succeed:.*RUNNING.*"
105+
):
106+
AppStatus(state=AppState.RUNNING).raise_for_status()
107+
89108

90109
class ResourceTest(unittest.TestCase):
91110
def test_copy_resource(self) -> None:

0 commit comments

Comments
 (0)