Skip to content

Support Model in registry in pipeline #26914

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/ml/azure-ai-ml/azure/ai/ml/dsl/_pipeline_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pathlib import Path
from typing import Any, Callable, Dict, TypeVar

from azure.ai.ml.entities import Data, PipelineJob, PipelineJobSettings
from azure.ai.ml.entities import Data, PipelineJob, PipelineJobSettings, Model
from azure.ai.ml.entities._builders.pipeline import Pipeline
from azure.ai.ml.entities._inputs_outputs import Input, is_parameter_group
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput, _GroupAttrDict
Expand All @@ -36,6 +36,7 @@
PipelineInput,
NodeOutput,
Input,
Model,
Data, # For the case use a Data object as an input, we will convert it to Input object
Pipeline, # For the case use a pipeline node as the input, we use its only one output as the real input.
str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from azure.ai.ml._utils._arm_id_utils import get_resource_name_from_arm_id_safe
from azure.ai.ml.constants import JobType
from azure.ai.ml.entities import Data
from azure.ai.ml.entities import Data, Model
from azure.ai.ml.entities._component.component import Component
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job._input_output_helpers import build_input_output
Expand Down Expand Up @@ -189,6 +189,7 @@ def _get_supported_inputs_types(cls):
NodeOutput,
Input,
Data,
Model,
str,
bool,
int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.constants._component import IOConstants
from azure.ai.ml.entities._assets._artifacts.data import Data
from azure.ai.ml.entities._assets._artifacts.model import Model
from azure.ai.ml.entities._inputs_outputs import Input, Output
from azure.ai.ml.entities._job.pipeline._pipeline_expression import PipelineExpressionMixin
from azure.ai.ml.entities._util import resolve_pipeline_parameter
Expand Down Expand Up @@ -241,7 +242,7 @@ def _build_data(self, data, key=None): # pylint: disable=unused-argument
# for data binding case, set is_singular=False for case like "${{parent.inputs.job_in_folder}}/sample1.csv"
if isinstance(data, Input) or is_data_binding_expression(data, is_singular=False):
return data
if isinstance(data, Data):
if isinstance(data, (Data, Model)):
return _data_to_input(data)
# self._meta.type could be None when sub pipeline has no annotation
if isinstance(self._meta, Input) and self._meta.type and not self._meta._is_primitive_type:
Expand Down Expand Up @@ -452,7 +453,7 @@ def _build_data(self, data, key=None): # pylint: disable=unused-argument
error_category=ErrorCategory.USER_ERROR,
)
return data
if isinstance(data, Data):
if isinstance(data, (Data, Model)):
# If value is Data, we convert it to an corresponding Input
return _data_to_input(data)
return data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import pytest
from pathlib import Path
from test_utilities.utils import _PYTEST_TIMEOUT_METHOD

from azure.ai.ml import MLClient, load_component
from azure.core.exceptions import HttpResponseError
from azure.ai.ml import MLClient, load_component, Input, load_model
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.constants import AssetTypes
from azure.core.exceptions import HttpResponseError, ResourceNotFoundError
from azure.core.polling import LROPoller

from .._util import _DSL_TIMEOUT_SECOND

Expand All @@ -12,15 +16,13 @@
@pytest.mark.usefixtures("enable_pipeline_private_preview_features", "recorded_test")
@pytest.mark.timeout(timeout=_DSL_TIMEOUT_SECOND, method=_PYTEST_TIMEOUT_METHOD)
@pytest.mark.e2etest
@pytest.mark.skip(reason="not able to re-record")
@pytest.mark.pipeline_test
class TestDSLPipelineOnRegistry(AzureRecordedTestCase):
@pytest.mark.skip(reason="not able to re-record")
def test_pipeline_job_create_with_registered_component_on_registry(
self,
registry_client: MLClient,
) -> None:
from azure.ai.ml.dsl import pipeline

local_component = load_component("./tests/test_configs/components/basic_component_code_local_path.yml")
try:
created_component = registry_client.components.get(local_component.name, version=local_component.version)
Expand All @@ -35,3 +37,105 @@ def sample_pipeline():
pipeline_job = sample_pipeline()
assert registry_client.jobs.validate(pipeline_job).passed
# TODO: add test for pipeline job create with registered component on registry after support is ready on canary

@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
"'Unable to find a record for the request' in playback mode")
def test_pipeline_with_local_component_and_registry_model_as_input(self, registry_client: MLClient, client: MLClient):
# get dataset
test_data = Input(
type=AssetTypes.URI_FILE,
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
)

# load_component
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")

pipeline_score_model = Input(
type='mlflow_model',
path='azureml://registries/testFeed/models/iris_model/versions/1'
)

@pipeline()
def score_pipeline_with_registry_model(model_input, test_data):
score = score_func(model_input=model_input, test_data=test_data)
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)

pipeline_job = score_pipeline_with_registry_model(
model_input=pipeline_score_model,
test_data=test_data
)
pipeline_job.settings.default_compute = "cpu-cluster"
pipeline_job = client.jobs.create_or_update(pipeline_job)
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
assert isinstance(cancel_poller, LROPoller)

@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
"'Unable to find a record for the request' in playback mode")
def test_pipeline_with_local_component_and_registry_model_as_input_with_model_input(
self,
registry_client: MLClient,
client: MLClient):
# get dataset
test_data = Input(
type=AssetTypes.URI_FILE,
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
)

# load_component
score_func = load_component("./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/score.yml")

model_path = Path("./tests/test_configs/model/model_iris.yml")
model_entity = load_model(model_path)
try:
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)
except ResourceNotFoundError:
model_entity = registry_client.models.create_or_update(model_entity)
pipeline_score_model = registry_client.models.get(name=model_entity.name, version=model_entity.version)

@pipeline()
def score_pipeline_with_registry_model(model_input, test_data):
score = score_func(model_input=model_input, test_data=test_data)
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)

pipeline_job = score_pipeline_with_registry_model(
model_input=pipeline_score_model, test_data=test_data
)
pipeline_job.settings.default_compute = "cpu-cluster"
pipeline_job = client.jobs.create_or_update(pipeline_job)
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
assert isinstance(cancel_poller, LROPoller)

@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
"'Unable to find a record for the request' in playback mode")
def test_pipeline_with_registry_component_and_model_as_input(self, registry_client: MLClient, client: MLClient):
# get dataset
test_data = Input(
type=AssetTypes.URI_FILE,
path="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/data/sample1.csv"
)

# load_component
score_component_name = "v2_dsl_score_component"
component_version = "0.0.8"
score_func = registry_client.components.get(
name=score_component_name, version=component_version
)

pipeline_score_model = Input(
type='mlflow_model',
path='azureml://registries/testFeed/models/iris_model/versions/1'
)

@pipeline()
def score_pipeline_with_registry_model(model_input, test_data):
score = score_func(model_input=model_input, test_data=test_data)
score_duplicate = score_func(model_input=pipeline_score_model, test_data=test_data)

pipeline_job = score_pipeline_with_registry_model(
model_input=pipeline_score_model,
test_data=test_data
)
pipeline_job.settings.default_compute = "cpu-cluster"
pipeline_job = client.jobs.create_or_update(pipeline_job)
cancel_poller = client.jobs.begin_cancel(pipeline_job.name)
assert isinstance(cancel_poller, LROPoller)
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,22 @@ def test_remote_pipeline_component_job(self, client: MLClient, randstr: Callable
# assert pipeline_dict["outputs"] == {"output_path": {"mode": "ReadWriteMount", "job_output_type": "uri_folder"}}
assert pipeline_dict["settings"] == {"default_compute": "cpu-cluster", "_source": "REMOTE.WORKSPACE.COMPONENT"}

@pytest.mark.skip(reason="request body still exits when re-record and will raise error "
"'Unable to find a record for the request' in playback mode")
def test_pipeline_job_create_with_registry_model_as_input(
self,
client: MLClient,
registry_client: MLClient,
randstr: Callable[[str], str],
) -> None:
params_override = [{"name": randstr("name")}]
pipeline_job = load_job(
source="./tests/test_configs/pipeline_jobs/job_with_registry_model_as_input/pipeline.yml",
params_override=params_override,
)
job = client.jobs.create_or_update(pipeline_job)
assert job.name == params_override[0]["name"]

def test_pipeline_node_with_default_component(self, client: MLClient, randstr: Callable[[str], str]):
params_override = [{"name": randstr("job_name")}]
pipeline_job = load_job(
Expand Down
Loading