Skip to content

Run steps defined in notebooks with remote orchestrators #2899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 73 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
010855e
WIP
schustmi Jul 29, 2024
756d65e
Merge branch 'develop' into feature/PRD-528-upload-code-to-artifact-s…
schustmi Jul 30, 2024
1710e22
Docstrings
schustmi Jul 30, 2024
2c8e98e
Add DB migration
schustmi Jul 30, 2024
75ab157
Add archivable superclass
schustmi Jul 31, 2024
39ca1f3
Improve build reuse
schustmi Jul 31, 2024
65940a4
Fix gzip for archives
schustmi Jul 31, 2024
0214859
Better error messages
schustmi Jul 31, 2024
dc0f0b5
Docstrings/mypy
schustmi Jul 31, 2024
6a70b07
Remove some unnecessary stuff
schustmi Jul 31, 2024
8ae50bf
Typo
schustmi Jul 31, 2024
961a043
Update build context to inherit from new superclass
schustmi Jul 31, 2024
b3f9409
Fix unit tests
schustmi Jul 31, 2024
64a49cb
Small fixes
schustmi Jul 31, 2024
a4f8d6c
Ignore .zen folder and other small improvements
schustmi Jul 31, 2024
ff1eb0e
Merge branch 'develop' into feature/PRD-528-upload-code-to-artifact-s…
schustmi Jul 31, 2024
190159d
Sort and remove duplicates for better build reuse
schustmi Aug 1, 2024
69bc059
Update docker settings to use booleans
schustmi Aug 1, 2024
a203f71
Add code path to pipeline run for frontend
schustmi Aug 2, 2024
74a3313
Move log
schustmi Aug 2, 2024
ec26a27
Better docstring
schustmi Aug 5, 2024
838f45f
Remove hub tests
schustmi Aug 5, 2024
408c33e
Try manual cleanup
schustmi Aug 6, 2024
8c8f3ba
Docs
schustmi Aug 6, 2024
b96d383
Merge branch 'develop' into feature/PRD-528-upload-code-to-artifact-s…
schustmi Aug 7, 2024
0c2df92
Fix alembic order
schustmi Aug 7, 2024
b05bc70
POC
schustmi Aug 1, 2024
5cdb56b
Better error messages
schustmi Aug 2, 2024
dcc2fd8
Docstrings
schustmi Aug 2, 2024
3d3fafe
Improved error handling
schustmi Aug 2, 2024
335dde7
Small fixes
schustmi Aug 2, 2024
e24216f
Rework the whole thing
schustmi Aug 2, 2024
5d41b1b
Don't write to file
schustmi Aug 2, 2024
42a1e87
write to file again
schustmi Aug 2, 2024
c4bf9f5
Add function to verify active notebook
schustmi Aug 2, 2024
8bdc1df
Verify notebook path
schustmi Aug 2, 2024
9af3204
Rename variable
schustmi Aug 2, 2024
5dabb14
Improve logic and add warning
schustmi Aug 2, 2024
1ad96fe
Rename
schustmi Aug 2, 2024
9d4f07f
Store correct relative path
schustmi Aug 2, 2024
9c2c680
Decorator for custom objects
schustmi Aug 5, 2024
92141ad
Better notebook detection
schustmi Aug 5, 2024
25be2bb
VSCode support
schustmi Aug 5, 2024
63fd6b0
Validation before pipeline run
schustmi Aug 5, 2024
bbbad59
Simplify
schustmi Aug 5, 2024
d194dc2
Rename
schustmi Aug 5, 2024
7809935
Apply PR feedback
schustmi Aug 5, 2024
93e89b9
Move imports
schustmi Aug 5, 2024
fa50225
type + sorting
schustmi Aug 5, 2024
8b6de40
Better detection
schustmi Aug 5, 2024
9abcbd1
Fix pyproject
schustmi Aug 5, 2024
f5c57de
Make jupyter extra optional
schustmi Aug 5, 2024
7df14ed
Additional warning
schustmi Aug 6, 2024
518b0bf
Docs/warnings
schustmi Aug 6, 2024
9a4896c
Fix for colab
schustmi Aug 6, 2024
0524dae
More fixes
schustmi Aug 6, 2024
c6e5459
Upload notebook code to artifact store instead
schustmi Aug 6, 2024
fc88cfa
Remove unnecessary dependencies
schustmi Aug 7, 2024
e8a2db4
Update docs
schustmi Aug 7, 2024
f30aadd
Rename variable
schustmi Aug 7, 2024
77ea271
Update comment
schustmi Aug 7, 2024
617210d
Docstrings
schustmi Aug 7, 2024
38fc80c
Fix private attribute access
schustmi Aug 7, 2024
467457c
Update azureml step operator unused docker args
schustmi Aug 7, 2024
4207484
Minor cleanup
schustmi Aug 7, 2024
bdf993f
Update notebook detection
schustmi Aug 7, 2024
ee09976
Call super method
schustmi Aug 7, 2024
06c9631
API docs ignores
schustmi Aug 7, 2024
fc75a2c
Linting
schustmi Aug 7, 2024
4e64e08
Remove notebook extra to fix e2e template
schustmi Aug 7, 2024
d5daa87
Merge branch 'develop' into feature/PRD-529-steps-from-notebook
schustmi Aug 7, 2024
acf007c
Apply review request
schustmi Aug 7, 2024
c7066ef
Linting
schustmi Aug 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/book/how-to/run-remote-pipelines-from-notebooks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
description: Use Jupyter Notebooks to run remote steps or pipelines
---

# 📔 Run remote pipelines from notebooks

ZenML steps and pipelines can be defined in a Jupyter notebook and executed remotely. To do so, ZenML will extract the code from your notebook cells and run them as Python modules inside the Docker containers that execute your pipeline steps remotely. For this to work, the notebook cells in which you define your steps need to meet certain conditions.

Learn more about it in the following sections:

<table data-view="cards"><thead><tr><th></th><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td>Define steps in notebook cells</td><td></td><td></td><td><a href="define-steps-in-notebook-cells.md">define-steps-in-notebook-cells.md</a></td></tr><tr><td>Configure the notebook path</td><td></td></tbody></table>

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

# Define steps in notebook cells

If you want to run ZenML steps defined in notebook cells remotely (either with a remote [orchestrator](../../component-guide/orchestrators/orchestrators.md) or [step operator](../../component-guide/step-operators/step-operators.md)), the cells defining your steps must meet the following conditions:
- The cell can only contain python code, no Jupyter magic commands or shell commands starting with a `%` or `!`.
- The cell **must not** call code from other notebook cells. Functions or classes imported from python files are allowed.
- The cell **must not** rely on imports of previous cells. This means your cell must perform all the imports it needs itself, including ZenML imports like `from zenml import step`.

<!-- For scarf -->
<figure><img alt="ZenML Scarf" referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=f0b4f458-0a54-4fcd-aa95-d5ee424815bc" /></figure>
2 changes: 2 additions & 0 deletions docs/book/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@
* [🔌 Connect to a server](how-to/connecting-to-zenml/README.md)
* [Connect in with your User (interactive)](how-to/connecting-to-zenml/connect-in-with-your-user-interactive.md)
* [Connect with a Service Account](how-to/connecting-to-zenml/connect-with-a-service-account.md)
* [📔 Run remote pipelines from notebooks](how-to/run-remote-pipelines-from-notebooks/README.md)
* [Define steps in notebook cells](how-to/run-remote-pipelines-from-notebooks/define-steps-in-notebook-cells.md)
* [🔐 Interact with secrets](how-to/interact-with-secrets.md)
* [🐞 Debug and solve issues](how-to/debug-and-solve-issues.md)

Expand Down
7 changes: 6 additions & 1 deletion docs/mocked_libs.json
Original file line number Diff line number Diff line change
Expand Up @@ -241,5 +241,10 @@
"databricks.sdk",
"databricks.sdk.service.compute",
"databricks.sdk.service.jobs",
"databricks.sdk.service.serving"
"databricks.sdk.service.serving",
"IPython",
"IPython.core",
"IPython.core.display",
"IPython.core.display_functions",
"ipywidgets"
]
58 changes: 58 additions & 0 deletions src/zenml/config/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class SourceType(Enum):
INTERNAL = "internal"
DISTRIBUTION_PACKAGE = "distribution_package"
CODE_REPOSITORY = "code_repository"
NOTEBOOK = "notebook"
UNKNOWN = "unknown"


Expand Down Expand Up @@ -229,6 +230,63 @@ def _validate_type(cls, value: SourceType) -> SourceType:
return value


class NotebookSource(Source):
"""Source representing an object defined in a notebook.

Attributes:
code_path: Path where the notebook cell code for this source is
uploaded.
replacement_module: Name of the module from which this source should
be loaded in case the code is not running in a notebook.
"""

code_path: Optional[str] = None
replacement_module: Optional[str] = None
type: SourceType = SourceType.NOTEBOOK

# Private attribute that is used to store the code but should not be
# serialized
_cell_code: Optional[str] = None

@field_validator("type")
@classmethod
def _validate_type(cls, value: SourceType) -> SourceType:
"""Validate the source type.

Args:
value: The source type.

Raises:
ValueError: If the source type is not `NOTEBOOK`.

Returns:
The source type.
"""
if value != SourceType.NOTEBOOK:
raise ValueError("Invalid source type.")

return value

@field_validator("module")
@classmethod
def _validate_module(cls, value: str) -> str:
"""Validate the module.

Args:
value: The module.

Raises:
ValueError: If the module is not `__main__`.

Returns:
The module.
"""
if value != "__main__":
raise ValueError("Invalid module for notebook source.")

return value


def convert_source(source: Any) -> Any:
"""Converts an old source string to a source object.

Expand Down
26 changes: 10 additions & 16 deletions src/zenml/entrypoints/base_entrypoint_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import argparse
import os
import shutil
import sys
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Set
Expand All @@ -27,9 +26,13 @@
ENV_ZENML_REQUIRES_CODE_DOWNLOAD,
handle_bool_env_var,
)
from zenml.io import fileio
from zenml.logger import get_logger
from zenml.utils import code_repository_utils, source_utils, uuid_utils
from zenml.utils import (
code_repository_utils,
code_utils,
source_utils,
uuid_utils,
)

if TYPE_CHECKING:
from zenml.models import CodeReferenceResponse, PipelineDeploymentResponse
Expand Down Expand Up @@ -261,30 +264,21 @@ def download_code_from_artifact_store(self, code_path: str) -> None:

Args:
code_path: Path where the code is stored.

Raises:
RuntimeError: If the code is stored in an artifact store which is
not active.
"""
logger.info(
"Downloading code from artifact store path `%s`.", code_path
)

# Do not remove this line, we need to instantiate the artifact store to
# register the filesystem needed for the file download
artifact_store = Client().active_stack.artifact_store

if not code_path.startswith(artifact_store.path):
raise RuntimeError("Code stored in different artifact store.")
_ = Client().active_stack.artifact_store

extract_dir = os.path.abspath("code")
os.makedirs(extract_dir)

download_path = os.path.basename(code_path)
fileio.copy(code_path, download_path)

shutil.unpack_archive(filename=download_path, extract_dir=extract_dir)
os.remove(download_path)
code_utils.download_and_extract_code(
code_path=code_path, extract_dir=extract_dir
)

source_utils.set_custom_source_root(extract_dir)
sys.path.insert(0, extract_dir)
Expand Down
19 changes: 10 additions & 9 deletions src/zenml/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import os
import platform
from importlib.util import find_spec
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Type, cast

Expand Down Expand Up @@ -259,15 +258,17 @@ def in_notebook() -> bool:
if Environment.in_google_colab():
return True

if find_spec("IPython") is not None:
from IPython import get_ipython
try:
ipython = get_ipython() # type: ignore[name-defined]
except NameError:
return False

if get_ipython().__class__.__name__ in [
"TerminalInteractiveShell",
"ZMQInteractiveShell",
"DatabricksShell",
]:
return True
if ipython.__class__.__name__ in [
"TerminalInteractiveShell",
"ZMQInteractiveShell",
"DatabricksShell",
]:
return True
return False

@staticmethod
Expand Down
2 changes: 1 addition & 1 deletion src/zenml/image_builders/build_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ def __init__(
given, a file called `.dockerignore` in the build context root
directory will be used instead if it exists.
"""
super().__init__()
self._root = root
self._dockerignore_file = dockerignore_file
self._extra_files: Dict[str, str] = {}

@property
def dockerignore_file(self) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def launch(
"apt_packages",
"user",
"source_files",
"allow_including_files_in_images",
"allow_download_from_code_repository",
"allow_download_from_artifact_store",
]
docker_settings = info.config.docker_settings
ignored_docker_fields = docker_settings.model_fields_set.intersection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ def is_local(self) -> bool:
"""
return False

@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.

Returns:
True if this config is for a remote component, False otherwise.
"""
return True


class DatabricksOrchestratorFlavor(BaseOrchestratorFlavor):
"""Databricks orchestrator flavor."""
Expand Down
81 changes: 10 additions & 71 deletions src/zenml/new/pipelines/build_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
"""Pipeline build utilities."""

import hashlib
import os
import platform
import tempfile
from typing import (
TYPE_CHECKING,
Dict,
Expand All @@ -29,7 +27,6 @@
import zenml
from zenml.client import Client
from zenml.code_repositories import BaseCodeRepository
from zenml.io import fileio
from zenml.logger import get_logger
from zenml.models import (
BuildItem,
Expand All @@ -40,9 +37,8 @@
PipelineDeploymentBase,
StackResponse,
)
from zenml.new.pipelines.code_archive import CodeArchive
from zenml.stack import Stack
from zenml.utils import source_utils, string_utils
from zenml.utils import source_utils
from zenml.utils.pipeline_docker_image_builder import (
PipelineDockerImageBuilder,
)
Expand Down Expand Up @@ -487,17 +483,15 @@ def verify_local_repository_context(
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository "
"(`source_files=['download_from_code_repository']`), but "
"code repository, but "
"there is no code repository active at your current source "
f"root `{source_utils.get_source_root()}`."
)
elif local_repo_context.is_dirty:
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository "
"(`source_files=['download_from_code_repository']`), but "
"code repository, but "
"the code repository active at your current source root "
f"`{source_utils.get_source_root()}` has uncommitted "
"changes."
Expand All @@ -506,8 +500,7 @@ def verify_local_repository_context(
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository "
"(`source_files=['download_from_code_repository']`), but "
"code repository, but "
"the code repository active at your current source root "
f"`{source_utils.get_source_root()}` has unpushed "
"changes."
Expand Down Expand Up @@ -578,7 +571,7 @@ def verify_custom_build(
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be included in the Docker "
"image (`source_files=['include']`), but the build you "
"image, but the build you "
"specified requires code download. Either update your "
"`DockerSettings` or specify a different build and try "
"again."
Expand All @@ -591,8 +584,7 @@ def verify_custom_build(
raise RuntimeError(
"The `DockerSettings` of the pipeline or one of its "
"steps specify that code should be downloaded from a "
"code repository "
"(`source_files=['download_from_code_repository']`), but "
"code repository but "
"there is no code repository active at your current source "
f"root `{source_utils.get_source_root()}`."
)
Expand Down Expand Up @@ -704,10 +696,10 @@ def should_upload_code(
Whether the current code should be uploaded for the deployment.
"""
if not build:
# No build means all the code is getting executed locally, which means
# we don't need to download any code
# TODO: This does not apply to e.g. Databricks, figure out a solution
# here
# No build means we don't need to download code into a Docker container
# for step execution. In other remote orchestrators that don't use
# Docker containers but instead use e.g. Wheels to run, the code should
# already be included.
return False

for step in deployment.step_configurations.values():
Expand All @@ -724,56 +716,3 @@ def should_upload_code(
return True

return False


def upload_code_if_necessary() -> str:
"""Upload code to the artifact store if necessary.

This function computes a hash of the code to be uploaded, and if an archive
with the same hash already exists it will not re-upload but instead return
the path to the existing archive.

Returns:
The path where to archived code is uploaded.
"""
logger.info("Archiving code...")

code_archive = CodeArchive(root=source_utils.get_source_root())
artifact_store = Client().active_stack.artifact_store

with tempfile.NamedTemporaryFile(
mode="w+b", delete=False, suffix=".tar.gz"
) as f:
code_archive.write_archive(f)

hash_ = hashlib.sha1() # nosec

while True:
data = f.read(64 * 1024)
if not data:
break
hash_.update(data)

filename = f"{hash_.hexdigest()}.tar.gz"
upload_dir = os.path.join(artifact_store.path, "code_uploads")
fileio.makedirs(upload_dir)
upload_path = os.path.join(upload_dir, filename)

if not fileio.exists(upload_path):
archive_size = string_utils.get_human_readable_filesize(
os.path.getsize(f.name)
)
logger.info(
"Uploading code to `%s` (Size: %s).", upload_path, archive_size
)
fileio.copy(f.name, upload_path)
logger.info("Code upload finished.")
else:
logger.info(
"Code already exists in artifact store, skipping upload."
)

if os.path.exists(f.name):
os.remove(f.name)

return upload_path
Loading
Loading