-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-52238][PYTHON] Python client for Declarative Pipelines #50963
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is Declarative Pipelines supposed to be only supported in connect mode?
from pyspark.sql.pipelines.block_connect_access import block_spark_connect_execution_and_analysis | ||
|
||
|
||
class BlockSparkConnectAccessTests(ReusedConnectTestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new tests should be registered in dev/sparktestsupport/modules.py
, otherwise they are skipped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we plan to also test this file in classic mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only be tested in connect mode – do I need to add something to the file to set that up?
|
||
class GraphElementRegistryTest(unittest.TestCase): | ||
def test_graph_element_registry(self): | ||
spark = SparkSession.builder.getOrCreate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not reusing the ReusedSQLTestCase (for classic) and ReusedConnectTestCase (for connect)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah – this test actually doesn't need a SparkSession
. Updating it to take it out.
@zhengruifeng this initial implementation is just for Connect. Connect is more straightforward to support, because Connect DataFrames are lazier than classic DataFrames. This means we can evaluate the user's decorated query function immediately rather than call back after all upstream datasets have been resolved. However, it's designed in a way that can support classic in the future – by implementing a |
41a323f
to
55bbe49
Compare
63178b4
to
3b3e843
Compare
8c3adfd
to
975cbcd
Compare
`init` command as described in this design doc: https://docs.google.com/document/d/1LrwYt99MO8Pt2xgQlMoVvoBjlX0EMfF_-NkWGQYi39E/edit?tab=t.0 ### How I tested ``` ./python/run-tests --modules pyspark-sql --testnames 'pyspark.sql.tests.pipelines.test_cli' dev/lint-python --compile --black --custom-pyspark-error --flake8 ~/oss/bin/spark-pipelines init --name demo2 cd demo2 ~/oss/bin/spark-pipelines run --remote sc://localhost ``` `init` output: ``` Pipeline project 'demo3' created successfully. To run your pipeline: cd 'demo3' spark-pipelines run ``` `run` output: ``` Loading pipeline spec from /Users/sandy.ryza/sdp-test/demo2/pipeline.yml... Spark session created. Creating dataflow graph... Registering graph elements... Loading definitions. Root directory: /Users/sandy.ryza/sdp-test/demo2. Found 1 files matching glob 'transformations/**/*.py' Importing /Users/sandy.ryza/sdp-test/demo2/transformations/example_python_materialized_view.py... Found 1 files matching glob 'transformations/**/*.sql' Registering SQL file /Users/sandy.ryza/sdp-test/demo2/transformations/example_sql_materialized_view.sql... Starting run... Starting execution... 2025-05-21T17:20:26.155Z: Flow `spark_catalog`.`default`.`example_python_materialized_view` is QUEUED. 2025-05-21T17:20:26.155Z: Flow `spark_catalog`.`default`.`example_sql_materialized_view` is QUEUED. 2025-05-21T17:20:26.156Z: Flow 'spark_catalog.default.example_python_materialized_view' is PLANNING. 2025-05-21T17:20:26.156Z: Flow `spark_catalog`.`default`.`example_python_materialized_view` is STARTING. 2025-05-21T17:20:26.156Z: Flow `spark_catalog`.`default`.`example_python_materialized_view` is RUNNING. 2025-05-21T17:20:26.629Z: Flow 'spark_catalog.default.example_python_materialized_view' has COMPLETED. 2025-05-21T17:20:27.164Z: Flow 'spark_catalog.default.example_sql_materialized_view' is PLANNING. 2025-05-21T17:20:27.165Z: Flow `spark_catalog`.`default`.`example_sql_materialized_view` is STARTING. 2025-05-21T17:20:27.165Z: Flow `spark_catalog`.`default`.`example_sql_materialized_view` is RUNNING. 2025-05-21T17:20:27.462Z: Flow 'spark_catalog.default.example_sql_materialized_view' has COMPLETED. 2025-05-21T17:20:29.216Z: Run has COMPLETED. ```
import argparse | ||
import importlib.util | ||
import os | ||
import yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like the PySpark tests fail if yaml
is not installed (https://github.com/apache/spark/actions/runs/15516895252/job/43685059103). I think we should skip the tests if yaml is not found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this what we do for other dependencies? I'd be worried that, if we accidentally make a change to Spark CI that avoids installing pyyaml when we'd otherwise expect it to be installed, then the tests could get broken and we wouldn't find out.
I could alternatively help trace down why it isn't installed in that situation and fix it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we do .. the tests work without any dependency basically. That build is a scheduled build dedicated for that (to test without any dependencies).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a bunch of scheduled jobs that we don't run for PR builders at https://github.com/apache/spark/actions, e.g., JDK 17, 21, Maven, MacOS etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it – I'll make this change. Any chance you have a reference to how we skip tests for other missing depndencies, so I can make add something consistent?
### What changes were proposed in this pull request? Adds the Python client for Declarative Pipelines. This implements the command line interface and Python APIs described in the [Declarative Pipelines SPIP](https://docs.google.com/document/d/1PsSTngFuRVEOvUGzp_25CQL1yfzFHFr02XdMfQ7jOM4/edit?tab=t.0#heading=h.9g6a5f8v6xig). #### Python API for defining pipeline graph elements The Python API consists of these APIs for defining flows and datasets in a pipeline dataflow graph (see their docstring for more details): - `create_streaming_table` - `append_flow` - `materialized_view` - `table` - `temporary_view` Example file of definitions: ```python from pyspark.sql import SparkSession from pyspark import pipelines as sdp spark = SparkSession.active() sdp.materialized_view def baby_names_raw(): return ( spark.read.option("header", "true").csv("babynames.csv") .withColumnRenamed("First Name", "First_Name") ) ``` #### Command line interface The CLI is implemented as a Spark Connect client. It enables launching runs of declarative pipelines. It accepts a YAML spec, which specifies where on the local filesystem to look for the Python and SQL files that contain the definitions of the flows and datasets that make up the pipeline dataflow graph. Example usage: ``` bin/spark-pipelines run --remote sc://localhost --spec pipeline.yml ``` Example output: ``` Loading pipeline spec from pipeline.yaml... Creating Spark session... Creating dataflow graph... Registering graph elements... Loading definitions. Root directory: .. Found 1 files matching glob 'transformations/**/*.py' Importing transformations/baby_names_raw.py... Found 1 files matching glob 'transformations/**/*.sql' Registering SQL file transformations/baby_names_prepared.sql... Starting run... Starting execution... 2025-05-20T15:08:01.395Z: Flow `spark_catalog`.`default`.`baby_names_raw` is QUEUED. 2025-05-20T15:08:01.398Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is QUEUED. 2025-05-20T15:08:01.402Z: Flow 'spark_catalog.default.baby_names_raw' is PLANNING. 2025-05-20T15:08:01.403Z: Flow `spark_catalog`.`default`.`baby_names_raw` is STARTING. 2025-05-20T15:08:01.404Z: Flow `spark_catalog`.`default`.`baby_names_raw` is RUNNING. 2025-05-20T15:08:03.096Z: Flow 'spark_catalog.default.baby_names_raw' has COMPLETED. 2025-05-20T15:08:03.422Z: Flow 'spark_catalog.default.baby_names_prepared' is PLANNING. 2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is STARTING. 2025-05-20T15:08:03.422Z: Flow `spark_catalog`.`default`.`baby_names_prepared` is RUNNING. 2025-05-20T15:08:03.875Z: Flow 'spark_catalog.default.baby_names_prepared' has COMPLETED. 2025-05-20T15:08:05.492Z: Run has COMPLETED. ``` #### Architecture diagram <img width="1256" alt="image" src="https://github.com/user-attachments/assets/0fa6428f-b506-493b-a788-7f047a2a7946" /> ### Why are the changes needed? In order to implement Declarative Pipelines, as described in the SPIP. ### Does this PR introduce _any_ user-facing change? No previous behavior is changed, but new behavior is introduced. ### How was this patch tested? #### Unit testing Includes unit tests for: - Python API error cases – test_decorators.py - Command line functionality – test_cli.py - The harness for registering graph elements while evaluating pipeline definition Python files – test_graph_element_registry.py - Code for blocking execution and analysis within decorated query functions – test_block_connect_access.py Note that, once the backend is wired up, we will submit additional unit tests that cover end-to-end pipeline execution with Python. #### CLI testing With the Declarative Pipelines Spark Connect backend (coming in a future PR), I ran the CLI and confirmed that it executed a pipeline as expected. ### Was this patch authored or co-authored using generative AI tooling? Closes apache#50963 from sryza/sdp-python. Lead-authored-by: Sandy Ryza <[email protected]> Co-authored-by: Sandy Ryza <[email protected]> Signed-off-by: Sandy Ryza <[email protected]>
What changes were proposed in this pull request?
Adds the Python client for Declarative Pipelines. This implements the command line interface and Python APIs described in the Declarative Pipelines SPIP.
Python API for defining pipeline graph elements
The Python API consists of these APIs for defining flows and datasets in a pipeline dataflow graph (see their docstring for more details):
create_streaming_table
@append_flow
@materialized_view
@table
@temporary_view
Example file of definitions:
Command line interface
The CLI is implemented as a Spark Connect client. It enables launching runs of declarative pipelines. It accepts a YAML spec, which specifies where on the local filesystem to look for the Python and SQL files that contain the definitions of the flows and datasets that make up the pipeline dataflow graph.
Example usage:
Example output:
Architecture diagram
Why are the changes needed?
In order to implement Declarative Pipelines, as described in the SPIP.
Does this PR introduce any user-facing change?
No previous behavior is changed, but new behavior is introduced.
How was this patch tested?
Unit testing
Includes unit tests for:
Note that, once the backend is wired up, we will submit additional unit tests that cover end-to-end pipeline execution with Python.
CLI testing
With the Declarative Pipelines Spark Connect backend (coming in a future PR), I ran the CLI and confirmed that it executed a pipeline as expected.
Was this patch authored or co-authored using generative AI tooling?