|
13 | 13 | # permissions and limitations under the License.
|
14 | 14 | """Utility functions for the orchestrator."""
|
15 | 15 |
|
| 16 | +import os |
16 | 17 | import random
|
17 |
| -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple |
| 18 | +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple, cast |
18 | 19 | from uuid import UUID
|
19 | 20 |
|
20 | 21 | from zenml.client import Client
|
|
25 | 26 | from zenml.constants import (
|
26 | 27 | ENV_ZENML_ACTIVE_STACK_ID,
|
27 | 28 | ENV_ZENML_ACTIVE_WORKSPACE_ID,
|
| 29 | + ENV_ZENML_SERVER, |
28 | 30 | ENV_ZENML_STORE_PREFIX,
|
29 | 31 | PIPELINE_API_TOKEN_EXPIRES_MINUTES,
|
30 | 32 | )
|
31 |
| -from zenml.enums import StoreType |
| 33 | +from zenml.enums import StackComponentType, StoreType |
32 | 34 | from zenml.exceptions import StepContextError
|
| 35 | +from zenml.logger import get_logger |
33 | 36 | from zenml.model.utils import link_artifact_config_to_model
|
34 | 37 | from zenml.models.v2.core.step_run import StepRunRequest
|
35 | 38 | from zenml.new.steps.step_context import get_step_context
|
| 39 | +from zenml.stack import StackComponent |
36 | 40 | from zenml.utils.string_utils import format_name_template
|
37 | 41 |
|
38 | 42 | if TYPE_CHECKING:
|
| 43 | + from zenml.artifact_stores.base_artifact_store import BaseArtifactStore |
39 | 44 | from zenml.artifacts.external_artifact_config import (
|
40 | 45 | ExternalArtifactConfiguration,
|
41 | 46 | )
|
@@ -302,3 +307,101 @@ def _get_model_versions_from_artifacts(
|
302 | 307 | else:
|
303 | 308 | break
|
304 | 309 | return models
|
| 310 | + |
| 311 | + |
| 312 | +class register_artifact_store_filesystem: |
| 313 | + """Context manager for the artifact_store/filesystem_registry dependency. |
| 314 | +
|
| 315 | + Even though it is rare, sometimes we bump into cases where we are trying to |
| 316 | + load artifacts that belong to an artifact store which is different from |
| 317 | + the active artifact store. |
| 318 | +
|
| 319 | + In cases like this, we will try to instantiate the target artifact store |
| 320 | + by creating the corresponding artifact store Python object, which ends up |
| 321 | + registering the right filesystem in the filesystem registry. |
| 322 | +
|
| 323 | + The problem is, the keys in the filesystem registry are schemes (such as |
| 324 | + "s3://" or "gcs://"). If we have two artifact stores with the same set of |
| 325 | + supported schemes, we might end up overwriting the filesystem that belongs |
| 326 | + to the active artifact store (and its authentication). That's why we have |
| 327 | + to re-instantiate the active artifact store again, so the correct filesystem |
| 328 | + will be restored. |
| 329 | + """ |
| 330 | + |
| 331 | + def __init__(self, target_artifact_store_id: Optional[UUID]) -> None: |
| 332 | + """Initialization of the context manager. |
| 333 | +
|
| 334 | + Args: |
| 335 | + target_artifact_store_id: the ID of the artifact store to load. |
| 336 | + """ |
| 337 | + self.target_artifact_store_id = target_artifact_store_id |
| 338 | + |
| 339 | + def __enter__(self) -> "BaseArtifactStore": |
| 340 | + """Entering the context manager. |
| 341 | +
|
| 342 | + It creates an instance of the target artifact store to register the |
| 343 | + correct filesystem in the registry. |
| 344 | +
|
| 345 | + Returns: |
| 346 | + The target artifact store object. |
| 347 | +
|
| 348 | + Raises: |
| 349 | + RuntimeError: If the target artifact store can not be fetched or |
| 350 | + initiated due to missing dependencies. |
| 351 | + """ |
| 352 | + try: |
| 353 | + if self.target_artifact_store_id is not None: |
| 354 | + if ( |
| 355 | + Client().active_stack.artifact_store.id |
| 356 | + != self.target_artifact_store_id |
| 357 | + ): |
| 358 | + get_logger(__name__).debug( |
| 359 | + f"Trying to use the artifact store with ID:" |
| 360 | + f"'{self.target_artifact_store_id}'" |
| 361 | + f"which is currently not the active artifact store." |
| 362 | + ) |
| 363 | + |
| 364 | + artifact_store_model_response = Client().get_stack_component( |
| 365 | + component_type=StackComponentType.ARTIFACT_STORE, |
| 366 | + name_id_or_prefix=self.target_artifact_store_id, |
| 367 | + ) |
| 368 | + return cast( |
| 369 | + "BaseArtifactStore", |
| 370 | + StackComponent.from_model(artifact_store_model_response), |
| 371 | + ) |
| 372 | + else: |
| 373 | + return Client().active_stack.artifact_store |
| 374 | + |
| 375 | + except KeyError: |
| 376 | + raise RuntimeError( |
| 377 | + "Unable to fetch the artifact store with id: " |
| 378 | + f"'{self.target_artifact_store_id}'. Check whether the " |
| 379 | + "artifact store still exists and you have the right " |
| 380 | + "permissions to access it." |
| 381 | + ) |
| 382 | + except ImportError: |
| 383 | + raise RuntimeError( |
| 384 | + "Unable to load the implementation of the artifact store with" |
| 385 | + f"id: '{self.target_artifact_store_id}'. Please make sure that " |
| 386 | + "the environment that you are loading this artifact from " |
| 387 | + "has the right dependencies." |
| 388 | + ) |
| 389 | + |
| 390 | + def __exit__( |
| 391 | + self, |
| 392 | + exc_type: Optional[Any], |
| 393 | + exc_value: Optional[Any], |
| 394 | + traceback: Optional[Any], |
| 395 | + ) -> None: |
| 396 | + """Set it back to the original state. |
| 397 | +
|
| 398 | + Args: |
| 399 | + exc_type: The class of the exception |
| 400 | + exc_value: The instance of the exception |
| 401 | + traceback: The traceback of the exception |
| 402 | + """ |
| 403 | + if ENV_ZENML_SERVER not in os.environ: |
| 404 | + # As we exit the handler, we have to re-register the filesystem |
| 405 | + # that belongs to the active artifact store as it may have been |
| 406 | + # overwritten. |
| 407 | + Client().active_stack.artifact_store._register() |
0 commit comments