Skip to content

Commit 0b6ed9a

Browse files
committed
Fixing online features part of this example (adding key columns, removing unimplemented part with stream data pushing).
Signed-off-by: Felix-neko <[email protected]>
1 parent 0ce35f4 commit 0b6ed9a

File tree

1 file changed

+3
-22
lines changed

1 file changed

+3
-22
lines changed

sdk/python/feast/templates/spark/feature_repo/test_workflow.py

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,6 @@ def run_demo():
2727
print("\n--- Online features retrieved (instead) through a feature service---")
2828
fetch_online_features(store, use_feature_service=True)
2929

30-
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
31-
event_df = pd.DataFrame.from_dict(
32-
{
33-
"driver_id": [1001],
34-
"event_timestamp": [
35-
datetime(2021, 5, 13, 10, 59, 42),
36-
],
37-
"created": [
38-
datetime(2021, 5, 13, 10, 59, 42),
39-
],
40-
"conv_rate": [1.0],
41-
"acc_rate": [1.0],
42-
"avg_daily_trips": [1000],
43-
}
44-
)
45-
print(event_df)
46-
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE)
47-
48-
print("\n--- Online features again with updated values from a stream push---")
49-
fetch_online_features(store, use_feature_service=True)
50-
5130
print("\n--- Run feast teardown ---")
5231
subprocess.run(["feast", "teardown"])
5332

@@ -92,17 +71,19 @@ def fetch_online_features(store, use_feature_service: bool):
9271
# {join_key: entity_value}
9372
{
9473
"driver_id": 1001,
74+
"customer_id": 201,
9575
"val_to_add": 1000,
9676
"val_to_add_2": 2000,
9777
},
9878
{
9979
"driver_id": 1002,
80+
"customer_id": 202,
10081
"val_to_add": 1001,
10182
"val_to_add_2": 2002,
10283
},
10384
]
10485
if use_feature_service:
105-
features_to_fetch = store.get_feature_service("driver_activity_v1")
86+
features_to_fetch = store.get_feature_service("driver_activity")
10687
else:
10788
features_to_fetch = [
10889
"driver_hourly_stats:acc_rate",

0 commit comments

Comments
 (0)