Skip to content

feat: Kickoff Transformation implementationtransformation code base #5181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import functools
import warnings
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from types import FunctionType
from typing import Dict, List, Optional, Tuple, Union

import dill

from feast import flags_helper
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.transformation.base import Transformation
from feast.transformation.mode import TransformationMode

warnings.simplefilter("once", RuntimeWarning)

Expand Down Expand Up @@ -42,6 +48,7 @@ class BatchFeatureView(FeatureView):
"""

name: str
mode: Union[TransformationMode, str]
entities: List[str]
ttl: Optional[timedelta]
source: DataSource
Expand All @@ -54,11 +61,15 @@ class BatchFeatureView(FeatureView):
owner: str
timestamp_field: str
materialization_intervals: List[Tuple[datetime, datetime]]
udf: Optional[FunctionType]
udf_string: Optional[str]
feature_transformation: Transformation

def __init__(
self,
*,
name: str,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
source: DataSource,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = None,
Expand All @@ -67,6 +78,9 @@ def __init__(
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
udf: Optional[FunctionType] = None,
udf_string: Optional[str] = "",
feature_transformation: Optional[Transformation] = None,
):
if not flags_helper.is_test():
warnings.warn(
Expand All @@ -84,6 +98,13 @@ def __init__(
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

self.mode = mode
self.udf = udf
self.udf_string = udf_string
self.feature_transformation = (
feature_transformation or self.get_feature_transformation()
)

super().__init__(
name=name,
entities=entities,
Expand All @@ -95,3 +116,79 @@ def __init__(
schema=schema,
source=source,
)

def get_feature_transformation(self) -> Transformation:
if not self.udf:
raise ValueError(
"Either a UDF or a feature transformation must be provided for BatchFeatureView"
)
if self.mode in (
TransformationMode.PANDAS,
TransformationMode.PYTHON,
TransformationMode.SQL,
) or self.mode in ("pandas", "python", "sql"):
return Transformation(
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
)
else:
raise ValueError(
f"Unsupported transformation mode: {self.mode} for StreamFeatureView"
)


def batch_feature_view(
*,
name: Optional[str] = None,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
entities: Optional[List[str]] = None,
ttl: Optional[timedelta] = None,
source: Optional[DataSource] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
):
"""
Args:
name:
entities:
ttl:
source:
tags:
online:
description:
owner:
schema:

Returns:

"""

def mainify(obj):
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the sfv.
if obj.__module__ != "__main__":
obj.__module__ = "__main__"

def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
batch_feature_view_obj = BatchFeatureView(
name=name or user_function.__name__,
mode=mode,
entities=entities,
ttl=ttl,
source=source,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
udf=user_function,
udf_string=udf_string,
)
functools.update_wrapper(wrapper=batch_feature_view_obj, wrapped=user_function)
return batch_feature_view_obj

return decorator
107 changes: 50 additions & 57 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import copy
import functools
import inspect
import warnings
from types import FunctionType
from typing import Any, List, Optional, Union, get_type_hints
from typing import Any, List, Optional, Union, cast

import dill
import pandas as pd
import pyarrow
from typeguard import typechecked

Expand All @@ -31,6 +29,8 @@
from feast.protos.feast.core.Transformation_pb2 import (
UserDefinedFunctionV2 as UserDefinedFunctionProto,
)
from feast.transformation.base import Transformation
from feast.transformation.mode import TransformationMode
from feast.transformation.pandas_transformation import PandasTransformation
from feast.transformation.python_transformation import PythonTransformation
from feast.transformation.substrait_transformation import SubstraitTransformation
Expand Down Expand Up @@ -66,15 +66,15 @@ class OnDemandFeatureView(BaseFeatureView):
features: List[Field]
source_feature_view_projections: dict[str, FeatureViewProjection]
source_request_sources: dict[str, RequestSource]
feature_transformation: Union[
PandasTransformation, PythonTransformation, SubstraitTransformation
]
feature_transformation: Transformation
mode: str
description: str
tags: dict[str, str]
owner: str
write_to_online_store: bool
singleton: bool
udf: Optional[FunctionType]
udf_string: Optional[str]

def __init__( # noqa: C901
self,
Expand All @@ -90,10 +90,8 @@ def __init__( # noqa: C901
]
],
udf: Optional[FunctionType] = None,
udf_string: str = "",
feature_transformation: Union[
PandasTransformation, PythonTransformation, SubstraitTransformation
],
udf_string: Optional[str] = "",
feature_transformation: Optional[Transformation] = None,
mode: str = "pandas",
description: str = "",
tags: Optional[dict[str, str]] = None,
Expand All @@ -112,9 +110,9 @@ def __init__( # noqa: C901
sources: A map from input source names to the actual input sources, which may be
feature views, or request data sources. These sources serve as inputs to the udf,
which will refer to them by name.
udf (deprecated): The user defined transformation function, which must take pandas
udf: The user defined transformation function, which must take pandas
dataframes as inputs.
udf_string (deprecated): The source code version of the udf (for diffing and displaying in Web UI)
udf_string: The source code version of the udf (for diffing and displaying in Web UI)
feature_transformation: The user defined transformation.
mode: Mode of execution (e.g., Pandas or Python native)
description (optional): A human-readable description.
Expand All @@ -136,29 +134,10 @@ def __init__( # noqa: C901

schema = schema or []
self.entities = [e.name for e in entities] if entities else [DUMMY_ENTITY_NAME]
self.sources = sources
self.mode = mode.lower()

if self.mode not in {"python", "pandas", "substrait"}:
raise ValueError(
f"Unknown mode {self.mode}. OnDemandFeatureView only supports python or pandas UDFs and substrait."
)

if not feature_transformation:
if udf:
warnings.warn(
"udf and udf_string parameters are deprecated. Please use transformation=PandasTransformation(udf, udf_string) instead.",
DeprecationWarning,
)
# Note inspecting the return signature won't work with isinstance so this is the best alternative
if self.mode == "pandas":
feature_transformation = PandasTransformation(udf, udf_string)
elif self.mode == "python":
feature_transformation = PythonTransformation(udf, udf_string)
else:
raise ValueError(
"OnDemandFeatureView needs to be initialized with either feature_transformation or udf arguments"
)

self.udf = udf
self.udf_string = udf_string
self.source_feature_view_projections: dict[str, FeatureViewProjection] = {}
self.source_request_sources: dict[str, RequestSource] = {}
for odfv_source in sources:
Expand Down Expand Up @@ -206,12 +185,33 @@ def __init__( # noqa: C901
features.append(field)

self.features = features
self.feature_transformation = feature_transformation
self.feature_transformation = (
feature_transformation or self.get_feature_transformation()
)
self.write_to_online_store = write_to_online_store
self.singleton = singleton
if self.singleton and self.mode != "python":
raise ValueError("Singleton is only supported for Python mode.")

def get_feature_transformation(self) -> Transformation:
if not self.udf:
raise ValueError(
"Either udf or feature_transformation must be provided to create an OnDemandFeatureView"
)
if self.mode in (
TransformationMode.PANDAS,
TransformationMode.PYTHON,
) or self.mode in ("pandas", "python"):
return Transformation(
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
)
elif self.mode == TransformationMode.SUBSTRAIT or self.mode == "substrait":
return SubstraitTransformation.from_ibis(self.udf, self.sources)
else:
raise ValueError(
f"Unsupported transformation mode: {self.mode} for OnDemandFeatureView"
)

@property
def proto_class(self) -> type[OnDemandFeatureViewProto]:
return OnDemandFeatureViewProto
Expand Down Expand Up @@ -312,16 +312,25 @@ def to_proto(self) -> OnDemandFeatureViewProto:
request_data_source=request_sources.to_proto()
)

feature_transformation = FeatureTransformationProto(
user_defined_function=self.feature_transformation.to_proto()
user_defined_function_proto = cast(
UserDefinedFunctionProto,
self.feature_transformation.to_proto()
if isinstance(
self.feature_transformation,
(PandasTransformation, PythonTransformation),
)
else None,
substrait_transformation=self.feature_transformation.to_proto()
)

substrait_transformation_proto = (
self.feature_transformation.to_proto()
if isinstance(self.feature_transformation, SubstraitTransformation)
else None,
else None
)

feature_transformation = FeatureTransformationProto(
user_defined_function=user_defined_function_proto,
substrait_transformation=substrait_transformation_proto,
)
spec = OnDemandFeatureViewSpec(
name=self.name,
Expand Down Expand Up @@ -786,38 +795,22 @@ def mainify(obj) -> None:
obj.__module__ = "__main__"

def decorator(user_function):
return_annotation = get_type_hints(user_function).get("return", inspect._empty)
udf_string = dill.source.getsource(user_function)
mainify(user_function)
if mode == "pandas":
if return_annotation not in (inspect._empty, pd.DataFrame):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be pd.DataFrame"
)
transformation = PandasTransformation(user_function, udf_string)
elif mode == "python":
transformation = PythonTransformation(user_function, udf_string)
elif mode == "substrait":
from ibis.expr.types.relations import Table

if return_annotation not in (inspect._empty, Table):
raise TypeError(
f"return signature for {user_function} is {return_annotation} but should be ibis.expr.types.relations.Table"
)
transformation = SubstraitTransformation.from_ibis(user_function, sources)

on_demand_feature_view_obj = OnDemandFeatureView(
name=name if name is not None else user_function.__name__,
sources=sources,
schema=schema,
feature_transformation=transformation,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we keep this to be backwards compatible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is backwards compatible. I put the feature_transformation extraction logic inside the OnDemandFeatureView initialization, since the decorator doesn't pass in the feature_transformation param.
So user can do:

@demand_feature_view(...)
def udf()...

or

odfv = OnDemandFeatureView(feature_transformation=Transformation(...))`

mode=mode,
description=description,
tags=tags,
owner=owner,
write_to_online_store=write_to_online_store,
entities=entities,
singleton=singleton,
udf=user_function,
udf_string=udf_string,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
Expand Down
Loading
Loading