-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathbatch_feature_view.py
More file actions
206 lines (188 loc) · 7.91 KB
/
batch_feature_view.py
File metadata and controls
206 lines (188 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import functools
import warnings
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import dill
from feast import flags_helper
from feast.aggregation import Aggregation
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.transformation.base import Transformation
from feast.transformation.mode import TransformationMode
warnings.simplefilter("once", RuntimeWarning)
SUPPORTED_BATCH_SOURCES = {
"BigQuerySource",
"FileSource",
"RedshiftSource",
"SnowflakeSource",
"SparkSource",
"TrinoSource",
"AthenaSource",
}
class BatchFeatureView(FeatureView):
"""
A batch feature view defines a logical group of features that has only a batch data source.
Attributes:
name: The unique name of the batch feature view.
mode: The transformation mode to use for the batch feature view. This can be one of TransformationMode
entities: List of entities or entity join keys.
ttl: The amount of time this group of features lives. A ttl of 0 indicates that
this group of features lives forever. Note that large ttl's or a ttl of 0
can result in extremely computationally intensive queries.
schema: The schema of the feature view, including feature, timestamp, and entity
columns. If not specified, can be inferred from the underlying data source.
source: The batch source of data where this group of features is stored.
online: A boolean indicating whether online retrieval and write to online store is enabled for this feature view.
offline: A boolean indicating whether offline retrieval and write to offline store is enabled for this feature view.
description: A human-readable description.
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the batch feature view, typically the email of the primary maintainer.
udf: A user-defined function that applies transformations to the data in the batch feature view.
udf_string: A string representation of the user-defined function.
feature_transformation: A transformation object that defines how features are transformed.
Note, feature_transformation has precedence over udf and udf_string.
batch_engine: A dictionary containing configuration for the batch engine used to process the feature view.
Note, it will override the repo-level default batch engine config defined in the yaml file.
aggregations: A list of aggregations to be applied to the features in the batch feature view.
"""
name: str
entities: List[str]
ttl: Optional[timedelta]
source: DataSource
sink_source: Optional[DataSource] = None
schema: List[Field]
entity_columns: List[Field]
features: List[Field]
online: bool
offline: bool
description: str
tags: Dict[str, str]
owner: str
timestamp_field: str
materialization_intervals: List[Tuple[datetime, datetime]]
udf: Optional[Callable[[Any], Any]]
udf_string: Optional[str]
feature_transformation: Optional[Transformation]
batch_engine: Optional[Dict[str, Any]]
aggregations: Optional[List[Aggregation]]
def __init__(
self,
*,
name: str,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
source: Union[DataSource, "BatchFeatureView", List["BatchFeatureView"]],
sink_source: Optional[DataSource] = None,
entities: Optional[List[Entity]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = False,
offline: bool = False,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
udf: Optional[Callable[[Any], Any]] = None,
udf_string: Optional[str] = "",
feature_transformation: Optional[Transformation] = None,
batch_engine: Optional[Dict[str, Any]] = None,
aggregations: Optional[List[Aggregation]] = None,
):
if not flags_helper.is_test():
warnings.warn(
"Batch feature views are experimental features in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
if isinstance(source, DataSource) and (
type(source).__name__ not in SUPPORTED_BATCH_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)
self.mode = mode
self.udf = udf
self.udf_string = udf_string
self.feature_transformation = (
feature_transformation or self.get_feature_transformation()
)
self.batch_engine = batch_engine
self.aggregations = aggregations or []
super().__init__(
name=name,
entities=entities,
ttl=ttl,
tags=tags,
online=online,
offline=offline,
description=description,
owner=owner,
schema=schema,
source=source, # type: ignore[arg-type]
sink_source=sink_source,
mode=mode,
)
def get_feature_transformation(self) -> Optional[Transformation]:
if not self.udf:
return None
if self.mode in (
TransformationMode.PANDAS,
TransformationMode.PYTHON,
TransformationMode.SQL,
TransformationMode.RAY,
) or self.mode in ("pandas", "python", "sql", "ray"):
return Transformation(
mode=self.mode, udf=self.udf, udf_string=self.udf_string or ""
)
else:
raise ValueError(
f"Unsupported transformation mode: {self.mode} for StreamFeatureView"
)
def batch_feature_view(
*,
name: Optional[str] = None,
mode: Union[TransformationMode, str] = TransformationMode.PYTHON,
entities: Optional[List[str]] = None,
ttl: Optional[timedelta] = None,
source: Optional[DataSource] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
offline: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
):
"""
Creates a BatchFeatureView object with the given user-defined function (UDF) as the transformation.
Please make sure that the udf contains all non-built in imports within the function to ensure that the execution
of a deserialized function does not miss imports.
"""
def mainify(obj):
# Needed to allow dill to properly serialize the udf. Otherwise, clients will need to have a file with the same
# name as the original file defining the sfv.
if obj.__module__ != "__main__":
obj.__module__ = "__main__"
def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
batch_feature_view_obj = BatchFeatureView(
name=name or user_function.__name__,
mode=mode,
entities=entities,
ttl=ttl,
source=source,
tags=tags,
online=online,
offline=offline,
description=description,
owner=owner,
schema=schema,
udf=user_function,
udf_string=udf_string,
)
functools.update_wrapper(wrapper=batch_feature_view_obj, wrapped=user_function)
return batch_feature_view_obj
return decorator