-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathfeatures.py
More file actions
176 lines (155 loc) · 5.01 KB
/
features.py
File metadata and controls
176 lines (155 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import json
from typing import List
import click
import pandas as pd
from feast.repo_operations import create_feature_store
@click.group(name="features")
def features_cmd():
"""
Access features
"""
pass
@features_cmd.command(name="list")
@click.option(
"--output",
type=click.Choice(["table", "json"], case_sensitive=False),
default="table",
show_default=True,
help="Output format",
)
@click.pass_context
def features_list(ctx: click.Context, output: str):
"""
List all features
"""
store = create_feature_store(ctx)
feature_views = [
*store.list_batch_feature_views(),
*store.list_on_demand_feature_views(),
*store.list_stream_feature_views(),
]
feature_list = []
for fv in feature_views:
for feature in fv.features:
feature_list.append([feature.name, fv.name, str(feature.dtype)])
if output == "json":
json_output = [
{"feature_name": fn, "feature_view": fv, "dtype": dt}
for fv, fn, dt in feature_list
]
click.echo(json.dumps(json_output, indent=4))
else:
from tabulate import tabulate
click.echo(
tabulate(
feature_list,
headers=["Feature", "Feature View", "Data Type"],
tablefmt="plain",
)
)
@features_cmd.command("describe")
@click.argument("feature_name", type=str)
@click.pass_context
def describe_feature(ctx: click.Context, feature_name: str):
"""
Describe a specific feature by name
"""
store = create_feature_store(ctx)
feature_views = [
*store.list_batch_feature_views(),
*store.list_on_demand_feature_views(),
*store.list_stream_feature_views(),
]
feature_details = []
for fv in feature_views:
for feature in fv.features:
if feature.name == feature_name:
feature_details.append(
{
"Feature Name": feature.name,
"Feature View": fv.name,
"Data Type": str(feature.dtype),
"Description": getattr(feature, "description", "N/A"),
"Online Store": getattr(fv, "online", "N/A"),
"Source": json.loads(str(getattr(fv, "batch_source", "N/A"))),
}
)
if not feature_details:
click.echo(f"Feature '{feature_name}' not found in any feature view.")
return
click.echo(json.dumps(feature_details, indent=4))
@click.command("get-online-features")
@click.option(
"--entities",
"-e",
type=str,
multiple=True,
required=True,
help="Entity key-value pairs (e.g., driver_id=1001)",
)
@click.option(
"--features",
"-f",
multiple=True,
required=True,
help="Features to retrieve. (e.g.,feature-view:feature-name) ex: driver_hourly_stats:conv_rate",
)
@click.pass_context
def get_online_features(ctx: click.Context, entities: List[str], features: List[str]):
"""
Fetch online feature values for a given entity ID
"""
store = create_feature_store(ctx)
entity_dict: dict[str, List[str]] = {}
for entity in entities:
try:
key, value = entity.split("=")
if key not in entity_dict:
entity_dict[key] = []
entity_dict[key].append(value)
except ValueError:
click.echo(f"Invalid entity format: {entity}. Use key=value format.")
return
entity_rows = [
dict(zip(entity_dict.keys(), values)) for values in zip(*entity_dict.values())
]
feature_vector = store.get_online_features(
features=list(features),
entity_rows=entity_rows,
).to_dict()
click.echo(json.dumps(feature_vector, indent=4))
@click.command(name="get-historical-features")
@click.option(
"--dataframe",
"-d",
type=str,
required=True,
help='JSON string containing entities and timestamps. Example: \'[{"event_timestamp": "2025-03-29T12:00:00", "driver_id": 1001}]\'',
)
@click.option(
"--features",
"-f",
multiple=True,
required=True,
help="Features to retrieve. feature-view:feature-name ex: driver_hourly_stats:conv_rate",
)
@click.pass_context
def get_historical_features(ctx: click.Context, dataframe: str, features: List[str]):
"""
Fetch historical feature values for a given entity ID
"""
store = create_feature_store(ctx)
try:
entity_list = json.loads(dataframe)
if not isinstance(entity_list, list):
raise ValueError("Entities must be a list of dictionaries.")
entity_df = pd.DataFrame(entity_list)
entity_df["event_timestamp"] = pd.to_datetime(entity_df["event_timestamp"])
except Exception as e:
click.echo(f"Error parsing entities JSON: {e}", err=True)
return
feature_vector = store.get_historical_features(
entity_df=entity_df,
features=list(features),
).to_df()
click.echo(feature_vector.to_json(orient="records", indent=4))