Skip to content

Commit e3321aa

Browse files
committed
[SPARK-52238][PYTHON] Python client for Declarative Pipelines
### What changes were proposed in this pull request? Adds the Python client for Declarative Pipelines. This implements the command line interface and Python APIs described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). #### Python API for defining pipeline graph elements The Python API consists of these APIs for defining flows and datasets in a pipeline dataflow graph (see their docstring for more details): - `create_streaming_table` - `append_flow` - `materialized_view` - `table` - `temporary_view` Example file of definitions: ```python from pyspark.sql import SparkSession from pyspark import pipelines as sdp spark = SparkSession.active() sdp.materialized_view def baby_names_raw(): return ( spark.read.option("header", "true").csv("babynames.csv") .withColumnRenamed("First Name", "First_Name") ) ``` #### Command line interface The CLI is implemented as a Spark Connect client. It enables launching runs of declarative pipelines. It accepts a YAML spec, which specifies where on the local filesystem to look for the Python and SQL files that contain the definitions of the flows and datasets that make up the pipeline dataflow graph. Example usage: ``` bin/spark-pipelines run --remote sc://localhost --spec pipeline.yml ``` Example output: ``` Loading pipeline spec from pipeline.yaml... Creating Spark session... Creating dataflow graph... Registering graph elements... Loading definitions. Root directory: .. Found 1 files matching glob 'transformations/**/*.py' Importing transformations/baby_names_raw.py... Found 1 files matching glob 'transformations/**/*.sql' Registering SQL file transformations/baby_names_prepared.sql... Starting run... Starting execution... 2025-05-20T15:08:01.395Z: Flow `spark_catalog`.`default`.`baby_names_raw` is QUEUED. 2025-05-20T15:08:01.398Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is QUEUED. 2025-05-20T15:08:01.402Z: Flow 'spark_catalog.default.baby_names_raw' is PLANNING. 2025-05-20T15:08:01.403Z: Flow `spark_catalog`.`default`.`baby_names_raw` is STARTING. 2025-05-20T15:08:01.404Z: Flow `spark_catalog`.`default`.`baby_names_raw` is RUNNING. 2025-05-20T15:08:03.096Z: Flow 'spark_catalog.default.baby_names_raw' has COMPLETED. 2025-05-20T15:08:03.422Z: Flow 'spark_catalog.default.baby_names_prepared' is PLANNING. 2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is STARTING. 2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is RUNNING. 2025-05-20T15:08:03.875Z: Flow 'spark_catalog.default.baby_names_prepared' has COMPLETED. 2025-05-20T15:08:05.492Z: Run has COMPLETED. ``` #### Architecture diagram <img width="1256" alt="image" src="https://github.com/user-attachments/assets/0fa6428f-b506-493b-a788-7f047a2a7946" /> ### Why are the changes needed? In order to implement Declarative Pipelines, as described in the SPIP. ### Does this PR introduce _any_ user-facing change? No previous behavior is changed, but new behavior is introduced. ### How was this patch tested? #### Unit testing Includes unit tests for: - Python API error cases – test_decorators.py - Command line functionality – test_cli.py - The harness for registering graph elements while evaluating pipeline definition Python files – test_graph_element_registry.py - Code for blocking execution and analysis within decorated query functions – test_block_connect_access.py Note that, once the backend is wired up, we will submit additional unit tests that cover end-to-end pipeline execution with Python. #### CLI testing With the Declarative Pipelines Spark Connect backend (coming in a future PR), I ran the CLI and confirmed that it executed a pipeline as expected. ### Was this patch authored or co-authored using generative AI tooling? Closes #50963 from sryza/sdp-python. Lead-authored-by: Sandy Ryza <[email protected]> Co-authored-by: Sandy Ryza <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
1 parent 6a89f65 commit e3321aa

28 files changed

+2227
-6
lines changed

.github/workflows/build_and_test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ jobs:
511511
- >-
512512
pyspark-core, pyspark-errors, pyspark-streaming, pyspark-logger
513513
- >-
514-
pyspark-mllib, pyspark-ml, pyspark-ml-connect
514+
pyspark-mllib, pyspark-ml, pyspark-ml-connect, pyspark-pipelines
515515
- >-
516516
pyspark-connect
517517
- >-

bin/spark-pipelines

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#!/usr/bin/env bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
# Default to standard python3 interpreter unless told otherwise
21+
if [[ -z "$PYSPARK_PYTHON" ]]; then
22+
PYSPARK_PYTHON=python3
23+
fi
24+
25+
if [ -z "${SPARK_HOME}" ]; then
26+
source "$(dirname "$0")"/find-spark-home
27+
fi
28+
29+
# Add the PySpark classes to the Python path:
30+
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
31+
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.9-src.zip:$PYTHONPATH"
32+
33+
$PYSPARK_PYTHON "${SPARK_HOME}"/python/pyspark/pipelines/cli.py "$@"

dev/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pytest-mypy-plugins==1.9.3
2727
flake8==3.9.0
2828
# See SPARK-38680.
2929
pandas-stubs<1.2.0.54
30+
types-PyYAML
3031

3132
# Documentation (SQL)
3233
mkdocs

dev/sparktestsupport/modules.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1501,6 +1501,18 @@ def __hash__(self):
15011501
],
15021502
)
15031503

1504+
pyspark_pipelines = Module(
1505+
name="pyspark-pipelines",
1506+
dependencies=[pyspark_core, pyspark_sql, pyspark_connect],
1507+
source_file_regexes=["python/pyspark/pipelines"],
1508+
python_test_goals=[
1509+
"pyspark.pipelines.tests.test_block_connect_access",
1510+
"pyspark.pipelines.tests.test_cli",
1511+
"pyspark.pipelines.tests.test_decorators",
1512+
"pyspark.pipelines.tests.test_graph_element_registry",
1513+
"pyspark.pipelines.tests.test_init_cli",
1514+
],
1515+
)
15041516

15051517
sparkr = Module(
15061518
name="sparkr",

dev/sparktestsupport/utils.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,25 +112,27 @@ def determine_modules_to_test(changed_modules, deduplicated=True):
112112
'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-ml-connect', 'pyspark-mllib',
113113
'pyspark-pandas', 'pyspark-pandas-connect-part0', 'pyspark-pandas-connect-part1',
114114
'pyspark-pandas-connect-part2', 'pyspark-pandas-connect-part3', 'pyspark-pandas-slow',
115-
'pyspark-sql', 'pyspark-testing', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
115+
'pyspark-pipelines', 'pyspark-sql', 'pyspark-testing', 'repl', 'sparkr', 'sql',
116+
'sql-kafka-0-10']
116117
>>> sorted([x.name for x in determine_modules_to_test(
117118
... [modules.sparkr, modules.sql], deduplicated=False)])
118119
... # doctest: +NORMALIZE_WHITESPACE
119120
['avro', 'connect', 'docker-integration-tests', 'examples', 'hive', 'hive-thriftserver',
120121
'mllib', 'protobuf', 'pyspark-connect', 'pyspark-ml', 'pyspark-ml-connect', 'pyspark-mllib',
121122
'pyspark-pandas', 'pyspark-pandas-connect-part0', 'pyspark-pandas-connect-part1',
122123
'pyspark-pandas-connect-part2', 'pyspark-pandas-connect-part3', 'pyspark-pandas-slow',
123-
'pyspark-sql', 'pyspark-testing', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
124+
'pyspark-pipelines', 'pyspark-sql', 'pyspark-testing', 'repl', 'sparkr', 'sql',
125+
'sql-kafka-0-10']
124126
>>> sorted([x.name for x in determine_modules_to_test(
125127
... [modules.sql, modules.core], deduplicated=False)])
126128
... # doctest: +NORMALIZE_WHITESPACE
127129
['avro', 'catalyst', 'connect', 'core', 'docker-integration-tests', 'examples', 'graphx',
128130
'hive', 'hive-thriftserver', 'mllib', 'mllib-local', 'protobuf', 'pyspark-connect',
129131
'pyspark-core', 'pyspark-ml', 'pyspark-ml-connect', 'pyspark-mllib', 'pyspark-pandas',
130132
'pyspark-pandas-connect-part0', 'pyspark-pandas-connect-part1', 'pyspark-pandas-connect-part2',
131-
'pyspark-pandas-connect-part3', 'pyspark-pandas-slow', 'pyspark-resource', 'pyspark-sql',
132-
'pyspark-streaming', 'pyspark-testing', 'repl', 'root', 'sparkr', 'sql', 'sql-kafka-0-10',
133-
'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
133+
'pyspark-pandas-connect-part3', 'pyspark-pandas-slow', 'pyspark-pipelines', 'pyspark-resource',
134+
'pyspark-sql', 'pyspark-streaming', 'pyspark-testing', 'repl', 'root', 'sparkr', 'sql',
135+
'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10', 'streaming-kinesis-asl']
134136
"""
135137
modules_to_test = set()
136138
for module in changed_modules:

python/mypy.ini

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ ignore_errors = True
117117
[mypy-pyspark.pandas.tests.*]
118118
ignore_errors = True
119119

120+
[mypy-pyspark.pipelines.tests.*]
121+
ignore_errors = True
122+
120123
[mypy-pyspark.tests.*]
121124
ignore_errors = True
122125

@@ -185,6 +188,9 @@ ignore_missing_imports = True
185188
[mypy-flameprof.*]
186189
ignore_missing_imports = True
187190

191+
[mypy-yaml.*]
192+
ignore_missing_imports = True
193+
188194
; Ignore errors for proto generated code
189195
[mypy-pyspark.sql.connect.proto.*, pyspark.sql.connect.proto, pyspark.sql.streaming.proto]
190196
ignore_errors = True

python/packaging/client/setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
"pyspark.pandas.tests.connect.reshape",
112112
"pyspark.pandas.tests.connect.series",
113113
"pyspark.pandas.tests.connect.window",
114+
"pyspark.pipelines.tests",
114115
"pyspark.logger.tests",
115116
"pyspark.logger.tests.connect",
116117
]
@@ -182,6 +183,7 @@
182183
"pyspark.pandas.spark",
183184
"pyspark.pandas.typedef",
184185
"pyspark.pandas.usage_logging",
186+
"pyspark.pipelines",
185187
"pyspark.testing",
186188
"pyspark.resource",
187189
"pyspark.errors",

python/pyspark/errors/error-conditions.json

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@
1414
"Arrow legacy IPC format is not supported in PySpark, please unset ARROW_PRE_0_15_IPC_FORMAT."
1515
]
1616
},
17+
"ATTEMPT_ANALYSIS_IN_PIPELINE_QUERY_FUNCTION": {
18+
"message": [
19+
"Operations that trigger DataFrame analysis or execution are not allowed in pipeline query functions. Move code outside of the pipeline query function."
20+
]
21+
},
1722
"ATTRIBUTE_NOT_CALLABLE": {
1823
"message": [
1924
"Attribute `<attr_name>` in provided object `<obj_name>` is not callable."
@@ -219,6 +224,11 @@
219224
"Unexpected filter <name>."
220225
]
221226
},
227+
"DECORATOR_ARGUMENT_NOT_CALLABLE": {
228+
"message": [
229+
"The first positional argument passed to @<decorator_name> must be callable. Either add @<decorator_name> with no parameters to your function, or pass options to @<decorator_name> using keyword arguments (e.g. <example_usage>)."
230+
]
231+
},
222232
"DIFFERENT_PANDAS_DATAFRAME": {
223233
"message": [
224234
"DataFrames are not almost equal:",
@@ -336,6 +346,11 @@
336346
"<field_name>: <obj> is not an instance of type <data_type>."
337347
]
338348
},
349+
"GRAPH_ELEMENT_DEFINED_OUTSIDE_OF_DECLARATIVE_PIPELINE": {
350+
"message": [
351+
"APIs that define elements of a declarative pipeline can only be invoked within the context of defining a pipeline."
352+
]
353+
},
339354
"HIGHER_ORDER_FUNCTION_SHOULD_RETURN_COLUMN": {
340355
"message": [
341356
"Function `<func_name>` should return Column, got <return_type>."
@@ -552,6 +567,11 @@
552567
"Mixed type replacements are not supported."
553568
]
554569
},
570+
"MULTIPLE_PIPELINE_SPEC_FILES_FOUND": {
571+
"message": [
572+
"Multiple pipeline spec files found in the directory `<dir_path>`. Please remove one or choose a particular one with the --spec argument."
573+
]
574+
},
555575
"NEGATIVE_VALUE": {
556576
"message": [
557577
"Value for `<arg_name>` must be greater than or equal to 0, got '<arg_value>'."
@@ -839,6 +859,41 @@
839859
"The Pandas SCALAR_ITER UDF outputs more rows than input rows."
840860
]
841861
},
862+
"PIPELINE_SPEC_DICT_KEY_NOT_STRING": {
863+
"message": [
864+
"For pipeline spec field `<field_name>`, key should be a string, got <key_type>."
865+
]
866+
},
867+
"PIPELINE_SPEC_DICT_VALUE_NOT_STRING": {
868+
"message": [
869+
"For pipeline spec field `<field_name>`, value for key `<key_name>` should be a string, got <value_type>."
870+
]
871+
},
872+
"PIPELINE_SPEC_FIELD_NOT_DICT": {
873+
"message": [
874+
"Pipeline spec field `<field_name>` should be a dict, got <field_type>."
875+
]
876+
},
877+
"PIPELINE_SPEC_FILE_DOES_NOT_EXIST": {
878+
"message": [
879+
"The pipeline spec file `<spec_path>` does not exist."
880+
]
881+
},
882+
"PIPELINE_SPEC_FILE_NOT_FOUND": {
883+
"message": [
884+
"No pipeline.yaml or pipeline.yml file provided in arguments or found in directory `<dir_path>` or readable ancestor directories."
885+
]
886+
},
887+
"PIPELINE_SPEC_UNEXPECTED_FIELD": {
888+
"message": [
889+
"Pipeline spec field `<field_name>` is unexpected."
890+
]
891+
},
892+
"PIPELINE_UNSUPPORTED_DEFINITIONS_FILE_EXTENSION": {
893+
"message": [
894+
"Pipeline definitions file `<file_path>` has an unsupported extension. Supported extensions are `.py` and `.sql`."
895+
]
896+
},
842897
"PIPE_FUNCTION_EXITED": {
843898
"message": [
844899
"Pipe function `<func_name>` exited with error code <error_code>."
@@ -1145,6 +1200,11 @@
11451200
"Pie plot requires either a `y` column or `subplots=True`."
11461201
]
11471202
},
1203+
"UNSUPPORTED_PIPELINES_DATASET_TYPE": {
1204+
"message": [
1205+
"Unsupported pipelines dataset type: <dataset_type>."
1206+
]
1207+
},
11481208
"UNSUPPORTED_PLOT_BACKEND": {
11491209
"message": [
11501210
"`<backend>` is not supported, it should be one of the values from <supported_backends>"

python/pyspark/pipelines/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
from pyspark.pipelines.api import (
18+
append_flow,
19+
create_streaming_table,
20+
materialized_view,
21+
table,
22+
temporary_view,
23+
)
24+
25+
__all__ = [
26+
"append_flow",
27+
"create_streaming_table",
28+
"materialized_view",
29+
"table",
30+
"temporary_view",
31+
]

0 commit comments

Comments
 (0)